Tag Archives: AWS Glue

How SikSin improved customer engagement with AWS Data Lab and Amazon Personalize

Post Syndicated from Byungjun Choi original https://aws.amazon.com/blogs/big-data/how-siksin-improved-customer-engagement-with-aws-data-lab-and-amazon-personalize/

This post is co-written with Byungjun Choi and Sangha Yang from SikSin.

SikSin is a technology platform connecting customers with restaurant partners serving their multiple needs. Customers use the SikSin platform to search and discover restaurants, read and write reviews, and view photos. From the restaurateurs’ perspective, SikSin enables restaurant partners to engage and acquire customers in order to grow their business. SikSin has a partnership with 850 corporate companies and more than 50,000 restaurants. They issue restaurant e-vouchers to more than 220,000 members, including individuals as well as corporate members. The SikSin platform receives more than 3 million users in a month. SikSin was listed in the top 100 of the Financial Times’s Asia-Pacific region’s high-growth companies in 2022.

SikSin was looking to deliver improved customer experiences and increase customer engagement. SikSin confronted two business challenges:

  • Customer engagement – SikSin maintains data on more than 750,000 restaurants and has more than 4,000 restaurant articles (and growing). SikSin was looking for a personalized and customized approach to provide restaurant recommendations for their customers and get them engaged with the content, thereby providing a personalized customer experience.
  • Data analysis activities – The SikSin Food Service team experienced difficulties in regards to report generation due to scattered data across multiple systems. The team previously had to submit a request to the IT team and then wait for answers that might be outdated. For the IT team, they needed to manually pull data out of files, databases, and applications, and then combine them upon every request, which is a time-consuming activity. The SikSin Food Service team wanted to view web analytics log data by multiple dimensions, such as customer profiles and places. Examples include page view, conversion rate, and channels.

To overcome these two challenges, SikSin participated in the AWS Data Lab program to assist them in building a prototype solution. The AWS Data Lab offers accelerated, joint-engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives. The Build Lab is a 2–5-day intensive build with a technical customer team.

In this post, we share how SikSin built the basis for accelerating their data project with the help of the Data Lab and Amazon Personalize.

Use cases

The Data Lab team and SikSin team had three consecutive meetings to discuss business and technical requirements, and decided to work on two uses cases to resolve their two business challenges:

  • Build personalized recommendations – SikSin wanted to deploy a machine learning (ML) model to produce personalized content on the landing page of the platform, particularly restaurants and restaurant articles. The success criteria was to increase the number of page views per session and membership subscription, reduce their bounce rate, and ultimately engage more visitors and members in SikSin’s contents.
  • Establish self-service analytics – SikSin’s business users wanted to reduce time to insight by making data more accessible while removing the reliance on the IT team by giving business users the ability to query data. The key was to consolidate web logs from BigQuery and operational business data from Amazon Relational Data Service (Amazon RDS) into a single place and analyze data whenever they need.

Solution overview

The following architecture depicts what the SikSin team built in the 4-day Build Lab. There are two parts in the solution to address SikSin’s business and technical requirements. The first part (1–8) is for building personalized recommendations, and the second part (A–D) is for establishing self-service analytics.

SikSin Solution Architecture

SikSin deployed an ML model to produce personalized content recommendations by using the following AWS services:

  1. AWS Database Migration Service (AWS DMS) helps migrate databases to AWS quickly and securely with minimal downtime. The SikSin team used AWS DMS to perform full load to bring data from the database tables into Amazon Simple Storage Service (Amazon S3) as a target. Amazon S3 is an object storage service offering industry-leading scalability, data availability, security, and performance. An AWS Glue crawler populates the AWS Glue Data Catalog with the data schema definitions (in a landing folder).
  2. An AWS Lambda function checks if any previous files still exist in the landing folder and archives the files into a backup folder, if any.
  3. AWS Glue is a serverless data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources for analytics, ML, and application development. The SikSin team created AWS Glue Spark extract, transform, and load (ETL) jobs to prepare input datasets for ML models. These datasets are used to train ML models in bulk mode. There are a total of five datasets for training and two datasets for batch inference jobs.
  4. Amazon Personalize allows developers to quickly build and deploy curated recommendations and intelligent user segmentation at scale using ML. Because Amazon Personalize can be tailored to your individual needs, you can deliver the right customer experience at the right time and in the right place. Also, users will select existing ML models (also known as recipes), train models, and run batch inference to make recommendations.
  5. An Amazon Personalize job predicts for each line of input data (restaurants and restaurant articles) and produces ML-generated recommendations in the designated S3 output folder. The recommendation records are surfaced using interaction data, product data, and predictive models. An AWS Glue crawler populates the AWS Glue Data Catalog with the data schema definitions (in an output folder).
  6. The SikSin team applied business logics and filters in an AWS Glue job to prepare the final datasets for recommendations.
  7. AWS Step Functions enables you to build scalable, distributed applications using state machines. The SikSin team used AWS Step Functions Workflow Studio to visually create, run, and debug workflow runs. This workflow is triggered based on a schedule. The process includes data ingestion, cleansing, processing, and all steps defined in Amazon Personalize. This also involves managing run dependencies, scheduling, error-catching, and concurrency in accordance with the logical flow of the pipeline.
  8. Amazon Simple Notification Service (Amazon SNS) sends notifications. The SikSin team used Amazon SNS to send a notification via email and Google Hangouts with a Lambda function as a target.

To establish a self-service analytics environment to enable business users to perform data analysis, SikSin used the following services:

  1. The Google BigQuery Connector for AWS Glue simplifies the process of connecting AWS Glue jobs to extract data from BigQuery. The SikSin team used the connector to extract web analytics logs from BigQuery and load them to an S3 bucket.
  2. AWS Glue DataBrew is a visual data preparation tool that makes it easy for data analysts and data scientists to clean and normalize data to prepare it for analytics and ML. You can choose from over 250 pre-built transformations to automate data preparation tasks, all without the need to write any code. The SikSin Food Service team used it to visually inspect large datasets and shape the data for their data analysis activities. An S3 bucket (in the intermediate folder) contains business operational data such as customers, places, articles, and products, and reference data loaded from AWS DMS and web analytics logs and data by AWS Glue jobs.
  3. An AWS Glue Python shell runs a job to cleanse and join data, and apply business rules to prepare the data for queries. The SikSin team used AWS SDK Pandas, an AWS Professional Service open-source Python initiative, which extends the power of the Pandas library to AWS, connecting DataFrames and AWS data related services. The output files are stored in an Apache Parquet format in a single folder. An AWS Glue crawler populates the data schema definitions (in an output folder) into the AWS Glue Data Catalog.
  4. The SikSin Food Service team used Amazon Athena and Amazon Quicksight to query and visualize the data analysis. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. QuickSight is an ML-powered business intelligence service built for the cloud.

Business outcomes

The SikSin Food Service team is now able to access the available data for performing data analysis and manipulation operations efficiently, as well as for getting insights on their own. This immediately allows the team as well as other lines of business to understand how customers are interacting with SikSin’s contents and services on the platform and make decisions sooner. For example, with the data output, the Food Service team was able to provide insights and data points for their external stakeholder and customer to initiate a new business idea. Moreover, the team shared, “We anticipate the recommendations and personalized content will increase conversion rates and customer engagement.”

The AWS Data Lab enabled SikSin to review and assess thoroughly what data is actually usable and available. With SikSin’s objective to successfully build a data pipeline for data analytics purposes, the SikSin team came to realize the importance of data cleansing, categorization, and standardization. “Only fruitful analysis and recommendation are possible when data is intact and properly cleansed,” said Byungjun Choi (the Head of SikSin’s Food Service Team). After completing the Data Lab, SikSin completed and set up an internal process that can streamline the data cleansing pipeline.

SikSin was stuck in the research phase of looking for a solution to solve their personalization challenges. The AWS Data Lab enabled the SikSin IT Team to get hands-on with the technology and build a minimum viable product (MVP) to explore how Amazon Personalize would work in their environment with their data. They achieved this via the Data Lab by adopting AWS DMS, AWS Glue, Amazon Personalize, and Step Functions. “Though it is still the early stage of building a prototype, I am very confident with the right enablement provided from AWS that an effective recommendation system can be adopted on production level very soon,” commented Sangha Yang (the Head of SikSin IT Team).

Conclusion

As a result of the 4-day Build Lab, the SikSin team left with a working prototype that is custom fit to their needs, gaining a clear path forward for enabling end-users to gain valuable insights into its data. The Data Lab allowed the SikSin team to accelerate the architectural design and prototype build of this solution by months. Based on the lessons and learnings obtained from Data Lab, SikSin is planning to launch a Global News Content Platform equipped with a recommendation feature in FY23.

As demonstrated by SikSin’s achievements, Amazon Personalize allows developers to quickly build and deploy curated recommendations and intelligent user segmentation at scale using ML. Because Amazon Personalize can be tailored to your individual needs, you can deliver the right customer experience at the right time and in the right place. Whether you want to optimize recommendations, target customers more accurately, maximize your data’s value, or promote items using business rules.

To accelerate your digital transformation with ML, the Data Lab program is available to support you by providing prescriptive architectural guidance on a particular use case, sharing best practices, and removing technical roadblocks. You’ll leave the engagement with an architecture or working prototype that is custom fit to your needs, a path to production, and deeper knowledge of AWS services.

Please contact your AWS Account Manager or Solutions Architect to get started. If you don’t have an AWS Account Manager, please contact Sales.


About the Authors

bdb-2857-BJByungjun Choi is the Head of SikSin Food Service at SikSin.

bdb-2857-SHSangha Yang is the Head of IT team at SinSin.

bdb-2857-youngguYounggu Yun is a Senior Data Lab Architect at AWS. He works with customers around the APAC region to help them achieve business goals and solve technical problems by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together.

Junwoo Lee is an Account Manager at AWS. He provides technical and business support to help customer resolve their problems and enrich customer journey by introducing local and global programs for his customers.

bdb-2857-jinwooJinwoo Park is a Senior Solutions Architect at AWS. He provides technical support for AWS customers to succeed with their cloud journey. He helps customers build more secure, efficient, and cost-optimized architectures and solutions, and delivers best practices and workshops.

Text analytics on AWS: implementing a data lake architecture with OpenSearch

Post Syndicated from Francisco Losada original https://aws.amazon.com/blogs/architecture/text-analytics-on-aws-implementing-a-data-lake-architecture-with-opensearch/

Text data is a common type of unstructured data found in analytics. It is often stored without a predefined format and can be hard to obtain and process.

For example, web pages contain text data that data analysts collect through web scraping and pre-process using lowercasing, stemming, and lemmatization. After pre-processing, the cleaned text is analyzed by data scientists and analysts to extract relevant insights.

This blog post covers how to effectively handle text data using a data lake architecture on Amazon Web Services (AWS). We explain how data teams can independently extract insights from text documents using OpenSearch as the central search and analytics service. We also discuss how to index and update text data in OpenSearch and evolve the architecture towards automation.

Architecture overview

This architecture outlines the use of AWS services to create an end-to-end text analytics solution, starting from the data collection and ingestion up to the data consumption in OpenSearch (Figure 1).

Data lake architecture with OpenSearch

Figure 1. Data lake architecture with OpenSearch

  1. Collect data from various sources, such as SaaS applications, edge devices, logs, streaming media, and social networks.
  2. Use tools like AWS Database Migration Service (AWS DMS), AWS DataSync, Amazon Kinesis, Amazon Managed Streaming for Apache Kafka (Amazon MSK), AWS IoT Core, and Amazon AppFlow to ingest the data into the AWS data lake, depending on the data source type.
  3. Store the ingested data in the raw zone of the Amazon Simple Storage Service (Amazon S3) data lake—a temporary area where data is kept in its original form.
  4. Validate, clean, normalize, transform, and enrich the data through a series of pre-processing steps using AWS Glue or Amazon EMR.
  5. Place the data that is ready to be indexed in the indexing zone.
  6. Use AWS Lambda to index the documents into OpenSearch and store them back in the data lake with a unique identifier.
  7. Use the clean zone as the source of truth for teams to consume the data and calculate additional metrics.
  8. Develop, train, and generate new metrics using machine learning (ML) models with Amazon SageMaker or artificial intelligence (AI) services like Amazon Comprehend.
  9. Store the new metrics in the enriching zone along with the identifier of the OpenSearch document.
  10. Use the identifier column from the initial indexing phase to identify the correct documents and update them in OpenSearch with the newly calculated metrics using AWS Lambda.
  11. Use OpenSearch to search through the documents and visualize them with metrics using OpenSearch Dashboards.

Considerations

Data lake orchestration among teams

This architecture allows data teams to work independently on text documents at different stages of their lifecycles. The data engineering team manages the raw and indexing zones, who also handle data ingestion and preprocessing for indexing in OpenSearch.

The cleaned data is stored in the clean zone, where data analysts and data scientists generate insights and calculate new metrics. These metrics are stored in the enrich zone and indexed as new fields in the OpenSearch documents by the data engineering team (Figure 2).

Data lake orchestration among teams

Figure 2. Data lake orchestration among teams

Let’s explore an example. Consider a company that periodically retrieves blog site comments and performs sentiment analysis using Amazon Comprehend. In this case:

  1. The comments are ingested into the raw zone of the data lake.
  2. The data engineering team processes the comments and stores them in the indexing zone.
  3. A Lambda function indexes the comments into OpenSearch, enriches the comments with the OpenSearch document ID, and saves it in the clean zone.
  4. The data science team consumes the comments and performs sentiment analysis using Amazon Comprehend.
  5. The sentiment analysis metrics are stored in the metrics zone of the data lake. A second Lambda function updates the comments in OpenSearch with the new metrics.

If the raw data does not require any preprocessing steps, the indexing and clean zones can be combined. You can explore this specific example, along with code implementation, in the AWS samples repository.

Schema evolution

As your data progresses through data lake stages, the schema changes and gets enriched accordingly. Continuing with our previous example, Figure 3 explains how the schema evolves.

Schema evolution through the data lake stages

Figure 3. Schema evolution through the data lake stages

  1. In the raw zone, there is a raw text field received directly from the ingestion phase. It’s best practice to keep a raw version of the data as a backup, or in case the processing steps need to be repeated later.
  2. In the indexing zone, the clean text field replaces the raw text field after being processed.
  3. In the clean zone, we add a new ID field that is generated during indexing and identifies the OpenSearch document of the text field.
  4. In the enrich zone, the ID field is required. Other fields with metric names are optional and represent new metrics calculated by other teams that will be added to OpenSearch.

Consumption layer with OpenSearch

In OpenSearch, data is organized into indices, which can be thought of as tables in a relational database. Each index consists of documents—similar to table rows—and multiple fields, similar to table columns. You can add documents to an index by indexing and updating them using various client APIs for popular programming languages.

Now, let’s explore how our architecture integrates with OpenSearch in the indexing and updating stage.

Indexing and updating documents using Python

The index document API operation allows you to index a document with a custom ID, or assigns one if none is provided. To speed up indexing, we can use the bulk index API to index multiple documents in one call.

We need to store the IDs back from the index operation to later identify the documents we’ll update with new metrics. Let’s explore two ways of doing this:

  • Use the requests library to call the REST Bulk Index API (preferred): the response returns the auto-generated IDs we need.
  • Use the Python Low-Level Client for OpenSearch: The IDs are not returned and need to be pre-assigned to later store them. We can use an atomic counter in Amazon DynamoDB to do so. This allows multiple Lambda functions to index documents in parallel without ID collisions.

As in Figure 4, the Lambda function:

  1. Increases the atomic counter by the number of documents that will index into OpenSearch.
  2. Gets the value of the counter back from the API call.
  3. Indexes the documents using the range that goes between [current counter value, current counter value – number of documents].
Storing the IDs back from the bulk index operation using the Python Low-Level Client for OpenSearch

Figure 4. Storing the IDs back from the bulk index operation using the Python Low-Level Client for OpenSearch

Data flow automation

As architectures evolve towards automation, the data flow between data lake stages becomes event-driven. Following our previous example, we can automate the processing steps of the data when moving from the raw to the indexing zone (Figure 5).

Event-driven automation for data flow

Figure 5. Event-driven automation for data flow

With Amazon EventBridge and AWS Step Functions, we can automatically trigger our pre-processing AWS Glue jobs so our data gets pre-processed without manual intervention.

The same approach can be applied to the other data lake stages to achieve a fully automated architecture. Explore this implementation for an automated language use case.

Conclusion

In this blog post, we covered designing an architecture to effectively handle text data using a data lake on AWS. We explained how different data teams can work independently to extract insights from text documents at different lifecycle stages using OpenSearch as the search and analytics service.

How BookMyShow saved 80% in costs by migrating to an AWS modern data architecture

Post Syndicated from Mahesh Vandi Chalil original https://aws.amazon.com/blogs/big-data/how-bookmyshow-saved-80-in-costs-by-migrating-to-an-aws-modern-data-architecture/

This is a guest post co-authored by Mahesh Vandi Chalil, Chief Technology Officer of BookMyShow.

BookMyShow (BMS), a leading entertainment company in India, provides an online ticketing platform for movies, plays, concerts, and sporting events. Selling up to 200 million tickets on an annual run rate basis (pre-COVID) to customers in India, Sri Lanka, Singapore, Indonesia, and the Middle East, BookMyShow also offers an online media streaming service and end-to-end management for virtual and on-ground entertainment experiences across all genres.

The pandemic gave BMS the opportunity to migrate and modernize our 15-year-old analytics solution to a modern data architecture on AWS. This architecture is modern, secure, governed, and cost-optimized architecture, with the ability to scale to petabytes. BMS migrated and modernized from on-premises and other cloud platforms to AWS in just four months. This project was run in parallel with our application migration project and achieved 90% cost savings in storage and 80% cost savings in analytics spend.

The BMS analytics platform caters to business needs for sales and marketing, finance, and business partners (e.g., cinemas and event owners), and provides application functionality for audience, personalization, pricing, and data science teams. The prior analytics solution had multiple copies of data, for a total of over 40 TB, with approximately 80 TB of data in other cloud storage. Data was stored on‑premises and in the cloud in various data stores. Growing organically, the teams had the freedom to choose their technology stack for individual projects, which led to the proliferation of various tools, technology, and practices. Individual teams for personalization, audience, data engineering, data science, and analytics used a variety of products for ingestion, data processing, and visualization.

This post discusses BMS’s migration and modernization journey, and how BMS, AWS, and AWS Partner Minfy Technologies team worked together to successfully complete the migration in four months and saving costs. The migration tenets using the AWS modern data architecture made the project a huge success.

Challenges in the prior analytics platform

  • Varied Technology: Multiple teams used various products, languages, and versions of software.
  • Larger Migration Project: Because the analytics modernization was a parallel project with application migration, planning was crucial in order to consider the changes in core applications and project timelines.
  • Resources: Experienced resource churn from the application migration project, and had very little documentation of current systems.
  • Data : Had multiple copies of data and no single source of truth; each data store provided a view for the business unit.
  • Ingestion Pipelines: Complex data pipelines moved data across various data stores at varied frequencies. We had multiple approaches in place to ingest data to Cloudera, via over 100 Kafka consumers from transaction systems and MQTT(Message Queue Telemetry Transport messaging protocol) for clickstreams, stored procedures, and Spark jobs. We had approximately 100 jobs for data ingestion across Spark, Alteryx, Beam, NiFi, and more.
  • Hadoop Clusters: Large dedicated hardware on which the Hadoop clusters were configured incurring fixed costs. On-premises Cloudera setup catered to most of the data engineering, audience, and personalization batch processing workloads. Teams had their implementation of HBase and Hive for our audience and personalization applications.
  • Data warehouse: The data engineering team used TiDB as their on-premises data warehouse. However, each consumer team had their own perspective of data needed for analysis. As this siloed architecture evolved, it resulted in expensive storage and operational costs to maintain these separate environments.
  • Analytics Database: The analytics team used data sourced from other transactional systems and denormalized data. The team had their own extract, transform, and load (ETL) pipeline, using Alteryx with a visualization tool.

Migration tenets followed which led to project success:

  • Prioritize by business functionality.
  • Apply best practices when building a modern data architecture from Day 1.
  • Move only required data, canonicalize the data, and store it in the most optimal format in the target. Remove data redundancy as much possible. Mark scope for optimization for the future when changes are intrusive.
  • Build the data architecture while keeping data formats, volumes, governance, and security in mind.
  • Simplify ELT and processing jobs by categorizing the jobs as rehosted, rewritten, and retired. Finalize canonical data format, transformation, enrichment, compression, and storage format as Parquet.
  • Rehost machine learning (ML) jobs that were critical for business.
  • Work backward to achieve our goals, and clear roadblocks and alter decisions to move forward.
  • Use serverless options as a first option and pay per use. Assess the cost and effort for rearchitecting to select the right approach. Execute a proof of concept to validate this for each component and service.

Strategies applied to succeed in this migration:

  • Team – We created a unified team with people from data engineering, analytics, and data science as part of the analytics migration project. Site reliability engineering (SRE) and application teams were involved when critical decisions were needed regarding data or timeline for alignment. The analytics, data engineering, and data science teams spent considerable time planning, understanding the code, and iteratively looking at the existing data sources, data pipelines, and processing jobs. AWS team with partner team from Minfy Technologies helped BMS arrive at a migration plan after a proof of concept for each of the components in data ingestion, data processing, data warehouse, ML, and analytics dashboards.
  • Workshops – The AWS team conducted a series of workshops and immersion days, and coached the BMS team on the technology and best practices to deploy the analytics services. The AWS team helped BMS explore the configuration and benefits of the migration approach for each scenario (data migration, data pipeline, data processing, visualization, and machine learning) via proof-of-concepts (POCs). The team captured the changes required in the existing code for migration. BMS team also got acquainted with the following AWS services:
  • Proof of concept – The BMS team, with help from the partner and AWS team, implemented multiple proofs of concept to validate the migration approach:
    • Performed batch processing of Spark jobs in Amazon EMR, in which we checked the runtime, required code changes, and cost.
    • Ran clickstream analysis jobs in Amazon EMR, testing the end-to-end pipeline. Team conducted proofs of concept on AWS IoT Core for MQTT protocol and streaming to Amazon S3.
    • Migrated ML models to Amazon SageMaker and orchestrated with Amazon MWAA.
    • Created sample QuickSight reports and dashboards, in which features and time to build were assessed.
    • Configured for key scenarios for Amazon Redshift, in which time for loading data, query performance, and cost were assessed.
  • Effort vs. cost analysis – Team performed the following assessments:
    • Compared the ingestion pipelines, the difference in data structure in each store, the basis of the current business need for the data source, the activity for preprocessing the data before migration, data migration to Amazon S3, and change data capture (CDC) from the migrated applications in AWS.
    • Assessed the effort to migrate approximately 200 jobs, determined which jobs were redundant or need improvement from a functional perspective, and completed a migration list for the target state. The modernization of the MQTT workflow code to serverless was time-consuming, decided to rehost on Amazon Elastic Compute Cloud (Amazon EC2) and modernization to Amazon Kinesis in to the next phase.
    • Reviewed over 400 reports and dashboards, prioritized development in phases, and reassessed business user needs.

AWS cloud services chosen for proposed architecture:

  • Data lake – We used Amazon S3 as the data lake to store the single truth of information for all raw and processed data, thereby reducing the copies of data storage and storage costs.
  • Ingestion – Because we had multiple sources of truth in the current architecture, we arrived at a common structure before migration to Amazon S3, and existing pipelines were modified to do preprocessing. These one-time preprocessing jobs were run in Cloudera, because the source data was on-premises, and on Amazon EMR for data in the cloud. We designed new data pipelines for ingestion from transactional systems on the AWS cloud using AWS Glue ETL.
  • Processing – Processing jobs were segregated based on runtime into two categories: batch and near-real time. Batch processes were further divided into transient Amazon EMR clusters with varying runtimes and Hadoop application requirements like HBase. Near-real-time jobs were provisioned in an Amazon EMR permanent cluster for clickstream analytics, and a data pipeline from transactional systems. We adopted a serverless approach using AWS Glue ETL for new data pipelines from transactional systems on the AWS cloud.
  • Data warehouse – We chose Amazon Redshift as our data warehouse, and planned on how the data would be distributed based on query patterns.
  • Visualization – We built the reports in Amazon QuickSight in phases and prioritized them based on business demand. We discussed with business users their current needs and identified the immediate reports required. We defined the phases of report and dashboard creation and built the reports in Amazon QuickSight. We plan to use embedded reports for external users in the future.
  • Machine learning – Custom ML models were deployed on Amazon SageMaker. Existing Airflow DAGs were migrated to Amazon MWAA.
  • Governance, security, and compliance – Governance with Amazon Lake Formation was adopted from Day 1. We configured the AWS Glue Data Catalog to reference data used as sources and targets. We had to comply to Payment Card Industry (PCI) guidelines because payment information was in the data lake, so we ensured the necessary security policies.

Solution overview

BMS modern data architecture

The following diagram illustrates our modern data architecture.

The architecture includes the following components:

  1. Source systems – These include the following:
    • Data from transactional systems stored in MariaDB (booking and transactions).
    • User interaction clickstream data via Kafka consumers to DataOps MariaDB.
    • Members and seat allocation information from MongoDB.
    • SQL Server for specific offers and payment information.
  2. Data pipeline – Spark jobs on an Amazon EMR permanent cluster process the clickstream data from Kafka clusters.
  3. Data lake – Data from source systems was stored in their respective Amazon S3 buckets, with prefixes for optimized data querying. For Amazon S3, we followed a hierarchy to store raw, summarized, and team or service-related data in different parent folders as per the source and type of data. Lifecycle polices were added to logs and temp folders of different services as per teams’ requirements.
  4. Data processing – Transient Amazon EMR clusters are used for processing data into a curated format for the audience, personalization, and analytics teams. Small file merger jobs merge the clickstream data to a larger file size, which saved costs for one-time queries.
  5. Governance – AWS Lake Formation enables the usage of AWS Glue crawlers to capture the schema of data stored in the data lake and version changes in the schema. The Data Catalog and security policy in AWS Lake Formation enable access to data for roles and users in Amazon Redshift, Amazon Athena, Amazon QuickSight, and data science jobs. AWS Glue ETL jobs load the processed data to Amazon Redshift at scheduled intervals.
  6. Queries – The analytics team used Amazon Athena to perform one-time queries raised from business teams on the data lake. Because report development is in phases, Amazon Athena was used for exporting data.
  7. Data warehouse – Amazon Redshift was used as the data warehouse, where the reports for the sales teams, management, and third parties (i.e., theaters and events) are processed and stored for quick retrieval. Views to analyze the total sales, movie sale trends, member behavior, and payment modes are configured here. We use materialized views for denormalized tables, different schemas for metadata, and transactional and behavior data.
  8. Reports – We used Amazon QuickSight reports for various business, marketing, and product use cases.
  9. Machine learning – Some of the models deployed on Amazon SageMaker are as follows:
    • Content popularity – Decides the recommended content for users.
    • Live event popularity – Calculates the popularity of live entertainment events in different regions.
    • Trending searches – Identifies trending searches across regions.

Walkthrough

Migration execution steps

We standardized tools, services, and processes for data engineering, analytics, and data science:

  • Data lake
    • Identified the source data to be migrated from Archival DB, BigQuery, TiDB, and the analytics database.
    • Built a canonical data model that catered to multiple business teams and reduced the copies of data, and therefore storage and operational costs. Modified existing jobs to facilitate migration to a canonical format.
    • Identified the source systems, capacity required, anticipated growth, owners, and access requirements.
    • Ran the bulk data migration to Amazon S3 from various sources.
  • Ingestion
    • Transaction systems – Retained the existing Kafka queues and consumers.
    • Clickstream data – Successfully conducted a proof of concept to use AWS IoT Core for MQTT protocol. But because we needed to make changes in the application to publish to AWS IoT Core, we decided to implement it as part of mobile application modernization at a later time. We decided to rehost the MQTT server on Amazon EC2.
  • Processing
  • Listed the data pipelines relevant to business and migrated them with minimal modification.
  • Categorized workloads into critical jobs, redundant jobs, or jobs that can be optimized:
    • Spark jobs were migrated to Amazon EMR.
    • HBase jobs were migrated to Amazon EMR with HBase.
    • Metadata stored in Hive-based jobs were modified to use the AWS Glue Data Catalog.
    • NiFi jobs were simplified and rewritten in Spark run in Amazon EMR.
  • Amazon EMR clusters were configured one persistent cluster for streaming the clickstream and personalization workloads. We used multiple transient clusters for running all other Spark ETL or processing jobs. We used Spot Instances for task nodes to save costs. We optimized data storage with specific jobs to merge small files and compressed file format conversions.
  • AWS Glue crawlers identified new data in Amazon S3. AWS Glue ETL jobs transformed and uploaded processed data to the Amazon Redshift data warehouse.
  • Datawarehouse
    • Defined the data warehouse schema by categorizing the critical reports required by the business, keeping in mind the workload and reports required in future.
    • Defined the staging area for incremental data loaded into Amazon Redshift, materialized views, and tuning the queries based on usage. The transaction and primary metadata are stored in Amazon Redshift to cater to all data analysis and reporting requirements. We created materialized views and denormalized tables in Amazon Redshift to use as data sources for Amazon QuickSight dashboards and segmentation jobs, respectively.
    • Optimally used the Amazon Redshift cluster by loading last two years data in Amazon Redshift, and used Amazon Redshift Spectrum to query historical data through external tables. This helped balance the usage and cost of the Amazon Redshift cluster.
  • Visualization
    • Amazon QuickSight dashboards were created for the sales and marketing team in Phase 1:
      • Sales summary report – An executive summary dashboard to get an overview of sales across the country by region, city, movie, theatre, genre, and more.
      • Live entertainment – A dedicated report for live entertainment vertical events.
      • Coupons – A report for coupons purchased and redeemed.
      • BookASmile – A dashboard to analyze the data for BookASmile, a charity initiative.
  • Machine learning
    • Listed the ML workloads to be migrated based on current business needs.
    • Priority ML processing jobs were deployed on Amazon EMR. Models were modified to use Amazon S3 as source and target, and new APIs were exposed to use the functionality. ML models were deployed on Amazon SageMaker for movies, live event clickstream analysis, and personalization.
    • Existing artifacts in Airflow orchestration were migrated to Amazon MWAA.
  • Security
    • AWS Lake Formation was the foundation of the data lake, with the AWS Glue Data Catalog as the foundation for the central catalog for the data stored in Amazon S3. This provided access to the data by various functionalities, including the audience, personalization, analytics, and data science teams.
    • Personally identifiable information (PII) and payment data was stored in the data lake and data warehouse, so we had to comply to PCI guidelines. Encryption of data at rest and in transit was considered and configured in each service level (Amazon S3, AWS Glue Data Catalog, Amazon EMR, AWS Glue, Amazon Redshift, and QuickSight). Clear roles, responsibilities, and access permissions for different user groups and privileges were listed and configured in AWS Identity and Access Management (IAM) and individual services.
    • Existing single sign-on (SSO) integration with Microsoft Active Directory was used for Amazon QuickSight user access.
  • Automation
    • We used AWS CloudFormation for the creation and modification of all the core and analytics services.
    • AWS Step Functions was used to orchestrate Spark jobs on Amazon EMR.
    • Scheduled jobs were configured in AWS Glue for uploading data in Amazon Redshift based on business needs.
    • Monitoring of the analytics services was done using Amazon CloudWatch metrics, and right-sizing of instances and configuration was achieved. Spark job performance on Amazon EMR was analyzed using the native Spark logs and Spark user interface (UI).
    • Lifecycle policies were applied to the data lake to optimize the data storage costs over time.

Benefits of a modern data architecture

A modern data architecture offered us the following benefits:

  • Scalability – We moved from a fixed infrastructure to the minimal infrastructure required, with configuration to scale on demand. Services like Amazon EMR and Amazon Redshift enable us to do this with just a few clicks.
  • Agility – We use purpose-built managed services instead of reinventing the wheel. Automation and monitoring were key considerations, which enable us to make changes quickly.
  • Serverless – Adoption of serverless services like Amazon S3, AWS Glue, Amazon Athena, AWS Step Functions, and AWS Lambda support us when our business has sudden spikes with new movies or events launched.
  • Cost savings – Our storage size was reduced by 90%. Our overall spend on analytics and ML was reduced by 80%.

Conclusion

In this post, we showed you how a modern data architecture on AWS helped BMS to easily share data across organizational boundaries. This allowed BMS to make decisions with speed and agility at scale; ensure compliance via unified data access, security, and governance; and to scale systems at a low cost without compromising performance. Working with the AWS and Minfy Technologies teams helped BMS choose the correct technology services and complete the migration in four months. BMS achieved the scalability and cost-optimization goals with this updated architecture, which has set the stage for innovation using graph databases and enhanced our ML projects to improve customer experience.


About the Authors

Mahesh Vandi Chalil is Chief Technology Officer at BookMyShow, India’s leading entertainment destination. Mahesh has over two decades of global experience, passionate about building scalable products that delight customers while keeping innovation as the top goal motivating his team to constantly aspire for these. Mahesh invests his energies in creating and nurturing the next generation of technology leaders and entrepreneurs, both within the organization and outside of it. A proud husband and father of two daughters and plays cricket during his leisure time.

Priya Jathar is a Solutions Architect working in Digital Native Business segment at AWS. She has more two decades of IT experience, with expertise in Application Development, Database, and Analytics. She is a builder who enjoys innovating with new technologies to achieve business goals. Currently helping customers Migrate, Modernise, and Innovate in Cloud. In her free time she likes to paint, and hone her gardening and cooking skills.

Vatsal Shah is a Senior Solutions Architect at AWS based out of Mumbai, India. He has more than nine years of industry experience, including leadership roles in product engineering, SRE, and cloud architecture. He currently focuses on enabling large startups to streamline their cloud operations and help them scale on the cloud. He also specializes in AI and Machine Learning use cases.

How Novo Nordisk built a modern data architecture on AWS

Post Syndicated from Jonatan Selsing original https://aws.amazon.com/blogs/big-data/how-novo-nordisk-built-a-modern-data-architecture-on-aws/

Novo Nordisk is a leading global pharmaceutical company, responsible for producing life-saving medicines that reach more than 34 million patients each day. They do this following their triple bottom line—that they must strive to be environmentally sustainable, socially sustainable, and financially sustainable. The combination of using AWS and data supports all these targets.

Data is pervasive throughout the entire value chain of Novo Nordisk. From foundational research, manufacturing lines, sales and marketing, clinical trials, pharmacovigilance, through patient-facing data-driven applications. Therefore, getting the foundation around how data is stored, safeguarded, and used in a way that provides the most value is one of the central drivers of improved business outcomes.

Together with AWS Professional Services, we’re building a data and analytics solution using a modern data architecture. The collaboration between Novo Nordisk and AWS Professional Services is a strategic and long-term close engagement, where developers from both organizations have worked together closely for years. The data and analytics environments are built around of the core tenets of the data mesh—decentralized domain ownership of data, data as a product, self-service data infrastructure, and federated computational governance. This enables the users of the environment to work with data in the way that drives the best business outcomes. We have combined this with elements from evolutionary architectures that will allow us to adapt different functionalities as AWS continuously develops new services and capabilities.

In this series of posts, you will learn how Novo Nordisk and AWS Professional Services built a data and analytics ecosystem to speed up innovation at petabyte scale:

  • In this first post, you will learn how the overall design has enabled the individual components to come together in a modular way. We dive deep into how we built a data management solution based on the data mesh architecture.
  • The second post discusses how we built a trust network between the systems that comprise the entire solution. We show how we use event-driven architectures, coupled with the use of attribute-based access controls, to ensure permission boundaries are respected at scale.
  • In the third post, we show how end-users can consume data from their tool of choice, without compromising data governance. This includes how to configure Okta, AWS Lake Formation, and Microsoft Power BI to enable SAML-based federated use of Amazon Athena for an enterprise business intelligence (BI) activity.

Pharma-compliant environment

As a pharmaceutical industry, GxP compliance is a mandate for Novo Nordisk. GxP is a general abbreviation for the “Good x Practice” quality guidelines and regulations defined by regulators such as European Medicines Agency, U.S. Food and Drug Administration, and others. These guidelines are designed to ensure that medicinal products are safe and effective for their intended use. In the context of a data environment, GxP compliance involves implementing integrity controls for data used to in decision making and processes and is used to guide how change management processes are implemented to continuously ensure compliance over time.

Because this data environment supports teams across the whole organization, each individual data owner must retain accountability on their data. Features were designed to provide data owners autonomy and transparency when managing their data, enabling them to take this responsibility. This includes the capability to handle personally identifiable information (PII) data and other sensitive workloads. To provide traceability on the environment, audit capabilities were added, which we describe more in this post.

Solution overview

The full solution is a sprawling landscape of independent services that work together to enable data and analytics with a decentralized data governance model at petabyte scale. Schematically, it can be represented as in the following figure.

Novo Nordisk Modern Data Architecture on AWS

The architecture is split into three independent layers: data management, virtualization, and consumption. The end-user sits in the consumption layer and works with their tool of choice. It’s meant to abstract as much of the AWS-native resources to application primitives. The consumption layer is integrated into the virtualization layer, which abstracts the access to data. The purpose of the virtualization layer is to translate between data consumption and data management solutions. The access to data is managed by what we refer to as data management solutions. We discuss one of our versatile data management solutions later in this post. Each layer in this architecture is independent of each other and instead only relies on well-defined interfaces.

Central to this architecture is that access is encapsulated in an AWS Identity and Access Management (IAM) role session. The data management layer focuses on providing the IAM role with the right permissions and governance, the virtualization layer provides access to the role, and the consumption layer abstracts the use of the roles in the tools of choice.

Technical architecture

Each of the three layers in the overall architecture has a distinct responsibility, but no singular implementation. Think of them as abstract classes. They can be implemented in concrete classes, and in our case they rely on foundational AWS services and capabilities. Let’s go through each of the three layers.

Data management layer

The data management layer is responsible for providing access to and governance of data. As illustrated in the following diagram, a minimal construct in the data management layer is the combination of an Amazon Simple Storage Service (Amazon S3) bucket and an IAM role that gives access to the S3 bucket. This construct can be expanded to include granular permission with Lake Formation, auditing with AWS CloudTrail, and security response capabilities from AWS Security Hub. The following diagram also shows that a single data management solution has no singular span. It can cross many AWS accounts and be comprised of any number of IAM role combinations.Data Mamangement Architecture

We have purposely not illustrated the trust policy of these roles in this figure, because those are a collaborative responsibility between the virtualization layer and the data management layer. We go into detail of how that works in the next post in this series. Data engineering professionals often interface directly with the data management layer, where they curate and prepare data for consumption.

Virtualization layer

The purpose of the virtualization layer is to keep track of who can do what. It doesn’t have any capabilities in itself, but translates the requirements from the data management ecosystems to the consumption layers and vice versa. It enables end-users on the consumption layer to access and manipulate data on one or more data management ecosystems, according to their permissions. This layer abstracts from end-users the technical details on data access, such as permission model, role assumptions, and storage location. It owns the interfaces to the other layers and enforces the logic of the abstraction. In the context of hexagonal architectures (see Developing evolutionary architecture with AWS Lambda), the interface layer plays the role of the domain logic, ports, and adapters. The other two layers are actors. The data management layer communicates the state of the layer to the virtualization layer and conversely receives information about the service landscape to trust. The virtualization layer architecture is shown in the following diagram.

Virtualization Layer Architecture

Consumption layer

The consumption layer is where the end-users of the data products are sitting. This can be data scientists, business intelligence analysts, or any third party that generates value from consuming the data. It’s important for this type of architecture that the consumption layer has a hook-based sign-in flow, where the authorization into the application can be modified at sign-in time. This is to translate the AWS-specific requirement into the target applications. After the session in the client-side application has successfully been started, it’s up to the application itself to instrument for data layer abstraction, because this will be application specific. And this is an additional important decoupling, where some responsibility is pushed to the decentralized units. Many modern software as a service (SaaS) applications support these built-in mechanisms, such as Databricks or Domino Data Lab, whereas more traditional client-side applications like RStudio Server have more limited native support for this. In the case where native support is missing, a translation down to the OS user session can be done to enable the abstraction. The consumption layer is shown schematically in the following diagram.

Consumption Layer Architecture

When using the consumption layer as intended, the users don’t know that the virtualization layer exists. The following diagram illustrates the data access patterns.

Data Access Patterns

Modularity

One of the main advantages of adopting the hexagonal architecture pattern, and delegating both the consuming layer and the data management layer to primary and secondary actors, means that they can be changed or replaced as new functionalities are released that require new solutions. This gives a hub-and-spoke type pattern, where many different types of producer/consumer type systems can be connected and work simultaneously in union. An example of this is that the current solution running in Novo Nordisk supports multiple, simultaneous data management solutions and are exposed in a homogenous way in the consuming layer. This includes both a data lake, the data mesh solution presented in this post, and several independent data management solutions. And these are exposed to multiple types of consuming applications, from custom managed, self-hosted applications, to SaaS offerings.

Data management ecosystem

To scale the usage of the data and increase the freedom, Novo Nordisk, jointly with AWS Professional Services, built a data management and governance environment, named Novo Nordisk Enterprise DataHub (NNEDH). NNEDH implements a decentralized distributed data architecture, and data management capabilities such as an enterprise business data catalog and data sharing workflow. NNEDH is an example of a data management ecosystem in the conceptual framework introduced earlier.

Decentralized architecture: From a centralized data lake to a distributed architecture

Novo Nordisk’s centralized data lake consists of 2.3 PB of data from more than 30 business data domains worldwide serving over 2000+ internal users throughout the value chain. It has been running successfully for several years. It is one of the data management ecosystems currently supported.

Within the centralized data architecture, data from each data domain is copied, stored, and processed in one central location: a central data lake hosted in one data storage. This pattern has challenges at scale because it retains the data ownership with the central team. At scale, this model slows down the journey toward a data-driven organization, because ownership of the data isn’t sufficiently anchored with the professionals closest to the domain.

The monolithic data lake architecture is shown in the following diagram.Monolithic Data Lake Architecture

Within the decentralized distributed data architecture, the data from each domain is kept within the domain on its own data storage and compute account. In this case, the data is kept close to domain experts, because they’re the ones who know their own data best and are ultimately the owner of any data products built around their data. They often work closely with business analysts to build the data product and therefore know what good data means to consumers of their data products. In this case, the data responsibility is also decentralized, where each domain has its own data owner, putting the accountability onto the true owners of the data. Nevertheless, this model might not work at small scale, for example an organization with only one business unit and tens of users, because it would introduce more overhead on the IT team to manage the organization data. It better suits large organizations, or small and medium ones that would like to grow and scale.

The Novo Nordisk data mesh architecture is shown in the following diagram.

Novo Nordisk Data Mesh Architecture

Data domains and data assets

To enable the scalability of data domains across the organization, it’s mandatory to have a standard permission model and data access pattern. This standard must not be too restrictive in such a way that it may be a blocker for specific use cases, but it should be standardized in such a way to use the same interface between the data management and virtualization layers.

The data domains on NNEDH are implemented by a construct called an environment. An environment is composed of at least one AWS account and one AWS Region. It’s a workplace where data domain teams can work and collaborate to build data products. It links the NNEDH control plane to the AWS accounts where the data and compute of the domain reside. The data access permissions are also defined at the environment level, managed by the owner of the data domain. The environments have three main components: a data management and governance layer, data assets, and optional blueprints for data processing.

For data management and governance, the data domains rely on Lake Formation, AWS Glue, and CloudTrail. The deployment method and setup of these components is standardized across data domains. This way, the NNEDH control plane can provide connectivity and management to data domains in a standardized way.

The data assets of each domain residing in an environment are organized in a dataset, which is a collection of related data used for building a data product. It includes technical metadata such as data format, size, and creation time, and business metadata such as the producer, data classification, and business definition. A data product can use one or several datasets. It is implemented through managed S3 buckets and the AWS Glue Data Catalog.

Data processing can be implemented in different ways. NNEDH provides blueprints for data pipelines with predefined connectivity to data assets to speed up the delivery of data products. Data domain users have the freedom to use any other compute capability on their domain, for example using AWS services not predefined on the blueprints or accessing the datasets from other analytics tools implemented in the consumption layer, as mentioned earlier in this post.

Data domain personas and roles

On NNEDH, the permission levels on data domains are managed through predefined personas, for example data owner, data stewards, developers, and readers. Each persona is associated with an IAM role that has a predefined permission level. These permissions are based on the typical needs of users on these roles. Nevertheless, to give more flexibility to data domains, these permissions can be customized and extended as needed.

The permissions associated with each persona are related only to actions allowed on the AWS account of the data domain. For the accountability on data assets, the data access to the assets is managed by specific resource policies instead of IAM roles. Only the owner of each dataset, or data stewards delegated by the owner, can grant or revoke data access.

On the dataset level, a required persona is the data owner. Typically, they work closely with one or many data stewards as data products managers. The data steward is the data subject matter expert of the data product domain, responsible for interpreting collected data and metadata to derive deep business insights and build the product. The data steward bridges between business users and technical teams on each data domain.

Enterprise business data catalog

To enable freedom and make the organization data assets discoverable, a web-based portal data catalog is implemented. It indexes in a single repository metadata from datasets built on data domains, breaking data silos across the organization. The data catalog enables data search and discovery across different domains, as well as automation and governance on data sharing.

The business data catalog implements data governance processes within the organization. It ensures the data ownership—someone in the organization is responsible for the data origin, definition, business attributes, relationships, and dependencies.

The central construct of a business data catalog is a dataset. It’s the search unit within the business catalog, having both technical and business metadata. To collect technical metadata from structured data, it relies on AWS Glue crawlers to recognize and extract data structures from the most popular data formats, including CSV, JSON, Avro, and Apache Parquet. It provides information such as data type, creation date, and format. The metadata can be enriched by business users by adding a description of the business context, tags, and data classification.

The dataset definition and related metadata are stored in an Amazon Aurora Serverless database and Amazon OpenSearch Service, enabling you to run textual queries on the data catalog.

Data sharing

NNEDH implements a data sharing workflow, enabling peer-to-peer data sharing across AWS accounts using Lake Formation. The workflow is as follows:

  1. A data consumer requests access to the dataset.
  2. The data owner grants access by approving the access request. They can delegate the approval of access requests to the data steward.
  3. Upon the approval of an access request, a new permission is added to the specific dataset in Lake Formation of the producer account.

The data sharing workflow is shown schematically in the following figure.

Data Sharing Workflow

Security and audit

The data in the Novo Nordisk data mesh lies in AWS accounts owned by Novo Nordisk business accounts. The configuration and the states of the data mesh are stored in Amazon Relational Database Service (Amazon RDS). The Novo Nordisk security architecture is shown in the following figure.

Novo Nordisk Distributed Security and Audit Architecture

Access and edits to the data in NNEDH needs to be logged for audit purposes. We need to be able to tell who modified data, when the modification happened, and what modifications were applied. In addition, we need to be able to answer why the modification was allowed by that person at that time.

To meet these requirements, we use the following components:

  • CloudTrail to log API calls. We specifically enable CloudTrail data event logging for S3 buckets and objects. By activating the logging, we can trace back any modification to any files in the data lake to the person who made the modification. We enforce usage of source identity for IAM role sessions to ensure user traceability.
  • We use Amazon RDS to store the configuration of the data mesh. We log queries against the RDS database. Together with CloudTrail, this log allows us to answer the question of why a modification to a file in Amazon S3 at a specific time by a specific person is possible.
  • Amazon CloudWatch to log activities across the mesh.

In addition to those logging mechanisms, the S3 buckets are created using the following properties:

  • The bucket is encrypted using server-side encryption with AWS Key Management Service (AWS KMS) and customer managed keys
  • Amazon S3 versioning is activated by default

Access to the data in NNEDH is controlled at the group level instead of individual users. The group corresponds to the group defined in the Novo Nordisk directory group. To keep track of the person who modified the data in the data lakes, we use the source identity mechanism explained in the post How to relate IAM role activity to corporate identity.

Conclusion

In this post, we showed how Novo Nordisk built a modern data architecture to speed up the delivery of data-driven use cases. It includes a distributed data architecture, to scale the usage to petabyte scale for over 2,000 internal users throughout the value chain, as well as a distributed security and audit architecture handling data accountability and traceability on the environment to meet their compliance requirements.

The next post in this series describes the implementation of distributed data governance and control at scale of Novo Nordisk’s modern data architecture.


About the Authors

Jonatan Selsing is former research scientist with a PhD in astrophysics that has turned to the cloud. He is currently the Lead Cloud Engineer at Novo Nordisk, where he enables data and analytics workloads at scale. With an emphasis on reducing the total cost of ownership of cloud-based workloads, while giving full benefit of the advantages of cloud, he designs, builds, and maintains solutions that enable research for future medicines.

Hassen Riahi is a Sr. Data Architect at AWS Professional Services. He holds a PhD in Mathematics & Computer Science on large-scale data management. He works with AWS customers on building data-driven solutions.

Anwar Rizal is a Senior Machine Learning consultant based in Paris. He works with AWS customers to develop data and AI solutions to sustainably grow their business.

Moses Arthur comes from a mathematics and computational research background and holds a PhD in Computational Intelligence specialized in Graph Mining. He is currently a Cloud Product Engineer at Novo Nordisk building GxP-compliant enterprise data lakes and analytics platforms for Novo Nordisk global factories producing digitalized medical products.

Alessandro FiorAlessandro Fior is a Sr. Data Architect at AWS Professional Services. With over 10 years of experience delivering data and analytics solutions, he is passionate about designing and building modern and scalable data platforms that accelerate companies to get value from their data.

Kumari RamarKumari Ramar is an Agile certified and PMP certified Senior Engagement Manager at AWS Professional Services. She delivers data and AI/ML solutions that speed up cross-system analytics and machine learning models, which enable enterprises to make data-driven decisions and drive new innovations.

Create your own reusable visual transforms for AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/create-your-own-reusable-visual-transforms-for-aws-glue-studio/

AWS Glue Studio has recently added the possibility of adding custom transforms that you can use to build visual jobs to use them in combination with the AWS Glue Studio components provided out of the box. You can now define custom visual transform by simply dropping a JSON file and a Python script onto Amazon S3, which defines the component and the processing logic, respectively.

Custom visual transform lets you define, reuse, and share business-specific ETL logic among your teams. With this new feature, data engineers can write reusable transforms for the AWS Glue visual job editor. Reusable transforms increase consistency between teams and help keep jobs up-to-date by minimizing duplicate effort and code.

In this blog post, I will show you a fictional use case that requires the creation of two custom transforms to illustrate what you can accomplish with this new feature. One component will generate synthetic data on the fly for testing purposes, and the other will prepare the data to store it partitioned.

Use case: Generate synthetic data on the fly

There are multiple reasons why you would want to have a component that generates synthetic data. Maybe the real data is heavily restricted or not yet available, or there is not enough quantity or variety at the moment to test performance. Or maybe using the real data imposes some cost or load to the real system, and we want to reduce its usage during development.

Using the new custom visual transforms framework, let’s create a component that builds synthetic data for fictional sales during a natural year.

Define the generator component

First, define the component by giving it a name, description, and parameters. In this case, use salesdata_generator for both the name and the function, with two parameters: how many rows to generate and for which year.

For the parameters, we define them both as int, and you can add a regex validation to make sure the parameters provided by the user are in the correct format.

There are further configuration options available; to learn more, refer to the AWS Glue User Guide.

This is how the component definition would look like. Save it as salesdata_generator.json. For convenience, we’ll match the name of the Python file, so it’s important to choose a name that doesn’t conflict with an existing Python module.
If the year is not specified, the script will default to last year.

{
  "name": "salesdata_generator",
  "displayName": "Synthetic Sales Data Generator",
  "description": "Generate synthetic order datasets for testing purposes.",
  "functionName": "salesdata_generator",
  "parameters": [
    {
      "name": "numSamples",
      "displayName": "Number of samples",
      "type": "int",
      "description": "Number of samples to generate"
    },
    {
      "name": "year",
      "displayName": "Year",
      "isOptional": true,
      "type": "int",
      "description": "Year for which generate data distributed randomly, by default last year",
      "validationRule": "^\\d{4}$",
      "validationMessage": "Please enter a valid year number"
    }
  ]
}

Implement the generator logic

Now, you need to create a Python script file with the implementation logic.
Save the following script as salesdata_generator.py. Notice the name is the same as the JSON, just with a different extension.

from awsglue import DynamicFrame
import pyspark.sql.functions as F
import datetime
import time

def salesdata_generator(self, numSamples, year=None):
    if not year:
        # Use last year
        year = datetime.datetime.now().year - 1
    
    year_start_ts = int(time.mktime((year,1,1,0,0,0,0,0,0)))
    year_end_ts = int(time.mktime((year + 1,1,1,0,0,0,0,0,0)))
    ts_range = year_end_ts - year_start_ts
    
    departments = ["bargain", "checkout", "food hall", "sports", "menswear", "womenwear", "health and beauty", "home"]
    dep_array = F.array(*[F.lit(x) for x in departments])
    dep_randomizer = (F.round(F.rand() * (len(departments) -1))).cast("int")

    df = self.glue_ctx.sparkSession.range(numSamples) \
      .withColumn("sale_date", F.from_unixtime(F.lit(year_start_ts) + F.rand() * ts_range)) \
      .withColumn("amount_dollars", F.round(F.rand() * 1000, 2)) \
      .withColumn("department", dep_array.getItem(dep_randomizer))  
    return DynamicFrame.fromDF(df, self.glue_ctx, "sales_synthetic_data")

DynamicFrame.salesdata_generator = salesdata_generator

The function salesdata_generator in the script receives the source DynamicFrame as “self”, and the parameters must match the definition in the JSON file. Notice the “year” is an optional parameter, so it has assigned a default function on call, which the function detects and replaces with the previous year. The function returns the transformed DynamicFrame. In this case, it’s not derived from the source one, which is the common case, but replaced by a new one.

The transform leverages Spark functions as well as Python libraries in order to implement this generator.
To keep things simple, this example only generates four columns, but we could do the same for many more by either hardcoding values, assigning them from a list, looking for some other input, or doing whatever makes sense to make the data realistic.

Deploy and using the generator transform

Now that we have both files ready, all we have to do is upload them on Amazon S3 under the following path.

s3://aws-glue-assets-<account id>-<region name>/transforms/

If AWS Glue has never been used in the account and Region, then that bucket might not exist and needs to be created. AWS Glue will automatically create this bucket when you create your first job.

You will need to manually create a folder called “transforms” in that bucket to upload the files into.

Once you have uploaded both files, the next time we open (or refresh) the page on AWS Glue Studio visual editor, the transform should be listed among the other transforms. You can search for it by name or description.

Because this is a transform and not a source, when we try to use the component, the UI will demand a parent node. You can use as a parent the real data source (so you can easily remove the generator and use the real data) or just use a placeholder. I’ll show you how:

  1. Go to the AWS Glue, and in the left menu, select Jobs under AWS Glue Studio.
  2. Leave the default options (Visual with a source and target and S3 source and destination), and choose Create.
  3. Give the job a name by editing Untitled job at the top left; for example, CustomTransformsDemo
  4. Go to the Job details tab and select a role with AWS Glue permissions as the IAM role. If no role is listed on the dropdown, then follow these instructions to create one.
    For this demo, you can also reduce Requested number of workers to 2 and Number of retries to 0 to minimize costs.
  5. Delete the Data target node S3 bucket at the bottom of the graph by selecting it and choosing Remove. We will restore it later when we need it.
  6. Edit the S3 source node by selecting it in the Data source properties tab and selecting source type S3 location.
    In the S3 URL box, enter a path that doesn’t exist on a bucket the role selected can access, for instance: s3://aws-glue-assets-<account id>-<region name>/file_that_doesnt_exist. Notice there is no trailing slash.
    Choose JSON as the data format with default settings; it doesn’t matter.
    You might get a warning that it cannot infer schema because the file doesn’t exist; that’s OK, we don’t need it.
  7. Now search for the transform by typing “synthetic” in the search box of transforms. Once the result appears (or you scroll and search it on the list), choose it so it is added to the job.
  8. Set the parent of the transform just added to be S3 bucket source in the Node properties tab. Then for the ApplyMapping node, replace the parent S3 bucket with transforms Synthetic Sales Data Generator. Notice this long name is coming from the displayName defined in the JSON file uploaded before.
  9. After these changes, your job diagram should look as follows (if you tried to save, there might be some warnings; that’s OK, we’ll complete the configuration next).
  10. Select the Synthetic Sales node and go to the Transform tab. Enter 10000 as the number of samples and leave the year by default, so it uses last year.
  11. Now we need the generated schema to be applied. This would be needed if we had a source that matches the generator schema.
    In the same node, select the tab Data preview and start a session. Once it is running, you should see sample synthetic data. Notice the sale dates are randomly distributed across the year.
  12. Now select the tab Output schema and choose Use datapreview schema That way, the four fields generated by the node will be propagated, and we can do the mapping based on this schema.
  13. Now we want to convert the generated sale_date timestamp into a date column, so we can use it to partition the output by day. Select the node ApplyMapping in the Transform tab. For the sale_date field, select date as the target type. This will truncate the timestamp to just the date.
  14. Now it’s a good time to save the job. It should let you save successfully.

Finally, we need to configure the sink. Follow these steps:

  1. With the ApplyMapping node selected, go to the Target dropdown and choose Amazon S3. The sink will be added to the ApplyMapping node. If you didn’t select the parent node before adding the sink, you can still set it in the Node details tab of the sink.
  2. Create an S3 bucket in the same Region as where the job will run. We’ll use it to store the output data, so we can clean up easily at the end. If you create it via the console, the default bucket config is OK.
    You can read more information about bucket creation on the Amazon S3 documentation 
  3. In the Data target properties tab, enter in S3 Target Location the URL of the bucket and some path and a trailing slash, for instance: s3://<your output bucket here>/output/
    Leave the rest with the default values provided.
  4. Choose Add partition key at the bottom and select the field sale_date.

We could create a partitioned table at the same time just by selecting the corresponding catalog update option. For simplicity, generate the partitioned files at this time without updating the catalog, which is the default option.

You can now save and then run the job.

Once the job has completed, after a couple of minutes (you can verify this in the Runs tab), explore the S3 target location entered above. You can use the Amazon S3 console or the AWS CLI. You will see files named like this: s3://<your output bucket here>/output/sale_date=<some date yyyy-mm-dd>/<filename>.

If you count the files, there should be close to but not more than 1,460 (depending on the year used and assuming you are using 2 G.1X workers and AWS Glue version 3.0)

Use case: Improve the data partitioning

In the previous section, you created a job using a custom visual component that produced synthetic data, did a small transformation on the date, and saved it partitioned on S3 by day.

You might be wondering why this job generated so many files for the synthetic data. This is not ideal, especially when they are as small as in this case. If this data was saved as a table with years of history, generating small files has a detrimental impact on tools that consume it, like Amazon Athena.

The reason for this is that when the generator calls the “range” function in Apache Spark without specifying a number of memory partitions (notice they are a different kind from the output partitions saved to S3), it defaults to the number of cores in the cluster, which in this example is just 4.

Because the dates are random, each memory partition is likely to contain rows representing all days of the year, so when the sink needs to split the dates into output directories to group the files, each memory partition needs to create one file for each day present, so you can have 4 * 365 (not in a leap year) is 1,460.

This example is a bit extreme, and normally data read from the source is not so spread over time. The issue can often be found when you add other dimensions, such as output partition columns.

Now you are going to build a component that optimizes this, trying to reduce the number of output files as much as possible: one per output directory.
Also, let’s imagine that on your team, you have the policy of generating S3 date partition separated by year, month, and day as strings, so the files can be selected efficiently whether using a table on top or not.

We don’t want individual users to have to deal with these optimizations and conventions individually but instead have a component they can just add to their jobs.

Define the repartitioner transform

For this new transform, create a separate JSON file, let’s call it repartition_date.json, where we define the new transform and the parameters it needs.

{
  "name": "repartition_date",
  "displayName": "Repartition by date",
  "description": "Split a date into partition columns and reorganize the data to save them as partitions.",
  "functionName": "repartition_date",
  "parameters": [
    {
      "name": "dateCol",
      "displayName": "Date column",
      "type": "str",
      "description": "Column with the date to split into year, month and day partitions. The column won't be removed"
    },
    {
      "name": "partitionCols",
      "displayName": "Partition columns",
      "type": "str",
      "isOptional": true,
      "description": "In addition to the year, month and day, you can specify additional columns to partition by, separated by commas"
    },
    {
      "name": "numPartitionsExpected",
      "displayName": "Number partitions expected",
      "isOptional": true,
      "type": "int",
      "description": "The number of partition column value combinations expected, if not specified the system will calculate it."
    }
  ]
}

Implement the transform logic

The script splits the date into multiple columns with leading zeros and then reorganizes the data in memory according to the output partitions. Save the code in a file named repartition_date.py:

from awsglue import DynamicFrame
import pyspark.sql.functions as F

def repartition_date(self, dateCol, partitionCols="", numPartitionsExpected=None):
    partition_list = partitionCols.split(",") if partitionCols else []
    partition_list += ["year", "month", "day"]
    
    date_col = F.col(dateCol)
    df = self.toDF()\
      .withColumn("year", F.year(date_col).cast("string"))\
      .withColumn("month", F.format_string("%02d", F.month(date_col)))\
      .withColumn("day", F.format_string("%02d", F.dayofmonth(date_col)))
    
    if not numPartitionsExpected:
        numPartitionsExpected = df.selectExpr(f"COUNT(DISTINCT {','.join(partition_list)})").collect()[0][0]
    
    # Reorganize the data so the partitions in memory are aligned when the file partitioning on s3
    # So each partition has the data for a combination of partition column values
    df = df.repartition(numPartitionsExpected, partition_list)    
    return DynamicFrame.fromDF(df, self.glue_ctx, self.name)

DynamicFrame.repartition_date = repartition_date

Upload the two new files onto the S3 transforms folder like you did for the previous transform.

Deploy and use the generator transform

Now edit the job to make use of the new component to generate a different output.
Refresh the page in the browser if the new transform is not listed.

  1. Select the generator transform and from the transforms dropdown, find Repartition by date and choose it; it should be added as a child of the generator.
    Now change the parent of the Data target node to the new node added and remove the ApplyMapping; we no longer need it.
  2. Repartition by date needs you to enter the column that contains the timestamp.
    Enter sale_date (the framework doesn’t yet allow field selection using a dropdown) and leave the other two as defaults.
  3. Now we need to update the output schema with the new date split fields. To do so, use the Data preview tab to check it’s working correctly (or start a session if the previous one has expired). Then in the Output schema, choose Use datapreview schema so the new fields get added. Notice the transform doesn’t remove the original column, but it could if you change it to do so.
  4. Finally, edit the S3 target to enter a different location so the folders don’t mix with the previous run, and it’s easier to compare and use. Change the path to /output2/.
    Remove the existing partition column and instead add year, month, and day.

Save and run the job. After one or two minutes, once it completes, examine the output files. They should be much closer to the optimal number of one per day, maybe two. Consider that in this example, we only have four partitions. In a real dataset, the number of files without this repartitioning would explode very easily.
Also, now the path follows the traditional date partition structure, for instance: output2/year=2021/month=09/day=01/run-AmazonS3_node1669816624410-4-part-r-00292

Notice that at the end of the file name is the partition number. While we now have more partitions, we have fewer output files because the data is organized in memory more aligned with the desired output.

The repartition transform has additional configuration options that we have left empty. You can now go ahead and try different values and see how they affect the output.
For instance, you can specify “department ” as “Partition columns” in the transform and then add it in the sink partition column list. Or you can enter a “Number of partitions expected” and see how it affects the runtime (it no longer needs to determine this at runtime) and the number of files produced as you enter a higher number, for instance, 3,000.

How this feature works under the hood

  1. Upon loading the AWS Glue Studio visual job authoring page, all your transforms stored in the aforementioned S3 bucket will be loaded in the UI. AWS Glue Studio will parse the JSON definition file to display transform metadata such as name, description, and list of parameters.
  2. Once the user is done creating and saving his job using custom visual transforms, AWS Glue Studio will generate the job script and update the Python library path (also referred as —extra-py-files job parameters) with the list of transform Python file S3 paths, separated by comma.
  3. Before running your script, AWS Glue will add all file paths stored in the —extra-py-files job parameters to the Python path, allowing your script to run all custom visual transform functions you defined.

Cleanup

In order to avoid running costs, if you don’t want to keep the generated files, you can empty and delete the output bucket created for this demo. You might also want to delete the AWS Glue job created.

Conclusion

In this post, you have seen how you can create your own reusable visual transforms and then use them in AWS Glue Studio to enhance your jobs and your team’s productivity.

You first created a component to use synthetically generated data on demand and then another transform to optimize the data for partitioning on Amazon S3.


About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Michael Benattar is a Senior Software Engineer on the AWS Glue Studio team. He has led the design and implementation of the custom visual transform feature.

Introducing native Delta Lake table support with AWS Glue crawlers

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-native-delta-lake-table-support-with-aws-glue-crawlers/

Delta Lake is an open-source project that helps implement modern data lake architectures commonly built on Amazon S3 or other cloud storages. With Delta Lake, you can achieve ACID transactions, time travel queries, CDC, and other common use cases on the cloud. Delta Lake is available with multiple AWS services, such as AWS Glue Spark jobs, Amazon EMR, Amazon Athena, and Amazon Redshift Spectrum.

AWS Glue includes Delta crawler, a capability that makes discovering datasets simpler by scanning Delta Lake transaction logs in Amazon Simple Storage Service (Amazon S3), extracting their schema, creating manifest files in Amazon S3, and automatically populating the AWS Glue Data Catalog, which keeps the metadata current.  The newly created AWS Glue Data Catalog table has format SymlinkTextInputFormat. Delta crawler creates a manifest file, which is a text file containing the list of data files that query engines such as Presto, Trino, or Athena can use to query the table rather than finding the files with the directory listing. A previous blog post demonstrated how it works. Manifest files needed to be regenerated on a periodic basis to include newer transactions in the original Delta Lake tables which resulted in expensive I/O operations, longer processing times, and increased storage footprint.

With today’s launch, Glue crawler is adding support for creating AWS Glue Data Catalog tables for native Delta Lake tables and does not require generating manifest files. This improves customer experience because now you don’t have to regenerate manifest files whenever a new partition becomes available or a table’s metadata changes. With the native Delta Lake tables and automatic schema evolution with no additional manual intervention, this reduces the time to insight by making newly ingested data quickly available for analysis with your preferred analytics and machine learning (ML) tools.

Amazon Athena SQL engine version 3 started supporting Delta Lake native connector. AWS Glue for Apache Spark also started supporting Delta Lake native connector in Glue version 3.0 and later. Amazon EMR started supporting Delta Lake in EMR release version 6.9.0 and later. It means that you can query the Delta transaction log directly in Amazon Athena, AWS Glue for Apache Spark, and Amazon EMR. It makes the experience of working with native Delta Lake tables seamless across the platforms.

This post demonstrates how AWS Glue crawlers work with native Delta Lake tables and describes typical use cases to query native Delta Lake tables.

How AWS Glue crawler works with native Delta Lake tables

Now AWS Glue crawler has two different options:

  • Native table: Create a native Delta Lake table definition on AWS Glue Data Catalog.
  • Symlink table: Create a symlink-based manifest table definition on AWS Glue Data Catalog from a Delta Lake table, and generate its symlink files on Amazon S3.

Native table

Native Delta Lake tables are accessible from Amazon Athena (engine version 3), AWS Glue for Apache Spark (Glue version 3.0 and later), Amazon EMR (release version 6.9.0 and later), and other platforms that support Delta Lake tables. With the native Delta Lake tables, you have the capabilities such as ACID transactions, all while needing to maintain just a single source of truth.

Symlink table

Symlink tables are a consistent snapshot of a native Delta Lake table, represented using the SymlinkTextInputFormat using parquet files. The symlink tables are accessible from Amazon Athena and Amazon Redshift Spectrum.

Since the symlink tables are a snapshot of the original native Delta Lake tables, you need to maintain both the original native Delta Lake tables and the symlink tables. When the data or schema in an original Delta Lake table is updated, the symlink tables in the AWS Glue Data Catalog may become out of sync. It means that you can still query the symlink table and get a consistent result, but the result of the table is at the previous point in time.

Crawl native Delta Lake tables using AWS Glue crawler

In this section, let’s go through how to crawl native Delta Lake tables using AWS Glue crawler.

Prerequisite

Here’s the prerequisite for this tutorial:

  1. Install and configure AWS Command Line Interface (AWS CLI).
  2. Create your S3 bucket if you do not have it.
  3. Create your IAM role for AWS Glue crawler if you do not have it.
  4. Run the following command to copy the sample Delta Lake table into your S3 bucket. (Replace your_s3_bucket with your S3 bucket name.)
$ aws s3 sync s3://aws-bigdata-blog/artifacts/delta-lake-crawler/sample_delta_table/ s3://your_s3_bucket/data/sample_delta_table

Create a Delta Lake crawler

A Delta Lake crawler can be created through the AWS Glue console, AWS Glue SDK, or AWS CLI. Specify a DeltaTarget with the following configurations:

  • DeltaTables – A list of S3 DeltaPaths where the Delta Lake tables are located. (Note that each path must be the parent of a _delta_log folder. If the Delta transaction log is located at s3://bucket/sample_delta_table/_delta_log, then the path s3://bucket/sample_delta_table/ should be provided.
  • WriteManifest – A Boolean value indicating whether or not the crawler should write the manifest files for each DeltaPath. This parameter is only applicable for Delta Lake tables created via manifest files
  • CreateNativeDeltaTable – A Boolean value indicating whether the crawler should create a native Delta Lake table. If set to False, the crawler would create a symlink table instead. Note that both WriteManifest and CreateNativeDeltaTable options can’t be set to True.
  • ConnectionName – An optional connection name stored in the Data Catalog that the crawler should use to access Delta Lake tables backed by a VPC.

In this instruction, create the crawler through the console. Complete the following steps to create a Delta Lake crawler:

  1. Open the AWS Glue console.
  2. Choose Crawlers.
  3. Choose Create crawler.
  4. For Name, enter delta-lake-native-crawler, and choose Next.
  5. Under Data sources, choose Add a data source.
  6. For Data source, select Delta Lake.
  7. For Include delta lake table path(s), enter s3://your_s3_bucket/data/sample_delta_table/.
  8. For Create tables for querying, choose Create Native tables,
  9. Choose Add a Delta Lake data source.
  10. Choose Next.
  11. For Existing IAM role, choose your IAM role, then choose Next.
  12. For Target database, choose Add database, then Add database dialog appears. For Database name, enter delta_lake_native, then choose Create. Choose Next.
  13. Choose Create crawler.
  14. The Delta Lake crawler can be triggered to run through the console or through the SDK or AWS CLI using the StartCrawl API. It could also be scheduled through the console to trigger the crawlers at specific times. In this instruction, run the crawler through the console.
  15. Select delta-lake-native-crawler, and choose Run.
  16. Wait for the crawler to complete.

After the crawler has run, you can see the Delta Lake table definition in the AWS Glue console:

You can also verify an AWS Glue table definition through the following AWS CLI command:

$ aws glue get-table --database delta_lake_native --name sample_delta_table
{
    "Table": {
        "Name": "sample_delta_table",
        "DatabaseName": "delta_lake_native",
        "Owner": "owner",
        "CreateTime": "2022-11-08T12:11:20+09:00",
        "UpdateTime": "2022-11-08T13:19:06+09:00",
        "LastAccessTime": "2022-11-08T13:19:06+09:00",
        "Retention": 0,
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "product_id",
                    "Type": "string"
                },
                {
                    "Name": "product_name",
                    "Type": "string"
                },
                {
                    "Name": "price",
                    "Type": "bigint"
                },
                {
                    "Name": "currency",
                    "Type": "string"
                },
                {
                    "Name": "category",
                    "Type": "string"
                },
                {
                    "Name": "updated_at",
                    "Type": "double"
                }
            ],
            "Location": "s3://your_s3_bucket/data/sample_delta_table/",
            "AdditionalLocations": [],
            "InputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
            "Compressed": false,
            "NumberOfBuckets": -1,
            "SerdeInfo": {
                "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
                "Parameters": {
                    "serialization.format": "1",
                    "path": "s3://your_s3_bucket/data/sample_delta_table/"
                }
            },
            "BucketColumns": [],
            "SortColumns": [],
            "Parameters": {
                "EXTERNAL": "true",
                "UPDATED_BY_CRAWLER": "delta-lake-native-connector",
                "spark.sql.sources.schema.part.0": "{\"type\":\"struct\",\"fields\":[{\"name\":\"product_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"product_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CURRENCY\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"updated_at\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}",
                "CrawlerSchemaSerializerVersion": "1.0",
                "CrawlerSchemaDeserializerVersion": "1.0",
                "spark.sql.partitionProvider": "catalog",
                "classification": "delta",
                "spark.sql.sources.schema.numParts": "1",
                "spark.sql.sources.provider": "delta",
                "delta.lastCommitTimestamp": "1653462383292",
                "delta.lastUpdateVersion": "6",
                "table_type": "delta"
            },
            "StoredAsSubDirectories": false
        },
        "PartitionKeys": [],
        "TableType": "EXTERNAL_TABLE",
        "Parameters": {
            "EXTERNAL": "true",
            "UPDATED_BY_CRAWLER": "delta-lake-native-connector",
            "spark.sql.sources.schema.part.0": "{\"type\":\"struct\",\"fields\":[{\"name\":\"product_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"product_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CURRENCY\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"updated_at\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}",
            "CrawlerSchemaSerializerVersion": "1.0",
            "CrawlerSchemaDeserializerVersion": "1.0",
            "spark.sql.partitionProvider": "catalog",
            "classification": "delta",
            "spark.sql.sources.schema.numParts": "1",
            "spark.sql.sources.provider": "delta",
            "delta.lastCommitTimestamp": "1653462383292",
            "delta.lastUpdateVersion": "6",
            "table_type": "delta"
        },
        "CreatedBy": "arn:aws:sts::012345678901:assumed-role/AWSGlueServiceRole/AWS-Crawler",
        "IsRegisteredWithLakeFormation": false,
        "CatalogId": "012345678901",
        "IsRowFilteringEnabled": false,
        "VersionId": "1",
        "DatabaseId": "0bd458e335a2402c828108f267bc770c"
    }
}

After you create the table definition on AWS Glue Data Catalog, AWS analytics services such as Athena and AWS Glue Spark jobs are able to query the Delta Lake table.

Query Delta Lake tables using Amazon Athena

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run on datasets at petabyte scale. You can use Athena to query your S3 data lake for use cases such as data exploration for machine learning (ML) and AI, business intelligence (BI) reporting, and ad hoc querying.

There are now two ways to use Delta Lake tables in Athena:

  • For native table: Use Athena’s newly launched native support for Delta Lake tables. You can learn more in Querying Delta Lake tables. This method no longer requires regenerating manifest files after every transaction. Data updates are available for queries in Athena as soon as they are performed in the original Delta Lake tables, and you get up to 40 percent improvement in query performance over querying manifest files. Since Athena optimizes data scans in native Delta Lake queries using statistics in Delta Lake files, you get the advantage of reduced cost for Athena queries. This post focuses on this approach.
  • For symlink table: Use SymlinkTextInputFormat to query symlink tables through manifest files generated from Delta Lake tables. This was previously the only manner in which Delta Lake table querying was supported via Athena and is no longer recommended when you use only Athena to query the Delta Lake tables.

To use the native Delta Lake connector in Athena, you need to use Athena engine version 3. If you are using an older engine version, change the engine version.

Complete following steps to start queries on Athena:

  1. Open the Amazon Athena console.
  2. Run the following query.
SELECT * FROM "delta_lake_native"."sample_delta_table" limit 10;

The following screenshot shows our output:

Query Delta Lake tables using AWS Glue for Apache Spark

AWS Glue for Apache Spark natively supports Delta Lake. AWS Glue version 3.0 (Apache Spark 3.1.1) supports Delta Lake 1.0.0, and AWS Glue version 4.0 (Apache Spark 3.3.0) supports Delta Lake 2.1.0. With this native support for Delta Lake, what you need for configuring Delta Lake is to provide a single job parameter --datalake-formats delta. There is no need to configure a separate connector for Delta Lake in AWS Marketplace. It reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark.

AWS Glue also provides a serverless notebook interface called AWS Glue Studio notebook to query and process data interactively. Complete the following steps to launch AWS Glue Studio notebook and query a Delta Lake table:

  1. On the AWS Glue console, choose Jobs in the navigation plane.
  2. Under Create job, select Jupyter Notebook.
  3. Choose Create a new notebook from scratch, and choose Create.
  4. For Job name, enter delta-sql.
  5. For IAM role,  choose your IAM role. If you don’t have your own role for the AWS Glue job, create it by following the steps documented in the AWS Glue Developer Guide.
  6. Choose Start notebook job.
  7. Copy and paste the following code to the first cell and run the cell.
    %glue_version 3.0
    %%configure
    {
      "--datalake-formats": "delta"
    }

  8. Run the existing cell containing the following code.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
      
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

  9. Copy and paste the following code to the third cell and run the cell.
    %%sql
    SELECT * FROM `delta_lake_native`.`sample_delta_table` limit 10

The following screenshot shows our output:

Clean up

Now for the final step, cleaning up the resources:

  • Delete your data under your S3 path: s3://your_s3_bucket/data/sample_delta_table/.
  • Delete the AWS Glue crawler delta-lake-native-crawler.
  • Delete the AWS Glue database delta_lake_native.
  • Delete the AWS Glue notebook job delta-sql.

Conclusion

This post demonstrated how to crawl native Delta Lake tables using an AWS Glue crawler and how to query the crawled tables from Athena and Glue Spark jobs. Start using AWS Glue crawlers for your own native Delta Lake tables.

If you have comments or feedback, please feel free to leave them in the comments.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Kyle Duong is a Software Development Engineer on the AWS Glue and Lake Formation team. He is passionate about building big data technologies and distributed systems. In his free time, he enjoys cycling or playing basketball.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Getting started with AWS Glue Data Quality for ETL Pipelines

Post Syndicated from Deenbandhu Prasad original https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-for-etl-pipelines/

Today, hundreds of thousands of customers use data lakes for analytics and machine learning. However, data engineers have to cleanse and prepare this data before it can be used. The underlying data has to be accurate and recent for customer to make confident business decisions. Otherwise, data consumers lose trust in the data and make suboptimal or incorrect decisions. It is a common task for data engineers to evaluate whether the data is accurate and recent or not. Today there are various data quality tools. However, common data quality tools usually require manual processes to monitor data quality.

AWS Glue Data Quality is a preview feature of AWS Glue that measures and monitors the data quality of Amazon Simple Storage Service (Amazon S3) data lakes and in AWS Glue extract, transform, and load (ETL) jobs. This is an open preview feature so it is already enabled in your account in the available Regions. You can easily define and measure the data quality checks in AWS Glue Studio console without writing codes. It simplifies your experience of managing data quality.

This post is Part 2 of a four-post series to explain how AWS Glue Data Quality works. Check out the previous post in this series:

Getting started with AWS Glue Data Quality

In this post, we show how to create an AWS Glue job that measures and monitors the data quality of a data pipeline. We also show how to take action based on the data quality results.

Solution overview

Let’s consider an example use case in which a data engineer needs to build a data pipeline to ingest the data from a raw zone to a curated zone in a data lake. As a data engineer, one of your key responsibilities—along with extracting, transforming, and loading data—is validating the quality of data. Identifying data quality issues upfront helps you prevent placing bad data in the curated zone and avoid arduous data corruption incidents.

In this post, you’ll learn how to easily set up built-in and custom data validation checks in your AWS Glue job to prevent bad data from corrupting the downstream high-quality data.

The dataset used for this post is synthetically generated; the following screenshot shows an example of the data.

Set up resources with AWS CloudFormation

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • An Amazon Simple Storage Service (Amazon S3) bucket (gluedataqualitystudio-*).
  • The following prefixes and objects in the S3 bucket:
    • datalake/raw/customer/customer.csv
    • datalake/curated/customer/
    • scripts/
    • sparkHistoryLogs/
    • temporary/
  • AWS Identity and Access Management (IAM) users, roles, and policies. The IAM role (GlueDataQualityStudio-*) has permission to read and write from the S3 bucket.
  • AWS Lambda functions and IAM policies required by those functions to create and delete this stack.

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console in the us-east-1 Region.
  2. Choose Launch Stack:

  3. Select I acknowledge that AWS CloudFormation might create IAM resources.
  4. Choose Create stack and wait for the stack creation step to complete.

Implement the solution

To start configuring your solution, complete the following steps:

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Select Visual with a blank canvas and choose Create.
  3. Choose the Job Details tab to configure the job.
  4. For Name, enter GlueDataQualityStudio.
  5. For IAM Role, choose the role starting with GlueDataQualityStudio-*.
  6. For Glue version, choose Glue 3.0.
  7. For Job bookmark, choose Disable. This allows you to run this job multiple times with the same input dataset.
  8. For Number of retries, enter 0.
  9. In the Advanced properties section, provide the S3 bucket created by the CloudFormation template (starting with gluedataqualitystudio-*).
  10. Choose Save.
  11. After the job is saved, choose the Visual tab and on the Source menu, choose Amazon S3.
  12. On the Data source properties – S3 tab, for S3 source type, select S3 location.
  13. Choose Browse S3 and navigate to prefix /datalake/raw/customer/ in the S3 bucket starting with gluedataqualitystudio-* .
  14. Choose Infer schema.
  15. On the Action menu, choose Evaluate Data Quality.
  16. Choose the Evaluate Data Quality node.

    On the Transform tab, you can now start building data quality rules. The first rule you create is to check if Customer_ID is unique and not null using the isPrimaryKey rule.
  17. On the Rule types tab of the DQDL rule builder, search for isprimarykey and choose the plus sign.
  18. On the Schema tab of the DQDL rule builder, choose the plus sign next to Customer_ID.
  19. In the rule editor, delete id.

    The next rule we add checks that the First_Name column value is present for all the rows.
  20. You can also enter the data quality rules directly in the rule editor. Add a comma (,) and enter IsComplete "First_Name", after the first rule.

    Next, you add a custom rule to validate that no row exists without Telephone or Email.
  21. Enter the following custom rule in the rule editor:
    CustomSql "select count(*) from primary where Telephone is null and Email is null" = 0


    The Evaluate Data Quality feature provides actions to manage the outcome of a job based on the job quality results.

  22. For this post, select Fail job when data quality fails and choose Fail job without loading target data actions. In the Data quality output setting section, choose Browse S3 and navigate to prefix dqresults in the S3 bucket starting with gluedataqualitystudio-*.
  23. On the Target menu, choose Amazon S3.
  24. Choose the Data target – S3 bucket node.
  25. On the Data target properties – S3 tab, for Format, choose Parquet, and for Compression Type, choose Snappy.
  26. For S3 Target Location, choose Browse S3 and navigate to the prefix /datalake/curated/customer/ in the S3 bucket starting with gluedataqualitystudio-*.
  27. Choose Save, then choose Run.
    You can view the job run details on the Runs tab. In our example, the job fails with the error message “AssertionError: The job failed due to failing DQ rules for node: <node>.”
    You can review the data quality result on the Data quality tab. In our example, the custom data quality validation failed because one of the rows in the dataset had no Telephone or Email value.Evaluate Data Quality results is also written to the S3 bucket in JSON format based on the data quality result location parameter of the node.
  28. Navigate to dqresults prefix under the S3 bucket starting gluedataqualitystudio-*. You will see that the data quality result is partitioned by date.

The following is the output of the JSON file. You can use this file output to build custom data quality visualization dashboards.

You can also monitor the Evaluate Data Quality node through Amazon CloudWatch metrics and set alarms to send notifications about data quality results. To learn more on how to set up CloudWatch alarms, refer to Using Amazon CloudWatch alarms.

Clean up

To avoid incurring future charges and to clean up unused roles and policies, delete the resources you created:

  1. Delete the GlueDataQualityStudio job you created as part of this post.
  2. On the AWS CloudFormation console, delete the GlueDataQualityStudio stack.

Conclusion

AWS Glue Data Quality offers an easy way to measure and monitor the data quality of your ETL pipeline. In this post, you learned how to take necessary actions based on the data quality results, which helps you maintain high data standards and make confident business decisions.

To learn more about AWS Glue Data Quality, check out the documentation:


About the Authors

Deenbandhu Prasad is a Senior Analytics Specialist at AWS, specializing in big data services. He is passionate about helping customers build modern data architecture on the AWS Cloud. He has helped customers of all sizes implement data management, data warehouse, and data lake solutions.

Yannis Mentekidis is a Senior Software Development Engineer on the AWS Glue team.

Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-from-the-aws-glue-data-catalog/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores.

Hundreds of thousands of customers use data lakes for analytics and machine learning to make data-driven business decisions. Data consumers lose trust in data if it is not accurate and recent, making data quality essential for undertaking optimal and correct decisions.

Evaluation of the accuracy and freshness of data is a common task for engineers. Currently, there are various tools available to evaluate data quality. However, these tools often require manual processes of data discovery and expertise in data engineering and coding.

We are pleased to announce the public preview launch of AWS Glue Data Quality. You can access this feature today without requesting any additional access in the available Regions. AWS Glue Data Quality is a new preview feature of AWS Glue that measures and monitors the data quality of Amazon S3-based data lakes and in AWS Glue ETL jobs. It does not require any expertise in data engineering or coding. It simplifies your experience of monitoring and evaluating the quality of your data.

This is Part 1 of a four-part series of posts to explain how AWS Glue Data Quality works. Check out the next posts in the series:

Getting started with AWS Glue Data Quality

In this post, we will go over the simplicity of using the AWS Glue Data Quality feature by:

  1. Starting data quality recommendations and runs on your data in AWS Glue Data Catalog.
  2. Creating an Amazon CloudWatch alarm for getting notifications when data quality results are below a certain threshold.
  3. Analyzing your AWS Glue Data Quality run results through Amazon Athena.

Set up resources with AWS CloudFormation

The provided CloudFormation script creates the following resources for you:

  1. The IAM role required to run AWS Glue Data Quality runs
  2. An Amazon Simple Storage Service (Amazon S3) bucket to store the NYC Taxi dataset
  3. An S3 bucket to store and analyze the results of AWS Glue Data Quality runs
  4. An AWS Glue database and table created from the NYC Taxi dataset

Steps:

  1. Open the AWS CloudFormation console.
  2. Choose Create stack and then select With new resources (standard).
  3. For Template source, choose Upload a template File, and provide the above attached template file. Then choose Next.
  4. For Stack name, DataQualityDatabase, and DataQualityTable, leave as default. For DataQualityS3BucketName, enter the name of your S3 bucket. Then choose Next.
  5. On the final screen, make sure to acknowledge that this stack would create IAM resources for you, and choose Submit.
  6. Once the stack is successfully created, navigate to the S3 bucket created by the stack and upload the yellow_tripdata_2022-01.parquet file.

Start an AWS Glue Data Quality run on your data in AWS Glue Data Catalog

In this first section, we will generate data quality rule recommendations from the AWS Glue Data Quality service. Using these recommendations, we will then run a data quality task against our dataset to obtain an analysis of our data.

To get started, complete the following steps:

  1. Open AWS Glue console.
  2. Choose Tables under Data Catalog.
  3. Select the DataQualityTable table created via the CloudFormation stack.
  4. Select the Data quality tab.
  5. Choose Recommend ruleset.
  6. On the Recommend data quality rules page, check Save recommended rules as a ruleset. This will allow us to save the recommended rules in a ruleset automatically, for use in the next steps.
  7. For IAM Role, choose the IAM role that was created from the CloudFormation stack.
  8. For Additional configurations -optional, leave the default number of workers and timeout.
  9. Choose Recommend ruleset. This will start a data quality recommendation run, with the given number of workers.
  10. Wait for the ruleset to be completed.
  11. Once completed, navigate back to the Rulesets tab. You should see a successful recommendation run and a ruleset created.

Understand AWS Glue Data Quality recommendations

AWS Glue Data Quality recommendations are suggestions generated by the AWS Glue Data Quality service and are based on the shape of your data. These recommendations automatically take into account aspects like RowCounts, Mean, Standard Deviation etc. of your data, and generate a set of rules, for you to use as a starting point.

The dataset used here was the NYC Taxi dataset. Based on this, the columns in this dataset, and the values of those columns, AWS Glue Data Quality recommends a set of rules. In total, the recommendation service automatically took into consideration all the columns of the dataset, and recommended 55 rules.

Some of these rules are:

  • “RowCount between <> and <> ” → Expect a count of number of rows based on the data it saw
  • “ColumnValues “VendorID” in [ ] → Expect the ”VendorID“ column to be within a specific set of values
  • IsComplete “VendorID” → Expect the “VendorID” to be a non-null value

How do I use the recommended AWS Glue Data Quality rules?

  1. From the Rulesets section, you should see your generated ruleset. Select the generated ruleset, and choose Evaluate ruleset.
    • If you didn’t check the box to Save recommended rules as a ruleset when you ran the recommendation, you can still click on the recommendation task run and copy the rules to create a new ruleset
  2. For Data quality actions under Data quality properties, select Publish metrics to Amazon CloudWatch. If this box isn’t checked, the data quality run will not publish metrics to Amazon CloudWatch.
  3. For IAM role, select the GlueDataQualityBlogRole created in the AWS CloudFormation stack.
  4. For Requested number of workers under Advanced properties, leave as default. 
  5. For Data quality results location, select the value of the GlueDataQualityResultsS3Bucket location that was created via the AWS CloudFormation stack
  6. Choose Evaluate ruleset.
  7. Once the run begins, you can see the status of the run on the Data quality results tab.
  8. After the run reaches a successful stage, select the completed data quality task run, and view the data quality results shown in Run results.

Our recommendation service suggested that we enforce 55 rules, based on the column values and the data within our NYC Taxi dataset. We then converted the collection of 55 rules into a RuleSet. Then, we ran a Data Quality Evaluation task run using our RuleSet against our dataset. In our results above, we see the status of each within the RuleSet.

You can also utilize the AWS Glue Data Quality APIs to carry out these steps.

Get Amazon SNS notifications for my failing data quality runs through Amazon CloudWatch alarms

Each AWS Glue Data Quality evaluation run from the Data Catalog, emits a pair of metrics named glue.data.quality.rules.passed (indicating a number of rules that passed) and glue.data.quality.rules.failed (indicating the number of failed rules) per data quality run. This emitted metric can be used to create alarms to alert users if a given data quality run falls below a threshold.

To get started with setting up an alarm that would send an email via an Amazon SNS notification, follow the steps below:

  1. Open Amazon CloudWatch console.
  2. Choose All metrics under Metrics. You will see an additional namespace under Custom namespaces titled Glue Data Quality.

Note: When starting an AWS Glue Data Quality run, make sure the Publish metrics to Amazon CloudWatch checkbox is enabled, as shown below. Otherwise, metrics for that particular run will not be published to Amazon CloudWatch.

  1. Under the Glue Data Quality namespace, you should be able to see metrics being emitted per table, per ruleset. For the purpose of our blog, we shall be using the glue.data.quality.rules.failed rule and alarm, if this value goes over 1 (indicating that, if we see a number of failed rule evaluations greater than 1, we would like to be notified).
  2. In order to create the alarm, choose All alarms under Alarms.
  3. Choose Create alarm.
  4. Choose Select metric.
  5. Select the glue.data.quality.rules.failed metric corresponding to the table you’ve created, then choose Select metric.
  6. Under the Specify metric and conditions tab, under the Metrics section:
    1. For Statistic, select Sum.
    2. For Period, select 1 minute.
  7. Under the Conditions section:
    1. For Threshold type, choose Static.
    2. For Whenever glue.data.quality.rules.failed is…, select Greater/Equal.
    3. For than…, enter 1 as the threshold value.
    4. Expand the Additional configurations dropdown and select Treat missing data as good

These selections imply that if the glue.data.quality.rules.failed metric emits a value greater than or equal to 1, we will trigger an alarm. However, if there is no data, we will treat it as acceptable.

  1. Choose Next.
  2. On Configure actions:
    1. For the Alarm state trigger section, select In alarm .
    2. For Send a notification to the following SNS topic, choose Create a new topic to send a notification via a new SNS topic.
    3. For Email endpoints that will receive the notification…, enter your email address. Choose Next.
  3. For Alarm name, enter myFirstDQAlarm, then choose Next.
  4. Finally, you should see a summary of all the selections on the Preview and create screen. Choose Create alarm at the bottom.
  5. You should now be able to see the alarm being created from the Amazon CloudWatch alarms dashboard.

In order to demonstrate AWS Glue Data Quality alarms, we are going to go over a real-world scenario where we have corrupted data being ingested, and how we could use the AWS Glue Data Quality service to get notified of this, using the alarm we created in the previous steps. For this purpose, we will use the provided file malformed_yellow_taxi.parquet that contains data that has been tweaked on purpose.

  1. Navigate to the S3 location DataQualityS3BucketName mentioned in the CloudFormation template supplied at the beginning of the blog post.
  2. Upload the malformed_yellow_tripdata.parquet file to this location. This will help us simulate a flow where we have a file with poor data quality coming into our data lakes via our ETL processes.
  3. Navigate to the AWS Glue Data Catalog console, select the demo_nyc_taxi_data_input that was created via the provided AWS CloudFormation template and then navigate to the Data quality tab.
  4. Select the RuleSet we had created in the first section. Then select Evaluate ruleset.
  5. From the Evaluate data quality screen:
    1. Check the box to Publish metrics to Amazon CloudWatch. This checkbox is needed to ensure that the failure metrics are emitted to Amazon CloudWatch.
    2. Select the IAM role created via the AWS CloudFormation template.
    3. Optionally, select an S3 location to publish your AWS Glue Data Quality results.
    4. Select Evaluate ruleset.
  6.  Navigate to the Data Quality results tab. You should now see two runs, one from the previous steps of this blog and one that we currently triggered. Wait for the current run to complete.
  7. As you see, we have a failed AWS Glue Data Quality run result, with only 52 of our original 55 rules passing. These failures are attributed to the new file we uploaded to S3.
  8. Navigate to the Amazon CloudWatch console and select the alarm we created at the beginning of this section.
  9. As you can see, we configured the alarm to fire every time the glue.data.quality.rules.failed metric crosses a threshold of 1. After the above AWS Glue Data Quality run, we see 3 rules failing, which triggered the alarm. Further, you also should have gotten an email detailing the alarm’s firing.

We have thus demonstrated an example where incoming malformed data, coming into our data lakes can be identified via the AWS Glue Data Quality rules, and subsequent alerting mechanisms can be created to notify appropriate personas.

Analyze your AWS Glue Data Quality run results through Amazon Athena

In scenarios where you have multiple AWS Glue Data Quality run results against a dataset, over a period of time, you might want to track the trends of the dataset’s quality over a period of time. To achieve this, we can export our AWS Glue Data Quality run results to S3, and use Amazon Athena to run analytical queries against the exported run. The results can then be further used in Amazon QuickSight to build dashboards to have a graphical representation of your data quality trends

In the third part of this post, we will see the steps needed to start tracking data on your dataset’s quality:

  1. For our data quality runs that we set up in the previous sections, we set the Data quality results location parameter to the bucket location specified by the AWS CloudFormation stack.
  2. After each successful run, you should see a single JSONL file being exported to your selected S3 location, corresponding to that particular run.
  3. Open the Amazon Athena console.
  4. In the query editor, run the following CREATE TABLE statement (replace the <my_table_name> with a relevant value, and <GlueDataQualityResultsS3Bucket_from_cfn> section with the GlueDataQualityResultsS3Bucket value from the provided AWS CloudFormation template):
    CREATE EXTERNAL TABLE `<my_table_name>`(
    `catalogid` string,
    `databasename` string,
    `tablename` string,
    `dqrunid` string,
    `evaluationstartedon` timestamp,
    `evaluationcompletedon` timestamp,
    `rule` string,
    `outcome` string,
    `failurereason` string,
    `evaluatedmetrics` string)
    PARTITIONED BY (
    `year` string,
    `month` string,
    `day` string)
    ROW FORMAT SERDE
    'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName')
    STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
    's3://<GlueDataQualityResultsS3Bucket_from_cfn>/'
    TBLPROPERTIES (
    'classification'='json',
    'compressionType'='none',
    'typeOfData'='file')
    
    MSCK REPAIR TABLE `<my_table_name>`

  5. Once the above table is created, you should be able to run queries to analyze your data quality results.

For example, consider the following query that shows me the failed AWS Glue Data Quality runs against my table demo_nyc_taxi_data_input within a time window:

SELECT * from "<my_table_name>"
WHERE "outcome" = 'Failed'
AND "tablename" = '<my_source_table>'
AND "evaluationcompletedon" between
parse_datetime('2022-12-05 17:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS') AND parse_datetime('2022-12-05 20:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS');

The output of the above query shows me details about all the runs with “outcome” = ‘Failed’ that ran against my NYC Taxi dataset table ( “tablename” = ‘demo_nyc_taxi_data_input’ ). The output also gives me information about the failure reason ( failurereason ) and the values it was evaluated against ( evaluatedmetrics ).

As you can see, we are able to get detailed information about our AWS Glue Data Quality runs, via the run results uploaded to S3, perform more detailed analysis and build dashboards on top of the data.

Clean up

  • Navigate to the Amazon Athena console and delete the table created for data quality analysis.
  • Navigate to the Amazon CloudWatch console and delete the alarms created.
  • If you deployed the sample CloudFormation stack, delete the CloudFormation stack via the AWS CloudFormation console. You will need to empty the S3 bucket before you delete the bucket.
  • If you have enabled your AWS Glue Data Quality runs to output to S3, empty those buckets as well.

Conclusion

In this post, we talked about the ease and speed of incorporating data quality rules using the AWS Glue Data Quality feature, into your AWS Glue Data Catalog tables. We also talked about how to run recommendations and evaluate data quality against your tables. We then discussed analyzing the data quality results via Amazon Athena, and the process for setting up alarms via Amazon CloudWatch in order to notify users of failed data quality.

To dive into the AWS Glue Data Quality APIs, take a look at the AWS Glue Data Quality API documentation
To learn more about AWS Glue Data Quality, check out the AWS Glue Data Quality Developer Guide


About the authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team.

Joseph Barlan is a Frontend Engineer at AWS Glue. He has over 5 years of experience helping teams build reusable UI components and is passionate about frontend design systems. In his spare time, he enjoys pencil drawing and binge watching tv shows.

Build an AWS Lake Formation permissions inventory dashboard using AWS Glue and Amazon QuickSight

Post Syndicated from Srividya Parthasarathy original https://aws.amazon.com/blogs/big-data/build-an-aws-lake-formation-permissions-inventory-dashboard-using-aws-glue-and-amazon-quicksight/

AWS Lake Formation is an integrated data lake service that makes it easy for you to ingest, clean, catalog, transform, and secure your data and make it available for analysis and machine learning (ML). Lake Formation provides a single place to define fine-grained access control on catalog resources. These permissions are granted to the principals by a data lake admin, and integrated engines like Amazon Athena, AWS Glue, Amazon EMR, and Amazon Redshift Spectrum enforce the access controls defined in Lake Formation. It also allows principals to securely share data catalog resources across multiple AWS accounts and AWS organizations through a centralized approach.

As organizations are adopting Lake Formation for scaling their permissions, there is steady increase in the access policies established and managed within the enterprise. However, it becomes more difficult to analyze and understand the permissions for auditing. Therefore, customers are looking for a simple way to collect, analyze, and visualize permissions data so that they can inspect and validate the policies. It also enables organizations to take actions that help them with compliance requirements.

This solution offers the ability to consolidate and create a central inventory of Lake Formation permissions that are registered in the given AWS account and Region. It provides a high-level view of various permissions that Lake Formation manages and aims at answering questions like:

  • Who has select access on given table
  • Which tables have delete permission granted
  • Which databases or tables does the given principal have select access to

In this post, we walk through how to set up and collect the permissions granted on resources in a given account using the Lake Formation API. AWS Glue makes it straightforward to set up and run jobs for collecting the permission data and creating an external table on the collected data. We use Amazon QuickSight to create a permissions dashboard using an Athena data source and dataset.

Overview of solution

The following diagram illustrates the architecture of this solution.

In this solution, we walk through the following tasks:

  1. Create an AWS Glue job to collect and store permissions data, and create external tables using Boto3.
  2. Verify the external tables created using Athena.
  3. Sign up for a QuickSight Enterprise account and enable Athena access.
  4. Create a dataset using an Athena data source.
  5. Use the datasets for analysis.
  6. Publish the analyses as a QuickSight dashboard.

The collected JSON data is flattened and written into an Amazon Simple Storage Service (Amazon S3) bucket as Parquet files partitioned by account ID, date, and resource type. After the data is stored in Amazon S3, external tables are created on them and filters are added for different types of resource permissions. These datasets can be imported into SPICE, an in-memory query engine that is part of QuickSight, or queried directly from QuickSight to create analyses. Later, you can publish these analyses as a dashboard and share it with other users.

Dashboards are created for the following use cases:

  • Database permissions
  • Table permissions
  • Principal permissions

Prerequisites

You should have the following prerequisites:

  • An S3 bucket to store the permissions inventory data
  • An AWS Glue database for permissions inventory metadata
  • An AWS Identity and Access Management (IAM) role for the AWS Glue job with access to the inventory AWS Glue database and S3 bucket and added as a data lake admin
  • A QuickSight account with access to Athena
  • An IAM role for QuickSight with access to the inventory AWS Glue database and S3 bucket

Set up and run the AWS Glue job

We create an AWS Glue job to collect Lake Formation permissions data for the given account and Region that is provided as job parameters, and the collected data is flattened before storage. Data is partitioned by account ID, date, and permissions type, and is stored as Parquet in an S3 bucket using Boto3. We create external tables on the data and add filters for different types of resource permissions.

To create the AWS Glue job, complete the following steps:

  1. Download the Python script file to local.
  2. On the AWS Glue console, under Data Integration and ETL in the navigation pane, choose Jobs.
  3. Under Create job, select Python Shell script editor.
  4. For Options, select Upload and edit an existing script.
  5. For File upload, choose Choose file.
  6. Choose the downloaded file (lf-permissions-inventory.py).
  7. Choose Create.

GlueJob

  1. After the job is created, enter a name for the job (for this post, lf-inventory-builder) and choose Save.

Glue Job save

  1. Choose the Job details tab.
  2. For Name, enter a name for the job.
  3. For IAM Role¸ choose an IAM role that has access to the inventory S3 bucket and inventory schema and registered as data lake admin.
  4. For Type, choose Python Shell.
  5. For Python version, choose Python 3.9.

Glue Job Details

  1. You can leave other property values at their default.
  2. Under Advanced properties¸ configure the following job parameters and values:
    1. catalog-id: with the value as the current AWS account ID whose permissions data are collected.
    2. databasename: with the value as the AWS Glue database where the inventory-related schema objects are created.
    3. region: with the value as the current Region where the job is configured and whose permissions data is collected.
    4. s3bucket: with the value as the S3 bucket where the collected permissions data is written.
    5. createtable: with the value yes, which enables external table creation on the data.

Job Parameters

  1. Choose Save to save the job settings.

Glue Job Save

  1. Choose Run to start the job.

When the job is complete, the run status changes to Succeeded. You can view the log messages in Amazon CloudWatch Logs.

Job Run

Permissions data is collected and stored in the S3 bucket (under lfpermissions-data) that you provided in the job parameters.

S3 Structure

The following external tables are created on the permissions data and can be queried using Athena:

  • lfpermissions – A summary of resource permissions
  • lfpermissionswithgrant – A summary of grantable resource permissions

For both tables, the schema structure is the same and the lftype column indicates what type of permissions the row applies to.

Athena Table Schema

Verify the tables using Athena

You can use Athena to verify the data using the following queries.

For more information, refer to Running SQL queries using Amazon Athena

  • List the database permissions:
Select * from lfpermissions where lftype=’DATABASE’
  • List the table permissions:
Select * from lfpermissions where lftype= ‘TABLE’
  • List the data lake permissions:
Select * from lfpermissions where lftype= ‘DATA_LOCATION’
  • List the grantable database permissions:
Select * from lfpermissionswithgrant where lftype=’DATABASE’
  • List the grantable table permissions:
Select * from lfpermissionswithgrant where lftype= ‘TABLE’
  • List grantable data lake permissions:
Select * from lfpermissionswithgrant where lftype= ‘DATA_LOCATION’

As the next step, we create a QuickSight dashboard with three sheets, each focused on different sets of permissions (database, table, principal) to slice and dice the data.

Sign up for a QuickSight account

If you haven’t signed up for QuickSight, complete the following steps:

  1. Sign in to the AWS Management Console as Admin, search for QuickSight and choose Sign up for QuickSight.

QuickSight signup

  1. For Edition, select Enterprise.
  2. Choose Continue.
  3. For Authentication method, select Use IAM federated identities & QuickSight-managed users.
  4. Under QuickSight Region, choose the same Region as your inventory S3 bucket.
  5. Under Account info, enter a QuickSight account name and email address for notification.

QuickSight Form

  1. In the Quick access to AWS services section, for IAM Role, select Use QuickSight-managed role (default).
  2. Allow access to IAM, Athena, and Amazon S3.
  3. Specify the S3 bucket that contains the permissions data.
  4. Choose Finish to complete the signup process.

QuickSight configuration

Note: If the inventory bucket and database is managed by Lake Formation, grant database and table access to the created QuickSight IAM role. For instructions, refer to Granting and revoking permissions on Data Catalog resources.

Configure your dataset in QuickSight

QuickSight is configured with an Athena data source the same Region as the S3 bucket. To set up your dataset, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.

Quicksight DataSet

  1. Choose Athena as your data source.

QuickSight Datasource

  1. Enter LF_DASHBOARD_DS as the name of your data source.
  2. Choose Create data source.
  3. For Catalog, leave it as AwsDataCatalog.
  4. For Database, choose database name provided as parameter to the Job.
  5. For Tables, select lfpermissions.
  6. Choose Select.

QuickSight Catalog Info

  1. Select Directly query your data and choose Visualize to take you to the analysis.

Quicksight data mode

Create analyses

We create three sheets for our dashboard to view different levels of permissions.

Sheet 1: Database permission view

To view database permissions, complete the following steps:

  1. On the QuickSight console, choose the plus sign to create a new sheet.
  2. Choose Add, then choose Add title.

QuickSight Title

  1. Name the sheet Database Permissions.
  2. Repeat steps (5-7) to add the following parameters:
    • catalogid
    • databasename
    • permission
    • tablename
  3. On the Add menu, choose Add parameter.
  4. Enter a name for the parameter.
  5. Leave the other values as default and choose Create.
  6. Choose Insights in the navigation pane, then choose Add control.

QuickSight Control

  1. Add a control for each parameter:
    1. For each parameter, for Style¸ choose List, and for Values, select Link to a dataset field.
      QuickSight Dependency
    2. Provide additional information for each parameter according to the following table.
Parameter Display Name Dataset Field
catalogid AccountID lfpermissions catalog_id
databasename DatabaseName lfpermissions databasename
permission Permission lfpermissions permission
  1. Add a control dependency and for Database, choose the options menu and choose Edit.

QuickSight Dependency

  1. Under Format control, choose Control options.
  2. Change the relevant values, choose AccountID, and choose Update.
  3. Similarly, under Permission control, choose Control options.
  4. Change the relevant values, choose AccountID, and choose Update.

We create two visuals for this view.

  1. For the first visual, choose Visualize and choose pivot table as the visual type.
  2. Drag and drop catalog_id and databasename into Rows.
  3. Drag and drop permission into Column.
  4. Drag and drop principal into Values and change the aggregation to Count distinct.

QuickSight Database View1

  1. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. DATABASE as the value.
  2. Add a filter on catalog_id the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  3. Add a filter on databasename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose databasename.
  4. Add a filter on permission with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose permission.
  5. Choose Actions in the navigation pane.
  6. Define a new action with the following parameters:
    1. For Activation, select Select.
    2. For Filter action, select All fields.
    3. For Target visuals, select Select visuals and Check principal.

Now we add our second visual.

  1. Add a second visual and choose the table visual type.
  2. Drag and drop principal to Group by.
  3. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. DATABASE as the value.
  4. Add a filter on catalog_id the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  5. Add a filter on databasename the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose databasename.
  6. Add a filter on permission with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose permission.

Now the Database and Permission drop-down menus are populated based on the relevant attributes and changes dynamically.

QuickSight Database View2

Sheet 2: Table permission view

Now that we have created the database permissions sheet, we can add a table permissions sheet.

  1. Choose the plus sign to add a new sheet.
  2. On the QuickSight console, choose Add, then choose Add title.
  3. Name the sheet Table Permissions.
  4. Choose Insights in the navigation pane, then choose Add control.
  5. Add a control for each parameter:
    1. For each parameter, for Style¸ choose List, and for Values, select Link to a dataset field.
    2. Provide the additional information for each parameter according to the following table.
Parameter Display Name Dataset Field
catalogid AccountID lfpermissions catalog_id
databasename DatabaseName lfpermissions databasename
permission Permission lfpermissions permission
tablename TableName lfpermissions tablename

We create two visuals for this view.

  1. For the first visual, choose Visualize and choose pivot table as the visual type.
  2. Drag and drop catalog_id, databasename, and tablename into Rows.
  3. Drag and drop permission into Column.
  4. Drag and drop principal into Values and change the aggregation to Count distinct.
  5. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. TABLE as the value.
  6. Add a filter on catalog_id the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  7. Add a filter on the databasename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose databasename.
  8. Add a filter on tablename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose tablename.
  9. Add a filter on permission with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose permission.
  10. Choose Actions in the navigation pane.
  11. Define a new action with the following parameters:
    1. For Activation, select Select.
    2. For Filter action, select All fields.
    3. For Target visuals, select Select visuals and Check principal.

Now we add our second visual.

  1. Add a second visual and choose the table visual type.
  2. Drag and drop principal to Group by.
  3. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. TABLE as the value.
  4. Add a filter on catalog_id the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  5. Add a filter on the databasename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose databasename.
  6. Add a filter on tablename with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose tablename.
  7. Add a filter on permission with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose permission.

Now the Databasename, Tablename, and Permission drop-down menus are populated based on the relevant attributes.

QuickSight Table Permissions

Sheet 3: Principal permission view

Now we add a third sheet for principal permissions.

  1. Choose the plus sign to add a new sheet.
  2. On the QuickSight console, choose Add, then choose Add title.
  3. Name the sheet Principal Permissions.
  4. Choose Insights in the navigation pane, then choose Add control.
  5. Add a control for the catalogid parameter:
    1. For Style¸ choose List, and for Values, select Link to a dataset field.
    2. Provide the additional information for the parameter according to the following table.
Parameter Display Name Dataset Field
catalogid AccountID lfpermissions catalog_id

We create four visuals for this view.

  1. For the first visual, choose Visualize and choose pivot table as the visual type.
  2. Drag and drop catalog_id and principal into Rows.
  3. Drag and drop permission into Column.
  4. Drag and drop databasename into Values and change the aggregation to Count distinct.
  5. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. DATABASE as the value.
  6. Add a filter on the catalog_id field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  7. Choose Actions in the navigation pane.
  8. Define a new action with the following parameters:
    1. For Activation, select Select.
    2. For Filter action, select All fields.
    3. For Target visuals, select Select visuals and Check Databasename.
  9. For the second visual, choose Visualize and choose table as the visual type.
  10. Drag and drop databasename into Group by.
  11. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. DATABASE as the value.
  12. Add a filter on the catalog_id field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  13. For the third visual, choose Visualize and choose pivot table as the visual type.
  14. Drag and drop catalog_id and principal into Rows.
  15. Drag and drop permission into Column.
  16. Drag and drop tablename into Values and change the aggregation to Count distinct.
  17. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. TABLE as the value.
  18. Add a filter on the catalog_id field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.
  19. Choose Actions in the navigation pane.
  20. Define a new action with the following parameters:
    1. For Activation, select Select.
    2. For Filter action, select All fields.
    3. For Target visuals, select Select visuals and Check Tablename.
  21. For the final visual, choose Visualize and choose table as the visual type.
  22. Drag and drop tablename into Group by.
  23. Add a filter on the lftype field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. TABLE as the value.
  24. Add a filter on the catalog_id field with the following options:
    1. Custom filter as the filter type.
    2. Equals as the filter condition.
    3. Select Use parameters and choose catalogid.

The following screenshot shows our sheet.

QuickSight Prinicipal View

Create a dashboard

Now that the analysis is ready, you can publish it as a dashboard and share it with other users. For instructions, refer to the tutorial Create an Amazon QuickSight dashboard.

Clean up

To clean up the resources created in this post, complete the following steps:

  1. Delete the AWS Glue job lf-inventory-builder.
  2. Delete the data stored under the bucket provided as the value of the s3bucket job parameter.
  3. Drop the external tables created under the schema provided as the value of the databasename job parameter.
  4. If you signed up for QuickSight to follow along with this post, you can delete the account.
  5. For an existing QuickSight account, delete the following resources:
    1. lfpermissions dataset
    2. lfpermissions analysis
    3. lfpermissions dashboard

Conclusion

In this post, we provided a design and implementation steps for a solution to collect Lake Formation permissions in a given Region of an account and consolidate them for analysis. We also walked through the steps to create a dashboard using Amazon QuickSight. You can utilize other QuickSight visuals to create more sophisticated dashboards based on your requirements.

You can also expand this solution to consolidate permissions for a multi-account setup. You can use a shared bucket across organizations and accounts and configure an AWS Glue job in each account or organization to write their permission data. With this solution, you can maintain a unified dashboard view of all the Lake Formation permissions within your organization, thereby providing a central audit mechanism to comply with business requirements.

Thanks for reading this post! If you have any comments or questions, please don’t hesitate to leave them in the comments section.


About the Author

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building analytics and data mesh solutions on AWS and sharing them with the community.

How to use Amazon Macie to preview sensitive data in S3 buckets

Post Syndicated from Koulick Ghosh original https://aws.amazon.com/blogs/security/how-to-use-amazon-macie-to-preview-sensitive-data-in-s3-buckets/

Security teams use Amazon Macie to discover and protect sensitive data, such as names, payment card data, and AWS credentials, in Amazon Simple Storage Service (Amazon S3). When Macie discovers sensitive data, these teams will want to see examples of the actual sensitive data found. Reviewing a sampling of the discovered data helps them quickly confirm that the object is truly sensitive according to their data protection and privacy policies.

In this post, we walk you through how your data security teams are able to use a new capability in Amazon Macie to retrieve up to 10 examples of sensitive data found in your S3 objects, so that you are able to confirm the nature of the data at a glance. Additionally, we will discuss how you are able to control who is able to use this capability, so that only authorized personnel have permissions to view these examples.

The challenge customers face

After a Macie sensitive data discovery job is run, security teams start their work. The security team will review the Macie findings to investigate the discovered sensitive data and decide what actions to take to protect such data. The findings provide details that include the severity of the finding, information on the affected S3 object, and a summary of the type, location, and amount of sensitive data found. However, Macie findings only contain pointers to data that Macie found in the object. In order to complete their investigation, customers in the past had to do additional work to extract the contents of a sensitive object, such as navigating to a different AWS account where the object is located, downloading and manually searching for keywords in a file editor, or writing and refining SQL queries by using Amazon S3 Select. The investigations are further slowed down when the object type is one that is not easily readable without additional tooling, such as big-data file types like Avro and Parquet. By using the Macie capability to retrieve sensitive data samples, you are able to review the discovered data and make decisions concerning the finding remediation.

Prerequisites

To implement the ability to retrieve and reveal samples of sensitive data, you’ll need the following prerequisites:

  • Enable Amazon Macie in your AWS account. For instructions, see Getting started with Amazon Macie.
  • Set your account as the delegated Macie administrator account and enable Macie in at least one member account by using AWS Organizations. In this post, we will refer to the delegated administrator account as Account A and the member account as Account B.
  • Configure Macie detailed classification results in Account A.

    Note: The detailed classification results contain a record for each Amazon S3 object that you configure the job to analyze, and include the location of up to 1,000 occurrences of each type of sensitive data that Macie found in an object. Macie uses the location information in the detailed classification results to retrieve the examples of sensitive data. The detailed classification results are stored in an S3 bucket of your choice. In this post, we will refer to this bucket as DOC-EXAMPLE-BUCKET1.

  • Create an S3 bucket that contains sensitive data in Account B. In this post, we will refer to this bucket as DOC-EXAMPLE-BUCKET2.

    Note: You should enable server-side encryption on this bucket by using customer managed AWS Key Management Service (AWS KMS) keys (a type of encryption known as SSE-KMS).

  • (Optional) Add sensitive data to DOC-EXAMPLE-BUCKET2. This post uses a sample dataset that contains fake sensitive data. You are able to download this sample dataset, unarchive the .zip folder, and follow these steps to upload the objects to S3. This is a synthetic dataset generated by AWS that we will use for the examples in this post. All data in this blog post has been artificially created by AWS for demonstration purposes and has not been collected from any individual person. Similarly, such data does not relate back to any individual person, nor is it intended to.
  • Create and run a sensitive data discovery job from Account A to analyze the contents of DOC-EXAMPLE-BUCKET2.
  • (Optional) Set up the AWS Command Line Interface (AWS CLI).

Configure Macie to retrieve and reveal examples of sensitive data

In this section, we’ll describe how to configure Macie so that you are able to retrieve and view examples of sensitive data from Macie findings.

To configure Macie (console)

  • In the AWS Management Console, in the Macie delegated administrator account (Account A), follow these steps from the Amazon Macie User Guide.

To configure Macie (AWS CLI)

  1. Confirm that you have Macie enabled.
    	$ aws macie2 get-macie-session --query 'status'
    	// The expected response is "ENABLED"

  2. Confirm that you have configured the detailed classification results bucket.
    	$ aws macie2 get-classification-export-configuration
    
    	// The expected response is:
    	{
       	 "configuration": {
       		 	    "s3Destination": {
            		    "bucketName": " DOC-EXAMPLE-BUCKET1 ",
               			"kmsKeyArn": "arn:aws:kms:<YOUR-REGION>:<YOUR-ACCOUNT-ID>:key/<KEY-USED-TO-ENCRYPT-DOC-EXAMPLE-BUCKET1>"
         		  	 }
    		}	
    	} 

  3. Create a new KMS key to encrypt the retrieved examples of sensitive data. Make sure that the key is created in the same AWS Region where you are operating Macie.
    $ aws kms create-key
    {
        "KeyMetadata": {
            "Origin": "AWS_KMS",
            "KeyId": "<YOUR-KEY-ID>",
            "Description": "",
            "KeyManager": "CUSTOMER",
            "Enabled": true,
            "KeySpec": "SYMMETRIC_DEFAULT",
            "CustomerMasterKeySpec": "SYMMETRIC_DEFAULT",
            "KeyUsage": "ENCRYPT_DECRYPT",
            "KeyState": "Enabled",
            "CreationDate": 1502910355.475,
            "Arn": "arn:aws:kms: <YOUR-AWS-REGION>:<AWS-ACCOUNT-A>:key/<YOUR-KEY-ID>",
            "AWSAccountId": "<AWS-ACCOUNT-A>",
            "MultiRegion": false
            "EncryptionAlgorithms": [
                "SYMMETRIC_DEFAULT"
            ],
        }
    }

  4. Give this key the alias REVEAL-KMS-KEY.
    $ aws kms CreateAlias
    {
       "AliasName": " <REVEAL-KMS-KEY> ",
       "TargetKeyId": "<YOUR-KEY-ID>"
    }

  5. Enable the feature in Macie and configure it to encrypt the data by using REVEAL-KMS-KEY. You do not specify a key policy for your new KMS key in this step. The key policy will be discussed later in the post.
    $ aws macie2 update-reveal-configuration --configuration '{"status":"ENABLED","kmsKeyId":"alias/ <REVEAL-KMS-KEY> "}'
    
    // The expected response is:
    {
        "configuration": {
            "kmsKeyId": "arn:aws:kms:<YOUR-REGION>: <YOUR ACCOUNT ID>:key/<REVEAL-KMS-KEY>.",
            "status": "ENABLED"
        }
    }

Control access to read sensitive data and protect data displayed in Macie

This new Macie capability uses the AWS Identity and Access Management (IAM) policies, S3 bucket policies, and AWS KMS key policies that you have defined in your accounts. This means that in order to see examples through the Macie console or by invoking the Macie API, the IAM principal needs to have read access to the S3 object and to decrypt the object if it is server-side encrypted. It’s important to note that Macie uses the IAM permissions of the AWS principal to locate, retrieve, and reveal the samples and does not use the Macie service-linked role to perform these tasks.

Using the setup discussed in the previous section, you will walk through how to control access to the ability to retrieve and reveal sensitive data examples. To recap, you created and ran a discovery job from the Amazon Macie delegated administrator account (Account A) to analyze the contents of DOC-EXAMPLE-BUCKET2 in a member account (Account B). You configured Macie to retrieve examples and to encrypt the examples of sensitive data with the REVEAL-KMS-KEY.

The next step is to create and use an IAM role that will be assumed by other users in Account A to retrieve and reveal examples of sensitive data discovered by Macie. In this post, we’ll refer to this role as MACIE-REVEAL-ROLE.

To apply the principle of least privilege and allow only authorized personnel to view the sensitive data samples, grant the following permissions so that Macie users who assume MACIE-REVEAL-ROLE will be able to successfully retrieve and reveal examples of sensitive data:

  • Step 1 – Update the IAM policy for MACIE-REVEAL-ROLE.
  • Step 2 – Update the KMS key policy for REVEAL-KMS-KEY.
  • Step 3 – Update the S3 bucket policy for DOC-EXAMPLE-BUCKET2 and the KMS key policy used for its server-side encryption in Account B.

After you grant these permissions, MACIE-REVEAL-ROLE is succcesfully able to retrieve and reveal examples of sensitive data in DOC-EXAMPLE-BUCKET2, as shown in Figure 1.

Figure 1: Macie runs the discovery job from the delegated administrator account in a member account, and MACIE-REVEAL-ROLE retrieves examples of sensitive data

Figure 1: Macie runs the discovery job from the delegated administrator account in a member account, and MACIE-REVEAL-ROLE retrieves examples of sensitive data

Step 1: Update the IAM policy

Provide the following required permissions to MACIE-REVEAL-ROLE:

  1. Allow GetObject from DOC-EXAMPLE-BUCKET2 in Account B.
  2. Allow decryption of DOC-EXAMPLE-BUCKET2 if it is server-side encrypted with a customer managed key (SSE-KMS).
  3. Allow GetObject from DOC-EXAMPLE-BUCKET1.
  4. Allow decryption of the Macie discovery results.
  5. Allow the necessary Macie actions to retrieve and reveal sensitive data examples.

To set up the required permissions

  • Use the following commands to provide the permissions. Make sure to replace the placeholders with your own data.
    {
        "Version": "2012-10-17",
        "Statement": [
    	{
                "Sid": "AllowGetFromCompanyDataBucket",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::<DOC-EXAMPLE-BUCKET2>/*"
            },
            {
                "Sid": "AllowKMSDecryptForCompanyDataBucket",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt"
                ],
                "Resource": "arn:aws:kms:<AWS-Region>:<AWS-Account-B>:key/<KEY-USED-TO-ENCRYPT-DOC-EXAMPLE-BUCKET2>"
            },
            {
                "Sid": "AllowGetObjectfromMacieResultsBucket",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::<DOC-EXAMPLE-BUCKET1>/*"
            },
    	{
                "Sid": "AllowKMSDecryptForMacieRoleDiscoveryBucket",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt"
                ],
                "Resource": "arn:aws:kms:<AWS-REGION>:<AWS-ACCOUNT-A>:key/<KEY-USED-TO-ENCRYPT-DOC-EXAMPLE-BUCKET1>"
            },
    	{
                "Sid": "AllowActionsRetrieveAndReveal",
                "Effect": "Allow",
                "Action": [
                    "macie2:GetMacieSession",
                    "macie2:GetFindings",
                    "macie2:GetSensitiveDataOccurrencesAvailability",
                    "macie2:GetSensitiveDataOccurrences",
                    "macie2:ListFindingsFilters",
                    "macie2:GetBucketStatistics",
                    "macie2:ListMembers",
                    "macie2:ListFindings",
                    "macie2:GetFindingStatistics",
                    "macie2:GetAdministratorAccount",
                    "macie2:GetClassificationExportConfiguration",
                    "macie2:GetRevealConfiguration",
                    "macie2:DescribeBuckets"
                ],
                "Resource": "*” 
            }
        ]
    }

Step 2: Update the KMS key policy

Next, update the KMS key policy that is used to encrypt sensitive data samples that you retrieve and reveal in your delegated administrator account.

To update the key policy

  • Allow the MACIE-REVEAL-ROLE access to the KMS key that you created for protecting the retrieved sensitive data, using the following commands. Make sure to replace the placeholders with your own data.
    	{
                "Sid": "AllowMacieRoleDecrypt",
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam:<AWS-REGION>:<AWS-ACCOUNT-A>:role/<MACIE-REVEAL-ROLE>"
                },
                "Action": [
                    "kms:Decrypt",
                    "kms:DescribeKey",
                    "kms:GenerateDataKey"
                ],
                "Resource": "arn:aws:kms:<AWS-REGION>:<AWS-ACCOUNT-A>:key/<REVEAL-KMS-KEY>"
            }

Step 3: Update the bucket policy of the S3 bucket

Finally, update the bucket policy of the S3 bucket in member accounts, and update the key policy of the key used for SSE-KMS.

To update the S3 bucket policy and KMS key policy

  1. Use the following commands to update key policy for the KMS key used for server-side encryption of the DOC-EXAMPLE-BUCKET2 bucket in Account B.
    	{
                "Sid": "AllowMacieRoleDecrypt”
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam:<AWS-REGION>:<AWS-ACCOUNT-A>:role/<MACIE-REVEAL-ROLE>"
                },
                "Action": "kms:Decrypt",
                "Resource": "arn:aws:kms:<AWS-REGION>:<AWS-ACCOUNT-B>:key/<KEY-USED-TO-ENCRYPT-DOC-EXAMPLE-BUCKET2>"
      }

  2. Use the following commands to update the bucket policy of DOC-EXAMPLE-BUCKET2 to allow cross-account access for MACIE-REVEAL-ROLE to get objects from this bucket.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AllowMacieRoleGet",
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<AWS-ACCOUNT-A>:role/<MACIE-REVEAL-ROLE>"
                },
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::<DOC-EXAMPLE-BUCKET2>/*"
            }
        ]
    }

Retrieve and reveal sensitive data samples

Now that you’ve put in place the necessary permissions, users who assume MACIE-REVEAL-ROLE will be able to conveniently retrieve and reveal sensitive data samples.

To retrieve and reveal sensitive data samples

  1. In the Macie console, in the left navigation pane, choose Findings, and select a specific finding. Under Sensitive Data, choose Review.
    Figure 2: The finding details panel

    Figure 2: The finding details panel

  2. On the Reveal sensitive data page, choose Reveal samples.
    Figure 3: The Reveal sensitive data page

    Figure 3: The Reveal sensitive data page

  3. Under Sensitive data, you will be able to view up to 10 examples of the sensitive data found by Amazon Macie.
    Figure 4: Examples of sensitive data revealed in the Amazon Macie console

    Figure 4: Examples of sensitive data revealed in the Amazon Macie console

You are able to find additional information on setting up the Macie Reveal function in the Amazon Macie User Guide.

Conclusion

In this post, we showed how you are to retrieve and review examples of sensitive data that were found in Amazon S3 using Amazon Macie. This capability will make it easier for your data protection teams to review the sensitive contents found in S3 buckets across the accounts in your AWS environment. With this information, security teams are able to quickly take remediation actions, such as updating the configuration of sensitive buckets, quarantining files with sensitive information, or sending a notification to the owner of the account where the sensitive data resides. In certain cases, you are able to add the examples to an allow list in Macie if you don’t want Macie to report those as sensitive data (for example, corporate addresses or sample data that is used for testing).

The following are links to additional resources that you will be able to use to expand your knowledge of Amazon Macie capabilities and features:

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on Amazon Macie re:Post.

Want more AWS Security news? Follow us on Twitter.

Koulick Ghosh

Koulick Ghosh

Koulick is a Senior Product Manager in AWS Security based in Seattle, WA. He loves speaking with customers on how AWS Security services can help make them more secure. In his free-time, he enjoys playing the guitar, reading, and exploring the Pacific Northwest.

Author

Michael Ingoldby

Michael is a Senior Security Solutions Architect at AWS based in Frisco, Texas. He provides guidance and helps customers to implement AWS native security services. Michael has been working in the security domain since 2006. When he is not working, he enjoys spending time outdoors.

Robert Wu

Robert Wu

Robert is the Software Development Engineer for AWS Macie, working on enabling customers with more sensitive data discovery capabilities. In his free time, he enjoys exploring and contributing to various open-source projects to widen his domain knowledge.

Introducing the Cloud Shuffle Storage Plugin for Apache Spark

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In AWS Glue, you can use Apache Spark, an open-source, distributed processing system for your data integration tasks and big data workloads.

Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple Spark partitions on different nodes so that you can process a large amount of data in parallel. In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. During a shuffle, data is written to local disk and transferred across the network. The shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a No space left on device or MetadataFetchFailedException error when there isn’t enough disk space left on the executor and there is no recovery. Such Spark jobs can’t typically succeed without adding additional compute and attached storage, wherein compute is often idle, and results in additional cost.

In 2021, we launched Amazon S3 shuffle for AWS Glue 2.0 with Spark 2.4. This feature disaggregated Spark compute and shuffle storage by utilizing Amazon Simple Storage Service (Amazon S3) to store Spark shuffle files. Using Amazon S3 for Spark shuffle storage enabled you to run data-intensive workloads more reliably. After the launch, we continued investing in this area, and collected customer feedback.

Today, we’re pleased to release Cloud Shuffle Storage Plugin for Apache Spark. It supports the latest Apache Spark 3.x distribution so you can take advantage of the plugin in AWS Glue or any other Spark environments. It’s now also natively available to use in AWS Glue Spark jobs on AWS Glue 3.0 and the latest AWS Glue version 4.0 without requiring any extra setup or bringing external libraries. Like the Amazon S3 shuffle for AWS Glue 2.0, the Cloud Shuffle Storage Plugin helps you solve constrained disk space errors during shuffle in serverless Spark environments.

We’re also excited to announce the release of software binaries for the Cloud Shuffle Storage Plugin for Apache Spark under the Apache 2.0 license. You can download the binaries and run them on any Spark environment. The new plugin is open-cloud, comes with out-of-the box support for Amazon S3, and can be easily configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage.

Understanding a shuffle operation in Apache Spark

In Apache Spark, there are two types of transformations:

  • Narrow transformation – This includes map, filter, union, and mapPartition, where each input partition contributes to only one output partition.
  • Wide transformation – This includes join, groupBykey, reduceByKey, and repartition, where each input partition contributes to many output partitions. Spark SQL queries including JOIN, ORDER BY, GROUP BY require wide transformations.

A wide transformation triggers a shuffle, which occurs whenever data is reorganized into new partitions with each key assigned to one of them. During a shuffle phase, all Spark map tasks write shuffle data to a local disk that is then transferred across the network and fetched by Spark reduce tasks. The volume of data shuffled is visible in the Spark UI. When shuffle writes take up more space than the local available disk capacity, it causes a No space left on device error.

To illustrate one of the typical scenarios, let’s use the query q80.sql from the standard TPC-DS 3 TB dataset as an example. This query attempts to calculate the total sales, returns, and eventual profit realized during a specific time frame. It involves multiple wide transformations (shuffles) caused by left outer join and group by.

Let’s run the following query on AWS Glue 3.0 job with 10 G1.X workers where a total of 640GB of local disk space is available:

with ssr as
 (select  s_store_id as store_id,
          sum(ss_ext_sales_price) as sales,
          sum(coalesce(sr_return_amt, 0)) as returns,
          sum(ss_net_profit - coalesce(sr_net_loss, 0)) as profit
  from store_sales left outer join store_returns on
         (ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number),
     date_dim, store, item, promotion
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
       and ss_store_sk = s_store_sk
       and ss_item_sk = i_item_sk
       and i_current_price > 50
       and ss_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by s_store_id),
 csr as
 (select  cp_catalog_page_id as catalog_page_id,
          sum(cs_ext_sales_price) as sales,
          sum(coalesce(cr_return_amount, 0)) as returns,
          sum(cs_net_profit - coalesce(cr_net_loss, 0)) as profit
  from catalog_sales left outer join catalog_returns on
         (cs_item_sk = cr_item_sk and cs_order_number = cr_order_number),
     date_dim, catalog_page, item, promotion
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and cs_catalog_page_sk = cp_catalog_page_sk
       and cs_item_sk = i_item_sk
       and i_current_price > 50
       and cs_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by cp_catalog_page_id),
 wsr as
 (select  web_site_id,
          sum(ws_ext_sales_price) as sales,
          sum(coalesce(wr_return_amt, 0)) as returns,
          sum(ws_net_profit - coalesce(wr_net_loss, 0)) as profit
  from web_sales left outer join web_returns on
         (ws_item_sk = wr_item_sk and ws_order_number = wr_order_number),
     date_dim, web_site, item, promotion
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and ws_web_site_sk = web_site_sk
       and ws_item_sk = i_item_sk
       and i_current_price > 50
       and ws_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by web_site_id)
 select channel, id, sum(sales) as sales, sum(returns) as returns, sum(profit) as profit
 from (select
        'store channel' as channel, concat('store', store_id) as id, sales, returns, profit
      from ssr
      union all
      select
        'catalog channel' as channel, concat('catalog_page', catalog_page_id) as id,
        sales, returns, profit
      from csr
      union all
      select
        'web channel' as channel, concat('web_site', web_site_id) as id, sales, returns, profit
      from  wsr) x
 group by rollup (channel, id)
 order by channel, id

The following screenshot shows the Executor tab in the Spark UI.
Spark UI Executor Tab

The following screenshot shows the status of Spark jobs included in the AWS Glue job run.
Spark UI Jobs
In the failed Spark job (job ID=7), we can see the failed Spark stage in the Spark UI.
Spark UI Failed stage
There was 167.8GiB shuffle write during the stage, and 14 tasks failed due to the error java.io.IOException: No space left on device because the host 172.34.97.212 ran out of local disk.
Spark UI Tasks

Cloud Shuffle Storage for Apache Spark

Cloud Shuffle Storage for Apache Spark allows you to store Spark shuffle files on Amazon S3 or other cloud storage services. This gives complete elasticity to Spark jobs, thereby allowing you to run your most data intensive workloads reliably. The following figure illustrates how Spark map tasks write the shuffle files to the Cloud Shuffle Storage. Reducer tasks consider the shuffle blocks as remote blocks and read them from the same shuffle storage.

This architecture enables your serverless Spark jobs to use Amazon S3 without the overhead of running, operating, and maintaining additional storage or compute nodes.
Chopper diagram
The following Glue job parameters enable and tune Spark to use S3 buckets for storing shuffle data. You can also enable at-rest encryption when writing shuffle data to Amazon S3 by using security configuration settings.

Key Value Explanation
--write-shuffle-files-to-s3 TRUE This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data.
--conf spark.shuffle.storage.path=s3://<shuffle-bucket> This is optional, and specifies the S3 bucket where the plugin writes the shuffle files. By default, we use –TempDir/shuffle-data.

The shuffle files are written to the location and create files such as following:

s3://<shuffle-storage-path>/<Spark application ID>/[0-9]/<Shuffle ID>/shuffle_<Shuffle ID>_<Mapper ID>_0.data

With the Cloud Shuffle Storage plugin enabled and using the same AWS Glue job setup, the TPC-DS query now succeeded without any job or stage failures.
Spark UI Jobs with Chopper plugin

Software binaries for the Cloud Shuffle Storage Plugin

You can now also download and use the plugin in your own Spark environments and with other cloud storage services. The plugin binaries are available for use under the Apache 2.0 license.

Bundle the plugin with your Spark applications

You can bundle the plugin with your Spark applications by adding it as a dependency in your Maven pom.xml as you develop your Spark applications, as shown in the follwoing code. For more details on the plugin and Spark versions, refer to Plugin versions.

<repositories>
   ...
    <repository>
        <id>aws-glue-etl-artifacts</id>
        <url>https://aws-glue-etl-artifacts.s3.amazonaws.com/release/</url>
    </repository>
</repositories>
...
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>chopper-plugin</artifactId>
    <version>3.1-amzn-LATEST</version>
</dependency>

You can alternatively download the binaries from AWS Glue Maven artifacts directly and include them in your Spark application as follows:

#!/bin/bash
sudo wget -v https://aws-glue-etl-artifacts.s3.amazonaws.com/release/com/amazonaws/chopper-plugin/3.1-amzn-LATEST/chopper-plugin-3.1-amzn-LATEST.jar -P /usr/lib/spark/jars

Submit the Spark application by including the JAR files on the classpath and specifying the two Spark configs for the plugin:

spark-submit --deploy-mode cluster \
--conf spark.shuffle.sort.io.plugin.class=com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin \
--conf spark.shuffle.storage.path=s3://<s3 bucket>/<shuffle-dir> \
 --class <your class> <your application jar> 

The following Spark parameters enable and configure Spark to use an external storage URI such as Amazon S3 for storing shuffle files; the URI protocol determines which storage system to use.

Key Value Explanation
spark.shuffle.storage.path s3://<shuffle-storage-path> It specifies an URI where the shuffle files are stored, which much be a valid Hadoop FileSystem and be configured as needed
spark.shuffle.sort.io.plugin.class com.amazonaws.spark.shuffle.io.cloud.ChopperPlugin The entry class in the plugin

Other cloud storage integration

This plugin comes with out-of-the box support for Amazon S3 and can also be configured to use other forms of cloud storage such as Google Cloud Storage and Microsoft Azure Blob Storage. To enable other Hadoop FileSystem compatible cloud storage services, you can simply add a storage URI for the corresponding service scheme, such as gs:// for Google Cloud Storage instead of s3:// for Amazon S3, add the FileSystem JAR files for the service, and set the appropriate authentication configurations.

For more information about how to integrate the plugin with Google Cloud Storage and Microsoft Azure Blob Storage, refer to Using AWS Glue Cloud Shuffle Plugin for Apache Spark with Other Cloud Storage Services.

Best practices and considerations

Note the following considerations:

  • This feature replaces local shuffle storage with Amazon S3. You can use it to address common failures with price/performance benefits for your serverless analytics jobs and pipelines. We recommend enabling this feature when you want to ensure reliable runs of your data-intensive workloads that create a large amount of shuffle data or when you’re getting No space left on device error. You can also use this plugin if your job encounters fetch failures org.apache.spark.shuffle.MetadataFetchFailedException or if your data is skewed.
  • We recommend setting S3 bucket lifecycle policies on the shuffle bucket (spark.shuffle.storage.s3.path) in order to clean up old shuffle data automatically.
  • The shuffle data on Amazon S3 is encrypted by default. You can also encrypt the data with your own AWS Key Management Service (AWS KMS) keys.

Conclusion

This post introduced the new Cloud Shuffle Storage Plugin for Apache Spark and described its benefits to independently scale storage in your Spark jobs without adding additional workers. With this plugin, you can expect jobs processing terabytes of data to run much more reliably.

The plugin is available in AWS Glue 3.0 and 4.0 Spark jobs in all AWS Glue supported Regions. We’re also releasing the plugin’s software binaries under the Apache 2.0 license. You can use the plugin in AWS Glue or other Spark environments. We look forward to hearing your feedback.


About the Authors

Noritaka Sekiyama s a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts that help customers build data lakes on the cloud.

Rajendra Gujja is a Senior Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about the data.

Chuhan Liu is a Software Development Engineer on the AWS Glue team.

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with data integration and connectivity to a variety of sources, efficiently manage data lakes on Amazon S3, and optimizes Apache Spark for fault-tolerance with ETL workloads.

New — Amazon SageMaker Data Wrangler Supports SaaS Applications as Data Sources

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/new-amazon-sagemaker-data-wrangler-supports-saas-applications-as-data-sources/

Data fuels machine learning. In machine learning, data preparation is the process of transforming raw data into a format that is suitable for further processing and analysis. The common process for data preparation starts with collecting data, then cleaning it, labeling it, and finally validating and visualizing it. Getting the data right with high quality can often be a complex and time-consuming process.

This is why customers who build machine learning (ML) workloads on AWS appreciate the ability of Amazon SageMaker Data Wrangler. With SageMaker Data Wrangler, customers can simplify the process of data preparation and complete the required processes of the data preparation workflow on a single visual interface. Amazon SageMaker Data Wrangler helps to reduce the time it takes to aggregate and prepare data for ML.

However, due to the proliferation of data, customers generally have data spread out into multiple systems, including external software-as-a-service (SaaS) applications like SAP OData for manufacturing data, Salesforce for customer pipeline, and Google Analytics for web application data. To solve business problems using ML, customers have to bring all of these data sources together. They currently have to build their own solution or use third-party solutions to ingest data into Amazon S3 or Amazon Redshift. These solutions can be complex to set up and not cost-effective.

Introducing Amazon SageMaker Data Wrangler Supports SaaS Applications as Data Sources
I’m happy to share that starting today, you can aggregate external SaaS application data for ML in Amazon SageMaker Data Wrangler to prepare data for ML. With this feature, you can use more than 40 SaaS applications as data sources via Amazon AppFlow and have these data available on Amazon SageMaker Data Wrangler. Once the data sources are registered in AWS Glue Data Catalog by AppFlow, you can browse tables and schemas from these data sources using Data Wrangler SQL explorer. This feature provides seamless data integration between SaaS applications and SageMaker Data Wrangler using Amazon AppFlow.

Here is a quick preview of this new feature:

This new feature of Amazon SageMaker Data Wrangler works by using integration with Amazon AppFlow, a fully managed integration service that enables you to securely exchange data between SaaS applications and AWS services. With Amazon AppFlow, you can establish bidirectional data integration between SaaS applications, such as Salesforce, SAP, and Amplitude and all supported services, into your Amazon S3 or Amazon Redshift.

Then, with Amazon AppFlow, you can catalog the data in AWS Glue Data Catalog. This is a new feature where with Amazon AppFlow, you can create an integration with AWS Glue Data Catalog for Amazon S3 destination connector. With this new integration, customers can catalog SaaS data applications into AWS Glue Data Catalog with a few clicks, directly from the Amazon AppFlow Flow configuration, without the need to run any crawlers.

Once you’ve established a flow and inserted it into the AWS Glue Data Catalog, you can use this data inside the Amazon SageMaker Data Wrangler. Then, you can do the data preparation as you usually do. You can write Amazon Athena queries to preview data, join data from multiple sources, or import data to prepare for ML model training.

With this feature, you need to do a few simple steps to perform seamless data integration between SaaS applications into Amazon SageMaker Data Wrangler via Amazon AppFlow. This integration supports more than 40 SaaS applications, and for a complete list of supported applications, please check the Supported source and destination applications documentation.

Get Started with Amazon SageMaker Data Wrangler Support for Amazon AppFlow
Let’s see how this feature works in detail. In my scenario, I need to get data from Salesforce, and do the data preparation using Amazon SageMaker Data Wrangler.

To start using this feature, the first thing I need to do is to create a flow in Amazon AppFlow that registers the data source into the AWS Glue Data Catalog. I already have an existing connection with my Salesforce account, and all I need now is to create a flow.

One important thing to note is that to make SaaS application data available in Amazon SageMaker Data Wrangler, I need to create a flow with Amazon S3 as the destination. Then, I need to enable Create a Data Catalog table in the AWS Glue Data Catalog settings. This option will automatically catalog my Salesforce data into AWS Glue Data Catalog.

On this page, I need to select a user role with the required AWS Glue Data Catalog permissions and define the database name and the table name prefix. In addition, in this section, I can define the data format preference, be it in JSON, CSV, or Apache Parquet formats, and filename preference if I want to add a timestamp into the file name section.

To learn more about how to register SaaS data in Amazon AppFlow and AWS Glue Data Catalog, you can read Cataloging the data output from an Amazon AppFlow flow documentation page.

Once I’ve finished registering SaaS data, I need to make sure the IAM role can view the data sources in Data Wrangler from AppFlow. Here is an example of a policy in the IAM role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "glue:SearchTables",
            "Resource": [
                "arn:aws:glue:*:*:table/*/*",
                "arn:aws:glue:*:*:database/*",
                "arn:aws:glue:*:*:catalog"
            ]
        }
    ]
} 

By enabling data cataloging with AWS Glue Data Catalog, from this point on, Amazon SageMaker Data Wrangler will be able to automatically discover this new data source and I can browse tables and schema using the Data Wrangler SQL Explorer.

Now it’s time to switch to the Amazon SageMaker Data Wrangler dashboard then select Connect to data sources.

On the following page, I need to Create connection and select the data source I want to import. In this section, I can see all the available connections for me to use. Here I see the Salesforce connection is already available for me to use.

If I would like to add additional data sources, I can see a list of external SaaS applications that I can integrate into the Set up new data sources section. To learn how to recognize external SaaS applications as data sources, I can learn more with the select How to enable access.

Now I will import datasets and select the Salesforce connection.

On the next page, I can define connection settings and import data from Salesforce. When I’m done with this configuration, I select Connect.

On the following page, I see my Salesforce data that I already configured with Amazon AppFlow and AWS Glue Data Catalog called appflowdatasourcedb. I can also see a table preview and schema for me to review if this is the data I need.

Then, I start building my dataset using this data by performing SQL queries inside the SageMaker Data Wrangler SQL Explorer. Then, I select Import query.

Then, I define a name for my dataset.

At this point, I can start doing the data preparation process. I can navigate to the Analysis tab to run the data insight report. The analysis will provide me with a report on the data quality issues and what transform I need to use next to fix the issues based on the ML problem I want to predict. To learn more about how to use the data analysis feature, see Accelerate data preparation with data quality and insights in the Amazon SageMaker Data Wrangler blog post.

In my case, there are several columns I don’t need, and I need to drop these columns. I select Add step.

One feature I like is that Amazon SageMaker Data Wrangler provides numerous ML data transforms. It helps me to streamline the process of cleaning, transforming and feature engineering my data in one dashboard. For more about what SageMaker Data Wrangler provides for transformation data, please read this Transform Data documentation page.

In this list, I select Manage columns.

Then, in the Transform section, I select the Drop column option. Then, I select a few columns that I don’t need.

Once I’m done, the columns I don’t need are removed and the Drop column data preparation step I just created is listed in the Add step section.

I can also see the visual of my data flow inside the Amazon SageMaker Data Wrangler. In this example, my data flow is quite basic. But when my data preparation process becomes complex, this visual view makes it easy for me to see all the data preparation steps.

From this point on, I can do what I require with my Salesforce data. For example, I can export data directly to Amazon S3 by selecting Export to and choosing Amazon S3 from the Add destination menu. In my case, I specify Data Wrangler to store the data in Amazon S3 after it has processed it by selecting Add destination and then Amazon S3.

Amazon SageMaker Data Wrangler provides me flexibility to automate the same data preparation flow using scheduled jobs. I can also automate feature engineering with SageMaker Pipelines (via Jupyter Notebook) and SageMaker Feature Store (via Jupyter Notebook), and deploy to Inference end point with SageMaker Inference Pipeline (via Jupyter Notebook).

Things to Know
Related news – This feature will make it easy for you to do data aggregation and preparation with Amazon SageMaker Data Wrangler. As this feature is an integration with Amazon AppFlow and also AWS Glue Data Catalog, you might want to learn more on Amazon AppFlow now supports AWS Glue Data Catalog integration and provides enhanced data preparation page.

Availability – Amazon SageMaker Data Wrangler supports SaaS applications as data sources available in all the Regions currently supported by Amazon AppFlow.

Pricing – There is no additional cost to use SaaS applications supports in Amazon SageMaker Data Wrangler, but there is a cost to running Amazon AppFlow to get the data in Amazon SageMaker Data Wrangler.

Visit Import Data From Software as a Service (SaaS) Platforms documentation page to learn more about this feature, and follow the getting started guide to start data aggregating and preparing SaaS applications data with Amazon SageMaker Data Wrangler.

Happy building!
Donnie

Join the Preview – AWS Glue Data Quality

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/join-the-preview-aws-glue-data-quality/

Back in 1980, at my second professional programming job, I was working on a project that analyzed driver’s license data from a bunch of US states. At that time data of that type was generally stored in fixed-length records, with values carefully (or not) encoded into each field. Although we were given schemas for the data, we would invariably find that the developers had to resort to tricks in order to represent values that were not anticipated up front. For example, coding for someone with heterochromia, eyes of different colors. We ended up doing a full scan of the data ahead of our actual time-consuming and expensive analytics run in order to make sure that we were dealing with known data. This was my introduction to data quality, or the lack thereof.

AWS makes it easier for you to build data lakes and data warehouses at any scale. We want to make it easier than ever before for you to measure and maintain the desired quality level of the data that you ingest, process, and share.

Introducing AWS Glue Data Quality
Today I would like to tell you about AWS Glue Data Quality, a new set of features for AWS Glue that we are launching in preview form. It can analyze your tables and recommend a set of rules automatically based on what it finds. You can fine-tune those rules if necessary and you can also write your own rules. In this blog post I will show you a few highlights, and will save the details for a full post when these features progress from preview to generally available.

Each data quality rule references a Glue table or selected columns in a Glue table and checks for specific types of properties: timeliness, accuracy, integrity, and so forth. For example, a rule can indicate that a table must have the expected number of columns, that the column names match a desired pattern, and that a specific column is usable as a primary key.

Getting Started
I can open the new Data quality tab on one of my Glue tables to get started. From there I can create a ruleset manually, or I can click Recommend ruleset to get started:

Then I enter a name for my Ruleset (RS1), choose an IAM Role that has permission to access it, and click Recommend ruleset:

My click initiates a Glue Recommendation task (a specialized type of Glue job) that scans the data and makes recommendations. Once the task has run to completion I can examine the recommendations:

I click Evaluate ruleset to check on the quality of my data.

The data quality task runs and I can examine the results:

In addition to creating Rulesets that are attached to tables, I can use them as part of a Glue job. I create my job as usual and then add an Evaluate Data Quality node:

Then I use the Data Quality Definition Language (DDQL) builder to create my rules. I can choose between 20 different rule types:

For this blog post, I made these rules more strict than necessary so that I could show you what happens when the data quality evaluation fails.

I can set the job options, and choose the original data or the data quality results as the output of the transform. I can also write the data quality results to an S3 bucket:

After I have created my Ruleset, I set any other desired options for the job, save it, and then run it. After the job completes I can find the results in the Data quality tab. Because I made some overly strict rules, the evaluation correctly flagged my data with a 0% score:

There’s a lot more, but I will save that for the next blog post!

Things to Know
Preview Regions – This is an open preview and you can access it today the US East (Ohio, N. Virginia), US West (Oregon), Asia Pacific (Tokyo), and Europe (Ireland) AWS Regions.

Pricing – Evaluating data quality consumes Glue Data Processing Units (DPU) in the same manner and at the same per-DPU pricing as any other Glue job.

Jeff;

New – Amazon Redshift Integration with Apache Spark

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/new-amazon-redshift-integration-with-apache-spark/

Apache Spark is an open-source, distributed processing system commonly used for big data workloads. Spark application developers working in Amazon EMR, Amazon SageMaker, and AWS Glue often use third-party Apache Spark connectors that allow them to read and write the data with Amazon Redshift. These third-party connectors are not regularly maintained, supported, or tested with various versions of Spark for production.

Today we are announcing the general availability of Amazon Redshift integration for Apache Spark, which makes it easy to build and run Spark applications on Amazon Redshift and Redshift Serverless, enabling customers to open up the data warehouse for a broader set of AWS analytics and machine learning (ML) solutions.

With Amazon Redshift integration for Apache Spark, you can get started in seconds and effortlessly build Apache Spark applications in a variety of languages, such as Java, Scala, and Python.

Your applications can read from and write to your Amazon Redshift data warehouse without compromising on the performance of the applications or transactional consistency of the data, as well as performance improvements with pushdown optimizations.

Amazon Redshift integration for Apache Spark builds on an existing open source connector project and enhances it for performance and security, helping customers gain up to 10x faster application performance. We thank the original contributors on the project who collaborated with us to make this happen. As we make further enhancements we will continue to contribute back into the open source project.

Getting Started with Spark Connector for Amazon Redshift
To get started, you can go to AWS analytics and ML services, use data frame or Spark SQL code in a Spark job or Notebook to connect to the Amazon Redshift data warehouse, and start running queries in seconds.

In this launch, Amazon EMR 6.9, EMR Serverless, and AWS Glue 4.0 come with the pre-packaged connector and JDBC driver, and you can just start writing code. EMR 6.9 provides a sample notebook, and EMR Serverless provides a sample Spark Job too.

First, you should set AWS Identity and Access Management (AWS IAM) authentication between Redshift and Spark, between Amazon Simple Storage Service (Amazon S3) and Spark, and between Redshift and Amazon S3. The following diagram describes the authentication between Amazon S3, Redshift, the Spark driver, and Spark executors.

For more information, see Identity and access management in Amazon Redshift in the AWS documentation.

Amazon EMR
If you already have an Amazon Redshift data warehouse and the data available, you can create the database user and provide the right level of grants to the database user. To use this with Amazon EMR, you need to upgrade to the latest version of the Amazon EMR 6.9 that has the packaged spark-redshift connector. Select the emr-6.9.0 release when you create an EMR cluster on Amazon EC2.

You can use EMR Serverless to create your Spark application using the emr-6.9.0 release to run your workload.

EMR Studio also provides an example Jupyter Notebook configured to connect to an Amazon Redshift Serverless endpoint leveraging sample data that you can use to get started quickly.

Here is a Scalar example to build your applications both with Spark Dataframe and Spark SQL. Use IAM-based credentials for connecting to Redshift and use IAM role for unloading and loading data from S3.

// Create the JDBC connection URL and define the Redshift context
val jdbcURL = "jdbc:redshift:iam://<RedshiftEndpoint>:<Port>/<Database>?DbUser=<RsUser>"
val rsOptions = Map (
  "url" -> jdbcURL,
  "tempdir" -> tempS3Dir, 
  "aws_iam_role" -> roleARN,
  )
// Reference the sales table from Redshift 
val sales_df = spark
  .read 
  .format("io.github.spark_redshift_community.spark.redshift") 
  .options(rsOptions) 
  .option("dbtable", "sales") 
  .load() 
sales_df.createOrReplaceTempView("sales") 
// Reference the date table from Redshift using Data Frame 
sales_df.join(date_df, sales_df("dateid") === date_df("dateid"))
  .where(col("caldate") === "2008-01-05")
  .groupBy().sum("qtysold")
  .select(col("sum(qtysold)"))
  .show() 

If Amazon Redshift and Amazon EMR are in different VPCs, you have to configure VPC peering or enable cross-VPC access. Assuming both Amazon Redshift and Amazon EMR are in the same virtual private cloud (VPC), you can create a Spark job or Notebook and connect to the Amazon Redshift data warehouse and write Spark code to use the Amazon Redshift connector.

To learn more, see Use Spark on Amazon Redshift with a connector in the AWS documentation.

AWS Glue
When you use AWS Glue 4.0, the spark-redshift connector is available both as a source and target. In Glue Studio, you can use a visual ETL job to read or write to a Redshift data warehouse simply by selecting a Redshift connection to use within a built-in Redshift source or target node.

The Redshift connection contains Redshift connection details along with the credentials needed to access Redshift with the proper permissions.

To get started, choose Jobs in the left menu of the Glue Studio console. Using either of the Visual modes, you can easily add and edit a source or target node and define a range of transformations on the data without writing any code.

Choose Create and you can easily add and edit a source, target node, and the transform node in the job diagram. At this time, you will choose Amazon Redshift as Source and Target.

Once completed, the Glue job can be executed on Glue for the Apache Spark engine, which will automatically use the latest spark-redshift connector.

The following Python script shows an example job to read and write to Redshift with dynamicframe using the spark-redshift connector.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("================ DynamicFrame Read ===============")
url = "jdbc:redshift://<RedshiftEndpoint>:<Port>/dev"
read_options = {
    "url": url,
    "dbtable": dbtable,
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "include_column_list": "false"
}

redshift_read = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options=read_options
) 

print("================ DynamicFrame Write ===============")

write_options = {
    "url": url,
    "dbtable": dbtable,
    "user": "awsuser",
    "password": "Password1",
    "redshiftTmpDir": redshiftTmpDir,
    "tempdir": redshiftTmpDir,
    "aws_iam_role": aws_iam_role,
    "autopushdown": "true",
    "DbUser": "awsuser"
}

print("================ dyf write result: check redshift table ===============")
redshift_write = glueContext.write_dynamic_frame.from_options(
    frame=redshift_read,
    connection_type="redshift",
    connection_options=write_options
)

When you set up your job detail, you can only use the Glue 4.0 – Supports spark 3.3 Python 3 version for this integration.

To learn more, see Creating ETL jobs with AWS Glue Studio and Using connectors and connections with AWS Glue Studio in the AWS documentation.

Gaining the Best Performance
In the Amazon Redshift integration for Apache Spark, the Spark connector automatically applies predicate and query pushdown to optimize for performance. You can gain performance improvement by using the default Parquet format for the connector used for unloading with this integration.

As the following sample code shows, the Spark connector will turn the supported function into a SQL query and run the query in Amazon Redshift.

import sqlContext.implicits._val
sample= sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url",jdbcURL )
.option("tempdir", tempS3Dir)
.option("unload_s3_format", "PARQUET")
.option("dbtable", "event")
.load()

// Create temporary views for data frames created earlier so they can be accessed via Spark SQL
sales_df.createOrReplaceTempView("sales")
date_df.createOrReplaceTempView("date")
// Show the total sales on a given date using Spark SQL API
spark.sql(
"""SELECT sum(qtysold)
| FROM sales, date
| WHERE sales.dateid = date.dateid
| AND caldate = '2008-01-05'""".stripMargin).show()

Amazon Redshift integration for Apache Spark adds pushdown capabilities for operations such as sort, aggregate, limit, join, and scalar functions so that only the relevant data is moved from the Redshift data warehouse to the consuming Spark application, thereby improving performance.

Available Now
The Amazon Redshift integration for Apache Spark is now available in all Regions that support Amazon EMR 6.9, AWS Glue 4.0, and Amazon Redshift. You can start using the feature directly from EMR 6.9 and Glue Studio 4.0 with the new Spark 3.3.0 version.

Give it a try, and please send us feedback either in the AWS re:Post for Amazon Redshift or through your usual AWS support contacts.

Channy

Scale AWS SDK for pandas workloads with AWS Glue for Ray

Post Syndicated from Abdel Jaidi original https://aws.amazon.com/blogs/big-data/scale-aws-sdk-for-pandas-workloads-with-aws-glue-for-ray/

AWS SDK for pandas is an open-source library that extends the popular Python pandas library, enabling you to connect to AWS data and analytics services using pandas data frames. We’ve seen customers use the library in combination with pandas for both data engineering and AI workloads. Although pandas data frames are simple to use, they have a limitation on the size of data that can be processed. Because pandas is single-threaded, jobs are bounded by the available resources. If the data you need to process is small, this won’t be a problem, and pandas makes analysis and manipulation simple, as well as interactions with many other tools that support machine learning (ML) and visualization. However, as your data size scales, you may run into problems. This can be especially frustrating if you’ve created a promising prototype that can’t be moved to production. In our work with customers, we’ve seen many projects, both in data science and data engineering, that are stuck while they wait for someone to rewrite using a big data framework such as Apache Spark.

We are excited to announce that AWS SDK for pandas now supports Ray and Modin, enabling you to scale your pandas workflows from a single machine to a multi-node environment, with no code changes. The simplest way to do this is to use AWS Glue with Ray, the new serverless option to run distributed Python code announced at AWS re:Invent 2022. AWS SDK for pandas also supports self-managed Ray on Amazon Elastic Compute Cloud (Amazon EC2).

In this post, we show you how you can use pandas to connect to AWS data and analytics services and manipulate data at scale by running on an AWS Glue with Ray job.

Overview of solution

Ray is a unified framework that enables you to scale AI and Python applications. The goal of the project is to take any Python code that’s written on a laptop and scale the workload on a cluster. This innovative framework opens the door to big data processing to a new audience. Previously, the only way to process large datasets on a cluster was to use tools such as Apache Hadoop, Apache Spark, or Apache Flink. These frameworks require additional skills because they provide their own programming model and often require languages such as Scala or Java to fully take advantage of the advanced capabilities. With Ray, you can just use Python to parallelize your code with few modifications.

Although Ray opens the door to big data processing, it’s not enough on its own to distribute pandas-specific methods. That task falls to Modin, a drop-in replacement of pandas, optimized to run in a distributed environment, such as Ray. Modin has the same API as pandas, so you can keep your code the same, but it parallelizes workloads to improve performance.

With today’s announcement, AWS SDK for pandas customers can use both Ray and Modin for their workloads. You have the option of loading data into Modin data frames, instead of regular pandas data frames. By configuring the library to use Ray and Modin, your existing data processing scripts can distribute operations end-to-end, with no code changes. AWS SDK for pandas takes care of parallelizing the read and write operations for your files across the compute cluster.

To use this feature, you can install the release candidate version of awswrangler with the ray and modin extras:

pip install "awswrangler[modin,ray]==3.0.0rc2"

Once installed, you can use the library in your code by importing it with the following statement:

import awswrangler as wr

When you run this code, the SDK for pandas looks for an environmental variable called WR_ADDRESS. If it finds it, it uses this value to send the commands to a remote cluster. If it doesn’t find it, it starts a local Ray runtime on your machine.

The following diagram shows what is happening when you run code that uses AWS SDK for pandas to read data from Amazon Simple Storage Service (Amazon S3) into a Modin data frame, perform a filtering operation, and write the data back to Amazon S3, using a multi-node cluster.

In the first phase, each node reads one or more input files and stores them in memory as blocks. During this phase, the head node builds a mapping reference that tracks the location of each block on the worker nodes. In the second phase, a filter operation is submitted to each node, creating a subset of the data. Finally, each worker node writes its blocks to Amazon S3.

It’s important to note that certain data frame operations (for example groupby or join) may result in the data being shuffled across nodes. Shuffling will also happen if you do partitioned or bucketed writes. This tends to slow down the job because data needs to move between nodes.

If you want to create your own Ray cluster on Amazon EC2, refer to the tutorial Distributing Calls on Ray Remote Cluster. The rest of this post shows you how to run AWS SDK for pandas and Modin on an AWS Glue with Ray job.

Use AWS Glue with Ray

Because AWS Glue with Ray is a fully managed environment, it’s a simple way to run jobs. Both AWS SDK for pandas and Modin are pre-loaded, you don’t need to worry about cluster management or installing the right set of dependencies, and the job auto scales with your workload. To get started, complete the following steps:

  1. Choose Launch Stack to provision an AWS CloudFormation stack in your AWS account:
    launch cloudformation stack
    Note that while in preview, AWS Glue with Ray is available in a limited set of AWS Regions.The stack takes about 3 minutes to complete. You can verify that everything was successfully deployed by checking that the CloudFormation stack shows the status CREATE_COMPLETE.
  2. Navigate to AWS Glue Studio to find an AWS Glue job named GlueRayJob with the following script.
  3. Choose Run to start the job and navigate to the Runs tab to monitor progress.

Here, we break down the script and show you what happens at each stage when we run this code on AWS Glue with Ray. First, we import the library:

import awswrangler as wr

At import, AWS SDK for pandas detects if the runtime supports Ray, and automatically initializes a Ray cluster with the default parameters. In this case, because we’re running on AWS Glue with Ray, AWS SDK for pandas automatically uses the Ray cluster with no extra configuration needed. Advanced users can override this process, however, by starting the Ray runtime before the import command.

Next, we read Amazon product data in Parquet format from Amazon S3 and load it into a distributed Modin data frame:

# Read Parquet data (1.2 Gb Parquet compressed)
df = wr.s3.read_parquet(
    path=f"s3://amazon-reviews-pds/parquet/product_category={category.title()}/",
)

Simple data transformations on the data frame are applied next. Modin data frames implement the same interface as pandas data frames, allowing you to perform familiar pandas operations at scale. First, we drop the customer_id column, then we filter for a subset of the reviews that received five-star ratings:

# Drop the customer_id column
df.drop("customer_id", axis=1, inplace=True)

# Filter reviews with 5-star rating
df5 = df[df["star_rating"] == 5]

The data is written back to Amazon S3 in Parquet format, partitioned by year and marketplace. The dataset=True argument ensures that an associated Hive table is also created in the AWS Glue metadata catalog:

# Write partitioned five-star reviews to S3 in Parquet format
wr.s3.to_parquet(
    df5,
    path=f"s3://{bucket_name}/{category}/",
    partition_cols=["year", "marketplace"],
    dataset=True, 
    database=glue_database,
    table=glue_table, 
)

Finally, a query is run in Amazon Athena, and the S3 objects resulting from this operation are read in parallel into a Modin data frame:

# Read the data back to a Modin df via Athena
df5_athena = wr.athena.read_sql_query(
    f"SELECT * FROM {glue_table}",
    database=glue_database,
    ctas_approach=False, 
    unload_approach=True, 
    workgroup=workgroup_name,
    s3_output=f"s3://{bucket_name}/unload/{category}/",
)

The Amazon CloudWatch logs of the job provide insights into the performance achieved from reading blocks in parallel in a multi-node Ray cluster.

For simplicity, this example showcased Amazon S3 and Athena APIs only, but AWS SDK for pandas supports other services, including Amazon Timestream and Amazon Redshift. For a full list of the APIs that support distribution, refer to Supported APIs.

Clean up AWS resources

To prevent unwanted charges to your AWS account, you can delete the AWS resources that you used for this example:

  1. On the Amazon S3 console, empty data from both buckets with prefix glue-ray-.
  2. On the AWS CloudFormation console, delete the SDKPandasOnGlueRay stack.

The resources created as part of the stack are automatically deleted with it.

Conclusion

In this post, we demonstrated how you can run your workloads at scale using AWS SDK for pandas. When used in combination with AWS Glue with Ray, this gives you access to a fully managed environment to distribute your Python scripts. We hope this solution can help with migrating your existing pandas jobs to achieve higher performance and speedups across multiple data stores on AWS.

For more examples, check out the tutorials in the AWS SDK for pandas documentation.


About the Authors

Abdel Jaidi is a Senior Cloud Engineer for AWS Professional Services. He works on open-source projects focused on AWS Data & Analytics services. In his spare time, he enjoys playing tennis and hiking.

Anton Kukushkin is a Data Engineer for AWS Professional Services based in London, United Kingdom. He works with AWS customers, helping them build and scale their data and analytics.

Leon Luttenberger is a Data Engineer for AWS Professional Services based in Austin, Texas. He works on AWS open-source solutions that help our customers analyze their data at scale.

Lucas Hanson is Senior Cloud Engineer for AWS Professional Services. He focuses on helping customers with infrastructure management and DevOps processes for data management solutions. Outside of work, he enjoys music production and practicing yoga.

New AWS Glue 4.0 – New and Updated Engines, More Data Formats, and More

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/new-aws-glue-4-0-new-and-updated-engines-more-data-formats-and-more/

AWS Glue is a scalable, serverless tool that helps you to accelerate the development and execution of your data integration and ETL workloads. Today we are launching Glue 4.0, with updated engines, support for additional data formats, Ray support, and a lot more.

Before I dive in, just a word about versioning. Unlike most AWS services, where the service team owns and has full control over the APIs, Glue includes a collection of libraries, engines, and tools developed by the open source community. Some of these components do not maintain strict backward compatibility, often in pursuit of efficiency. In order to make sure that changes to the components do not impact your Glue jobs, you must select a particular Glue version when you create the job.

Each version of Glue includes performance and reliability benefits in addition to the added features, and you should plan to upgrade your jobs over time to take advantage of all that Glue has to offer.

Dive in to Glue
Let’s take a look at what’s new in Glue 4.0:

Updated Engines – This version of Glue includes Python 3.10 and Apache Spark 3.3.0. Both engines include bug fixes and performance enhancements; Spark includes new features such as row-level runtime filtering, improved error messages, additional built-in functions, and much more. Glue and Amazon EMR make use of the same optimized Spark runtime, which has been optimized to run in the AWS cloud and can be 2-3 times faster than the basic open source version.

New Engine Plugins – Glue 4.0 adds native support for the Cloud Shuffle Service Plugin for Spark to help you scale your disk usage, and Adaptive Query Execution to dynamically optimize your queries as they run.

Pandas Support Pandas is an open source data analysis and manipulation tool that is built on top of Python. It is easy to learn and includes all kinds of interesting and useful data manipulation functions.

New Data Formats – Whether you are building a data lake or a data warehouse, Glue 4.0 now handles new open source data formats for sources and targets, with support for Apache Hudi, Apache Iceberg, and Delta Lake. To learn more about these new options and formats, read Get Started with Apache Hudi using AWS Glue by Implementing Key Design Concepts.

Everything Else – In addition to the above items, Glue 4.0 also includes the Parquet vectorized reader, with support for additional data types and encodings. It has been upgraded to use log4j 2 and is no longer dependent on log4j 1.

Available Now
Glue 4.0 is available today in the US East (Ohio, N. Virginia), US West (N. California, Oregon), Africa (Cape Town), Asia Pacific (Hong Kong, Jakarta, Mumbai, Osaka, Seoul, Singapore, Sydney, Tokyo), Canada (Central), Europe (Frankfurt, Ireland, London, Milan, Paris, Stockholm), Middle East (Bahrain), and South America (Sao Paulo) AWS Regions.

Jeff;

Introducing AWS Glue for Ray: Scaling your data integration workloads using Python

Post Syndicated from Zach Mitchell original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-for-ray-scaling-your-data-integration-workloads-using-python/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. Today, AWS Glue processes customer jobs using either Apache Spark’s distributed processing engine for large workloads or Python’s single-node processing engine for smaller workloads. Customers like Python for its ease of use and rich collection of built-in data-processing libraries but might find it difficult for customers to scale Python beyond a single compute node. This limitation makes it difficult for customers to process large datasets. Customers want a solution that allows them to continue using familiar Python tools and AWS Glue jobs on data sets of all sizes, even those that can’t fit on a single instance.

We are happy to announce the release of a new AWS Glue job type: Ray. Ray is an open-source unified compute framework that makes it simple to scale AI and Python workloads. Ray started as an open-source project at RISELab in UC Berkeley. If your application is written in Python, you can scale it with Ray in a distributed cluster in a multi-node environment.  Ray is Python native and you can combine it with the AWS SDK for pandas to prepare, integrate and transform your data for running your data analytics and ML workloads in combination.

This post provides an introduction to AWS Glue for Ray and shows you how to start using Ray to distribute your Python workloads.

What is AWS Glue for Ray?

Customers like the serverless experience and fast start time offered by AWS Glue. With the introduction of Ray, we have ensured that you get the same experience. We have also ensured that you can use the AWS Glue job and AWS Glue interactive session primitives to access the Ray engine. AWS Glue jobs are fire-and-forget systems where customer submit their Ray code to the AWS Glue jobs API and AWS Glue automatically provisions the required compute resources and runs the job. AWS Glue interactive session APIs allow interactive exploration of the data for the purpose of job development. Regardless of the option used, you are only billed for the duration of the compute used. With AWS Glue for Ray, we are also introducing a new Graviton2 based worker (Z.2x) which offers 8 virtual CPUs and 64 GB of RAM.

AWS Glue for Ray consists of two major components:

  1. Ray Core – The distributed computing framework
  2. Ray Dataset – The distributed data framework based on Apache Arrow

When running a Ray job, AWS Glue provisions the Ray cluster for you and runs these distributed Python jobs on a serverless auto-scaling infrastructure.  The cluster in AWS Glue for Ray will consists of exactly one head node and one or more worker nodes.  

The head node is identical to the other worker nodes with the exception that it runs singleton processes for cluster management and the Ray driver process.  The driver is a special worker process in the head node that runs the top-level application in Python that starts the Ray job.  The worker node has processes that are responsible for submitting and running tasks.

The following figure provides a simple introduction to the Ray architecture.  The architecture illustrates how Ray is able to schedule jobs through processes called Raylets.  The Raylet manages the shared resources on each node and is shared between the concurrently running jobs.  For more information on how Ray works, see Ray.io.

The following figure shows the components of the worker node and the shared-memory object store:

There is a Global Control Store in the head node that can treat each separate machine as nodes, similar to how Apache Spark treats workers as nodes.  The following figure shows the components of the head node and the Global Control Store managing the cluster-level metadata.

AWS Glue for Ray comes included with Ray Core, Ray DatasetModin (distributed pandas) and the AWS SDK for pandas (on Modin) for seamless distributed integration into other AWS services.  Ray Core is the foundation of Ray and the basic framework for distributing Python functions and classes. Ray Dataset is a distributed data framework based on Apache Arrow and is most closely analogous to a dataframe in Apache Spark. Modin is a library designed to distribute pandas applications across a Ray cluster without any modification and is compatible with data in Ray Datasets. The included AWS SDK for pandas (formerly AWS Data Wrangler) is an abstraction layer on top of Modin to allow for the creation of pandas dataframes from (and writing to) many AWS sources such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon DynamoDB, Amazon OpenSearch Service, and others.

You can also install your own ARM compatible Python libraries via pip, either through Ray’s environmental configuration in @ray.remote or via --additional-python-modules.

To learn more about Ray, please visit the GitHub repo.

Why use AWS Glue for Ray?

Many of us start our data journey on AWS with Python, looking to prepare data for ML and data science, and move data at scale with AWS APIs and Boto3. Ray allows you to bring those familiar skills, paradigms, frameworks and libraries to AWS Glue and make them scale to handle massive datasets with minimal code changes. You can use the same data processing tools you currently have (such as Python libraries for data cleansing, computation, and ML) on datasets of all sizes. AWS Glue for Ray enables the distributed run of your Python scripts over multi-node clusters.

AWS Glue for Ray is designed for the following:

  • Task parallel applications (for example, when you want to apply multiple transforms in parallel)
  • Speeding up your Python workload as well as using Python native libraries.
  • Running the same workload across hundreds of data sources.
  • ML ingestion and parallel batch inference on data

Solution overview

For this post, you will use the Parquet Amazon Customer Reviews Dataset stored in the public S3 bucket. The objective is to perform transformations using the Ray dataset and then write it back to Amazon S3 in the Parquet file format.

Configure Amazon S3

The first step is to create an Amazon S3 bucket to store the transformed Parquet dataset as the end result.

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter a name for your Amazon S3 bucket.
  4. Choose Create.

Set up a Jupyter notebook with an AWS Glue interactive session

For our development environment, we use a Jupyter notebook to run the code.

You’re required to install the AWS Glue interactive sessions locally or run interactive sessions with an AWS Glue Studio notebook. Using AWS Glue Interactive sessions will help you follow and run the series of demonstration steps.

Refer to Getting started with AWS Glue interactive sessions for instructions to spin up a notebook on an AWS Glue interactive session.

Run your code using Ray in a Jupyter notebook

This section walks you through several notebook paragraphs on how to use AWS Glue for Ray. In this exercise, we look at the customer reviews from the Amazon Customer Review Parquet dataset, perform some Ray transformations, and write the results to Amazon S3 in a Parquet format.

  1. On Jupyter console, under New, choose Glue Python.
  2. Signify you want to use Ray as the engine by using the %glue_ray magic.
  3. Import the Ray library along with additional Python libraries:
    %glue_ray
    
    import ray
    import pandas
    import pyarrow
    from ray import data
    import time
    from ray.data import ActorPoolStrategy

  4. Initialize a Ray Cluster with AWS Glue.
    ray.init('auto')

  5. Next, we read a single partition from the dataset, which is Parquet file format:
    start = time.time()
    ds = ray.data.read_parquet("s3://amazon-reviews-pds/parquet/product_category=Wireless/")
    end = time.time()
    print(f"Reading the data to dataframe: {end - start} seconds")

  6. Parquet files store the number of rows per file in the metadata, so we can get the total number of records in ds without performing a full data read:
    ds.count()

  7. Next , we can check the schema of this dataset. We don’t have to read the actual data to get the schema; we can read it from the metadata:
    ds.schema()

  8. We can check the total size in bytes for the full Ray dataset:
    #calculate the size in bytes of the full dataset,  Note that for Parquet files, this size-in-bytes will be pulled from the Parquet
    #  metadata (not triggering a data read).
    ds.size_bytes()

  9. We can see a sample record from the Ray dataset:
    #Show sample records from the underlying Parquet dataset  
    start = time.time()
    ds.show(1)
    end = time.time()
    print(f"Time taken to show the data from dataframe : {end - start} seconds")

Applying dataset transformations with Ray

There are primarily two types of transformations that can be applied to Ray datasets:

  • One-to-One transformations – Each input block will contributes to only one output block, such as add_column(), map_batches() and drop_column() , and so on.
  • All-to-All transformations – Input blocks can contribute to multiple output blocks such as sort() and groupby(), and so on.

In the next series of steps we will apply some of these transformations on our resultant Ray datasets from the previous section.

  1. We can add a new column and check the schema to verify the newly added column, followed by retrieving a sample record. This transformation is only available for the datasets that can be converted to pandas format.
    # Add the given new column to the dataset and show the sample record after adding a new column
    
    start = time.time()
    ds = ds.add_column( "helpful_votes_ratio", lambda df: df["helpful_votes"] / df["total_votes"])
    end = time.time()
    print(f"Time taken to Add a new columns : {end - start} seconds")
    ds.show(1)

  2. Let’s drop a few columns we don’t need using a drop_columns transformation and then check the schema to verify if those columns are dropped from the Ray dataset:
    # Dropping few columns from the underlying Dataset 
    start = time.time()
    ds = ds.drop_columns(["review_body", "vine", "product_parent", "verified_purchase", "review_headline"])
    end = time.time()
    print(f"Time taken to drop a few columns : {end - start} seconds")
    ds.schema()


    Ray datasets have built-in transformations such as sorting the dataset by the specified key column or key function.

  3. Next, we apply the sort transformation using one of the columns present in the dataset (total_votes):
    #Sort the dataset by total votes
    start = time.time()
    ds =ds.sort("total_votes")
    end = time.time()
    print(f"Time taken for sort operation  : {end - start} seconds")
    ds.show(3)

  4. Next, we will create a Python UDF function that allows you to write customized business logic in transformations. In our UDF we have written a logic to find out the products that are rated low (i.e. total votes less than 100).We create a UDF as a function on pandas DataFrame batches. For the supported input batch formats, see the UDF Input Batch Format. We also demonstrate using map_batches() which applies the given function to the batches of records of this dataset. Map_batches() uses the default compute strategy (tasks), which helps distribute the data processing to multiple Ray workers, which are used to run tasks. For more information on a map_batches() transformation, please see the following documentation.
    # UDF as a function on pandas DataFrame - To Find products with total_votes < 100 
    def low_rated_products(df: pandas.DataFrame) -> pandas.DataFrame:
        return df[(df["total_votes"] < 100)]
        
    #Calculate the number of products which are rated low in terms of low votes i.e. less than 100
    # This technique is called Batch inference processing with Ray tasks (the default compute strategy).
    ds = ds.map_batches(low_rated_products)
    
    #See sample records for the products which are rated low in terms of low votes i.e. less than 100
    ds.show(1)

    #Count total number of products which are rated low 
    ds.count()

  5. If you have complex transformations that require more resources for data processing, we recommend utilizing Ray actors using additional configurations with applicable transformations. We have demonstrated with map_batches() below:
    # Batch inference processing with Ray actors. Autoscale the actors between 2 and 4.
    
    class LowRatedProducts:
        def __init__(self):
            self._model = low_rated_products
    
        def __call__(self, batch: pandas.DataFrame) -> pandas.DataFrame:
            return self._model(batch)
    
    start = time.time()
    predicted = ds.map_batches(
        LowRatedProducts, compute=ActorPoolStrategy(2, 4), batch_size=4)
    end = time.time()
    

  6. Next, before writing the final resultant Ray dataset we will apply map_batches() transformations to filter out the customer reviews data where the total votes for a given product is greater than 0 and the reviews belongs to the “US” marketplace only. Using map_batches() for the filter operation is better in terms of performance in comparison to filter() transformation.
    # Filter our records with total_votes == 0
    ds = ds.map_batches(lambda df: df[df["total_votes"] > 0])
    
    # Filter and select records with marketplace equals US only
    ds = ds.map_batches(lambda df: df[df["marketplace"] == 'US'])
    
    ds.count()

  7. Finally, we write the resultant data to the S3 bucket you created in a Parquet file format. You can use different dataset APIs available, such as write_csv() or write_json() for different file formats.  Additionally, you can convert the resultant dataset to another DataFrame type such as Mars, Modin or pandas.
    ds.write_parquet("s3://<your-own-s3-bucket>/manta/Output/Raydemo/")

Clean up

To avoid incurring future charges, delete the Amazon S3 bucket and Jupyter notebook.

  1. On the Amazon S3 console, choose Buckets.
  2. Choose the bucket you created.
  3. Choose Empty and enter your bucket name.
  4. Choose Confirm.
  5. Choose Delete and enter your bucket name.
  6. Choose Delete bucket.
  7. On the AWS Glue console, choose Interactive Sessions
  8. Choose the interactive session you created.
  9. Choose Delete to remove the interactive session.

Conclusion

In this post, we demonstrated how you can use AWS Glue for Ray to run your Python code in a distributed environment.  You can now run your data and ML applications in a multi-node environment.

Refer to the Ray documentation for additional information and use cases.


About the authors

Zach Mitchell is a Sr. Big Data Architect. He works within the product team to enhance understanding between product engineers and their customers while guiding customers through their journey to develop data lakes and other data solutions on AWS analytics services.

Ishan Gaur works as Sr. Big Data Cloud Engineer ( ETL ) specialized in AWS Glue. He’s passionate about helping customers build out scalable distributed ETL workloads and implement scalable data processing and analytics pipelines on AWS. When not at work, Ishan likes to cook, travel with his family, or listen to music.

Derek Liu is a Solutions Architect on the Enterprise team based out of Vancouver, BC.  He is part of the AWS Analytics field community and enjoys helping customers solve big data challenges through AWS analytic services.

Kinshuk Pahare is a Principal Product Manager on AWS Glue.

Optimize your modern data architecture for sustainability: Part 2 – unified data governance, data movement, and purpose-built analytics

Post Syndicated from Sam Mokhtari original https://aws.amazon.com/blogs/architecture/optimize-your-modern-data-architecture-for-sustainability-part-2-unified-data-governance-data-movement-and-purpose-built-analytics/

In the first part of this blog series, Optimize your modern data architecture for sustainability: Part 1 – data ingestion and data lake, we focused on the 1) data ingestion, and 2) data lake pillars of the modern data architecture. In this blog post, we will provide guidance and best practices to optimize the components within the 3) unified data governance, 4) data movement, and 5) purpose-built analytics pillars.
Figure 1 shows the different pillars of the modern data architecture. It includes data ingestion, data lake, unified data governance, data movement, and purpose-built analytics pillars.

Modern Data Analytics Reference Architecture on AWS

Figure 1. Modern Data Analytics Reference Architecture on AWS

3. Unified data governance

A centralized Data Catalog is responsible for storing business and technical metadata about datasets in the storage layer. Administrators apply permissions in this layer and track events for security audits.

Data discovery

To increase data sharing and reduce data movement and duplication, enable data discovery and well-defined access controls for different user personas. This reduces redundant data processing activities. Separate teams within an organization can rely on this central catalog. It provides first-party data (such as sales data) or third-party data (such as stock prices, climate change datasets). You’ll only need access data once, rather than having to pull from source repeatedly.

AWS Glue Data Catalog can simplify the process for adding and searching metadata. Use AWS Glue crawlers to update the existing schemas and discover new datasets. Carefully plan schedules to reduce unnecessary crawling.

Data sharing

Establish well-defined access control mechanisms for different data consumers using services such as AWS Lake Formation. This will enable datasets to be shared between organizational units with fine-grained access control, which reduces redundant copying and movement. Use Amazon Redshift data sharing to avoid copying the data across data warehouses.

Well-defined datasets

Create well-defined datasets and associated metadata to avoid unnecessary data wrangling and manipulation. This will reduce resource usage that might result from additional data manipulation.

4. Data movement

AWS Glue provides serverless, pay-per-use data movement capability, without having to stand up and manage servers or clusters. Set up ETL pipelines that can process tens of terabytes of data.

To minimize idle resources without sacrificing performance, use auto scaling for AWS Glue.

You can create and share AWS Glue workflows for similar use cases by using AWS Glue blueprints, rather than creating an AWS Glue workflow for each use case. AWS Glue job bookmark can track previously processed data.

Consider using Glue Flex Jobs for non-urgent or non-time sensitive data integration workloads such as pre-production jobs, testing, and one-time data loads. With Flex, AWS Glue jobs run on spare compute capacity instead of dedicated hardware.

Joins between several dataframes is a common operation in Spark jobs. To reduce shuffling of data between nodes, use broadcast joins when one of the merged dataframes is small enough to be duplicated on all the executing nodes.

The latest AWS Glue version provides more new and efficient features for your workload.

5. Purpose-built analytics

Data Processing modes

Real-time data processing options need continuous computing resources and require more energy consumption. For the most favorable sustainability impact, evaluate trade-offs and choose the optimal batch data processing option.

Identify the batch and interactive workload requirements and design transient clusters in Amazon EMR. Using Spot Instances and configuring instance fleets can maximize utilization.

To improve energy efficiency, Amazon EMR Serverless can help you avoid over- or under-provisioning resources for your data processing jobs. Amazon EMR Serverless automatically determines the resources that the application needs, gathers these resources to process your jobs, and releases the resources when the jobs finish.

Amazon Redshift RA3 nodes can improve compute efficiency. With RA3 nodes, you can scale compute up and down without having to scale storage. You can choose Amazon Redshift Serverless to intelligently scale data warehouse capacity. This will deliver faster performance for the most demanding and unpredictable workloads.

Energy efficient transformation and data model design

Data processing and data modeling best practices can reduce your organization’s environmental impact.

To avoid unnecessary data movement between nodes in an Amazon Redshift cluster, follow best practices for table design.

You can also use automatic table optimization (ATO) for Amazon Redshift to self-tune tables based on usage patterns.

Use the EXPLAIN feature in Amazon Athena or Amazon Redshift to tune and optimize the queries.

The Amazon Redshift Advisor provides specific, tailored recommendations to optimize the data warehouse based on performance statistics and operations data.

Consider migrating Amazon EMR or Amazon OpenSearch Service to a more power-efficient processor such as AWS Graviton. AWS Graviton 3 delivers 2.5–3 times better performance over other CPUs. Graviton 3-based instances use up to 60% less energy for the same performance than comparable EC2 instances.

Minimize idle resources

Use auto scaling features in EMR Clusters or employ Amazon Kinesis Data Streams On-Demand to minimize idle resources without sacrificing performance.

AWS Trusted Advisor can help you identify underutilized Amazon Redshift Clusters. Pause Amazon Redshift clusters when not in use and resume when needed.

Energy efficient consumption patterns

Consider querying the data in place with Amazon Athena or Amazon Redshift Spectrum for one-off analysis, rather than copying the data to Amazon Redshift.

Enable a caching layer for frequent queries as needed. This is in addition to the result caching that comes built-in with services such as Amazon Redshift. Also, use Amazon Athena Query Result Reuse for every query where the source data doesn’t change frequently.

Use materialized views capabilities available in Amazon Redshift or Amazon Aurora Postgres to avoid unnecessary computation.

Use federated queries across data stores powered by Amazon Athena federated query or Amazon Redshift federated query to reduce data movement. For querying across separate Amazon Redshift clusters, consider using Amazon Redshift data sharing feature that decreases data movement between these clusters.

Track and assess improvement for environmental sustainability

The optimal way to evaluate success in optimizing your workloads for sustainability is to use proxy measures and unit of work KPI. This can be GB per transaction for storage, or vCPU minutes per transaction for compute.

In Table 1, we list certain metrics you could collect on analytics services as proxies to measure improvement. These fall under each pillar of the modern data architecture covered in this post.

Pillar Metrics
Unified data governance
Data movement
Purpose-built Analytics

Table 1. Metrics for the Modern data architecture pillars

Conclusion

In this blog post, we provided best practices to optimize processes under the unified data governance, data movement, and purpose-built analytics pillars of modern architecture.

If you want to learn more, check out the Sustainability Pillar of the AWS Well-Architected Framework and other blog posts on architecting for sustainability.

If you are looking for more architecture content, refer to the AWS Architecture Center for reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more.

How GoDaddy built a data mesh to decentralize data ownership using AWS Lake Formation

Post Syndicated from Ankit Jhalaria original https://aws.amazon.com/blogs/big-data/how-godaddy-built-a-data-mesh-to-decentralize-data-ownership-using-aws-lake-formation/

This is a guest post co-written with Ankit Jhalaria from GoDaddy.

GoDaddy is empowering everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their idea, build a professional website, attract customers, and manage their work.

GoDaddy is a data-driven company, and getting meaningful insights from data helps them drive business decisions to delight their customers. In 2018, GoDaddy began a large infrastructure revamp and partnered with AWS to innovate faster than ever before to meet the needs of its customer growth around the world. As part of this revamp, the GoDaddy Data Platform team wanted to set the company up for long-term success by creating a well-defined data strategy and setting goals to decentralize the ownership and processing of data.

In this post, we discuss how GoDaddy uses AWS Lake Formation to simplify security management and data governance at scale, and enable data as a service (DaaS) supporting organization-wide data accessibility with cross-account data sharing using a data mesh architecture.

The challenge

In the vast ocean of data, deriving useful insights is an art. Prior to the AWS partnership, GoDaddy had a shared Hadoop cluster on premises that various teams used to create and share datasets with other analysts for collaboration. As the teams grew, copies of data started to grow in the Hadoop Distributed File System (HDFS). Several teams started to build tooling to manage this challenge independently, duplicating efforts. Managing permissions on these data assets became harder. Making data discoverable across a growing number of data catalogs and systems is something that had started to become a big challenge. Although the cost of storage these days is relatively inexpensive, when there are several copies of the same data asset available, it makes it harder for analysts to efficiently and reliably use the data available to them. Business analysts need robust pipelines on key datasets that they rely upon to make business decisions.

Solution overview

In GoDaddy’s data mesh hub and spoke model, a central data catalog contains information about all the data products that exist in the company. In AWS terminology, this is the AWS Glue Data Catalog. The data platform team provides APIs, SDKs, and Airflow Operators as components that different teams use to interact with the catalog. Activities such as updating the metastore to reflect a new partition for a given data product, and occasionally running MSCK repair operations, are all handled in the central governance account, and Lake Formation is used to secure access to the Data Catalog.

The data platform team introduced a layer of data governance that ensures best practices for building data products are followed throughout the company. We provide the tooling to support data engineers and business analysts while leaving the domain experts to run their data pipelines. With this approach, we have well-curated data products that are intuitive and easy to understand for our business analysts.

A data product refers to an entity that powers insights for analytical purposes. In simple terms, this could refer to an actual dataset pointing to a location in Amazon Simple Storage Service (Amazon S3). Data producers are responsible for the processing of data and creating new snapshots or partitions depending on the business needs. In some cases, data is refreshed every 24 hours, and other cases, every hour. Data consumers come to the data mesh to consume data, and permissions are managed in the central governance account through Lake Formation. Lake Formation uses AWS Resource Access Manager (AWS RAM) to send resource shares to different consumer accounts to be able to access the data from the central governance account. We go into details about this functionality later in the post.

The following diagram illustrates the solution architecture.

Solution architecture illustrated

Defining metadata with the central schema repository

Data is only useful if end-users can derive meaningful insights from it—otherwise, it’s just noise. As part of onboarding with the data platform, a data producer registers their schema with the data platform along with relevant metadata. This is reviewed by the data governance team that ensures best practices for creating datasets are followed. We have automated some of the most common data governance review items. This is also the place where producers define a contract about reliable data deliveries, often referred to as Service Level Objective (SLO). After a contract is in place, the data platform team’s background processes monitor and send out alerts when data producers fail to meet their contract or SLO.

When managing permissions with Lake Formation, you register the Amazon S3 location of different S3 buckets. Lake Formation uses AWS RAM to share the named resource.

When managing resources with AWS RAM, the central governance account creates AWS RAM shares. The data platform provides a custom AWS Service Catalog product to accept AWS RAM shares in consumer accounts.

Having consistent schemas with meaningful names and descriptions makes the discovery of datasets easy. Every data producer who is a domain expert is responsible for creating well-defined schemas that business users use to generate insights to make key business decisions. Data producers register their schemas along with additional metadata with the data lake repository. Metadata includes information about the team responsible for the dataset, such as their SLO contract, description, and contact information. This information gets checked into a Git repository where automation kicks in and validates the request to make sure it conforms to standards and best practices. We use AWS CloudFormation templates to provision resources. The following code is a sample of what the registration metadata looks like.

Sample code of what the registration metadata looks like

As part of the registration process, automation steps run in the background to take care of the following on behalf of the data producer:

  • Register the producer’s Amazon S3 location of the data with Lake Formation – This allows us to use Lake Formation for fine-grained access to control the table in the AWS Glue Data Catalog that refers to this location as well as to the underlying data.
  • Create the underlying AWS Glue database and table – Based on the schema specified by the data producer along with the metadata, we create the underlying AWS Glue database and table in the central governance account. As part of this, we also use table properties of AWS Glue to store additional metadata to use later for analysis.
  • Define the SLO contract – Any business-critical dataset needs to have a well-defined SLO contract. As part of dataset registration, the data producer defines a contract with a cron expression that gets used by the data platform to create an event rule in Amazon EventBridge. This rule triggers an AWS Lambda function to watch for deliveries of the data and triggers an alert to the data producer’s Slack channel if they breach the contract.

Consuming data from the data mesh catalog

When a data consumer belonging to a given line of business (LOB) identifies the data product that they’re interested in, they submit a request to the central governance team containing their AWS account ID that they use to query the data. The data platform provides a portal to discover datasets across the company. After the request is approved, automation runs to create an AWS RAM share with the consumer account covering the AWS Glue database and tables mapped to the data product registered in the AWS Glue Data Catalog of the central governance account.

The following screenshot shows an example of a resource share.

Example of a resource share

The consumer data lake admin needs to accept the AWS RAM share and create a resource link in Lake Formation to start querying the shared dataset within their account. We automated this process by building an AWS Service Catalog product that runs in the consumer’s account as a Lambda function that accepts shares on behalf of consumers.

When the resource linked datasets are available in the consumer account, the consumer data lake admin provides grants to IAM users and roles mapping to data consumers within the account. These consumers (application or user persona) can now query the datasets using AWS analytics services of their choice like Amazon Athena and Amazon EMR based on the access privileges granted by the consumer data lake admin.

Day-to-day operations and metrics

Managing permissions using Lake Formation is one part of the overall ecosystem. After permissions have been granted, data producers create new snapshots of the data at a certain cadence that can vary from every 15 minutes to a day. Data producers are integrated with the data platform APIs that informs the platform about any new refreshes of the data. The data platform automatically writes a 0-byte _SUCCESS file for every dataset that gets refreshed, and notifies the subscribed consumer account via an Amazon Simple Notification Service (Amazon SNS) topic in the central governance account. Consumers use this as a signal to trigger their data pipelines and processes to start processing newer version of the data utilizing an event-driven approach.

There are over 2,000 data products built on the GoDaddy data mesh on AWS. Every day, there are thousands of updates to the AWS Glue metastore in the central data governance account. There are hundreds of data producers generating data every hour in a wide array of S3 buckets, and thousands of data consumers consuming data across a wide array of tools, including Athena, Amazon EMR, and Tableau from different AWS accounts.

Business outcomes

With the move to AWS, GoDaddy’s Data Platform team laid the foundations to build a modern data platform that has increased our velocity of building data products and delighting our customers. The data platform has successfully transitioned from a monolithic platform to a model where ownership of data has been decentralized. We accelerated the data platform adoption to over 10 lines of business and over 300 teams globally, and are successfully managing multiple petabytes of data spread across hundreds of accounts to help our business derive insights faster.

Conclusion

GoDaddy’s hub and spoke data mesh architecture built using Lake Formation simplifies security management and data governance at scale, to deliver data as a service supporting company-wide data accessibility. Our data mesh manages multiple petabytes of data across hundreds of accounts, enabling decentralized ownership of well-defined datasets with automation in place, which helps the business discover data assets quicker and derive business insights faster.

This post illustrates the use of Lake Formation to build a data mesh architecture that enables a DaaS model for a modernized enterprise data platform. For more information, see Design a data mesh architecture using AWS Lake Formation and AWS Glue.


About the Authors

Ankit Jhalaria is the Director Of Engineering on the Data Platform at GoDaddy. He has over 10 years of experience working in big data technologies. Outside of work, Ankit loves hiking, playing board games, building IoT projects, and contributing to open-source projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in Analytics. He has over 6 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Kyle Tedeschi is a Principal Solutions Architect at AWS. He enjoys helping customers innovate, transform, and become leaders in their respective domains. Outside of work, Kyle is an avid snowboarder, car enthusiast, and traveler.

Get started with data integration from Amazon S3 to Amazon Redshift using AWS Glue interactive sessions

Post Syndicated from Vikas Omer original https://aws.amazon.com/blogs/big-data/get-started-with-data-integration-from-amazon-s3-to-amazon-redshift-using-aws-glue-interactive-sessions/

Organizations are placing a high priority on data integration, especially to support analytics, machine learning (ML), business intelligence (BI), and application development initiatives. Data is growing exponentially and is generated by increasingly diverse data sources. Data integration becomes challenging when processing data at scale and the inherent heavy lifting associated with infrastructure required to manage it. This is one of the key reasons why organizations are constantly looking for easy-to-use and low maintenance data integration solutions to move data from one location to another or to consolidate their business data from several sources into a centralized location to make strategic business decisions.

Most organizations use Spark for their big data processing needs. If you’re looking to simplify data integration, and don’t want the hassle of spinning up servers, managing resources, or setting up Spark clusters, we have the solution for you.

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, ML, and application development. AWS Glue provides both visual and code-based interfaces to make data integration simple and accessible for everyone.

If you prefer a code-based experience and want to interactively author data integration jobs, we recommend interactive sessions. Interactive sessions is a recently launched AWS Glue feature that allows you to interactively develop AWS Glue processes, run and test each step, and view the results.

There are different options to use interactive sessions. You can create and work with interactive sessions through the AWS Command Line Interface (AWS CLI) and API. You can also use Jupyter-compatible notebooks to visually author and test your notebook scripts. Interactive sessions provide a Jupyter kernel that integrates almost anywhere that Jupyter does, including integrating with IDEs such as PyCharm, IntelliJ, and Visual Studio Code. This enables you to author code in your local environment and run it seamlessly on the interactive session backend. You can also start a notebook through AWS Glue Studio; all the configuration steps are done for you so that you can explore your data and start developing your job script after only a few seconds. When the code is ready, you can configure, schedule, and monitor job notebooks as AWS Glue jobs.

If you haven’t tried AWS Glue interactive sessions before, this post is highly recommended. We work through a simple scenario where you might need to incrementally load data from Amazon Simple Storage Service (Amazon S3) into Amazon Redshift or transform and enrich your data before loading into Amazon Redshift. In this post, we use interactive sessions within an AWS Glue Studio notebook to load the NYC Taxi dataset into an Amazon Redshift Serverless cluster, query the loaded dataset, save our Jupyter notebook as a job, and schedule it to run using a cron expression. Let’s get started.

Solution overview

We walk you through the following steps:

  1. Set up an AWS Glue Jupyter notebook with interactive sessions.
  2. Use notebook’s magics, including AWS Glue connection and bookmarks.
  3. Read data from Amazon S3, and transform and load it into Redshift Serverless.
  4. Save the notebook as an AWS Glue job and schedule it to run.

Prerequisites

For this walkthrough, we must complete the following prerequisites:

  1. Upload Yellow Taxi Trip Records data and the taxi zone lookup table datasets into Amazon S3. Steps to do that are listed in the next section.
  2. Prepare the necessary AWS Identity and Access Management (IAM) policies and roles to work with AWS Glue Studio Jupyter notebooks, interactive sessions, and AWS Glue.
  3. Create the AWS Glue connection for Redshift Serverless.

Upload datasets into Amazon S3

Download Yellow Taxi Trip Records data and taxi zone lookup table data to your local environment. For this post, we download the January 2022 data for yellow taxi trip records data in Parquet format. The taxi zone lookup data is in CSV format. You can also download the data dictionary for the trip record dataset.

  1. On the Amazon S3 console, create a bucket called my-first-aws-glue-is-project-<random number> in the us-east-1 Region to store the data.S3 bucket names must be unique across all AWS accounts in all the Regions.
  2. Create folders nyc_yellow_taxi and taxi_zone_lookup in the bucket you just created and upload the files you downloaded.
    Your folder structures should look like the following screenshots.s3 yellow taxi datas3 lookup data

Prepare IAM policies and role

Let’s prepare the necessary IAM policies and role to work with AWS Glue Studio Jupyter notebooks and interactive sessions. To get started with notebooks in AWS Glue Studio, refer to Getting started with notebooks in AWS Glue Studio.

Create IAM policies for the AWS Glue notebook role

Create the policy AWSGlueInteractiveSessionPassRolePolicy with the following permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
        "Effect": "Allow",
        "Action": "iam:PassRole",
        "Resource":"arn:aws:iam::<AWS account ID>:role/AWSGlueServiceRole-GlueIS"
        }
    ]
}

This policy allows the AWS Glue notebook role to pass to interactive sessions so that the same role can be used in both places. Note that AWSGlueServiceRole-GlueIS is the role that we create for the AWS Glue Studio Jupyter notebook in a later step. Next, create the policy AmazonS3Access-MyFirstGlueISProject with the following permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<your s3 bucket name>",
                "arn:aws:s3:::<your s3 bucket name>/*"
            ]
        }
    ]
}

This policy allows the AWS Glue notebook role to access data in the S3 bucket.

Create an IAM role for the AWS Glue notebook

Create a new AWS Glue role called AWSGlueServiceRole-GlueIS with the following policies attached to it:

Create the AWS Glue connection for Redshift Serverless

Now we’re ready to configure a Redshift Serverless security group to connect with AWS Glue components.

  1. On the Redshift Serverless console, open the workgroup you’re using.
    You can find all the namespaces and workgroups on the Redshift Serverless dashboard.
  2. Under Data access, choose Network and security.
  3. Choose the link for the Redshift Serverless VPC security group.redshift serverless vpc security groupYou’re redirected to the Amazon Elastic Compute Cloud (Amazon EC2) console.
  4. In the Redshift Serverless security group details, under Inbound rules, choose Edit inbound rules.
  5. Add a self-referencing rule to allow AWS Glue components to communicate:
    1. For Type, choose All TCP.
    2. For Protocol, choose TCP.
    3. For Port range, include all ports.
    4. For Source, use the same security group as the group ID.
      redshift inbound security group
  6. Similarly, add the following outbound rules:
    1. A self-referencing rule with Type as All TCP, Protocol as TCP, Port range including all ports, and Destination as the same security group as the group ID.
    2. An HTTPS rule for Amazon S3 access. The s3-prefix-list-id value is required in the security group rule to allow traffic from the VPC to the Amazon S3 VPC endpoint.
      redshift outbound security group

If you don’t have an Amazon S3 VPC endpoint, you can create one on the Amazon Virtual Private Cloud (Amazon VPC) console.

s3 vpc endpoint

You can check the value for s3-prefix-list-id on the Managed prefix lists page on the Amazon VPC console.

s3 prefix list

Next, go to the Connectors page on AWS Glue Studio and create a new JDBC connection called redshiftServerless to your Redshift Serverless cluster (unless one already exists). You can find the Redshift Serverless endpoint details under your workgroup’s General Information section. The connection setting looks like the following screenshot.

redshift serverless connection page

Write interactive code on an AWS Glue Studio Jupyter notebook powered by interactive sessions

Now you can get started with writing interactive code using AWS Glue Studio Jupyter notebook powered by interactive sessions. Note that it’s a good practice to keep saving the notebook at regular intervals while you work through it.

  1. On the AWS Glue Studio console, create a new job.
  2. Select Jupyter Notebook and select Create a new notebook from scratch.
  3. Choose Create.
    glue interactive session create notebook
  4. For Job name, enter a name (for example, myFirstGlueISProject).
  5. For IAM Role, choose the role you created (AWSGlueServiceRole-GlueIS).
  6. Choose Start notebook job.
    glue interactive session notebook setupAfter the notebook is initialized, you can see some of the available magics and a cell with boilerplate code. To view all the magics of interactive sessions, run %help in a cell to print a full list. With the exception of %%sql, running a cell of only magics doesn’t start a session, but sets the configuration for the session that starts when you run your first cell of code.glue interactive session jupyter notebook initializationFor this post, we configure AWS Glue with version 3.0, three G.1X workers, idle timeout, and an Amazon Redshift connection with the help of available magics.
  7. Let’s enter the following magics into our first cell and run it:
    %glue_version 3.0
    %number_of_workers 3
    %worker_type G.1X
    %idle_timeout 60
    %connections redshiftServerless

    We get the following response:

    Welcome to the Glue Interactive Sessions Kernel
    For more information on available magic commands, please type %help in any new cell.
    
    Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
    Installed kernel version: 0.35 
    Setting Glue version to: 3.0
    Previous number of workers: 5
    Setting new number of workers to: 3
    Previous worker type: G.1X
    Setting new worker type to: G.1X
    Current idle_timeout is 2880 minutes.
    idle_timeout has been set to 60 minutes.
    Connections to be included:
    redshiftServerless

  8. Let’s run our first code cell (boilerplate code) to start an interactive notebook session within a few seconds:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
      
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

    We get the following response:

    Authenticating with environment variables and user-defined glue_role_arn:arn:aws:iam::xxxxxxxxxxxx:role/AWSGlueServiceRole-GlueIS
    Attempting to use existing AssumeRole session credentials.
    Trying to create a Glue session for the kernel.
    Worker Type: G.1X
    Number of Workers: 3
    Session ID: 7c9eadb1-9f9b-424f-9fba-d0abc57e610d
    Applying the following default arguments:
    --glue_kernel_version 0.35
    --enable-glue-datacatalog true
    --job-bookmark-option job-bookmark-enable
    Waiting for session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d to get into ready status...
    Session 7c9eadb1-9f9b-424f-9fba-d0abc57e610d has been created

  9. Next, read the NYC yellow taxi data from the S3 bucket into an AWS Glue dynamic frame:
    nyc_taxi_trip_input_dyf = glueContext.create_dynamic_frame.from_options(
        connection_type = "s3", 
        connection_options = {
            "paths": ["s3://<your-s3-bucket-name>/nyc_yellow_taxi/"]
        }, 
        format = "parquet",
        transformation_ctx = "nyc_taxi_trip_input_dyf"
    )

    Let’s count the number of rows, look at the schema and a few rows of the dataset.

  10. Count the rows with the following code:
    nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    nyc_taxi_trip_input_df.count()

    We get the following response:

    2463931

  11. View the schema with the following code:
    nyc_taxi_trip_input_df.printSchema()

    We get the following response:

    root
     |-- VendorID: long (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- passenger_count: double (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- RatecodeID: double (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- PULocationID: long (nullable = true)
     |-- DOLocationID: long (nullable = true)
     |-- payment_type: long (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- extra: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
     |-- congestion_surcharge: double (nullable = true)
     |-- airport_fee: double (nullable = true)

  12. View a few rows of the dataset with the following code:
    nyc_taxi_trip_input_df.show(5)

    We get the following response:

    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    |VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    |       2| 2022-01-18 15:04:43|  2022-01-18 15:12:51|            1.0|         1.13|       1.0|                 N|         141|         229|           2|        7.0|  0.0|    0.5|       0.0|         0.0|                  0.3|        10.3|                 2.5|        0.0|
    |       2| 2022-01-18 15:03:28|  2022-01-18 15:15:52|            2.0|         1.36|       1.0|                 N|         237|         142|           1|        9.5|  0.0|    0.5|      2.56|         0.0|                  0.3|       15.36|                 2.5|        0.0|
    |       1| 2022-01-06 17:49:22|  2022-01-06 17:57:03|            1.0|          1.1|       1.0|                 N|         161|         229|           2|        7.0|  3.5|    0.5|       0.0|         0.0|                  0.3|        11.3|                 2.5|        0.0|
    |       2| 2022-01-09 20:00:55|  2022-01-09 20:04:14|            1.0|         0.56|       1.0|                 N|         230|         230|           1|        4.5|  0.5|    0.5|      1.66|         0.0|                  0.3|        9.96|                 2.5|        0.0|
    |       2| 2022-01-24 16:16:53|  2022-01-24 16:31:36|            1.0|         2.02|       1.0|                 N|         163|         234|           1|       10.5|  1.0|    0.5|       3.7|         0.0|                  0.3|        18.5|                 2.5|        0.0|
    +--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
    only showing top 5 rows

  13. Now, read the taxi zone lookup data from the S3 bucket into an AWS Glue dynamic frame:
    nyc_taxi_zone_lookup_dyf = glueContext.create_dynamic_frame.from_options(
        connection_type = "s3", 
        connection_options = {
            "paths": ["s3://<your-s3-bucket-name>/taxi_zone_lookup/"]
        }, 
        format = "csv",
        format_options= {
            'withHeader': True
        },
        transformation_ctx = "nyc_taxi_zone_lookup_dyf"
    )

    Let’s count the number of rows, look at the schema and a few rows of the dataset.

  14. Count the rows with the following code:
    nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    nyc_taxi_zone_lookup_df.count()

    We get the following response:

    265

  15. View the schema with the following code:
    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    We get the following response:

    root
     |-- LocationID: string (nullable = true)
     |-- Borough: string (nullable = true)
     |-- Zone: string (nullable = true)
     |-- service_zone: string (nullable = true)

  16. View a few rows with the following code:
    nyc_taxi_zone_lookup_df.show(5)

    We get the following response:

    +----------+-------------+--------------------+------------+
    |LocationID|      Borough|                Zone|service_zone|
    +----------+-------------+--------------------+------------+
    |         1|          EWR|      Newark Airport|         EWR|
    |         2|       Queens|         Jamaica Bay|   Boro Zone|
    |         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
    |         4|    Manhattan|       Alphabet City| Yellow Zone|
    |         5|Staten Island|       Arden Heights|   Boro Zone|
    +----------+-------------+--------------------+------------+
    only showing top 5 rows

  17. Based on the data dictionary, lets recalibrate the data types of attributes in dynamic frames corresponding to both dynamic frames:
    nyc_taxi_trip_apply_mapping_dyf = ApplyMapping.apply(
        frame = nyc_taxi_trip_input_dyf, 
        mappings = [
            ("VendorID","Long","VendorID","Integer"), 
            ("tpep_pickup_datetime","Timestamp","tpep_pickup_datetime","Timestamp"), 
            ("tpep_dropoff_datetime","Timestamp","tpep_dropoff_datetime","Timestamp"), 
            ("passenger_count","Double","passenger_count","Integer"), 
            ("trip_distance","Double","trip_distance","Double"),
            ("RatecodeID","Double","RatecodeID","Integer"), 
            ("store_and_fwd_flag","String","store_and_fwd_flag","String"), 
            ("PULocationID","Long","PULocationID","Integer"), 
            ("DOLocationID","Long","DOLocationID","Integer"),
            ("payment_type","Long","payment_type","Integer"), 
            ("fare_amount","Double","fare_amount","Double"),
            ("extra","Double","extra","Double"), 
            ("mta_tax","Double","mta_tax","Double"),
            ("tip_amount","Double","tip_amount","Double"), 
            ("tolls_amount","Double","tolls_amount","Double"), 
            ("improvement_surcharge","Double","improvement_surcharge","Double"), 
            ("total_amount","Double","total_amount","Double"), 
            ("congestion_surcharge","Double","congestion_surcharge","Double"), 
            ("airport_fee","Double","airport_fee","Double")
        ],
        transformation_ctx = "nyc_taxi_trip_apply_mapping_dyf"
    )

    nyc_taxi_zone_lookup_apply_mapping_dyf = ApplyMapping.apply(
        frame = nyc_taxi_zone_lookup_dyf, 
        mappings = [ 
            ("LocationID","String","LocationID","Integer"), 
            ("Borough","String","Borough","String"), 
            ("Zone","String","Zone","String"), 
            ("service_zone","String", "service_zone","String")
        ],
        transformation_ctx = "nyc_taxi_zone_lookup_apply_mapping_dyf"
    )

  18. Now let’s check their schema:
    nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()

    We get the following response:

    root
     |-- VendorID: integer (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- passenger_count: integer (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- RatecodeID: integer (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- PULocationID: integer (nullable = true)
     |-- DOLocationID: integer (nullable = true)
     |-- payment_type: integer (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- extra: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
     |-- congestion_surcharge: double (nullable = true)
     |-- airport_fee: double (nullable = true)

    nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()

    We get the following response:

    root
     |-- LocationID: integer (nullable = true)
     |-- Borough: string (nullable = true)
     |-- Zone: string (nullable = true)
     |-- service_zone: string (nullable = true)

  19. Let’s add the column trip_duration to calculate the duration of each trip in minutes to the taxi trip dynamic frame:
    # Function to calculate trip duration in minutes
    def trip_duration(start_timestamp,end_timestamp):
        minutes_diff = (end_timestamp - start_timestamp).total_seconds() / 60.0
        return(minutes_diff)

    # Transformation function for each record
    def transformRecord(rec):
        rec["trip_duration"] = trip_duration(rec["tpep_pickup_datetime"], rec["tpep_dropoff_datetime"])
        return rec
    nyc_taxi_trip_final_dyf = Map.apply(
        frame = nyc_taxi_trip_apply_mapping_dyf, 
        f = transformRecord, 
        transformation_ctx = "nyc_taxi_trip_final_dyf"
    )

    Let’s count the number of rows, look at the schema and a few rows of the dataset after applying the above transformation.

  20. Get a record count with the following code:
    nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    nyc_taxi_trip_final_df.count()

    We get the following response:

    2463931

  21. View the schema with the following code:
    nyc_taxi_trip_final_df.printSchema()

    We get the following response:

    root
     |-- extra: double (nullable = true)
     |-- tpep_dropoff_datetime: timestamp (nullable = true)
     |-- trip_duration: double (nullable = true)
     |-- trip_distance: double (nullable = true)
     |-- mta_tax: double (nullable = true)
     |-- improvement_surcharge: double (nullable = true)
     |-- DOLocationID: integer (nullable = true)
     |-- congestion_surcharge: double (nullable = true)
     |-- total_amount: double (nullable = true)
     |-- airport_fee: double (nullable = true)
     |-- payment_type: integer (nullable = true)
     |-- fare_amount: double (nullable = true)
     |-- RatecodeID: integer (nullable = true)
     |-- tpep_pickup_datetime: timestamp (nullable = true)
     |-- VendorID: integer (nullable = true)
     |-- PULocationID: integer (nullable = true)
     |-- tip_amount: double (nullable = true)
     |-- tolls_amount: double (nullable = true)
     |-- store_and_fwd_flag: string (nullable = true)
     |-- passenger_count: integer (nullable = true)

  22. View a few rows with the following code:
    nyc_taxi_trip_final_df.show(5)

    We get the following response:

    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    |extra|tpep_dropoff_datetime|     trip_duration|trip_distance|mta_tax|improvement_surcharge|DOLocationID|congestion_surcharge|total_amount|airport_fee|payment_type|fare_amount|RatecodeID|tpep_pickup_datetime|VendorID|PULocationID|tip_amount|tolls_amount|store_and_fwd_flag|passenger_count|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    |  0.0|  2022-01-18 15:12:51| 8.133333333333333|         1.13|    0.5|                  0.3|         229|                 2.5|        10.3|        0.0|           2|        7.0|         1| 2022-01-18 15:04:43|       2|         141|       0.0|         0.0|                 N|              1|
    |  0.0|  2022-01-18 15:15:52|              12.4|         1.36|    0.5|                  0.3|         142|                 2.5|       15.36|        0.0|           1|        9.5|         1| 2022-01-18 15:03:28|       2|         237|      2.56|         0.0|                 N|              2|
    |  3.5|  2022-01-06 17:57:03| 7.683333333333334|          1.1|    0.5|                  0.3|         229|                 2.5|        11.3|        0.0|           2|        7.0|         1| 2022-01-06 17:49:22|       1|         161|       0.0|         0.0|                 N|              1|
    |  0.5|  2022-01-09 20:04:14| 3.316666666666667|         0.56|    0.5|                  0.3|         230|                 2.5|        9.96|        0.0|           1|        4.5|         1| 2022-01-09 20:00:55|       2|         230|      1.66|         0.0|                 N|              1|
    |  1.0|  2022-01-24 16:31:36|14.716666666666667|         2.02|    0.5|                  0.3|         234|                 2.5|        18.5|        0.0|           1|       10.5|         1| 2022-01-24 16:16:53|       2|         163|       3.7|         0.0|                 N|              1|
    +-----+---------------------+------------------+-------------+-------+---------------------+------------+--------------------+------------+-----------+------------+-----------+----------+--------------------+--------+------------+----------+------------+------------------+---------------+
    only showing top 5 rows

  23. Next, load both the dynamic frames into our Amazon Redshift Serverless cluster:
    nyc_taxi_trip_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame = nyc_taxi_trip_final_dyf, 
        catalog_connection = "redshiftServerless", 
        connection_options =  {"dbtable": "public.f_nyc_yellow_taxi_trip","database": "dev"}, 
        redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", 
        transformation_ctx = "nyc_taxi_trip_sink_dyf"
    )

    nyc_taxi_zone_lookup_sink_dyf = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame = nyc_taxi_zone_lookup_apply_mapping_dyf, 
        catalog_connection = "redshiftServerless", 
        connection_options = {"dbtable": "public.d_nyc_taxi_zone_lookup", "database": "dev"}, 
        redshift_tmp_dir = "s3://aws-glue-assets-<AWS-account-ID>-us-east-1/temporary/", 
        transformation_ctx = "nyc_taxi_zone_lookup_sink_dyf"
    )

    Now let’s validate the data loaded in Amazon Redshift Serverless cluster by running a few queries in Amazon Redshift query editor v2. You can also use your preferred query editor.

  24. First, we count the number of records and select a few rows in both the target tables (f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup):
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    redshift table record count query output

    The number of records in f_nyc_yellow_taxi_trip (2,463,931) and d_nyc_taxi_zone_lookup (265) match the number of records in our input dynamic frame. This validates that all records from files in Amazon S3 have been successfully loaded into Amazon Redshift.

    You can view some of the records for each table with the following commands:

    SELECT * FROM public.f_nyc_yellow_taxi_trip LIMIT 10;

    redshift fact data select query

    SELECT * FROM public.d_nyc_taxi_zone_lookup LIMIT 10;

    redshift lookup data select query

  25. One of the insights that we want to generate from the datasets is to get the top five routes with their trip duration. Let’s run the SQL for that on Amazon Redshift:
    SELECT 
        CASE WHEN putzl.zone >= dotzl.zone 
            THEN putzl.zone || ' - ' || dotzl.zone 
            ELSE  dotzl.zone || ' - ' || putzl.zone 
        END AS "Route",
        COUNT(1) AS "Frequency",
        ROUND(SUM(trip_duration),1) AS "Total Trip Duration (mins)"
    FROM 
        public.f_nyc_yellow_taxi_trip ytt
    INNER JOIN 
        public.d_nyc_taxi_zone_lookup putzl ON ytt.pulocationid = putzl.locationid
    INNER JOIN 
        public.d_nyc_taxi_zone_lookup dotzl ON ytt.dolocationid = dotzl.locationid
    GROUP BY 
        "Route"
    ORDER BY 
        "Frequency" DESC, "Total Trip Duration (mins)" DESC
    LIMIT 5;

    redshift top 5 route query

Transform the notebook into an AWS Glue job and schedule it

Now that we have authored the code and tested its functionality, let’s save it as a job and schedule it.

Let’s first enable job bookmarks. Job bookmarks help AWS Glue maintain state information and prevent the reprocessing of old data. With job bookmarks, you can process new data when rerunning on a scheduled interval.

  1. Add the following magic command after the first cell that contains other magic commands initialized during authoring the code:
    %%configure
    {
        "--job-bookmark-option": "job-bookmark-enable"
    }

    To initialize job bookmarks, we run the following code with the name of the job as the default argument (myFirstGlueISProject for this post). Job bookmarks store the states for a job. You should always have job.init() in the beginning of the script and the job.commit() at the end of the script. These two functions are used to initialize the bookmark service and update the state change to the service. Bookmarks won’t work without calling them.

  2. Add the following piece of code after the boilerplate code:
    params = []
    if '--JOB_NAME' in sys.argv:
        params.append('JOB_NAME')
    args = getResolvedOptions(sys.argv, params)
    if 'JOB_NAME' in args:
        jobname = args['JOB_NAME']
    else:
        jobname = "myFirstGlueISProject"
    job.init(jobname, args)

  3. Then comment out all the lines of code that were authored to verify the desired outcome and aren’t necessary for the job to deliver its purpose:
    #nyc_taxi_trip_input_df = nyc_taxi_trip_input_dyf.toDF()
    #nyc_taxi_trip_input_df.count()
    #nyc_taxi_trip_input_df.printSchema()
    #nyc_taxi_trip_input_df.show(5)
    
    #nyc_taxi_zone_lookup_df = nyc_taxi_zone_lookup_dyf.toDF()
    #nyc_taxi_zone_lookup_df.count()
    #nyc_taxi_zone_lookup_df.printSchema()
    #nyc_taxi_zone_lookup_df.show(5)
    
    #nyc_taxi_trip_apply_mapping_dyf.toDF().printSchema()
    #nyc_taxi_zone_lookup_apply_mapping_dyf.toDF().printSchema()
    
    #nyc_taxi_trip_final_df = nyc_taxi_trip_final_dyf.toDF()
    #nyc_taxi_trip_final_df.count()
    #nyc_taxi_trip_final_df.printSchema()
    #nyc_taxi_trip_final_df.show(5)

  4. Save the notebook.
    glue interactive session save job
    You can check the corresponding script on the Script tab.glue interactive session script tabNote that job.commit() is automatically added at the end of the script.Let’s run the notebook as a job.
  5. First, truncate f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup tables in Amazon Redshift using the query editor v2 so that we don’t have duplicates in both the tables:
    truncate "public"."f_nyc_yellow_taxi_trip";
    truncate "public"."d_nyc_taxi_zone_lookup";

  6. Choose Run to run the job.
    glue interactive session run jobYou can check its status on the Runs tab.glue interactive session job run statusThe job completed in less than 5 minutes with G1.x 3 DPUs.
  7. Let’s check the count of records in f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup tables in Amazon Redshift:
    SELECT 'f_nyc_yellow_taxi_trip' AS table_name, COUNT(1) FROM "public"."f_nyc_yellow_taxi_trip"
    UNION ALL
    SELECT 'd_nyc_taxi_zone_lookup' AS table_name, COUNT(1) FROM "public"."d_nyc_taxi_zone_lookup";

    redshift count query output

    With job bookmarks enabled, even if you run the job again with no new files in corresponding folders in the S3 bucket, it doesn’t process the same files again. The following screenshot shows a subsequent job run in my environment, which completed in less than 2 minutes because there were no new files to process.

    glue interactive session job re-run

    Now let’s schedule the job.

  8. On the Schedules tab, choose Create schedule.
    glue interactive session create schedule
  9. For Name¸ enter a name (for example, myFirstGlueISProject-testSchedule).
  10. For Frequency, choose Custom.
  11. Enter a cron expression so the job runs every Monday at 6:00 AM.
  12. Add an optional description.
  13. Choose Create schedule.
    glue interactive session add schedule

The schedule has been saved and activated. You can edit, pause, resume, or delete the schedule from the Actions menu.

glue interactive session schedule action

Clean up

To avoid incurring future charges, delete the AWS resources you created.

  • Delete the AWS Glue job (myFirstGlueISProject for this post).
  • Delete the Amazon S3 objects and bucket (my-first-aws-glue-is-project-<random number> for this post).
  • Delete the AWS IAM policies and roles (AWSGlueInteractiveSessionPassRolePolicy, AmazonS3Access-MyFirstGlueISProject and AWSGlueServiceRole-GlueIS).
  • Delete the Amazon Redshift tables (f_nyc_yellow_taxi_trip and d_nyc_taxi_zone_lookup).
  • Delete the AWS Glue JDBC Connection (redshiftServerless).
  • Also delete the self-referencing Redshift Serverless security group, and Amazon S3 endpoint (if you created it while following the steps for this post).

Conclusion

In this post, we demonstrated how to do the following:

  • Set up an AWS Glue Jupyter notebook with interactive sessions
  • Use the notebook’s magics, including the AWS Glue connection onboarding and bookmarks
  • Read the data from Amazon S3, and transform and load it into Amazon Redshift Serverless
  • Configure magics to enable job bookmarks, save the notebook as an AWS Glue job, and schedule it using a cron expression

The goal of this post is to give you step-by-step fundamentals to get you going with AWS Glue Studio Jupyter notebooks and interactive sessions. You can set up an AWS Glue Jupyter notebook in minutes, start an interactive session in seconds, and greatly improve the development experience with AWS Glue jobs. Interactive sessions have a 1-minute billing minimum with cost control features that reduce the cost of developing data preparation applications. You can build and test applications from the environment of your choice, even on your local environment, using the interactive sessions backend.

Interactive sessions provide a faster, cheaper, and more flexible way to build and run data preparation and analytics applications. To learn more about interactive sessions, refer to Job development (interactive sessions), and start exploring a whole new development experience with AWS Glue. Additionally, check out the following posts to walk through more examples of using interactive sessions with different options:


About the Authors

Vikas blog picVikas Omer is a principal analytics specialist solutions architect at Amazon Web Services. Vikas has a strong background in analytics, customer experience management (CEM), and data monetization, with over 13 years of experience in the industry globally. With six AWS Certifications, including Analytics Specialty, he is a trusted analytics advocate to AWS customers and partners. He loves traveling, meeting customers, and helping them become successful in what they do.

Nori profile picNoritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys collaborating with different teams to deliver results like this post. In his spare time, he enjoys playing video games with his family.

Gal blog picGal Heyne is a Product Manager for AWS Glue and has over 15 years of experience as a product manager, data engineer and data architect. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design elegant, powerful and easy to use data products. Gal has a Master’s degree in Data Science from UC Berkeley and she enjoys traveling, playing board games and going to music concerts.