Tag Archives: AWS Glue

Building an ad-to-order conversion engine with Amazon Kinesis, AWS Glue, and Amazon QuickSight

Post Syndicated from Gandhi Raketla original https://aws.amazon.com/blogs/big-data/building-an-ad-to-order-conversion-engine-with-aws-glue-amazon-kinesis-data-streams-and-amazon-quicksight/

Businesses in ecommerce have the challenge of measuring their ad-to-order conversion ratio for ads or promotional campaigns displayed on a webpage. Tracking the number of users that clicked on a particular promotional ad and the number of users who actually added items to their cart or placed an order helps measure the ad’s effectiveness. Utilizing promotional ads that have higher conversion rates enables you to effectively utilize limited space on your ecommerce websites and applications.

This post demonstrates how to sessionize and aggregate clickstream and order data, compute the conversion ratio in real time, and generate data visualizations. We use Amazon Kinesis Data Streams to ingest and send data to Amazon Simple Storage Service (Amazon S3), and AWS Glue, Amazon Athena, and Amazon QuickSight to catalog, analyze, and visualize the data, respectively.

Solution overview

To measure ad-to-order conversion, you need two important pieces of data: user clicks and orders. Clickstream data is captured as users navigate through the site, each time users click on the webpage, and the metadata associated with those clicks. Depending on the user base and number of active users at any moment, clickstream data can be a large amount of data generated per second. Typically, every ecommerce system has a centralized order management system that captures orders created from different channels like a web portal or mobile app. To compute an ad-to-order conversion rate, you join clickstream data and order data over time: (total number of orders/total number of clicks) *100.

The following diagram illustrates the architecture of our solution.

The solution has six main categories.

  • Data generators – Clickstream and order data is generated with the help of an AWS Lambda function. The function is triggered by a scheduled Amazon CloudWatch Events event every minute and generates random clicks for ingestion into a Kinesis data stream. Similarly, another function triggered by a CloudWatch event generates random orders for ingestion into a second data stream. In a production environment, this data comes from clickstream generators and a centralized order management system.
  • Data ingestion – Kinesis data streams ingest clickstream and order data as they are generated.
  • Data sessionization – Data sessionization helps group related data. For clickstream data, we can group clicks on an ad by different users or time periods. For order data, we can group orders by different ads. We use Amazon Kinesis Data Analytics for SQL to analyze streaming data in real time with standard SQL. Sessionized clickstream and order data is ingested into another in-application stream.
  • Data processing and storage – The sessionization stream from Kinesis Data Analytics for SQL is ingested into an Amazon Kinesis Data Firehose delivery stream, which delivers the data to a pre-configured S3 bucket.
  • Data Catalog – You use AWS Glue to crawl the clickstream and orders data in their respective S3 buckets, as well as build metadata definitions and tables in Athena. AWS Glue crawlers run every hour to update table definitions, and Athena views are built to compute the ad-to-order conversion.
  • Data visualization – You use QuickSight to generate visualizations.

Prerequisites

Before getting started, you must provision your resources with AWS CloudFormation. 

  1. Choose Launch Stack.
  1. Choose Next.
  2. For Stack name, enter a name for the stack.
  3. For Bucket Name for Clicks, enter the name of the S3 bucket that holds clickstream data (for this post, click-stream).
  4. For Bucket Name for Orders, enter the name of the S3 bucket that holds order data (order-stream).
  5. Enter any tags you wish to assign to the stack.
  6. Choose Next.
  7. Verify that the stack has been created successfully.

If you have never used QuickSight in this account before, sign up for QuickSight before moving on to the next step. Keep in mind that admin access to the Enterprise Edition QuickSight instance is needed to complete setup. 

Generating and ingesting clickstream data

On the Lambda console, view your function ingest-clickstream for ingesting clickstream data. The clickstream data attributes include UserId, Device, Event, EventType, and Timestamp. The event contains promotional ad information on the webpage clicked by the user. This function generates random clickstreams and ingests it into the data stream ClickStream. The following screenshot shows your function details on the console.

A CloudWatch Events rule invokes this function every minute. The following screenshot shows sample data that was ingested into the data stream. The Event column represents the portion of the webpage the user clicked; every click on the webpage has a unique ID and type assigned (for example, P601 has the event type Promotion, C301 has the event type Checkout).

Generating and ingesting order data

On the AWS Lambda console, view your function ingest-order for ingesting order data. This function ingests random orders.

Each order has order lines, which contain the attributes ItemId, Promotion, UnitPrice, and Quantity (see the following screenshot). The promotion attribute indicates the ad the user clicked before adding the item to their shopping cart. This function generates random orders and ingests it into OrderStream. The Promotion attribute joins clickstream data and order data.

Sessionizing the data

To sessionize the data, complete the following steps:

  1. On the Kinesis Data Analytics console, select <Stack Name>-ClickStreamApplication.
  2. Choose Run.
  3. Repeat the same step for <Stack Name>-OrderAnalysisApp.
  4. When the status changes to Running, choose the application name.
  5. Under Real time analytics, choose Go to SQL results.
  6. Choose the Real-time analytics

The application groups clicks in 1-minute intervals. Let’s take the ad P701 as an example. If this ad is clicked by multiple users, this SQL function adds all the clicks by different users in the last minute. If five users clicked on P701 in the last minute, the function outputs a ClickCount of 5. A stagger window is used because it’s well-suited for analyzing groups of data that arrive at inconsistent times.

  1. On the Kinesis Data Analytics console, choose OrderAnalysisApp.
  2. Choose Go to SQL results.
    This application groups orders by Promotion, as shown in the following screenshot.

Processing and storing the data

In the data processing and storage stage, aggregated clickstream and order data is delivered to a Kinesis Data Firehose delivery stream. Kinesis Data Firehose delivers clickstream aggregated records and orders to the click-stream and order-stream buckets, respectively. The data is partitioned by year, month, and day. The following screenshot shows the delivery streams on the console.

Analyzing the data

To analyze your data, complete the following steps:

  1. Verify that the S3 bucket was created for clickstream and orders.

The data in the bucket is partitioned by year, month, date, and hour.

  1. On the AWS Glue console, view the clickstream and orders crawlers.

These two crawlers crawl the click-stream and order-stream buckets every 15 minutes and create tables.

  1. To run the crawlers on demand, choose Run crawler.

When the crawler is finished, the Tables added column displays 1.

  1. In the navigation pane, choose Tables.
  2. Verify that the crawlers created the tables.
  3. On the Athena console, choose Saved queries.

You can see three queries have been created.

  1. Select view_clicks_aggregate to load it in the query editor.
  2. Select ad_to_order_conversion and choose Run Query.

If the Amazon S3 bucket name has -, the crawler replaces - with _ while creating the table.

  1. Replace - with _ in the table name when creating the view.
  2. Repeat the same process for view_orders_aggregate and view_conversion_ratio.

Make sure you run view_clicks_aggregate and view_orders_aggregate before running view_conversion_ratio.

  1. Choose view_conversion_ratio and choose Preview.

Orders and clicks for each promotion and the corresponding conversion ratio are displayed.

Visualizing the data

To visualize your data, you first load it into QuickSight. You can then create visualizations. In this section, we also configure a scheduled data refresh.

Loading the data

To visualize your data, you must first load your data into QuickSight.

  1. On the QuickSight console, from the Admin drop-down menu, choose Manage QuickSight.
  2. In the navigation pane, choose Security & Permissions.
  3. Choose Add or remove.
  4. Select Amazon Athena.
  5. Select Amazon S3 to edit QuickSight access to your S3 buckets.
  6. Choose the Details link next to Amazon S3.
  7. Choose Select S3 buckets.
  8. Select the bucket names you provided for clicks and orders.
  9. Choose Finish.
  10. Choose Update.
  11. Choose the QuickSight icon on the top left of the admin panel to proceed back to the home screen.
  12. In the navigation pane, choose Datasets.
  13. Choose New dataset.
  14. Choose Athena.
  15. For Data source name, enter Ad-To-Order-Conversion.
  16. Choose Validate Connection.
  17. After your connection is validated, choose Create data source.
  18. For Database, choose ad-to-order-conversion.
  19. For Tables, select view_conversion_ratio.
  20. Choose Select.
  21. Choose Visualize.

Creating visualizations

In this section, we create two visualizations of our data. We first make a horizontal bar chart.

  1. From the Add menu, choose Add Calculated Field.
  2. Enter Clicks_to_Orders.
  3. Enter the formula sum(orders)/sum(clicks).
  4. Choose Save.
  5. Choose next to Click to orders.
  6. For Show as, choose Percent.
  7. For Visual type, choose Horizontal bar chart.
  8. Drag promotion to Y-axis.
  9. Drag clicks_to_orders to Value.
  10.  Drag date to Group/Color.

The following screenshot shows our visualization.

We now make our second visualization, a vertical bar chart.

  1. Choose the + icon next to Sheet1.
  2. For Visual types, choose Vertical bar chart.
  3. Drag promotions to Y-axis.
  4. Drag clicks and orders to Value.

This graph displays clicks and orders for each promotion.

  1. Choose Insights on the left panel to see a summary of your insights.

Refreshing the data

We can also set up a scheduled refresh for our data.

  1. Choose Manage Data.
  2. Choose view_conversion_ratio.
  3. Choose Schedule refresh.
  4. Choose Create.
  5. For Repeats, choose Hourly.
  6. Choose Create.

You see a confirmation message that you configured a refresh one time per hour.

Conclusion

In this post, we showed you how to use AWS analytics and storage services to address business challenges that require handling large volumes of data. Kinesis Data Streams and Kinesis Data Analytics let you ingest large volumes of data and sessionize the data. We also showed you how to analyze and visualize the clickstream and order data using AWS Glue, Athena, and QuickSight.


About the Authors

Gandhi Raketla is a Senior Solutions Architect for AWS. He works with AWS customers and partners on cloud adoption, architecting solutions that help customers foster agility and innovation.

 

 

 

Nick Sack is a DevOps Consultant for AWS Professional Services. He is passionate about working with customers and building automated solutions to help customers on their cloud journeys. When not working, Nick enjoys hiking, playing soccer, reading, and learning about technology.

Preparing data for ML models using AWS Glue DataBrew in a Jupyter notebook

Post Syndicated from Zayd Simjee original https://aws.amazon.com/blogs/big-data/preparing-data-for-ml-models-using-aws-glue-databrew-in-a-jupyter-notebook/

AWS Glue DataBrew is a new visual data preparation tool that makes it easy for data analysts and data scientists to clean and normalize data to prepare it for analytics and machine learning (ML). In this post, we examine a sample ML use case and show how to use DataBrew and a Jupyter notebook to upload a dataset, clean and normalize the data, and train and publish an ML model. We look for anomalies by applying the Amazon SageMaker Random Cut Forest (RCF) anomaly detection algorithm on a public dataset that records power consumption for more than 300 random households.

Deploying your resources

To make it easier for you to get started, we created an AWS CloudFormation template that automatically configures a Jupyter notebook instance with the required libraries and installs the plugin. We used Amazon Deep Learning AMI to configure the out-of-the-box Jupyter server. This easy deployment is intended to get you started on DataBrew from within a Jupyter environment. The source code for the DataBrew plugin and the CloudFormation template are available in the GitHub repo.

To deploy the solution, you must have a subnet that has internet access and an Amazon Simple Storage Service (Amazon S3) bucket where you want store the data for DataBrew. Select the VPC, subnet, security group, and the S3 bucket that you want to use store the data for DataBrew processing. Provide the Amazon Elastic Compute Cloud (Amazon EC2) key pair if you plan to SSH to the instance.

  1. Launch the following stack:
  2. When the template deployment is complete, on the Outputs tab, choose the URL to open JupyterLab.

Because the Jupyter server is configured with a self-signed SSL certificate, your browser warns you and prompts you to avoid continuing to this website. But because you set this up yourself, it’s safe to continue.

  1. Choose Advanced.
  2. Choose Proceed.
  3. Use the password databrew_demo to log in.

For more information about securing and configuring your Jupyter server, see Set up a Jupyter Notebook Server.

  1. In the Jupyter environment’s left panel, choose the DataBrew logo.
  2. Choose Launch AWS Glue DataBrew.
  3. When the extension loads, choose the Datasets tab in the navigation bar.

Preparing your data using DataBrew

Now that the DataBrew extension is ready to go, we can begin to explore how DataBrew can make data preparation easy. Our source dataset contains data points at 15-minute intervals, and is organized as a series of columns for each household. The dataset is really wide, and the RCF algorithm expects data tuples of date/time, client ID, and consumption value. Additionally, we want to normalize our data to 1-hour intervals. All of this is achieved through DataBrew.

Setting up your dataset and project

To get started, you set up your dataset, import your data, and create a new project.

  1. Download power.consumption.csv from the GitHub repo.
  2. On the Datasets page, choose Connect new dataset.
  3. For Dataset name, enter a name.
  4. In the Connect to new dataset section, choose File upload.
  5. Upload power.consumption.csv.
  6. For Enter S3 destination, enter an S3 path where you can save the file.
  7. Choose Create dataset.

The file may take a few minutes to upload, depending on your internet speed.

  1. On the Datasets page, filter for your created dataset.
  2. Select your dataset and choose Create project with this dataset.

  1. In the Create project wizard, give your project a name.
  2. In the Permissions section, choose the AWS Identity and Access Management (IAM) role created from the CloudFormation template.

You can find the role on the CloudFormation stack’s Resources tab. If you use the default stack name, the role should begin with databrew-jupyter-plugin-demo.

After you create the project, the project view loads, and you’re ready to prepare your data.

Building a recipe for data transformation

A recipe is a series of steps that prepare your data for the RCF algorithm. The algorithm requires three columns: date, client ID, and an integer value. To transform our dataset to contain those three columns, we configure our recipe to do the following:

  1. Unpivot the data to collapse measurements from multiple clients into one column.
  2. Apply the window function to average the 15-minute data points into 1-hour data points.
  3. Filter to keep only values at each hour.
  4. Multiply and floor the results.

Unpivoting the data

To unpivot the data and collapse measurements, complete the following steps:

  1. On the toolbar, choose Pivot.

  1. In the Pivot wizard, select Unpivot: Columns to rows.
  2. For Unpivot columns, choose MT_012, MT_013, MT_131, and MT_132.
  3. For Column name, enter client_id.
  4. For Value column name, enter quarter_hour_consumption.

In the Recipe pane, you can see the action that unpivots the columns. This action can be revisited later and changed. The new columns may not be visible immediately.

  1. To see them and narrow down the visible data to only the relevant columns, choose the arrow next to Viewing.
  2. Deselect all items and select only _c0 and our two new columns, client_id and
    quarter_hour_consumption.

Applying the window function

To apply the window function to average the 15-minute data points into 1-hour data points, complete the following steps:

  1. Choose the quarter_hour_consumption
  2. Choose Functions.
  3. Choose Window functions.
  4. Choose Rolling average.

  1. In the Create column pane, for Number of rows before, enter 0.
  2. For Number of rows after, enter 3.
  3. For Name of column to order by with, choose client_id.
  4. For Destination column, enter hourly_consumption_raw.
  5. Choose Apply.

Filtering to keep only values at each hour

In this step, you rename the date/time column, convert it to string type so that you can do simple filtering, and filter the dataset on the string column for times ending in :00:00.

  1. For the _c0 column, choose the ellipsis icon (…) and choose Rename.

  1. Rename the column to timestamp.
  2. Choose the clock icon and choose string.

  1. With the column selected, choose Filter.
  2. Choose By condition.
  3. Choose Ends with.
  4. Enter the value :00:00.
  5. Choose Apply.

Filtering the column for only values that end with :00:00 leaves you with hourly averages of power consumption per client for every hour.

Multiplying and flooring the results

In this step, you multiply the data by 100 to increase precision and floor the data so that it can be accepted by the RCF algorithm, which only accepts integers.

  1. Choose Functions.
  2. Choose Math functions.
  3. Choose Multiply.
  4. For Value using¸ choose Source columns and value.
  5. For Source column, choose hourly_consumption_raw.
  6. For Destination column, enter hourly_consumption_raw_times_a_hundred.
  7. Choose Apply.

  1. Choose Functions.
  2. Choose Math functions.
  3. Choose Floor.
  4. For Source column, choose hourly_consumption_raw_times_a_hundred.
  5. For Destination column, enter hourly_consumption.
  6. Choose Apply.

This column contains the final, normalized data.

Running the job to transform the data

You’re now ready to transform the data.

  1. Choose Create job.

  1. Enter a job name and choose the dataset we created.
  2. Specify the S3 bucket you provided in the CloudFormation template.
  3. Choose the IAM role that AWS CloudFormation created (which we used earlier to create the project).
  4. Choose Create and run job.

The job may take up to 5 minutes to complete.

On the Job run history page for the job, view the output on the Amazon S3 console by choosing the link in the table.

That’s it, the data is now ready to use when training and deploying our ML model.

Training and deploying the ML model using prepared data

The data was already prepared using DataBrew via the plugin, so the next step is to train an ML model using that data. We provided a sample anomaly detection notebook that you can download.

In this sample notebook, you need to specify the S3 data location where you stored the output data from DataBrew. The notebook uses the IAM role attached to the EC2 instance profile that was created by AWS CloudFormation. You can follow through the notebook and when you provide the right S3 paths, the first step is to filter the specific columns we’re interested in and visualize the time series power consumption data.

The next step is to train a sample anomaly detection model using the SageMaker Random Cut Forest algorithm. We pick one of the time series available in the input Pandas DataFrame and train the anomaly detection model with the hyperparameter feature_dim set to 1, leaving the default values for other hyperparameters. We then create an estimator for Random Cut Forest and fit the model. In a few minutes, the training should be complete. In the next step, we create a predictor and deploy the model to a SageMaker endpoint.

Using the prepared data, we run the prediction and plot the results.

We use the anomaly detection baseline that is two standard deviations away from the mean score. The data shows an anomaly towards the end of the time series. With this information, that timeframe can be further investigated.

Finally, we clean up by deleting the SageMaker endpoint to prevent any ongoing charges.

Conclusion

We’ve walked you through the process of setting up the AWS Glue DataBrew Jupyter plugin in a Jupyter notebook environment. We used the plugin to prepare data, then trained and deployed an ML model in the same Jupyter environment using SageMaker.

Although we used a DLAMI Jupyter environment in this post, the DataBrew Jupyter extension also works on SageMaker notebooks. For installation instructions, see the GitHub repo.

DataBrew makes it easy to iterate through data preparation workflows. The resultant recipes and jobs are duplicable and can be run over discrete, large datasets. The DataBrew Jupyter plugin allows you to prepare your data seamlessly, in context, within your Jupyter notebook.


About the Authors

Zayd Simjee is a software engineer at Amazon. He’s interested in distributed systems, big data, and simplifying developer experience on the Cloud. He’s worked on a few Big Data services at AWS, and most recently completed work on the AWS Glue DataBrew release.

 

 

 

 

As a Principal Solutions Architect at Amazon Web Services, Karthik Sonti works with GSI partners to help enterprises realize transformational business outcomes using artificial intelligence, machine learning and data analytics

Enabling self-service data publication to your data lake using AWS Glue DataBrew

Post Syndicated from Jason Hunter original https://aws.amazon.com/blogs/big-data/enable-self-service-data-publication-to-your-data-lake-using-aws-glue-databrew/

Data lakes have been providing a level of flexibility to organizations unparalleled to anything before them. Having the ability to load and query data in place—and in its natural form—has led to an explosion of data lake deployments that have allowed organizations to accelerate against their data strategy faster than ever before.

Most organizations have a latent demand for business data that already exists within the organization—but is not yet broadly advertised or accessible. Building data lakes with AWS Lake Formation allows you to create an open and portable data platform that allows data consumers (users and applications) to discover and source high-quality and trusted data from across business data domains.

In a typical scenario, a data lake is run and managed by a data team that is responsible for the onboarding of datasets, application and enforcement of business rules (ETL/ELT), and publishing the transformed dataset into a company-wide data catalog for consumption. The following diagram illustrates a typical process flow of how data transmits through a data lake.

It most cases, the data team writes the code and uses platform tools to run the pipeline, including running business logic against the data that is to be shared to the wider organization data users.

The ability for users to self-serve the creation of datasets (such as data owners and business units) for general applicability across the business is an ongoing request for most organizations. If you run a centralized operating model across your data lake, you may require that all data pass through this centralized team or where the facility exists for self-serve data publication, this typically requires a high technical bar to participate in the data onboarding process, which is normally outside the capability of data stewards and owners.

AWS Glue DataBrew is a visual data preparation tool that helps solve some of these challenges by allowing you to inspect datasets and apply transformations against them in a visual and no code way that allows users of all skill sets to participate as part of the data preparation pipeline process. An obvious benefit is data owners and SMEs (subject matter experts) can define and implement data quality and governance checks against their data before publishing it into the data lake.

This accelerates and simplifies the process of producing trusted and quality outputs because the data owners are integrated as part of the onboarding process.

For this post, we discuss a use case of a project team in a consultancy organization that wants to combine first-party data they have acquired against an existing dataset that exists within their data lake.

Overview of solution

The workflow includes the following steps:

Step 1: Create a new AWS Glue DataBrew dataset by uploading a file to, or choosing and existing dataset, from their organization data lake.
Step 2: Create a DataBrew project to load and visually inspect and validate the datasets that you will transform.
Step 3: Using DataBrew, visually transform, augment and define the business rules you want to be applied to your new dataset. (These business rules are called steps, which collectively are called a Recipe)
Step 4: Create a DataBrew job to execute your Rules (Recipe) against the datasets in their entirety.
Step 5: Run a AWS Glue crawler to discover the resulting dataset, publish it to the AWS Glue data catalog and apply a secure permission model to it.
Step 6: Use a preferred analysis or machine learning (ML) tool to browse and consume the data in the Data Catalog based on user permissio

Let’s now go through these steps in a bit more detail.

Creating a project

A project in DataBrew defines two key aspects: the primary dataset you’re working with and the steps you want to run against that dataset to produce a trusted and validated dataset that is specific to your business need.

To create your project, complete the following steps:

  1. On the DataBrew console, choose Create project.

 

  1. For Project name, enter a name that is representative of the source and intent of the output.
  2. For Attached recipe, choose Create new recipe.

In subsequent projects with a similar shape of data, you can reuse your recipe of business rules.

You have several options when connecting your dataset. You can upload a new data source (such as a local Excel or JSON file on my computer) or to reuse an existing dataset that exists within your data lake. If you’re sourcing data from the data lake, you can filter by the origin of the source, such as our cloud data warehouse Amazon Redshift, or transactional systems hosted in an Amazon Relation Database Service (Amazon RDS) instance. For more information about copying data into your data lake using AWS Glue, see Which Data Stores Can I Crawl? For this post, I upload a new dataset.

  1. Select New dataset.

  1. For Dataset name, enter a name.
  2. For File upload, choose Choose file.
  3. Choose the file to upload.
  4. For Enter S3 destination, enter the location in our data lake where the file is saved to.

  1. For Role name, choose a security role that grants access to the data lake location specified.

Typically, this is an area that you have access to that is driven by project or department privileges.

If no role is provided by default, a data platform admin or AWS account admin can create one for you. For more information, see Identity-based policy examples for AWS Glue DataBrew.

  1. Optionally, modify the sample size of the data to inspect.
  2. Choose Create project.

Analyzing and transforming your dataset

With a project created, I can load a sample of my dataset in a familiar tabular grid view and start to explore the data in greater detail.

This view allows for quick inspection of the data and the ability to run some sample and survey methods to determine if the data being viewed has statistical significance to our project. This way, we can make sure we’re making informed, data-driven decisions on behalf of our clients.

The following screenshot shows that the review_id column has duplicate values.

To maintain integrity and trust of the output, I can remove the duplicate rows as a step by choosing Duplicates and Remove duplicates in column.

This adds a new step to my recipe, which is enforced on this column type whenever it runs. DataBrew shows a preview of the change before you apply the changes. 

If I want to dig deeper into the shape and validity of my data, I can simply choose the Schema tab to get a coalesced view of the data, or create a data profile job on the Profile tab to get further insights, such as correlations across column types.

The rest of the data looks good, so I now complete the rest of my transformation functions:

  1. Clean the survey result summary text by removing invalid characters and quotes, and enforcing sentence casing.
  2. For downstream ML tasks, apply a hot encode function to the text value of a customer’s sentiment (categorical variable) to that of an integer value (binary vector).
  3. Join to a company dataset that contains our customer data so I can bring in identifiable information of the locations surveyed.
  4. Hide columns not require for the final output.

Cleaning the summary text

We first clean the survey result summary text by removing invalid characters and quotes, and enforce sentence casing.

  1. Choose Format.
  2. Choose Change to sentence casing.

  1. On the recipe bar, choose Clean.
  2. For Specify values to remove, select Special characters.

Applying a hot encode function

Next, apply a hot encode function to the categorical variable (sentiment) to a binary vector for downstream ML purposes.

  1. Choose Encode.
  2. Choose One-Hot-Encode column.
  3. Choose your source column.
  4. Choose Apply.

Joining and enriching our dataset

We now join our dataset against a primary customer dataset so we can enrich our dataset with additional columns of value.

The initial dataset we created contains a number of key data elements, such as a review rating, customer comments, associated sentiment rating, a number of ordinal values specifying the value of the review, emotional attributes from the interviewee, and various restaurant and location identifiers.

As part of our full analysis, we want to reference the additional data elements associated with these restaurant and location identifiers so we can form a complete view of the data.

DataBrew makes this enrichment task easy by letting you join to an additional company dataset that exists within your data lake. In the following steps, I choose to bring across only those columns that I require by deselecting those that are not required. (This existing dataset contains additional data of businesses that are part of the wider survey group.)

  1. On the recipe menu bar, choose Join.
  2. Choose Dataset join.
  3. For Select dataset, choose the dataset to join against.

The page displays a summary view for inspection.

  1. Choose Next.

For this post, we perform a left join to ensure that we’re only bring in matching records against our own dataset (Table A) from the paired dataset (Table B).

  1. Choose the columns between both tables that we want to match against (for this post, business_id).
  2. Select the columns from the joined table to include in our dataset.

  1. Choose Finish.

When this step is complete, we have a complete dataset that is applicable towards our final requirements for analysis.

Removing unnecessary columns

Finally, remove columns not required for the final dataset and change the column names. ­­

At the conclusion of our data preparation, we can see the number and types of steps we have run.

For similar datasets, you can choose Publish in the Recipe pane to reuse this recipe and enforce the steps taken instead of having to complete the steps again. This allows you to build up your own repository of recipes that you can replay against various datasets to ensure a consistent level of data quality and outputs.

You then provide a description and publish the recipe for broader accessibility.

As a final validation step before publishing my dataset, I can choose to visually inspect the data lineage to confirm my sources selected are as intended and to confirm the size of my datasets to process.

For this post, I can confirm that the source and datasets are correct and the recipe being created has the expected number of steps. The data lineage view is useful for when you’re reading unfamiliar data sources because it allows you to understand the origins and transformations that have taken place beforehand.

Creating a job to publish your dataset

Now we have our dataset defined and validated by way of the visual steps we created in our recipe. It’s time to create a job that runs them against the entirety of our dataset. The output is saved back into an area of our data lake for additional processing, leading to publication within our Data Catalog.

  1. Choose Create job.

  1. For Job name, enter a name.

  1. In the Job output settings section, specify the data lake location where the final output is saved to.

If required, we can optionally choose a file format for the dataset. If you have multiple requirements for different file formats, you can simply choose to save multiple outputs by choosing Add another output and specifying the details. For this post, I opt to save the output in two formats: one in CSV and another in the optimized open file format Apache Parquet.

  1. For Role name, choose the security role that has access to our client project folder to make sure DataBrew has permission to save the dataset to the location specified.

The role that you choose here is different than the DataBrew role you chose when creating the project. Each role serves a different purpose. For this post, we choose a role that has permissions to read, run, and save the resulting files into data lake locations (as governed by the role) of which I have been delegated rights to access and save data to.

  1. Choose Create and run job.

 

The project is now visible on the main page, detailing the recipe used against the dataset and the job name.
style=”margin: 20px 0px 20px 0px; border: 1px solid #CCCCCC;”

To view details about the job run and inspect completion details, choose the value in the Jobs column.

To see additional details about the history of the job, choose the job name.

The output provides details of the job.

The lineage view shows the end-to-end process of all input files, transformation steps, and final output locations.

Consuming your dataset

After you save the new dataset to your data lake, we can use AWS Glue to crawl the new dataset and publish it to our Data Catalog. For more information, see Populating the AWS Glue Data Catalog. After the data is indexed, it’s available and visible through AWS Glue and Lake Formation. 

Crawling the output files from DataBrew

To crawl the output files and publish them to our Data Catalog, complete the following steps:

  1. On the AWS Glue console, choose Crawlers.
  2. Choose Add crawler.

  1. For Crawler name, enter a name.
  2. Choose Next.

  1. For Crawler source type, select Data stores.
  2. For Repeat crawls of S3 data stores, select Crawl new folders only.
  3. Choose Next.

  1. For Choose a data store, choose S3.
  2. For Crawl data in, select Specified path in my account.
  3. For Include path, enter the path of the DataBrew output files.
  4. Choose Next.

  1. Select Choose an existing IAM role.
  2. For IAM role, choose or create a role that has access to the location of your DataBrew output.

If you don’t have such a role, your data platform admin can set one up for you.

  1. Choose Next.

  1. For Frequency, specify how often the crawler should check for new output files. For this post, we choose Run on demand. 

This normally aligns with the schedule you have set up in DataBrew.

  1. Choose Next.
  2. For Database, choose (or create) a database to publish the business view of your output files.

The crawler creates one or more data tables that users can browse and, if permissions allow, run select queries against.

  1. Choose Next.

  1. Review the summary page and choose Finish.
  2. Select your crawler and choose Run crawler.

When the crawler is complete, it details the number of tables it created and the time it took to discover and index your output dataset. The next step is to view and set up permissions in the Data Catalog.

Publishing and securing your DataBrew output files

In this post, we use the Lake Formation security and permission model enforced for data consumers using supported services. For more information, see Security and Access Control to Metadata and Data in Lake Formation.

  1. On the Lake Formation console, under Data catalog, choose Databases.
  2. Enter the database name you used when you created the crawler earlier (for this post, hospitalitydb).

 

  1. Select the database and choose View tables.

We can see that a table called results was created in this database.

We can do several things, such as grant or revoke items from the Actions menu, or choose the table name to inspect the schema.

As a data owner, I grant an analyst with the username lfanalyst select permissions to my dataset, and restrict access to only a limited set of columns. For this post, I exclude the userid and reviewid columns.

After I save this change, I can let the analyst know that they have access to my survey results.

Consumption access to DataBrew output files

The analyst user who has been granted limited access to this dataset can query the data using Amazon Athena, either via the console or external analytical tools that support the JDBC or ODBC drivers it provides.

If using the console, the analyst can choose the database hospitalitydb to view the results table.

 

If the analyst expands the results table to view the columns, userid and reviewid aren’t present.

The following screenshot shows the results of a SQL statement to browse a sample of the dataset.

Conclusion

DataBrew makes it easy for data owners, analysts, and data SMEs to easily curate and publish data of value to their organization data lake in a self-service manner.

This visual interface, combined with ML assistance to help you better understand and validate your data, allows all types of users to easily create, consume, and publish data into an organization’s data lake in a visual, no code, or low code way while working within an existing centralized governance model.


About the Author

Jason Hunter is a Principal Solutions Architect at AWS with a focus on Big Data & Analytics solutions. With over 20 years of experience in information technology, he helps customers architect and build highly scalable, secure and performant cloud-based data platforms, accelerating their journey to become more data driven.

 

 

 

 

Keeping your data lake clean and compliant with Amazon Athena

Post Syndicated from David Roberts original https://aws.amazon.com/blogs/big-data/keeping-your-data-lake-clean-and-compliant-with-amazon-athena/

With the introduction of CTAS support for Amazon Athena (see Use CTAS statements with Amazon Athena to reduce cost and improve performance), you can not only query but also create tables using Athena with the associated data objects stored in Amazon Simple Storage Service (Amazon S3). These tables are often temporary in nature and used to filter or aggregate data that already exists in another Athena table. Although this offers great flexibility to perform exploratory analytics, when tables are dropped, the underlying Amazon S3 data remains indefinitely. Over time, the accumulation of these objects can increase Amazon S3 costs, become administratively challenging to manage, and may inadvertently preserve data that should have been deleted for privacy or compliance reasons. Furthermore, the AWS Glue table entry is purged so there is no convenient way to trace back which Amazon S3 path was mapped to a deleted table.

This post shows how you can automate  deleting Amazon S3 objects associated with a table  after dropping it using Athena. AWS Glue is required to be the metadata store for Athena.

Overview of solution

The solution requires that the AWS Glue table record (database, table, Amazon S3 path) history is preserved outside of AWS Glue, because it’s removed immediately  after a table is dropped. Without this record, you can’t delete the associated Amazon S3 object entries after the fact.

When Athena CTAS statements  are issued, AWS Glue generates Amazon CloudWatch events that specify the database and table names. These events are available from Amazon EventBridge and can be used to trigger an AWS Lambda function (autoCleanS3) to fetch the new or updated Amazon S3 path from AWS Glue and write the database, table, and Amazon S3 path into an AWS Glue history table stored in Amazon DynamoDB (GlueHistoryDDB). When Athena drop table queries are detected, CloudWatch events are generated that trigger autoCleanS3 to look up the Amazon S3 path from GlueHistoryDDB and delete all related objects from Amazon S3.

Not all dropped tables should trigger Amazon S3 object deletion. For example, when you create a table using existing Amazon S3 data (not CTAS), it’s not advisable to automatically delete the associated Amazon S3 tables, because other analysts may have other tables referring to the same source data. For this reason, you must include a user-defined comment (--dropstore ) in the Athena drop table query to cause autoCleanS3 to purge the Amazon S3 objects.

Lastly, after objects are successfully deleted, the corresponding entry in GlueHistoryDDB  is updated for historical and audit purposes. The overall workflow is described in the following diagram.

The workflow contains the following steps:

  1. A user creates a table either via Athena or the AWS Glue console or API.
  2. AWS Glue generates a CloudWatch event, and an EventBridge rule triggers the Lambda function.
  3. The function creates an entry in DynamoDB containing a copy of the AWS Glue record and Amazon S3 path.
  4. The user drops the table from Athena, including the special comment --dropstore.
  5. The Lambda function fetches the dropped table entry from DynamoDB, including the Amazon S3 path.
  6. The function deletes data from the Amazon S3 path, including manifest files, and marks the DynamoDB entry as purged.

Walkthrough overview

To implement this solution, we complete the following steps:

  1. Create the required AWS Identity and Access Management (IAM) policy and role.
  2. Create the AWS Glue history DynamoDB table.
  3. Create the Lambda autoCleanS3 function.
  4. Create the EventBridge rules.
  5. Test the solution.

If you prefer to use a preconfigured CloudFormation template, launch one of the following stacks depending on your Region.

Region Launch Button
us-east-1 (N. Virginia)
us-west-2 (Oregon)
eu-west-1 (Ireland)

Prerequisites

Before implementing this solution, create an AWS Glue database and table with the data residing in  Amazon S3. Be sure your user has the necessary permissions to access Athena and  perform CTAS operations writing  in a sample Amazon S3 location.

For more information about building a data lake, see Build a Data Lake Foundation with AWS Glue and Amazon S3.

Creating an IAM policy and role

You need to first create the required IAM policy for the Lambda function role to use to query AWS Glue and write to DynamoDB.

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the following code (update the Region, account ID, and S3 bucket accordingly, and the table name GlueHistoryDDB if you choose to change it):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListObjectsV2",
                    "s3:DeleteObjectVersion",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<athena-query-results-s3-bucket>",
                    "arn:aws:s3:::<athena-query-results-s3-bucket>/*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "dynamodb:PutItem",
                    "dynamodb:Scan",
                    "dynamodb:Query",
                    "dynamodb:UpdateItem"
                ],
                "Resource": [
                    "arn:aws:dynamodb:<region>:<accountId>:table/GlueHistoryDDB"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "glue:GetTable"
                ],
                "Resource": [
                    "arn:aws:glue:<region>:<accountId>:catalog",
                    "arn:aws:glue:<region>:<accountId>:database/*",
                    "arn:aws:glue:<region>:<accountId>:table/*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "logs:CreateLogGroup"
                ],
                "Resource": [
                    "arn:aws:logs:<region>:<accountId>:*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "logs:DescribeLogStreams"
                ],
                "Resource": [
                    "arn:aws:logs:<region>:<accountId>:log-group:/aws/lambda/autoCleanS3:*"
                ],
                "Effect": "Allow"
            }
        ]
    }

  1. Choose Review policy.
  2. For Name, enter autoCleanS3-LambdaPolicy.
  3. For Description, enter Policy used by Lambda role to purge S3 objects when an Amazon Athena table is dropped.
  4. Choose Create policy.

Next, you need to create an IAM role and attach this policy.

  1. On the IAM console, choose Roles.
  2. Choose Create role.
  3. Choose AWS service.
  4. Choose Lambda.
  5. Choose Next: Permissions.

  1. For Filter policies, enter autoCleanS3-LambdaPolicy.
  2. Choose Next: Tags.
  3. Choose Next: Review.
  4. For Role name, enter autoCleanS3-LambdaRole.
  5. For Description, enter Role used by Lambda to purge S3 objects when an Amazon Athena table is dropped.
  6. Choose Create role.

Creating the AWS Glue history DynamoDB table

You use this DynamoDB table to hold the current and historical list of AWS Glue tables and their corresponding Amazon S3 path. Create the table as follows:

  1. On the DynamoDB console, choose Dashboard.
  2. Choose Create table.
  3. For Table name, enter GlueHistoryDDB.
  4. For Partition key, enter database (leave type as String).
  5. Select Add sort key.
  6. Enter table_date (leave type as String).
  7. For Table settings, select Use default settings.
  8. Choose Create.

The following table summarizes the GlueHistoryDDB table attributes that the Lambda function creates.

Column Type Description
database partition key The name of the AWS Glue database.
table_date sort key A composite attribute of AWS Glue table name plus date created. Because the same database and table name can be created again, the date must be used to ensure uniqueness.
created_by attribute The user or Amazon EC2 instance ARN from which the table was created.
owner attribute The owner of the table or account number.
purged attribute A boolean indicating whether the Amazon S3 objects have been deleted (True/False).
s3_path attribute The Amazon S3 path containing objects associated with the table.
table attribute The AWS Glue table name.
update_time attribute The last time the table was updated (the Amazon S3 path changed or objects purged).
view_sql attribute The view DDL if a view was created.

Creating the Lambda function autoCleanS3

A CloudWatch event triggers the Lambda function autoCleanS3 when a new table is created, updated, or dropped. If the --dropstore keyword is included in the Athena query comments, the associated Amazon S3 objects are also removed.

  1. On the Lambda console, choose Create function.
  2. Select Author from scratch.
  3. For Function name¸ enter autoCleanS3.
  4. For Runtime, choose Python 3.8.
  5. Under Permissions, for Execution role, select Use an existing role.
  6. Choose the role you created (service-role/autoCleanS3-LambdaRole).
  7. Choose Create function.
  8. Scroll down to the Function code section.
  9. If using Region us-west-2, on the Actions menu, choose Upload a file to Amazon S3.

  1. Enter the following:
    https://aws-bigdata-blog.s3.amazonaws.com/artifacts/aws-blog-keep-your-data-lake-clean-and-compliant-with-amazon-athena/autoCleanS3.zip

  2. Choose Save.

If using a Region other than us-west-2, download the Lambda .zip file locally. Then choose Upload a .zip file and choose the file from your computer to upload the Lambda function.

  1. In the Environment variables section, choose Edit.
  2. Choose Add environment variable.
  3. Enter the following key-values in the following table (customize as desired):
Key Value Purpose
Athena_SQL_Drop_Phrase --dropstore String to embed in Athena drop table queries to cause associated Amazon S3 objects to be removed
db_list

Comma-separated regex filter

<.*>

Allows you to limit which databases may contain tables that autoCleanS3 is allowed to purge
ddb_history_table GlueHistoryDDB The name of the AWS Glue history DynamoDB table
disable_s3_cleanup False If set to True, it disables the Amazon S3 purge, still recording attempts in the history table
log_level INFO Set to DEBUG to troubleshoot if needed

You must use a standard regex expression, which can be a simple comma-separated list of the AWS Glue databases that you want autoCleanS3 to evaluate.

 The following table shows example patterns for db_list.

Example Regex Pattern Result
.* Default, includes all databases
clickstream_web, orders_web, default Includes only clickstream_web, orders_web, default
.*_web Includes all databases having names ending in _web
.*stream.* Includes all databases containing stream in their name

For a complete list or supported patterns, see https://docs.python.org/3/library/re.html#re.Pattern.match

  1. Choose Save.

Creating EventBridge rules

You need to create EventBridge rules that invoke your Lambda function whenever Athena query events and AWS Glue CreateTable and UpdateTable events are generated.

Creating the Athena event rule

To create the Athena query event rule, complete the following steps:

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter autoCleanS3-AthenaQueryEvent.
  3. For Description, enter Amazon Athena event for any query to trigger autoCleanS3.
  4. For Define pattern, choose Event pattern.
  5. For Event matching pattern, choose Custom pattern.
  6. For Event pattern, enter the following:
    {
    	"detail-type": [
    		"AWS API Call via CloudTrail"
    	],
    	"source": [
    		"aws.athena"
    	],
    	"detail": {
    		"eventName": [
    			"StartQueryExecution"
    		]
    	}
    }

  1. Choose Save.
  2. For Select targets, choose Lambda function.
  3. For Function¸ choose autoClean3.
  4. Choose Create.

Creating the AWS Glue event rule

To create the AWS Glue table event  rule, complete the following steps:

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter autoCleanS3-GlueTableEvent.
  3. For Description, enter AWS Glue event for any table creation or update to trigger autoCleanS3.
  4. For Define pattern, choose Event pattern.
  5. For Event matching pattern, choose Custom pattern.
  6. For Event pattern, enter the following:
    {
    	"detail-type": [
    		"Glue Data Catalog Database State Change"
    	],
    	"source": [
    		"aws.glue"
    	],
    	"detail": {
    		"typeOfChange": [
    			"CreateTable",
    			"UpdateTable"
    		]
    	}
    }

  1. Choose Save.
  2. For Select targets, choose Lambda function.
  3. For Function¸ choose autoClean3.
  4. Choose Create.

You’re finished!

Testing the solution

Make sure you already have a data lake with tables defined in your AWS Glue Data Catalog and permission to access Athena. For this post, we use NYC taxi ride data. For more information, see Build a Data Lake Foundation with AWS Glue and Amazon S3.

  1. Create a new table using Athena CTAS.

Next, verify that the entry appears in the new GlueHistoryDDB table.

  1. On the DynamoDB console, open the GlueHistoryDDB table.
  2. Choose Items.
  3. Confirm the s3_path value for the table.

You can also view  the Amazon S3 table path and objects associated with the table.

  1. On the Amazon S3 console, navigate to the s3_path found in GlueHistoryDDB.
  2. Confirm the table and path containing the data folder and associated manifest and metadata objects.

  1. Drop the table using the keyword --dropstore.

  1. Check the Amazon S3 path to verify both the table folder and associated manifest and metadata files have been removed.

You can also see the purged attribute for the entry in GlueHistoryDDB is now set to True, and update_time has been updated, which you can use if you ever need to look back and understand when a purge event occurred.

Considerations

The Lambda timeout may need to be increased for very large tables, because the object deletion operations may not complete in time.

To prevent accidental data deletion, it’s recommended to carefully limit which databases may participate (Lambda environment variable db_list) and to enable versioning on the Athena bucket path and set up Amazon S3 lifecycle policies to eventually remove older versions. For more information, see Deleting object versions. 

Conclusion

In this post, we demonstrated how to automate the process of  deleting Amazon S3 objects associated with dropped AWS Glue tables. Deleting Amazon S3 objects that are no longer associated with an AWS Glue table reduces ongoing storage expense, management overhead, and unnecessary exposure of potentially private data no longer needed within the organization, allowing you to meet regulatory requirements.

This serverless solution monitors Athena and AWS Glue table creation and drop events via CloudWatch, and triggers Lambda to perform Amazon S3 object deletion. We use DynamoDB to store the audit history of all AWS Glue tables that have been dropped over time. It’s strongly recommended to enable Amazon S3 bucket versioning to prevent accidental data deletion.

To restore the Amazon S3 objects for the deleted table, you first identify the s3_path value for the relevant table entry in GlueHistoryDDB and either copy or remove the delete marker from objects in that path. For more information, see How do I undelete a deleted S3 object?


About the Author

David Roberts is a Senior Solutions Architect at AWS. His passion is building efficient and effective solutions on the cloud, especially involving analytics and data lake governance. Besides spending time with his wife and two daughters, he likes drumming and watching movies, and is an avid video gamer.

Optimizing Spark applications with workload partitioning in AWS Glue

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/optimizing-spark-applications-with-workload-partitioning-in-aws-glue/

AWS Glue provides a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs. This posts discusses a new AWS Glue Spark runtime optimization that helps developers of Apache Spark applications and ETL jobs, big data architects, data engineers, and business analysts scale their data processing and batch jobs running on AWS Glue automatically.

Customers use Spark for a wide variety of ETL and analytics workloads on datasets with diverse characteristics. They want to ensure fast and error-free execution of these workloads. Errors in Spark applications commonly arise from inefficient Spark scripts, distributed in-memory execution of large-scale transformations, and dataset abnormalities. Spark’s distributed execution uses a Master/Slave architecture with driver and executor processes perform parallel computation over partitions of input dataset. Inspite of this data-parallel architecture, Spark applications commonly run into out-of-memory (OOM) exceptions on driver and executors due to skew in input data, large number of input files, or large joins and shuffle operations.

In this blog post, we introduce a new Spark runtime optimization on Glue – Workload/Input Partitioning for data lakes built on Amazon S3. Customers on Glue have been able to automatically track the files and partitions processed in a Spark application using Glue job bookmarks. Now, this feature gives them another simple yet powerful construct to bound the execution of their Spark applications. Bounded execution allows customers to partition their workloads by limiting the maximum number of files or dataset size processed incrementally within Glue Spark applications that can be orchestrated sequentially or in parallel.

Specifically, this feature makes it easy for customers to make their complex ETL pipelines significantly more resilient to errors. This is achieved by breaking down the monolithic Spark applications processing a large backlog of tens to hundreds of millions of files into simpler modular Spark applications that can process a bounded number of files or dataset size incrementally.

This Spark runtime optimization also works together with existing Glue features such as push down predicates, AWS Glue S3 lister, grouping, exclusions for S3 paths, and other optimizations .

Setup and Use Cases

One of the common use cases of data warehousing is processing a large number of records from a fact table (employees, sales or items) and joining the same with multiple dimension tables (departments, stores, catalog), and loading the output to the final destination. The following diagram illustrates an ETL architecture used commonly by several customers.

 

ETL pipelines using Apache Spark applications for this use case or similar backlog ingestion can encounter 3 common errors. First, the Spark driver can run out-of-memory while listing millions of files in S3 for the fact table. Second, the Spark executors can run out-of-memory if there is skew in the dataset resulting in imbalanced shuffles or join operations across the different partitions of the fact table. Third, any data abnormality or malformed records can cause the Spark application to fail during any of the three stages – read from S3, application of join transform, or write to S3. In this blog post, we would show how workload partitioning can help you mitigate these errors by bounding the execution of the Spark application, and also detect abnormalities or skews in your data.

Our setup uses a fact table consisting of employee badge access data stored in S3 with 1.34 million objects and files, and a record count of 1.3 billion. This dataset is joined with two other datasets (dimension tables – employee and badge data), which are smaller in size, one with 107 records and another with a record count of 12,249 in 10 files. We use native Spark 2.4 and Python 3. We will monitor the memory profile of Spark driver and executors over time. We find that both the Spark driver and executors get prone to OOM exceptions. We would use the AWS Glue Workload Partitioning feature to show how we can automatically mitigate those errors automatically with minimal changes to the Spark application.

We enable AWS Glue job bookmarks with the use of AWS Glue Dynamic Frames as it helps to incrementally load unprocessed data from S3. Vanilla Spark applications using Spark Dataframes do not support Glue job bookmarks and therefore can not incrementally load data out-of-the-box. We find that Spark applications using both Glue Dynamic Frames and Spark Dataframes can run into the above 3 error scenarios while loading tables with large number of input files or distributed transformations such as join resulting in large shuffles. Following is the code snippet of the Spark application used for our setup.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
## args = getResolvedOptions(sys.argv, ['JOB_NAME', 'year_partition_key'])
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "spark-oom-test", table_name = "oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx = "datasource0")
##datasource0 schema : |-- BadgeID|-- EmployeeID|-- Date-Month|-- Date-Day|-- Date-Year|-- Hours_Logged|-- partition_2|-- partition_1|-- partition_3|-- partition_0
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "employee-productivity-database", table_name = "lake-formation-workshop_hr_employees", transformation_ctx = "datasource0")
##datasource1 schema: |-- job_id|-- employee_id|-- salary|-- hire_date|-- department_id|-- last_name|-- email|-- phone_number|-- first_name|-- manager_id|-- commission_pct
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "employee-productivity-database", table_name = "dynamodb", transformation_ctx = "datasource2")
##datasource2 schema:|-- col_dateyear|— col_dateday|-- employeeid|-- badgeid|-- hours_logged|-- col_datemonth
## ApplyMappings to check and convert the data types to avoid type mismatch during join operation
datasource_0 = ApplyMapping.apply(frame = datasource0, mappings = [("badgeid", "string", "badgeid", "string"), ("employeeid", "long", "employeeid", "long"), ("date-month", "string", "date-month", "string"), ("date-day", "int", "date-day", "int"), ("date-year", "int", "date-year", "int"), ("hours_logged", "int", "hours_logged", "int"), ("partition_0", "string", "partition_0", "string"), ("partition_1", "string", "partition_1", "string"), ("partition_2", "string", "partition_2", "string"), ("partition_3", "string", "partition_3", "string")], transformation_ctx = "applymapping1")
datasource_1 = ApplyMapping.apply(frame = datasource1, mappings = [("job_id", "string", "job_id", "string"), ("employee_id", "int", "employee_id", "int"), ("salary", "double", "salary", "double"), ("hire_date", "string", "hire_date", "string"), ("department_id", "long", "department_id", "long"), ("last_name", "string", "last_name", "string"), ("email", "string", "email", "string"), ("phone_number", "string", "phone_number", "string"), ("first_name", "string", "first_name", "string"), ("commission_pct", "double", "commission_pct", "double"), ("manager_id", "long", "manager_id", "long")], transformation_ctx = "applymapping1")
datasource_2 = ApplyMapping.apply(frame = datasource2, mappings = [("col_dateyear", "int", "col_dateyear", "int"), ("col_dateday", "int", "col_dateday", "int"), ("employeeid", "int", "employeeid", "int"), ("badgeid", "string", "badgeid", "string"), ("hours_logged", "int", "hours_logged", "int"), ("col_datemonth", "string", "col_datemonth", "string")], transformation_ctx = "applymapping1")
## Apply Join and drop fields that we don't need in target dataset
datasource3 = Join.apply(datasource_0, Join.apply(datasource_1, datasource_2, 'employee_id', 'employeeid'), 'badgeid','badgeid').drop_fields(['job_id', 'employee_id', 'salary', 'hire_date', 'department_id', 'last_name', 'email', 'phone_number', 'first_name', 'commission_pct', 'manager_id', 'col_dateyear', 'col_dateday',  'col_datemonth',  'partition_2', 'partition_1', 'partition_3', 'partition_0'])
## @type: ApplyMapping
## @return: applymapping1
## @inputs: [frame = datasource3]
applymapping1 = ApplyMapping.apply(frame = datasource3, mappings = [("badgeid", "decimal(19,0)", "badgeid", "decimal(19,0)"), ("employeeid", "long", "employeeid", "long"), ("date-month", "string", "date-month", "string"), ("date-day", "int", "date-day", "int"), ("date-year", "int", "date-year", "int"), ("hours_logged", "int", "hours_logged", "int")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://agoswami0915-spark-oom-test-oct12/target-tablle"}, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://agoswami0915-spark-oom-test-oct12/target-tablle"}, format = "json", transformation_ctx = "datasink2")
job.commit()

We have used AWS Glue crawlers to infer the schema of the datasets and create the AWS Glue Data Catalog objects referred in the Spark application. The sample Spark code creates DynamicFrames for each dataset in an S3 bucket, joins the three DynamicFrames, and writes the transformed data to a target location in an S3 bucket.

Spark application without bounded execution

When we ran the Spark application to join three datasets with their common keys, it ran for about 4 hours to read and iterate over the large dataset. It eventually failed with a Spark driver OOM error:

Exception in thread "spark-listener-group-appStatus" 
java.lang.OutOfMemoryError: Java heap space

When checking the memory profile of the driver and executors (see the following graph) using Glue job metrics, it’s apparent that the driver memory utilization gradually increases over the 50% threshold as it reads data from a large data source, and finally goes out of memory while trying to join with the two smaller datasets.

Rerunning the Spark application with bounded execution

To overcome this Spark driver OOM, we modified the previous code to use workload partitioning by simply including the boundedFiles parameter as an additional_options (see the following code). In this changed code, we used the job to process 100,000 files from datasource0. Bounded execution works in conjunction with job bookmarks. Job bookmarks tracks processed files and partitions based on timestamp and path hashes. In addition, bounded execution applies filters to track files and partitions with a specified bound on the number of files or the dataset size.

datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"spark-oom-test", table_name =
"oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx =
"datasource0", additional_options = {"boundedFiles" : "100000"})

After this change, the driver memory utilization stayed consistently low, with a peak utilization of about 26%, as seen in the following graph (blue line). However, the job encountered heavy memory usage by the executors during the join operations resulting from the shuffle (different colored lines showing high executor memory usage). This caused the job to eventually fail after four retries with an executor OOM.

Detecting OOM issues: Data skews and straggler tasks

In many cases, customer’s Spark jobs can run for hours before finally failing with errors. Instead of waiting for the jobs to fail after running for long hours and then analyze the root cause, we can check the job progress using Glue’s job metrics available through Amazon CloudWatch, or the Spark UI to identify straggler tasks that could potentially cause failures.

With Spark UI, we examined the Spark execution timeline and found that some of the executors are straggling with long-running tasks, resulting in eventual failures of those executors (Executor IDs 19, 11, 6, and 22 in the following event timeline graph)

Looking into the executor summary details, it was evident that these four executors contributed to many failed tasks during the job.

Diving deep into the executors revealed that the tasks are straggling during the shuffle phase, taking the longest runtime, and contributing to most of the job runtime. The following event timeline shows a consistent pattern of failures for all four executors performing straggler tasks that started with Executor 19.

In this scenario, the job ran for more than 10 hours before finally failing due to an executor OOM. Looking into the trend of the job from Spark UI or memory profiles from CloudWatch shows that executors in this job were involved in straggler tasks and this job was potentially on a path to failure. Instead of waiting for the job to run for hours and waste valuable resources, the job can be cancelled after looking at these trends after Executor 19 failed or automatically after a job-level timeout.

The first failed stage from the Spark UI shows Executor 19 was involved in many failed tasks and finally timed out and was replaced by another executor by the Spark driver.

Finally, investigating the details of the final stage of the job that failed showed that Executor 22, like the other three executors (19,11, and 6), was involved in straggler tasks during the shuffle phase and eventually failed with an OOM error.

Rerunning the job with a tighter bound

Now, we chang the boundedFiles parameter value to process 50,000 files:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"spark-oom-test", table_name = 
"oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx = 
"datasource0", additional_options = {"boundedFiles" : "50000"})

The job ran successfully without any driver or executor memory issues.

Considering that each input file is about 1 MB size in our use case, we concluded that we can process about 50 GB of data from the fact dataset and join the same with two other datasets that have 10 additional files.

You can further convert AWS Glue DynamicFrames to Spark DataFrames and also use additional Spark transformations.

Running jobs in parallel on different partitions with tighter bounds

In production scenarios, data engineering pipelines generally have strict SLAs to complete data processing with ETL. For example, if we need to complete our job in 1.5 hours and process 50,000 files from the input dataset, the previous job would miss the SLA easily because the job takes more than 2 hours to complete. Another scenario could be if we have to process 100,000 input files, which might take more than 4 hours to finish if we run the same job sequentially, with each run processing 50,000 files with bounded execution.

To address these issues, we can optimize the pipeline by creating multiple copies of the job. We can use Glue’s push down predicates to process a subset of the data from different S3 partitions with bounded execution. In the following code, we create two copies of the same job that we ran earlier, but with the same boundedFiles parameter for both jobs to process 50,000 files. In one of the jobs, we pass a push down predicate with an even number as the partition value. In the other job, we process odd numbered partition values.

The following code shows the job with an even partition value:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "spark-oom-
test", table_name = "oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx 
= "datasource0", push_down_predicate = "(partition_0 == '2020')", 
additional_options = {"boundedFiles" : "50000"})

The following code shows the job with an odd partition value:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "spark-oom-
test", table_name = "oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx 
= "datasource0", push_down_predicate = "(partition_0 == '2019')", 
additional_options = {"boundedFiles" : "50000"})

On the AWS Glue console, we can create an AWS Glue Workflow to run both jobs in parallel. Because our input files have unique keys, even when running the jobs in parallel, the output doesn’t have any duplicates. If the input data can have duplicate keys, but the downstream application expects only unique records, we need to create a successor data deduplication job in the workflow to meet the business requirement. The following screenshot shows our workflow running both jobs in parallel.

After running the workflow, we can go to the AWS Glue console and CloudWatch page to check the progress of the jobs triggered by the workflow.

We find that both jobs started and ended at the same time (within 2 hours), and were triggered by the same workflow trigger, bounded-exec-parallel-run-1. Both of them had safe Spark driver and executor memory usage throughout the job execution.

Conclusion

AWS Glue effectively manages Spark memory while running Spark applications. The workload partitioning feature provides the ability to bound execution of Spark applications and effectively improve the reliability of ETL pipelines susceptible to encounter errors arising due to large input sources, large-scale transformations, and data skews or abnormalities. Combining this feature with other optimization mechanisms, including push down predicates, can help avoid these issues and meet data pipeline SLAs for your ETL jobs.


About the Authors

Avijit Goswami is a Principal Solutions Architect at AWS, helping startup customers become tomorrow’s enterprises using AWS services. He is part of the Analytics Specialist community at AWS. When not at work, Avijit likes to cook, travel, hike, watch sports, and listen to music.

 

 

Xiaorun Yu is a Software Development Engineer at AWS Glue who works on Glue Spark runtime. When not at work, Xiaorun enjoys hiking around the Bay Area and trying local restaurants.

 

 

 

Mohit Saxena is a Technical Lead Manager at AWS Glue. His team works on Glue’s Spark runtime to enable new customer use cases for efficiently managing data lakes on AWS and optimize Apache Spark for performance and reliability.

Data preprocessing for machine learning on Amazon EMR made easy with AWS Glue DataBrew

Post Syndicated from Kartik Kannapur original https://aws.amazon.com/blogs/big-data/data-preprocessing-for-machine-learning-on-amazon-emr-made-easy-with-aws-glue-databrew/

The machine learning (ML) lifecycle consists of several key phases: data collection, data preparation, feature engineering, model training, model evaluation, and model deployment. The data preparation and feature engineering phases ensure an ML model is given high-quality data that is relevant to the model’s purpose. Because most raw datasets require multiple cleaning steps (such as addressing missing values and imbalanced data) and numerous data transformations to produce useful features ready for model training, these phases are often considered the most time-consuming in the ML lifecycle. Additionally, producing well-prepared training datasets has typically required extensive knowledge of multiple data analysis libraries and frameworks. This has presented a barrier to entry for new ML practitioners and reduced iteration speed for more experienced practitioners.

In this post, we show you how to address this challenge with the newly released AWS Glue DataBrew. DataBrew is a visual data preparation service, with over 250 pre-built transformations to automate data preparation tasks, without the need to write any code. We show you how to use DataBrew to analyze, prepare, and extract features from a dataset for ML, and subsequently train an ML model using PySpark on Amazon EMR. Amazon EMR is a managed cluster platform that provides the ability to process and analyze large amounts of data using frameworks such as Apache Spark and Apache Hadoop.

For more details about DataBrew, Amazon EMR, and each phase of the ML lifecycle, see the following:

Solution overview

The following diagram illustrates the architecture of our solution.

Loading the dataset to Amazon S3

We use the Census Income dataset from the UCI Machine Learning Repository to train an ML model that predicts whether a person’s income is above $50,000 a year. This multivariate dataset contains 48,842 observations and 14 attributes, such as age, nature of employment, educational background, and marital status.

For this post, we download the Adult dataset. The data folder contains five files, of which adult.data and adult.test are the train and test datasets, and adult.names contains the column names and description. Because the raw dataset doesn’t contain the column names, we add them to the first row of the train and test datasets and save the files with the extension .csv:

Column Names
age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race, sex,capital-gain,capital-loss,hours-per-week,native-country,target

Create a new bucket in Amazon Simple Storage Service (Amazon S3) and upload the train and test data files under a new folder titled raw-data.

Data preparation and feature engineering using DataBrew

In this section, we use DataBrew to explore a sample of the dataset uploaded to Amazon S3 and prepare the dataset to train an ML model.

Creating a DataBrew project

To get started with DataBrew, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create a project.
  3. For Name, enter census-income.
  4. For Attached recipe, choose Create a new recipe.
  5. For Recipe name, enter census-income-recipe.
  6. For Select a dataset, select New dataset.
  7. For Dataset name¸ enter adult.data.

  1. Import the train dataset adult.data.csv from Amazon S3.
  2. Create a new AWS Identity and Access Management (IAM) policy and IAM role by following the steps on the DataBrew console, which provides DataBrew the necessary permissions to access the source data in Amazon S3.
  3. In the Sampling section, for Type, choose Random rows.
  4. Select 1,000.

Exploratory data analysis

The first step in the data preparation phase is to perform exploratory data analysis (EDA). EDA allows us to gain an intuitive understanding of the dataset by summarizing its main characteristics. Example outputs from EDA include identifying data types across columns, plotting the distribution of data points, and creating visuals that describe the relationship between columns. This process informs the data transformations and feature engineering steps that you need to apply prior to building an ML model.

After you create the project, DataBrew provides three different views of the dataset:

  • Grid view – Presents the 15 columns and first 1,000 rows sampled from the dataset and the distribution of data across each column
  • Schema view – In addition to information in the grid view, presents information about the data types (such as double, integer, or string) and the data quality that indicates the presence of missing or invalid values
  • Data profile view – Supported by a data profile job, generates summary statistics such as quartiles, standard deviation, variance, most frequently occurring values, and the correlation between columns

The following screenshot shows our view of the dataset.

Each view presents a unique piece of information that helps us gain a better understanding of the dataset. For instance, in the grid view, we can observe the distribution of data across the 15 columns and spot erroneous data points, such as those with ? in the workclass, occupation, or native-country columns.

In the schema view, we can observe six columns with continuous data, nine columns with categorical or binary data, and no missing or invalid observations in the sample of our dataset. The columns with continuous data also contain the corresponding minimum, maximum, mean, median, and mode values represented as a box plot.

In the data profile view, after running a data profile job, we can observe the summary statistics from the first 20,000 rows, such as the five-number summary, measures of central tendency, variance, skewness, kurtosis, correlations, and the most frequently occurring values in each column. For instance, we can combine the information from the grid view and the data profile view to replace erroneous data points such as ? by the most frequently occurring value in that column as a form of data cleaning. To run a data profile job on more than 20,000 rows, request for a limit increase at [email protected]

As part of the EDA phase, we can look at the distribution of data in the target column, which represents whether a person’s income is above $50,000 per year. The ratio of people whose income is greater than $50,000 per year to those whose income is less than or equal to $50,000 per year is 1:3, indicating that the distribution of the target classes is not imbalanced.

Building a set of data transformation steps and publishing a recipe

Now that we have an intuitive understanding of the dataset, let’s build out the data transformation steps. Based on our EDA, we replace the ? observation with the most frequently occurring value in each column.

  1. Choose the Replace value or pattern transformation.
  2. Replace ? with Private in the workclass column.
  3. Replace ? with United-States in the native-country column.

The occupation column also contains observations with ?, but the data points are spread across categories without a clear frequently occurring category. Therefore, we can categorically encode the observations in the occupation column, including those with ? observation, thereby treating ? as a separate category. The occupation column in the adult.data training dataset contains 15 categories, of which Protective-serv, Priv-house-serv, and Armed-Forces occur infrequently. To avoid excessive granularity in ML modeling, we can group these three categories into a single category named Other.

During ML model evaluation and prediction, we can also map categories that the model hasn’t encountered during model training to the Other category.

With that as the background, let’s apply the categorical mapping transformation to only the top 12 distinct values.

  1. Select Map top 12 values.
  2. Select Map values to numeric values.

This selects the top 12 categories and combines the other categories into a single category named Other. We now have a new column named occupation_mapped.

  1. Delete the occupation column to avoid redundancy.
  2. Similarly, apply the categorical mapping transformation to the top five values in the workclass column and the top one value in the native-country Remember to select Map values to numeric values.

This groups the remaining categories into a single category named Other.

  1. Delete the columns workclass and native-country.

The other four columns with categorical data—marital-status, relationship, race, and sex—have few categories with most of them occurring frequently. Let’s apply the categorical mapping transformation to these columns as well.

  1. Apply categorical mapping, with the following differences:
    1. Select Map all values.
    2. Select Map values to numeric values.
  2. Delete the original columns to avoid redundancy.
  3. Delete the fnlwgt column, because it represents the sampling weight and isn’t related to the target
  4. Delete the education column, because it has already been categorically mapped to education-num.
  5. Map the target column to numeric values, where income less than or equal to $50,000 per year is mapped to class 0 and income greater than $50,000 per year is mapped to class 1.
  6. Rename the destination column to label in order to align with our downstream PySpark model training code.

  1. Delete the original target column.

The data preparation phase is now complete, and the set of 20 transformations that consist of data cleaning and categorical mapping is combined into a recipe.

Because the data preparation and ML model training phases are highly iterative, we can save the set of data transformation steps applied by publishing the recipe. This provides version control, and allows us to maintain the data transformation steps and experiment with multiple versions of the recipe in order to determine the version with the best ML model performance. For more information about DataBrew recipes, see Creating and using AWS Glue DataBrew recipes.

Creating and running a DataBrew recipe job

The exploratory data analysis phase helped us gain an intuitive understanding of the dataset, from which we built a recipe to prepare and transform our data for ML modeling. We have been working with a random sample of 1,000 rows from the adult.data training dataset, and we need to apply the same set of data transformation steps to the over 32,000 rows in the adult.data dataset. A DataBrew recipe job provides the ability to scale the transformation steps from a sample of data to the entire dataset. To create our recipe job, complete the following steps:

  1. On the DataBrew console, choose Jobs.
  2. Choose Create recipe job.
  3. For Job name, enter a name.
  4. Create a new folder in Amazon S3 (s3://<YOUR-S3-BUCKET-NAME>/transformed-data/) for the recipe job to save the transformed dataset.

The recipe job should take under 2 minutes to complete.

Training an ML model on the transformed dataset using PySpark

With the data transformation job complete, we can use the transformed dataset to train a binary classification model to predict whether a person’s income is above $50,000 per year.

  1. Create an Amazon EMR notebook.
  2. When the notebook’s status is Ready, open the notebook in a JupyterLab or Jupyter Notebook environment.
  3. Choose the PySpark kernel.

For this post, we use Spark version 2.4.6.

  1. Load the transformed dataset into a PySpark DataFrame within the notebook:
    train_dataset = spark.read.csv(path='s3://<YOUR-S3-BUCKET-NAME>/transformed-data/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>_part00000.csv', header=True, inferSchema=True)
    print('The transformed train dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=train_dataset.count(), n_cols=len(train_dataset.columns)))
    The transformed train dataset has 32561 rows and 13 columns

  2. Inspect the schema of the transformed dataset:
    train_dataset.printSchema()
    root 
    |-- age: integer (nullable = true) 
    |-- workclass_mapped: double (nullable = true) 
    |-- education-num: double (nullable = true) 
    |-- marital_status_mapped: double (nullable = true) 
    |-- occupation_mapped: double (nullable = true) 
    |-- relationship_mapped: double (nullable = true) 
    |-- race_mapped: double (nullable = true) 
    |-- sex_mapped: double (nullable = true) 
    |-- capital-gain: double (nullable = true) 
    |-- capital-loss: double (nullable = true) 
    |-- hours-per-week: double (nullable = true) 
    |-- native_country_mapped: double (nullable = true) 
    |-- label: double (nullable = true)

Of the 13 columns in the dataset, we use the first 12 columns as features for the model and the label column as the final target value for prediction.

  1. Use the VectorAssembler method within PySpark to combine the 12 columns into a single feature vector column, which makes it convenient to train the ML model:
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import VectorAssembler
    stages = []
    arr_features = train_dataset.columns[:-1]
    # Transform input features into a vector using VectorAssembler
    features_vector_assembler = VectorAssembler(inputCols=arr_features, outputCol='features')
    stages.append(features_vector_assembler)
    # Run the train dataset through the pipeline
    pipeline = Pipeline(stages=stages)
    train_dataset_pipeline = pipeline.fit(train_dataset).transform(train_dataset)
    # Select the feature vector and label column
    train_dataset_pipeline = train_dataset_pipeline.select('features', 'label')

  2. To estimate the model performance on the unseen test dataset (test) split the transformed train dataset (train_dataset_pipline) into 70% for model training and 30% for model validation:
    df_train, df_val = train_dataset_pipeline.randomSplit([0.7, 0.3], seed=42)
    print('The train dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=df_train.count(), n_cols=len(df_train.columns)))
    print('The validation dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=df_val.count(), n_cols=len(df_val.columns)))
    The train dataset has 22841 rows and 2 columns
    The validation dataset has 9720 rows and 2 columns

  3. Train a Random Forest classifier on the training dataset df_train and evaluate its performance on the validation dataset df_val using the area under the ROC curve (AUC), which is a measure of model performance for binary classifiers at different classification thresholds:
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    # Train a Random Forest classifier
    rf_classifier = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
    model = rf_classifier.fit(df_train)
    # Model predictions on the validation dataset
    preds = model.transform(df_val)
    # Evaluate model performance
    evaluator = BinaryClassificationEvaluator()
    auc = evaluator.evaluate(preds, {evaluator.metricName: "areaUnderROC"})
    print('Validation AUC: {}'.format(auc))
    Validation AUC: 0.8909629419656796

A validation AUC of 0.89 indicates strong model performance for the classifier. Because the data transformation and model training phases are highly iterative in nature, in order to improve the model performance, we can experiment with different data transformation steps, additional features, and other classification models. After we achieve a satisfactory model performance, we can evaluate the model predictions on the unseen test dataset, adult.test.

Evaluating the ML model on the test dataset

In the data transformation and ML model training sections, we have developed a reusable pipeline that we can use to evaluate the model predictions on the unseen test dataset.

  1. Create a new DataBrew project and load the raw test dataset (adult.test.csv) from Amazon S3, as we did in the data preparation section.
  2. Import the recipe we created earlier with the 20 data transformation steps to apply them on the adult.test dataset.



We can observe that all the columns have been transformed successfully, apart from the label column, which contains null values. This is because the adult.test dataset contains messy data in the target column, namely an extra punctuation mark at the end of the classes <=50k and >50k. To correct this, we can remove the last step of the recipe.

  1. Delete the column target.
  2. Edit the prior step in creating a categorical map to account for the extra punctuation mark.
  3. Delete the original target column to avoid redundancy.
  4. Create and run the recipe job to transform and store the over 16,000 rows in the adult.test dataset under s3://<YOUR-S3-BUCKET-NAME>/transformed-data/.

This job should take approximately 1 minute to complete.

When the train and test datasets don’t have any variation in the types of categories, we can create and run a recipe job directly from the DataBrew console, without having to create a separate project.

  1. When the data transformation job on the adult.test dataset is complete, load the transformed dataset into a PySpark dataframe to evaluate the performance of the binary classification model:
    # Load the transformed test dataset
    test_dataset = spark.read.csv(path='s3://<YOUR-S3-BUCKET-NAME>/transformed-data/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>_part00000.csv', header=True, inferSchema=True)
    
    print('The transformed test dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=test_dataset.count(), n_cols=len(test_dataset.columns)))

The transformed test dataset has 16281 rows and 13 columns

# Run the test dataset through the same feature vector pipeline
test_dataset_pipeline = pipeline.fit(test_dataset).transform(test_dataset)

# Select the feature vector and label column
test_dataset_pipeline = test_dataset_pipeline.select('features', 'label')

# Model predictions on the test dataset
preds_test = model.transform(test_dataset_pipeline)

# Evaluate model performance
evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(preds_test, {evaluator.metricName: "areaUnderROC"})
print('Test AUC: {}'.format(auc))
Test AUC: 0.8947235975486465

The model performance with an AUC of 0.89 on the unseen test dataset is about the same as the model performance on the validation set, which demonstrates strong model performance on the unseen test dataset as well.

Summary

In this post, we showed you how to use DataBrew and Amazon EMR to streamline and speed up the data preparation and feature engineering stages of the ML lifecycle. We explored a binary classification problem, but the wide selection of DataBrew pre-built transformations and PySpark ML libraries make this approach extendable to numerous ML use cases.

Get started today! Explore your use case with the services mentioned in this post and many others on the AWS Management Console


About the Authors

Kartik Kannapur is a Data Scientist with AWS Professional Services. He holds a Master’s degree in Applied Mathematics and Statistics from Stony Brook University and focuses on using machine learning to solve customer business problems.

 

 

 

Prithiviraj Jothikumar, PhD, is a Data Scientist with AWS Professional Services, where he helps customers build solutions using machine learning. He enjoys watching movies and sports and spending time to meditate.

 

 

 

Bala Krishnamoorthy is a Data Scientist with AWS Professional Services, where he helps customers solve problems and run machine learning workloads on AWS. He has worked with customers across diverse industries, including software, finance, and healthcare. In his free time, he enjoys spending time outdoors, running with his dog, beating his family and friends at board games and keeping up with the stock market.

Building Python modules from a wheel for Spark ETL workloads using AWS Glue 2.0

Post Syndicated from Rumeshkrishan Mohan original https://aws.amazon.com/blogs/big-data/building-python-modules-from-a-wheel-for-spark-etl-workloads-using-aws-glue-2-0/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. AWS Glue 2.0 features an upgraded infrastructure for running Apache Spark ETL jobs in AWS Glue with reduced startup times. With reduced startup delay time and lower minimum billing duration, overall jobs complete faster, enabling you to run micro-batching and time-sensitive workloads more cost-effectively. To use this feature with your AWS Glue Spark ETL jobs, choose 2.0 for the AWS Glue version when creating your jobs.

AWS Glue 2.0 also lets you provide additional Python modules at the job level. You can use the --additional-python-modules option with a list of comma-separated Python modules to add a new module or change the version of an existing module. AWS Glue uses the Python Package Installer (pip3) to install the additional modules. You can pass additional options specified by the --python-modules-installer-option to pip3 to install the modules. Any incompatibly or limitations from pip3 apply. AWS Glue supports Python modules out of the box. For more information, see Running Spark ETL Jobs with Reduced Startup Times.

In this post, we go through the steps needed to create an AWS Glue Spark ETL job with the new capability to install or upgrade Python modules from a wheel file, from a PyPI repository, or from an Amazon Simple Storage Service (Amazon S3) bucket. We discuss approaches to install additional python modules for an AWS Glue Spark ETL job from a PyPI repository or from a wheel file on Amazon S3 in a VPC with and without internet access.

Setting up an AWS Glue job in a VPC with internet access

To set up your AWS Glue job in a VPC with internet access, you have two options:

  • Install Python modules from a PyPI repository
  • Install Python modules using a wheel file on Amazon S3

To setup an Internet Gateway and attach to a VPC, please refer the documentation here.

The following diagram illustrates the final architecture.

Installing Python modules from a PyPI repository

You can create an AWS Glue Spark ETL job with job parameters --additional-python-modules and --python-modules-installer-option to install a new Python module or update an existing Python module from a PyPI repository.

The following screenshot shows the Amazon CloudWatch logs for the job.

The AWS Glue job successfully uninstalled the previous version of scikit-learn and installed the provided version. We can also see that the nltk requirement was already satisfied.

Installing Python modules using a wheel file from Amazon S3

To install a new Python module or update an existing Python module using a wheel file from Amazon S3, create an AWS Glue Spark ETL job with job parameters --additional-python-modules and --python-modules-installer-option.

The following screenshot shows the CloudWatch logs for the job.

The AWS Glue job successfully installed the psutil Python module using a wheel file from Amazon S3.

Setting up an AWS Glue job in a VPC without internet access

In this section, we discuss the steps to set up an AWS Glue job in a VPC without internet access. The following diagram illustrates this architecture.

Setting up a VPC and a VPC endpoint for Amazon S3

As our first step, we will set up a VPC.

  1. Create a VPC with at least one private subnet, and make sure that DNS hostnames are enabled.

For more information about creating a private VPC, see VPC with a private subnet only and AWS Site-to-Site VPN access.

  1. Create an Amazon S3 endpoint. During the setup, associate the endpoint with the route table of your private subnet.

For more information about creating an Amazon S3 endpoint, see Amazon VPC Endpoints for Amazon S3.

Setting up an S3 bucket for Python repository

You now configure your S3 bucket for your Python repository.

  1. Create an S3 bucket.
  2. Configure the bucket to host a static website for Python repository.

You want to qualify that the S3 bucket holds the Python packages and acts as a repository. For more information, see Enabling website hosting.

  1. Record the Amazon S3 website endpoint.
  2. Configure the bucket policy with restricted access to a specific Amazon VPC (AWS Glue VPC).

Creating a Python repository on Amazon S3

To create your Python repository on Amazon S3, complete the following steps:

  1. If you haven’t already, install Docker for Linux, Windows, or macOS on your computer.
  2. Create a modules_to_install.txt file with required Python modules and their versions. For example, see the following code:
    psutil==5.7.2
    scikit-learn==0.23.0
    scikit-learn==0.23.1
    scikit-learn==0.23.2
    geopy==2.0.0
    Shapely==1.7.1
    googleads==25.0.0
    nltk==3.5

  3. Create a script.sh file with the following code:
    #!/bin/bash
    # install required lib python3.7 and gcc
    yum -y install gcc python3-devel python3
    # create the virtual environment
    python3.7 -m venv wheel-env
    # activate the virtual environment
    source wheel-env/bin/activate
    # install wheel package for creating wheel files
    pip install wheel
    # create folder for package and cache
    mkdir wheelhouse cache
    # run pip command on cache location
    cd cache
    for f in $(cat ../modules_to_install.txt); do pip wheel $f -w ../wheelhouse; done
    cd ..
    # create the index.html file
    cd wheelhouse
    INDEXFILE="<html><head><title>Links</title></head><body><h1>Links</h1>"
    for f in *.whl; do INDEXFILE+="<a href='$f'>$f</a><br>"; done
    INDEXFILE+="</body></html>"
    echo "$INDEXFILE" > index.html
    cd ..
    # cleanup environment
    deactivate
    rm -rf cache wheel-env
    # exit the docker container
    exit

  4. Create a wheelhouse using the following Docker command:
    docker run -v "$PWD":/tmp amazonlinux:latest /bin/bash -c "cd /tmp;sh script.sh"

The expected outcome looks like the following:

|- modules_to_install.txt
|- script.sh
|- wheelhouse/
  |- PyYAML-5.3.1-cp37-cp37m-linux_x86_64.whl
  |- psutil-5.7.2-cp37-cp37m-linux_x86_64.whl
  |- scikit_learn-0.23.0-cp37-cp37m-manylinux1_x86_64.whl
  |- scikit_learn-0.23.1-cp37-cp37m-manylinux1_x86_64.whl
  ....
  |- index.html
  1. Copy the wheelhouse directory into the S3 bucket using following code:
    S3_BUCKET="MY-PYTHON-REPO-BUCKET"
    S3_GLUE_SCRIPT_BUCKET="MY-SCRIPT-BUCKET"
    aws s3 cp wheelhouse/ "s3://$S3_BUCKET/wheelhouse/" --recursive --profile default

For more information, see Named profiles.

Creating an AWS Glue connection

To enable AWS Glue to access resources inside your VPC, you must provide additional VPC-specific configuration information that includes VPC subnet IDs and security group IDs. For instructions, see Creating the Connection to Amazon S3.

Test if the AWS Glue connection to the S3 bucket MY-PYTHON-REPO-BUCKET is working properly. For instructions, see Testing an AWS Glue Connection.

The following screenshot shows the message that your connection is successful.

Creating an AWS Glue Spark ETL job with an AWS Glue connection

Finally, create an AWS Glue Spark ETL job with job parameters --additional-python-modules and --python-modules-installer-option to install a new Python module or update the existing Python module using Amazon S3 as the Python repository.

The following code is an example job parameter:

{
"--additional-python-modules" : "psutil==5.7.2,scikit-learn==0.23.1,geopy==2.0.0,Shapely==1.7.1,googleads==25.0.0,nltk==3.5",
"--python-modules-installer-option" : "--no-index --find-links=http://MY-BUCKET.s3-website-us-east-1.amazonaws.com/wheelhouse --trusted-host MY-BUCKET.s3-website-us-east-1.amazonaws.com"
}

For this use case, we create a sample S3 bucket, a VPC, and an AWS Glue ETL Spark job in the US East (N. Virginia) Region, us-east-1.

To view the CloudWatch logs for the job, complete the following steps:

  1. Choose your AWS Glue job.
  2. Select the run ID.
  3. Choose Error logs.
  4. Select the driver log stream for that run ID.
  5. Check the status of the pip installation step.

The logs show that the AWS Glue job successfully installed all the Python modules and its dependencies from the Amazon S3 PyPI repository using Amazon S3 static web hosting.

Limitation: It is currently not supported to install a python module with a C binding that relies on a native library (compiled) from a rpm package that is not available at runtime.

Summary

In this post, you learned how to configure AWS Glue Spark ETL jobs to install additional Python modules and its dependencies in an environment that has access to internet and in a secure environment that doesn’t have access to the internet.


About the Authors

Rumeshkrishnan Mohan is a Big Data Consultant with Amazon Web Services. He works with Global Customers in building their data lakes.

 

 

 

Krithivasan Balasubramaniyan is Senior Consultant at Amazon Web Services. He enables global enterprise customers in their digital transformation journey and helps architect cloud native solutions.

 

 

 

 

 

Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/creating-a-source-to-lakehouse-data-replication-pipe-using-apache-hudi-aws-glue-aws-dms-and-amazon-redshift/

Most customers have their applications backed by various sql and nosql systems on prem and on cloud. Since the data is in various independent systems, customers struggle to derive meaningful info by combining data from all of these sources. Hence, customers create data lakes to bring their data in a single place.

Typically, a replication tool such as AWS Database Migration Service (AWS DMS) can replicate the data from your source systems to Amazon Simple Storage Service (Amazon S3). When the data is in Amazon S3, customers process it based on their requirements. A typical requirement is to sync the data in Amazon S3 with the updates on the source systems. Although it’s easy to apply updates on a relational database management system (RDBMS) that backs an online source application, it’s tough to apply this change data capture (CDC) process on your data lakes. Apache Hudi is a good way to solve this problem. Currently, you can use Hudi on Amazon EMR to create Hudi tables.

In this post, we use Apache Hudi to create tables in the AWS Glue Data Catalog using AWS Glue jobs. AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. This post enables you to take advantage of the serverless architecture of AWS Glue while upserting data in your data lake, hassle-free.

To write to Hudi tables using AWS Glue jobs, we use a JAR file created using open-source Apache Hudi. This JAR file is used as a dependency in the AWS Glue jobs created through the AWS CloudFormation template provided in this post. Steps to create the JAR file are included in the appendix.

The following diagram illustrates the architecture the CloudFormation template implements.

Prerequisites

The CloudFormation template requires you to select an Amazon Elastic Compute Cloud (Amazon EC2) key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to get to the Aurora cluster that lives in the private subnet. Make sure you have a key in the Region where you deploy the template. If you don’t have one, you can create a new key pair.

Solution overview

The following are the high-level implementation steps:

  1. Create a CloudFormation stack using the provided template.
  2. Connect to the Amazon Aurora cluster used as a source for this post.
  3. Run InitLoad_TestStep1.sql, in the source Amazon Aurora cluster, to create a schema and a table.

AWS DMS replicates the data from the Aurora cluster to the raw S3 bucket. AWS DMS supports a variety of sources.
The CloudFormation stack creates an AWS Glue job (HudiJob) that is scheduled to run at a frequency set in the ScheduleToRunGlueJob parameter of the CloudFormation stack. This job reads the data from the raw S3 bucket, writes to the Curated S3 bucket, and creates a Hudi table in the Data Catalog. The job also creates an Amazon Redshift external schema in the Amazon Redshift cluster created by the CloudFormation stack.

  1. You can now query the Hudi table in Amazon Athena or Amazon Redshift. Visit Creating external tables for data managed in Apache Hudi or Considerations and Limitations to query Apache Hudi datasets in Amazon Athena for details.
  2. Run IncrementalUpdatesAndInserts_TestStep2.sql on the source Aurora cluster.

This incremental data is also replicated to the raw S3 bucket through AWS DMS. HudiJob picks up the incremental data, using AWS Glue bookmarks, and applies it to the Hudi table created earlier.

  1. You can now query the changed data.

Creating your CloudFormation stack

Click on the Launch Stack button to get started and provide the following parameters:

Parameter Description
VpcCIDR CIDR range for the VPC.
PrivateSubnet1CIDR CIDR range for the first private subnet.
PrivateSubnet2CIDR CIDR range for the second private subnet.
PublicSubnetCIDR CIDR range for the public subnet.
AuroraDBMasterUserPassword Primary user password for the Aurora cluster.
RedshiftDWMasterUserPassword Primary user password for the Amazon Redshift data warehouse.
KeyName The EC2 key pair to be configured in the EC2 instance on the public subnet. This EC2 instance is used to get to the Aurora cluster in the private subnet. Select the value from the dropdown.
ClientIPCIDR Your IP address in CIDR notation. The CloudFormation template creates a security group rule that grants ingress on port 22 to this IP address. On a Mac, you can run the following command to get your IP address: curl ipecho.net/plain ; echo /32
EC2ImageId The image ID used to create the EC2 instance in the public subnet to be a jump box to connect to the source Aurora cluster. If you supply your image ID, the template uses it to create the EC2 instance.
HudiStorageType This is used by the AWS Glue job to determine if you want to create a CoW or MoR storage type table. Enter MoR if you want to create MoR storage type tables.
ScheduleToRunGlueJob The AWS Glue job runs on a schedule to pick the new files and load to the curated bucket. This parameter sets the schedule of the job.
DMSBatchUnloadIntervalInSecs AWS DMS batches the inputs from the source and loads the output to the taw bucket. This parameter defines the frequency in which the data is loaded to the raw bucket.
GlueJobDPUs The number of DPUs that are assigned to the two AWS Glue jobs.

To simplify running the template, your account is given permissions on the key used to encrypt the resources in the CloudFormation template. You can restrict that to the role if desired.

Granting Lake Formation permissions

AWS Lake Formation enables customers to set up fine grained access control for their Datalake. Detail steps to set up AWS Lake Formation can be found here.

Setting up AWS Lake Formation is out of scope for this post. However, if you have Lake Formation configured in the Region where you’re deploying this template, grant Create database permission to the LakeHouseExecuteGlueHudiJobRole role after the CloudFormation stack is successfully created.

This will ensure that you don’t get the following error while running your AWS Glue job.

org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Insufficient Lake Formation permission(s) on global_temp

Similarly grant Describe permission to the LakeHouseExecuteGlueHudiJobRole role on default database.

This will ensure that you don’t get the following error while running your AWS Glue job.

AnalysisException: 'java.lang.RuntimeException: MetaException(message:Unable to verify existence of default database: com.amazonaws.services.glue.model.AccessDeniedException: Insufficient Lake Formation permission(s) on default (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException;

Connecting to source Aurora cluster

To connect to source Aurora cluster using SQL Workbench, complete the following steps:

  1. On SQL Workbench, under File, choose Connect window.

  1. Choose Manage Drivers.

  1. Choose PostgreSQL.
  2. For Library, use the driver JAR file.
  3. For Classname, enter org.postgresql.Driver.
  4. For Sample URL, enter jdbc:postgresql://host:port/name_of_database.

  1. Click the Create a new connection profile button.
  2. For Driver, choose your new PostgreSQL driver.
  3. For URL, enter lakehouse_source_db after port/.
  4. For Username, enter postgres.
  5. For Password, enter the same password that you used for the AuroraDBMasterUserPassword parameter while creating the CloudFormation stack.
  6. Choose SSH.
  7. On the Outputs tab of your CloudFormation stack, copy the IP address next to PublicIPOfEC2InstanceForTunnel and enter it for SSH hostname.
  8. For SSH port, enter 22.
  9. For Username, enter ec2-user.
  10. For Private key file, enter the private key for the public key chosen in the KeyName parameter of the CloudFormation stack.
  11. For Local port, enter any available local port number.
  12. On the Outputs tab of your stack, copy the value next to EndpointOfAuroraCluster and enter it for DB hostname.
  13. For DB port, enter 5432.
  14. Select Rewrite JDBC URL.


Checking the Rewrite JDBC URL checkbox will automatically feed in the value of host and port in the URL text box as shown below.

  1. Test the connection and make sure that you get a message that the connection was successful.

 

Troubleshooting

Complete the following steps if you receive this message: Could not initialize SSH tunnel: java.net.ConnectException: Operation timed out (Connection timed out)

  1. Go to your CloudFormation stack and search for LakeHouseSecurityGroup under Resources .
  2. Choose the link in the Physical ID.

  1. Select your security group.
  2. From the Actions menu, choose Edit inbound rules.

  1. Look for the rule with the description:Rule to allow connection from the SQL client to the EC2 instance used as jump box for SSH tunnel
  2. From the Source menu, choose My IP.
  3. Choose Save rules.

  1. Test the connection from your SQL Workbench again and make sure that you get a successful message.

Running the initial load script

You’re now ready to run the InitLoad_TestStep1.sql script to create some test data.

  1. Open InitLoad_TestStep1.sql in your SQL client and run it.

The output shows that 11 statements have been run.

AWS DMS replicates these inserts to your raw S3 bucket at the frequency set in the DMSBatchUnloadIntervalInSecs parameter of your CloudFormation stack.

  1. On the AWS DMS console, choose the lakehouse-aurora-src-to-raw-s3-tgt task:
  2. On the Table statistics tab, you should see the seven full load rows of employee_details have been replicated.

The lakehouse-aurora-src-to-raw-s3-tgt replication task has the following table mapping with transformation to add a schema name and a table name as additional columns:

{
   "rules":[
      {
         "rule-type":"selection",
         "rule-id":"1",
         "rule-name":"1",
         "object-locator":{
            "schema-name":"human_resources",
            "table-name":"%"
         },
         "rule-action":"include",
         "filters":[
            
         ]
      },
      {
         "rule-type":"transformation",
         "rule-id":"2",
         "rule-name":"2",
         "rule-target":"column",
         "object-locator":{
            "schema-name":"%",
            "table-name":"%"
         },
         "rule-action":"add-column",
         "value":"schema_name",
         "expression":"$SCHEMA_NAME_VAR",
         "data-type":{
            "type":"string",
            "length":50
         }
      },
      {
         "rule-type":"transformation",
         "rule-id":"3",
         "rule-name":"3",
         "rule-target":"column",
         "object-locator":{
            "schema-name":"%",
            "table-name":"%"
         },
         "rule-action":"add-column",
         "value":"table_name",
         "expression":"$TABLE_NAME_VAR",
         "data-type":{
            "type":"string",
            "length":50
         }
      }
   ]
}

These settings put the name of the source schema and table as two additional columns in the output Parquet file of AWS DMS.
These columns are used in the AWS Glue HudiJob to find out the tables that have new inserts, updates, or deletes.

  1. On the Resources tab of the CloudFormation stack, locate RawS3Bucket.
  2. Choose the Physical ID link.

  1. Navigate to human_resources/employee_details.

The LOAD00000001.parquet file is created under human_resources/employee_details. (The name of your raw bucket is different from the following screenshot).

You can also see the time of creation of this file. You should have at least one successful run of the AWS Glue job (HudiJob) after this time for the Hudi table to be created. The AWS Glue job is configured to load this data into the curated bucket at the frequency set in the ScheduleToRunGlueJob parameter of your CloudFormation stack. The default is 5 minutes.

AWS Glue job HudiJob

The following code is the script for HudiJob:

import sys
import os
import json

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import concat, col, lit, to_timestamp

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

import boto3
from botocore.exceptions import ClientError

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
glueContext = GlueContext(spark.sparkContext)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()

logger.info('Initialization.')
glueClient = boto3.client('glue')
ssmClient = boto3.client('ssm')
redshiftDataClient = boto3.client('redshift-data')

logger.info('Fetching configuration.')
region = os.environ['AWS_DEFAULT_REGION']

curatedS3BucketName = ssmClient.get_parameter(Name='lakehouse-curated-s3-bucket-name')['Parameter']['Value']
rawS3BucketName = ssmClient.get_parameter(Name='lakehouse-raw-s3-bucket-name')['Parameter']['Value']
hudiStorageType = ssmClient.get_parameter(Name='lakehouse-hudi-storage-type')['Parameter']['Value']

dropColumnList = ['db','table_name','Op']

logger.info('Getting list of schema.tables that have changed.')
changeTableListDyf = glueContext.create_dynamic_frame_from_options(connection_type = 's3', connection_options = {'paths': ['s3://'+rawS3BucketName], 'groupFiles': 'inPartition', 'recurse':True}, format = 'parquet', format_options={}, transformation_ctx = 'changeTableListDyf')

logger.info('Processing starts.')
if(changeTableListDyf.count() > 0):
    logger.info('Got new files to process.')
    changeTableList = changeTableListDyf.toDF().select('schema_name','table_name').distinct().rdd.map(lambda row : row.asDict()).collect()

    for dbName in set([d['schema_name'] for d in changeTableList]):
        spark.sql('CREATE DATABASE IF NOT EXISTS ' + dbName)
        redshiftDataClient.execute_statement(ClusterIdentifier='lakehouse-redshift-cluster', Database='lakehouse_dw', DbUser='rs_admin', Sql='CREATE EXTERNAL SCHEMA IF NOT EXISTS ' + dbName + ' FROM DATA CATALOG DATABASE \'' + dbName + '\' REGION \'' + region + '\' IAM_ROLE \'' + boto3.client('iam').get_role(RoleName='LakeHouseRedshiftGlueAccessRole')['Role']['Arn'] + '\'')

    for i in changeTableList:
        logger.info('Looping for ' + i['schema_name'] + '.' + i['table_name'])
        dbName = i['schema_name']
        tableNameCatalogCheck = ''
        tableName = i['table_name']
        if(hudiStorageType == 'MoR'):
            tableNameCatalogCheck = i['table_name'] + '_ro' #Assumption is that if _ro table exists then _rt table will also exist. Hence we are checking only for _ro.
        else:
            tableNameCatalogCheck = i['table_name'] #The default config in the CF template is CoW. So assumption is that if the user hasn't explicitly requested to create MoR storage type table then we will create CoW tables. Again, if the user overwrites the config with any value other than 'MoR' we will create CoW storage type tables.
        isTableExists = False
        isPrimaryKey = False
        isPartitionKey = False
        primaryKey = ''
        partitionKey = ''
        try:
            glueClient.get_table(DatabaseName=dbName,Name=tableNameCatalogCheck)
            isTableExists = True
            logger.info(dbName + '.' + tableNameCatalogCheck + ' exists.')
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityNotFoundException':
                isTableExists = False
                logger.info(dbName + '.' + tableNameCatalogCheck + ' does not exist. Table will be created.')
        try:
            table_config = json.loads(ssmClient.get_parameter(Name='lakehouse-table-' + dbName + '.' + tableName)['Parameter']['Value'])
            try:
                primaryKey = table_config['primaryKey']
                isPrimaryKey = True
                logger.info('Primary key:' + primaryKey)
            except KeyError as e:
                isPrimaryKey = False
                logger.info('Primary key not found. An append only glueparquet table will be created.')
            try:
                partitionKey = table_config['partitionKey']
                isPartitionKey = True
                logger.info('Partition key:' + partitionKey)
            except KeyError as e:
                isPartitionKey = False
                logger.info('Partition key not found. Partitions will not be created.')
        except ClientError as e:    
            if e.response['Error']['Code'] == 'ParameterNotFound':
                isPrimaryKey = False
                isPartitionKey = False
                logger.info('Config for ' + dbName + '.' + tableName + ' not found in parameter store. Non partitioned append only table will be created.')

        inputDyf = glueContext.create_dynamic_frame_from_options(connection_type = 's3', connection_options = {'paths': ['s3://' + rawS3BucketName + '/' + dbName + '/' + tableName], 'groupFiles': 'none', 'recurse':True}, format = 'parquet',transformation_ctx = tableName)
        
        inputDf = inputDyf.toDF().withColumn('update_ts_dms',to_timestamp(col('update_ts_dms')))
        
        targetPath = 's3://' + curatedS3BucketName + '/' + dbName + '/' + tableName

        morConfig = {'hoodie.datasource.write.storage.type': 'MERGE_ON_READ', 'hoodie.compact.inline': 'false', 'hoodie.compact.inline.max.delta.commits': 20, 'hoodie.parquet.small.file.limit': 0}

        commonConfig = {'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'update_ts_dms', 'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name': tableName, 'hoodie.consistency.check.enabled': 'true', 'hoodie.datasource.hive_sync.database': dbName, 'hoodie.datasource.hive_sync.table': tableName, 'hoodie.datasource.hive_sync.enable': 'true'}

        partitionDataConfig = {'hoodie.datasource.write.partitionpath.field': partitionKey, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.partition_fields': partitionKey}
                     
        unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
        
        incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 20, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
        
        initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 3, 'hoodie.datasource.write.operation': 'bulk_insert'}
        
        deleteDataConfig = {'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.EmptyHoodieRecordPayload'}

        if(hudiStorageType == 'MoR'):
            commonConfig = {**commonConfig, **morConfig}
            logger.info('MoR config appended to commonConfig.')
        
        combinedConf = {}

        if(isPrimaryKey):
            logger.info('Going the Hudi way.')
            if(isTableExists):
                logger.info('Incremental load.')
                outputDf = inputDf.filter("Op != 'D'").drop(*dropColumnList)
                if outputDf.count() > 0:
                    logger.info('Upserting data.')
                    if (isPartitionKey):
                        logger.info('Writing to partitioned Hudi table.')
                        outputDf = outputDf.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                        combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                    else:
                        logger.info('Writing to unpartitioned Hudi table.')
                        combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                outputDf_deleted = inputDf.filter("Op = 'D'").drop(*dropColumnList)
                if outputDf_deleted.count() > 0:
                    logger.info('Some data got deleted.')
                    if (isPartitionKey):
                        logger.info('Deleting from partitioned Hudi table.')
                        outputDf_deleted = outputDf_deleted.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                        combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig, **deleteDataConfig}
                        outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                    else:
                        logger.info('Deleting from unpartitioned Hudi table.')
                        combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig, **deleteDataConfig}
                        outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
            else:
                outputDf = inputDf.drop(*dropColumnList)
                if outputDf.count() > 0:
                    logger.info('Inital load.')
                    if (isPartitionKey):
                        logger.info('Writing to partitioned Hudi table.')
                        outputDf = outputDf.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                        combinedConf = {**commonConfig, **partitionDataConfig, **initLoadConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
                    else:
                        logger.info('Writing to unpartitioned Hudi table.')
                        combinedConf = {**commonConfig, **unpartitionDataConfig, **initLoadConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
        else:
            if (isPartitionKey):
                logger.info('Writing to partitioned glueparquet table.')
                sink = glueContext.getSink(connection_type = 's3', path= targetPath, enableUpdateCatalog = True, updateBehavior = 'UPDATE_IN_DATABASE', partitionKeys=[partitionKey])
            else:
                logger.info('Writing to unpartitioned glueparquet table.')
                sink = glueContext.getSink(connection_type = 's3', path= targetPath, enableUpdateCatalog = True, updateBehavior = 'UPDATE_IN_DATABASE')
            sink.setFormat('glueparquet')
            sink.setCatalogInfo(catalogDatabase = dbName, catalogTableName = tableName)
            outputDyf = DynamicFrame.fromDF(inputDf.drop(*dropColumnList), glueContext, 'outputDyf')
            sink.writeFrame(outputDyf)

job.commit()

Hudi tables need a primary key to perform upserts. Hudi tables can also be partitioned based on a certain key. We get the names of the primary key and the partition key from AWS Systems Manager Parameter Store.

The HudiJob script looks for an AWS Systems Manager Parameter with the naming format lakehouse-table-<schema_name>.<table_name>. It compares the name of the parameter with the name of the schema and table columns, added by AWS DMS, to get the primary key and the partition key for the Hudi table.

The CloudFormation template creates lakehouse-table-human_resources.employee_details AWS Systems Manager Parameter, as shown on the Resources tab.

If you choose the Physical ID link, you can locate the value of the AWS Systems Manager Parameter. The AWS Systems Manager Parameter has {"primaryKey": "emp_no", "partitionKey": "department"} value in it.

Because of the value in the lakehouse-table-human_resources.employee_details AWS Systems Manager Parameter, the AWS Glue script creates a human_resources.employee_details Hudi table partitioned on the department column for the employee_details table created in the source using the InitLoad_TestStep1.sql script. The HudiJob also uses the emp_no column as the primary key for upserts.

If you reuse this CloudFormation template and create your own table, you have to create an associated AWS Systems Manager Parameter with the naming convention lakehouse-table-<schema_name>.<table_name>. Keep in mind the following:

  • If you don’t create a parameter, the script creates an unpartitioned glueparquet append-only table.
  • If you create a parameter that only has the primaryKey part in the value, the script creates an unpartitioned Hudi table.
  • If you create a parameter that only has the partitionKey part in the value, the script creates a partitioned glueparquet append-only table.

If you have too many tables to replicate, you can also store the primary key and partition key configuration in Amazon DynamoDB or Amazon S3 and change the code accordingly.

In the InitLoad_TestStep1.sql script, replica identity for human_resources.employee_details table is set to full. This makes sure that AWS DMS transfers the full delete record to Amazon S3. Having this delete record is important for the HudiJob script to delete the record from the Hudi table. A full delete record from AWS DMS for the human_resources.employee_details table looks like the following:

{ "Op": "D", "update_ts_dms": "2020-10-25 07:57:48.589284", "emp_no": 3, "name": "Jeff", "department": "Finance", "city": "Tokyo", "salary": 55000, "schema_name": "human_resources", "table_name": "employee_details"}

The schema_name, and table_name columns are added by AWS DMS because of the task configuration shared previously.update_ts_dms has been set as the value for TimestampColumnName S3 setting in AWS DMS S3 Endpoint.Op is added by AWS DMS for cdc and it indicates source DB operations in migrated S3 data.

We also set spark.serializer in the script. This setting is required for Hudi.

In HudiJob script, you can also find a few Python dict that store various Hudi configuration properties. These configurations are just for demo purposes; you have to adjust them based on your workload. For more information about Hudi configurations, see Configurations.

HudiJob is scheduled to run every 5 minutes by default. The frequency is set by the ScheduleToRunGlueJob parameter of the CloudFormation template. Make sure that you successfully run HudiJob at least one time after the source data lands in the raw S3 bucket. The screenshot in Step 6 of Running the initial load script section confirms that AWS DMS put the LOAD00000001.parquet file in the raw bucket at 11:54:41 AM and following screenshot confirms that the job execution started at 11:55 AM.

The job creates a Hudi table in the AWS Glue Data Catalog (see the following screenshot). The table is partitioned on the department column.

Granting AWS Lake Formation permissions

If you have AWS Lake Formation enabled, make sure that you grant Select permission on the human_resources.employee_details table to the role/user used to run Athena query. Similarly, you also have to grant Select permission on the human_resources.employee_details table to the LakeHouseRedshiftGlueAccessRole role so you can query human_resources.employee_details in Amazon Redshift.

Grant Drop permission on the human_resources database to LakeHouseExecuteLambdaFnsRole so that the template can delete the database when you delete the template. Also, the CloudFormation template does not roll back any AWS Lake Formation grants or changes that are manually applied.

Granting access to KMS key

The curated S3 bucket is encrypted by lakehouse-key, which is an AWS Key Management Service (AWS KMS) customer managed key created by AWS CloudFormation template.

To run the query in Athena, you have to add the ARN of the role/user used to run the Athena query in the Allow use of the key section in the key policy.

This will ensure that you don’t get com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; error while running your Athena query.

You might not have to execute the above KMS policy change if you have kept the default of granting access to the AWS account and the role/user used to run Athena query has the necessary KMS related policies attached to it.

Confirming job completion

When HudiJob is complete, you can see the files in the curated bucket.

  1. On the Resources tab, search for CuratedS3Bucket.
  2. Choose the Physical ID link.

The following screenshot shows the timestamp on the initial load.

  1. Navigate to the department=Finance prefix and select the Parquet file.
  2. Choose Select from.
  1. For File format, select Parquet.
  2. Choose Show file preview.

You can see the value of the timestamp in the update_ts_dms column.

Querying the Hudi table

You can now query your data in Amazon Athena or Amazon Redshift.

Querying in Amazon Athena

Query the human_resources.employee_details table in Amazon Athena with the following code:

SELECT emp_no,
         name,
         city,
         salary,
         department,
         from_unixtime(update_ts_dms/1000000,'America/Los_Angeles') update_ts_dms_LA,
         from_unixtime(update_ts_dms/1000000,'UTC') update_ts_dms_UTC         
FROM "human_resources"."employee_details"
ORDER BY emp_no

The timestamp for all the records matches the timestamp in the update_ts_dms column in the earlier screenshot.

Querying in Redshift Spectrum

Read query your table in Redshift Spectrum for Apache Hudi support in Amazon Redshift.

  1. On the Amazon Redshift console, locate lakehouse-redshift-cluster.
  2. Choose Query cluster.

  1. For Database name, enter lakehouse_dw.
  2. For Database user, enter rs_admin.
  3. For Database password, enter the password that you used for the RedshiftDWMasterUserPassword parameter in the CloudFormation template.

  1. Enter the following query for the human_resources.employee_details table:
    SELECT emp_no,
             name,
             city,
             salary,
             department,
             (TIMESTAMP 'epoch' + update_ts_dms/1000000 * interval '1 second') AT TIME ZONE 'utc' AT TIME ZONE 'america/los_angeles' update_ts_dms_LA,
             (TIMESTAMP 'epoch' + update_ts_dms/1000000 * interval '1 second') AT TIME ZONE 'utc' update_ts_dms_UTC
    FROM human_resources.employee_details
    ORDER BY emp_no 

The following screenshot shows the query output.

Running the incremental load script

We now run the IncrementalUpdatesAndInserts_TestStep2.sql script. The output shows that 6 statements were run.

AWS DMS now shows that it has replicated the new incremental changes. The changes are replicated at a frequency set in DMSBatchUnloadIntervalInSecs parameter of the CloudFormation stack.

This creates another Parquet file in the raw S3 bucket.

The incremental updates are loaded into the Hudi table according to the chosen frequency to run the job (the ScheduleToRunGlueJob parameter). The HudiJobscript uses job bookmarks to find out the incremental load so it only processes the new files brought in through AWS DMS.

Confirming job completion

Make sure that HudiJob runs successfully at least one time after the incremental file arrives in the raw bucket. The previous screenshot shows that the incremental file arrived in the raw bucket at 1:18:38 PM and the following screenshot shows that the job started at 1:20 PM.

Querying the changed data

You can now check the table in Athena and Amazon Redshift. Both results show that emp_no 3 is deleted, 8 and 9 have been added, and 2 and 5 have been updated.

The following screenshot shows the results in Athena.

The following screenshot shows the results in Redshift Spectrum.

AWS Glue Job HudiMoRCompactionJob

The CloudFormation template also deploys the AWS Glue job HudiMoRCompactionJob. This job is not scheduled; you only use it if you choose the MoR storage type. To execute the pipe for MoR storage type instead of CoW storage type, delete the CloudFormation stack and create it again. After creation, replace CoW in lakehouse-hudi-storage-type AWS Systems Manager Parameter with MoR.

If you use MoR storage type, the incremental updates are stored in log files. You can’t see the updates in the _ro (read optimized) view, but can see them in the _rt view. Amazon Athena documentation and Amazon Redshift documentation gives more details about support and considerations for Apache Hudi.

To see the incremental data in the _ro view, run the HudiMoRCompactionJob job. For more information about Hudi storage types and views, see Hudi Dataset Storage Types and Storage Types & Views. The following code is an example of the CLI command used to run HudiMoRCompactionJob job:

aws glue start-job-run --job-name HudiMoRCompactionJob --arguments="--DB_NAME=human_resources","--TABLE_NAME=employee_details","--IS_PARTITIONED=true"

You can decide on the frequency of running this job. You don’t have to run the job immediately after the HudiJob. You should run this job when you want the data to be available in the _ro view. You have to pass the schema name and the table name to this script so it knows the table to compact.

Additional considerations

The JAR file we use in this post has not been tested for AWS Glue streaming jobs. Additionally, there are some hardcoded Hudi options in the HudiJob script. These options are set for the sample table that we create for this post. Update the options based on your workload. 

Conclusion

In this post, we created AWS Glue 2.0 jobs that moved the source upserts and deletes into Hudi tables. The code creates tables in the AWS GLue Data Catalog and updates partitions so you don’t have to run the crawlers to update them.

This post simplified your LakeHouse code base by giving you the benefits of Apache Hudi along with serverless AWS Glue. We also showed how to create an source to LakeHouse replication system using AWS Glue, AWS DMS, and Amazon Redshift with minimum overhead.


Appendix

We can write to Hudi tables because of the hudi-spark.jar file that we downloaded to our DependentJarsAndTempS3Bucket S3 bucket with the CloudFormation template. The path to this file is added as a dependency in both the AWS Glue jobs. This file is based on open-source Hudi. To create the JAR file, complete the following steps:

  1. Get Hudi 0.5.3 and unzip it using the following code:
    wget https://github.com/apache/hudi/archive/release-0.5.3.zip
    unzip hudi-release-0.5.3.zip

  2. Edit Hudi pom.xml:
    vi hudi-release-0.5.3/pom.xml

    1. Remove the following code to make the build process faster:
      <module>packaging/hudi-hadoop-mr-bundle</module>
      <module>packaging/hudi-hive-bundle</module>
      <module>packaging/hudi-presto-bundle</module>
      <module>packaging/hudi-utilities-bundle</module>
      <module>packaging/hudi-timeline-server-bundle</module>
      <module>docker/hoodie/hadoop</module>
      <module>hudi-integ-test</module>

    2. Change the versions of all three dependencies of httpcomponents to 4.4.1. The following is the original code:
      <!-- Httpcomponents -->
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>fluent-hc</artifactId>
              <version>4.3.2</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpcore</artifactId>
              <version>4.3.2</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpclient</artifactId>
              <version>4.3.6</version>
            </dependency>

      The following is the replacement code:

      <!-- Httpcomponents -->
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>fluent-hc</artifactId>
              <version>4.4.1</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpcore</artifactId>
              <version>4.4.1</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpclient</artifactId>
              <version>4.4.1</version>
            </dependency>

  3. Build the JAR file:
    mvn clean package -DskipTests -DskipITs -f <Full path of the hudi-release-0.5.3 dir>

  4. You can now get the JAR from the following location:
hudi-release-0.5.3/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.5.3-rc2.jar

The other JAR dependency used in the AWS Glue jobs is spark-avro_2.11-2.4.4.jar.


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

Announcing AWS Glue DataBrew – A Visual Data Preparation Tool That Helps You Clean and Normalize Data Faster

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/announcing-aws-glue-databrew-a-visual-data-preparation-tool-that-helps-you-clean-and-normalize-data-faster/

To be able to run analytics, build reports, or apply machine learning, you need to be sure the data you’re using is clean and in the right format. That’s the data preparation step that requires data analysts and data scientists to write custom code and do many manual activities. First, you need to look at the data, understand which possible values are present, and build some simple visualizations to understand if there are correlations between the columns. Then, you need to check for strange values outside of what you’re expecting, such as weather temperature above 200℉ (93℃) or speed of a truck above 200 mph (322 km/h), or for data that is missing. Many algorithms need values to be rescaled to a specific range, for example between 0 and 1, or normalized around the mean. Text fields need to be set to a standard format, and may require advanced transformations such as stemming.

That’s a lot of work. For this reason, I am happy to announce that today AWS Glue DataBrew is available, a visual data preparation tool that helps you clean and normalize data up to 80% faster so you can focus more on the business value you can get.

DataBrew provides a visual interface that quickly connects to your data stored in Amazon Simple Storage Service (S3), Amazon Redshift, Amazon Relational Database Service (RDS), any JDBC accessible data store, or data indexed by the AWS Glue Data Catalog. You can then explore the data, look for patterns, and apply transformations. For example, you can apply joins and pivots, merge different data sets, or use functions to manipulate data.

Once your data is ready, you can immediately use it with AWS and third-party services to gain further insights, such as Amazon SageMaker for machine learning, Amazon Redshift and Amazon Athena for analytics, and Amazon QuickSight and Tableau for business intelligence.

How AWS Glue DataBrew Works
To prepare your data with DataBrew, you follow these steps:

  • Connect one or more datasets from S3 or the Glue data catalog (S3, Redshift, RDS). You can also upload a local file to S3 from the DataBrew console. CSV, JSON, Parquet, and .XLSX formats are supported.
  • Create a project to visually explore, understand, combine, clean, and normalize data in a dataset. You can merge or join multiple datasets. From the console, you can quickly spot anomalies in your data with value distributions, histograms, box plots, and other visualizations.
  • Generate a rich data profile for your dataset with over 40 statistics by running a job in the profile view.
  • When you select a column, you get recommendations on how to improve data quality.
  • You can clean and normalize data using more than 250 built-in transformations. For example, you can remove or replace null values, or create encodings. Each transformation is automatically added as a step to build a recipe.
  • You can then save, publish, and version recipes, and automate the data preparation tasks by applying recipes on all incoming data. To apply recipes to or generate profiles for large datasets, you can run jobs.
  • At any point in time, you can visually track and explore how datasets are linked to projects, recipes, and job runs. In this way, you can understand how data flows and what are the changes. This information is called data lineage and can help you find the root cause in case of errors in your output.

Let’s see how this works with a quick demo!

Preparing a Sample Dataset with AWS Glue DataBrew
In the DataBrew console, I select the Projects tab and then Create project. I name the new project Comments. A new recipe is also created and will be automatically updated with the data transformations that I will apply next.

I choose to work on a New dataset and name it Comments.

Here, I select Upload file and in the next dialog I upload a comments.csv file I prepared for this demo. In a production use case, here you will probably connect an existing source on S3 or in the Glue Data Catalog. For this demo, I specify the S3 destination for storing the uploaded file. I leave Encryption disabled.

The comments.csv file is very small, but will help show some common data preparation needs and how to complete them quickly with DataBrew. The format of the file is comma-separated values (CSV). The first line contains the name of the columns. Then, each line contains a text comment and a numerical rating made by a customer (customer_id) about an item (item_id). Each item is part of a category. For each text comment, there is an indication of the overall sentiment (comment_sentiment). Optionally, when giving the comment, customers can enable a flag to ask to be contacted for further support (support_needed).

Here’s the content of the comments.csv file:

customer_id,item_id,category,rating,comment,comment_sentiment,support_needed
234,2345,"Electronics;Computer", 5,"I love this!",Positive,False
321,5432,"Home;Furniture",1,"I can't make this work... Help, please!!!",negative,true
123,3245,"Electronics;Photography",3,"It works. But I'd like to do more",,True
543,2345,"Electronics;Computer",4,"Very nice, it's going well",Positive,False
786,4536,"Home;Kitchen",5,"I really love it!",positive,false
567,5432,"Home;Furniture",1,"I doesn't work :-(",negative,true
897,4536,"Home;Kitchen",3,"It seems OK...",,True
476,3245,"Electronics;Photography",4,"Let me say this is nice!",positive,false

In the Access permissions, I select a AWS Identity and Access Management (IAM) role which provides DataBrew read permissions to my input S3 bucket. Only roles where DataBrew is the service principal for the trust policy are shown in the DataBrew console. To create one in the IAM console, select DataBrew as trusted entity.

If the dataset is big, you can use Sampling to limit the number of rows to use in the project. These rows can be selected at the beginning, at the end, or randomly through the data. You are going to use projects to create recipes, and then jobs to apply recipes to all the data. Depending on your dataset, you may not need access to all the rows to define the data preparation recipe.

Optionally, you can use Tagging to manage, search, or filter resources you create with AWS Glue DataBrew.

The project is now being prepared and in a few minutes I can start exploring my dataset.

In the Grid view, the default when I create a new project, I see the data as it has been imported. For each column, there is a summary of the range of values that have been found. For numerical columns, the statistical distribution is given.

In the Schema view, I can drill down on the schema that has been inferred, and optionally hide some of the columns.

In the Profile view, I can run a data profile job to examine and collect statistical summaries about the data. This is an assessment in terms of structure, content, relationships, and derivation. For a large dataset, this is very useful to understand the data. For this small example the benefits are limited, but I run it nonetheless, sending the output of the profile job to a different folder in the same S3 bucket I use to store the source data.

When the profile job has succeeded, I can see a summary of the rows and columns in my dataset, how many columns and rows are valid, and correlations between columns.

Here, if I select a column, for example rating, I can drill down into specific statistical information and correlations for that column.

Now, let’s do some actual data preparation. In the Grid view, I look at the columns. The category contains two pieces of information, separated by a semicolon. For example, the category of the first row is “Electronics;Computers.” I select the category column, then click on the column actions (the three small dots on the right of the column name) and there I have access to many transformations that I can apply to the column. In this case, I select to split the column on a single delimiter. Before applying the changes, I quickly preview them in the console.

I use the semicolon as delimiter, and now I have two columns, category_1 and category_2. I use the column actions again to rename them to category and subcategory. Now, for the first row, category contains Electronics and subcategory Computers. All these changes are added as steps to the project recipe, so that I’ll be able to apply them to similar data.

The rating column contains values between 1 and 5. For many algorithms, I prefer to have these kind of values normalized. In the column actions, I use min-max normalization to rescale the values between 0 and 1. More advanced techniques are available, such as mean or Z-score normalization. A new rating_normalized column is added.

I look into the recommendations that DataBrew gives for the comment column. Since it’s text, the suggestion is to use a standard case format, such as lowercase, capital case, or sentence case. I select lowercase.

The comments contain free text written by customers. To simplify further analytics, I use word tokenization on the column to remove stop words (such as “a,” “an,” “the”), expand contractions (so that “don’t” becomes “do not”), and apply stemming. The destination for these changes is a new column, comment_tokenized.

I still have some special characters in the comment_tokenized column, such as an emoticon :-). In the column actions, I select to clean and remove special characters.

I look into the recommendations for the comment_sentiment column. There are some missing values. I decide to fill the missing values with a neutral sentiment. Now, I still have values written with a different case, so I follow the recommendation to use lowercase for this column.

The comment_sentiment column now contains three different values (positive, negative, or neutral), but many algorithms prefer to have one-hot encoding, where there is a column for each of the possible values, and these columns contain 1, if that is the original value, or 0 otherwise. I select the Encode icon in the menu bar and then One-hot encode column. I leave the defaults and apply. Three new columns for the three possible values are added.

The support_needed column is recognized as boolean, and its values are automatically formatted to a standard format. I don’t have to do anything here.

The recipe for my dataset is now ready to be published and can be used in a recurring job processing similar data. I didn’t have a lot of data, but the recipe can be used with much larger datasets.

In the recipe, you can find a list of all the transformations that I just applied. When running a recipe job, output data is available in S3 and ready to be used with analytics and machine learning platforms, or to build reports and visualization with BI tools. The output can be written in a different format than the input, for example using a columnar storage format like Apache Parquet.

Available Now

AWS Glue DataBrew is available today in US East (N. Virginia), US East (Ohio), US West (Oregon), Europe (Ireland), Europe (Frankfurt), Asia Pacific (Tokyo), Asia Pacific (Sydney).

It’s never been easier to prepare you data for analytics, machine learning, or for BI. In this way, you can really focus on getting the right insights for your business instead of writing custom code that you then have to maintain and update.

To practice with DataBrew, you can create a new project and select one of the sample datasets that are provided. That’s a great way to understand all the available features and how you can apply them to your data.

Learn more and get started with AWS Glue DataBrew today.

Danilo

Handling data erasure requests in your data lake with Amazon S3 Find and Forget

Post Syndicated from Chris Deigan original https://aws.amazon.com/blogs/big-data/handling-data-erasure-requests-in-your-data-lake-with-amazon-s3-find-and-forget/

Data lakes are a popular choice for organizations to store data around their business activities. Best practice design of data lakes impose that data is immutable once stored, but new regulations such as the European General Data Protection Regulation (GDPR), California Consumer Privacy Act (CCPA), and others have created new obligations that operators now need to be able to erase private data from their data lake when requested.

When asked to erase an individual’s private data, as a data lake operator you have to find all the objects in your Amazon Simple Storage Service (Amazon S3) buckets that contain data relating to that individual. This can be complex because data lakes contain many S3 objects (each of which may contain multiple rows), as shown in the following diagram. You often can’t predict which objects contain data relating to an individual, so you need to check each object. For example, if the user mary34 asks to be removed, you need to check each object to determine if it contains data relating to mary34. This is the first challenge operators face: identifying which objects contain data of interest.

After you identify objects containing data of interest, you face a second challenge: you need to retrieve the object from the S3 bucket, remove relevant rows from the file, put a new version of the object into S3, and make sure you delete any older versions.

Locating and removing data manually can be time-consuming and prone to mistakes, considering the large number of objects typically in data lakes.

Amazon S3 Find and Forget solves these challenges with ready-to-use automations. It allows you to remove records from data lakes of any size that are in AWS Glue Data Catalog. The solution includes a web user interface that you can use and an API that you can use to integrate with your own applications.

Solution overview

Amazon S3 Find and Forget enables you to find and delete records automatically in data lakes on Amazon S3. Using the solution, you can:

  • Define which tables from your AWS Glue Data Catalog contain data you want to erase
  • Manage a queue of identifiers (such as unique customer identifiers) to erase
  • Erase rows from your data lake matching the queued record identifiers
  • Access a log of all actions taken by the solution

You can use Amazon S3 Find and Forget to work with data lakes stored on Amazon S3 in a supported file format.

The solution is developed and distributed as open-source software that you deploy and run inside your own AWS account. When deploying this solution, you only pay for the AWS services consumed to run it. We recommend reviewing the Cost Estimate guide and creating Amazon CloudWatch Billing Alarms to monitor charges before deploying the solution in your own account.

When you handle requests to remove data, you add the identifiers through the web interface or API to a Deletion Queue. The identifiers remain in the queue until you start a Deletion Job. The Deletion Job processes the queue and removes matching rows from objects in your data lake.

Where your requirements allow it, batching deletions can provide significant cost savings by minimizing the number of times the data lake needs to be re-scanned and processed. For example, you could start a Deletion Job once a week to process all requests received in the preceding week.

Solution demonstration

This section provides a demonstration of using Amazon S3 Find and Forget’s main features. To deploy the solution in your own account, refer to the User Guide.

For this demonstration, I have prepared in advance:

The first step is to deploy the solution using AWS CloudFormation by following the instructions in the User Guide. The CloudFormation stack can take 20-30 minutes to deploy depending on the options chosen when deploying.

Once deployed, I visit the web user interface by going to the address in the WebUIUrl CloudFormation stack output. Using a temporary password emailed to the address I provided in my CloudFormation parameters, I login and set a password for future use. I then see a dashboard with some base metrics for my Amazon S3 Find and Forget deployment:

I now need to create a Data Mapper so that Amazon S3 Find and Forget can find my data lake. To do this, I select Data Mappers, then Create Data Mapper:

On this screen, I give my Data Mapper a name, choose the AWS Glue database and table in my account that I want to operate on, and the columns that I want my deletions to match. In this demonstration, I’m using a copy of the Amazon Customer Reviews Dataset that I copied to my own S3 bucket. I’ll be using the customer_id column to remove data. In the dataset, this field contains a unique identifier for each customer who has created a product review.

I then specify the IAM role to be used when modifying the objects in S3. I also choose whether I want the old S3 object versions to be deleted for me. I can turn this off if I want to implement my own strategy to manage deleting old object versions, such as by using S3 lifecycle policies.

After choosing Create Data Mapper the Data Mapper is created, and I am prompted to grant permissions for S3 Find and Forget to operate in my bucket. In the Data Mapper list, I select my new Data Mapper, then choose Generate Access Policies. The interface displays a sample bucket policy that I copy and paste into the bucket policy for my S3 bucket in the AWS Management Console.

With the Data Mapper set up, I’m now able to add the customers who have requested to have their data deleted to the Deletion Queue. Using their Customer IDs, I go to the Deletion Queue section and select Add Match to the Deletion Queue.

I’ve chosen to delete from all the available Data Mappers, but I can also choose specific ones. Once I’ve added my matches, I can see a list of them on Deletion Queue page:

I can now run a deletion job that will cause the matches to be deleted from the data lake. To do this, I select Deletion Jobs then Start a Deletion Job.

After a few minutes the Deletion Job completes, and I can see metrics collected during the job including that the job took just over two-and-a-half minutes:

There is an Export to JSON option that includes all the metrics shown, more granular information about the Deletion Job, and which S3 objects were modified.

At this point the Deletion Queue is empty, and ready for me to use for future requests.

Solution design

This section includes a brief introduction to how the solution works. More comprehensive design documentation is available in the Amazon S3 Find and Forget GitHub repository.

The following diagram illustrates the architecture of this solution.

Amazon S3 Find and Forget uses AWS Serverless services to optimize for cost and scalability. The user interface and API are built using Amazon S3, Amazon Cognito, AWS Lambda, Amazon DynamoDB, and Amazon API Gateway, which automatically scale down when not in use so that there is no expensive baseline cost just for having the solution installed. These AWS services are always available and scale in concert with when the solution is used with a pay-for-what-you-use price model.

The Deletion Job workflow is coordinated using AWS Step Functions, Lambda, and Amazon Simple Queue Service (Amazon SQS). The solution uses Step Functions for high-level coordination and state tracking in the workflow, Lambda functions for discrete computation tasks, and Amazon SQS to store queues of repetitive work.

A deletion job has two phases: Find and Forget. In the Find phase, the solution uses Amazon Athena to scan the data lake for objects containing rows matching the identifiers in the deletion queue. For this to work at scale, we built a query planner Lambda function that uses the partition list in the AWS Glue Data Catalog for each data mapper to run an Athena query on each partition, returning the path to S3 objects that contain matches with the identifiers in the Deletion Queue. The object keys are then added to an SQS queue that we refer to as the Object Deletion Queue.

In the Forget phase, deletion workers are started as a service running on AWS Fargate. These workers process each object in the Object Deletion Queue by downloading the objects from the S3 bucket into memory, deleting the rows that contain matched identifiers, then putting a new version of the object to the S3 bucket using the same key. By default, older versions of the object are then deleted from the S3 bucket to make the deletion irreversible. You can alternatively disable this feature to implement your own strategy for deleting old object versions, such as by using an S3 Lifecycle policy.

Note that during the Forget phase, affected S3 objects are replaced at the time they are processed and are subject to the Amazon S3 data consistency model. We recommend that you avoid running a Deletion Job in parallel to a workload that reads from the data lake unless it has been designed to handle temporary inconsistencies between objects.

When the object deletion queue is empty, the Forget phase is complete and a final status is determined for the Deletion Job based on whether any errors occurred (for example, due to missing permissions for S3 objects).

Logs are generated for all actions throughout the Deletion Job, which you can use for reporting or troubleshooting. These are stored in DynamoDB, along with other persistent data including the Data Mappers and Deletion Queue.

Conclusion

In this post, we introduced the Amazon S3 Find and Forget solution, which assists data lake operators to handle data erasure requests they may receive pursuant to regulations such as GDPR, CCPA, and others. We then described features of the solution and how to use it for a basic use case.

You can get started today by deploying the solution from the GitHub repository, where you can also find more documentation of how the solution works, its features, and limits. We are continuing to develop the solution and welcome you to send feedback, feature requests, or questions through GitHub Issues.

 


About the Authors

Chris Deigan is an AWS Solution Engineer in London, UK. Chris works with AWS Solution Architects to create standardized tools, code samples, demonstrations, and quick starts.

 

 

 

Matteo Figus is an AWS Solution Engineer based in the UK. Matteo works with the AWS Solution Architects to create standardized tools, code samples, demonstrations and quickstarts. He is passionate about open-source software and in his spare time he likes to cook and play the piano.

 

 

 

Nick Lee is an AWS Solution Engineer based in the UK. Nick works with the AWS Solution Architects to create standardized tools, code samples, demonstrations and quickstarts. In his spare time he enjoys playing football and squash, and binge-watching TV shows.

 

 

 

Adir Sharabi is a Solutions Architect with Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud. He is also passionate about Data and helping customers to get the most out of it.

 

 

 

Cristina Fuia is a Specialist Solutions Architect for Analytics at AWS. She works with customers across EMEA helping them to solve complex problems, design and build data architectures so that they can get business value from analyzing their data.

 

AWS serverless data analytics pipeline reference architecture

Post Syndicated from Praful Kava original https://aws.amazon.com/blogs/big-data/aws-serverless-data-analytics-pipeline-reference-architecture/

Onboarding new data or building new analytics pipelines in traditional analytics architectures typically requires extensive coordination across business, data engineering, and data science and analytics teams to first negotiate requirements, schema, infrastructure capacity needs, and workload management.

For a large number of use cases today however, business users, data scientists, and analysts are demanding easy, frictionless, self-service options to build end-to-end data pipelines because it’s hard and inefficient to predefine constantly changing schemas and spend time negotiating capacity slots on shared infrastructure. The exploratory nature of machine learning (ML) and many analytics tasks means you need to rapidly ingest new datasets and clean, normalize, and feature engineer them without worrying about operational overhead when you have to think about the infrastructure that runs data pipelines.

A serverless data lake architecture enables agile and self-service data onboarding and analytics for all data consumer roles across a company. By using AWS serverless technologies as building blocks, you can rapidly and interactively build data lakes and data processing pipelines to ingest, store, transform, and analyze petabytes of structured and unstructured data from batch and streaming sources, all without needing to manage any storage or compute infrastructure.

In this post, we first discuss a layered, component-oriented logical architecture of modern analytics platforms and then present a reference architecture for building a serverless data platform that includes a data lake, data processing pipelines, and a consumption layer that enables several ways to analyze the data in the data lake without moving it (including business intelligence (BI) dashboarding, exploratory interactive SQL, big data processing, predictive analytics, and ML).

Logical architecture of modern data lake centric analytics platforms

The following diagram illustrates the architecture of a data lake centric analytics platform.

You can envision a data lake centric analytics architecture as a stack of six logical layers, where each layer is composed of multiple components. A layered, component-oriented architecture promotes separation of concerns, decoupling of tasks, and flexibility. These in turn provide the agility needed to quickly integrate new data sources, support new analytics methods, and add tools required to keep up with the accelerating pace of changes in the analytics landscape. In the following sections, we look at the key responsibilities, capabilities, and integrations of each logical layer.

Ingestion layer

The ingestion layer is responsible for bringing data into the data lake. It provides the ability to connect to internal and external data sources over a variety of protocols. It can ingest batch and streaming data into the storage layer. The ingestion layer is also responsible for delivering ingested data to a diverse set of targets in the data storage layer (including the object store, databases, and warehouses).

Storage layer

The storage layer is responsible for providing durable, scalable, secure, and cost-effective components to store vast quantities of data. It supports storing unstructured data and datasets of a variety of structures and formats. It supports storing source data as-is without first needing to structure it to conform to a target schema or format. Components from all other layers provide easy and native integration with the storage layer. To store data based on its consumption readiness for different personas across organization, the storage layer is organized into the following zones:

  • Landing zone – The storage area where components from the ingestion layer land data. This is a transient area where data is ingested from sources as-is. Typically, data engineering personas interact with the data stored in this zone.
  • Raw zone – After the preliminary quality checks, the data from the landing zone is moved to the raw zone for permanent storage. Here, data is stored in its original format. Having all data from all sources permanently stored in the raw zone provides the ability to “replay” downstream data processing in case of errors or data loss in downstream storage zones. Typically, data engineering and data science personas interact with the data stored in this zone.
  • Curated zone – This zone hosts data that is in the most consumption-ready state and conforms to organizational standards and data models. Datasets in the curated zone are typically partitioned, cataloged, and stored in formats that support performant and cost-effective access by the consumption layer. The processing layer creates datasets in the curated zone after cleaning, normalizing, standardizing, and enriching data from the raw zone. All personas across organizations use the data stored in this zone to drive business decisions.

Cataloging and search layer

The cataloging and search layer is responsible for storing business and technical metadata about datasets hosted in the storage layer. It provides the ability to track schema and the granular partitioning of dataset information in the lake. It also supports mechanisms to track versions to keep track of changes to the metadata. As the number of datasets in the data lake grows, this layer makes datasets in the data lake discoverable by providing search capabilities.

Processing layer

The processing layer is responsible for transforming data into a consumable state through data validation, cleanup, normalization, transformation, and enrichment. It’s responsible for advancing the consumption readiness of datasets along the landing, raw, and curated zones and registering metadata for the raw and transformed data into the cataloging layer. The processing layer is composed of purpose-built data-processing components to match the right dataset characteristic and processing task at hand. The processing layer can handle large data volumes and support schema-on-read, partitioned data, and diverse data formats. The processing layer also provides the ability to build and orchestrate multi-step data processing pipelines that use purpose-built components for each step.

Consumption layer

The consumption layer is responsible for providing scalable and performant tools to gain insights from the vast amount of data in the data lake. It democratizes analytics across all personas across the organization through several purpose-built analytics tools that support analysis methods, including SQL, batch analytics, BI dashboards, reporting, and ML. The consumption layer natively integrates with the data lake’s storage, cataloging, and security layers. Components in the consumption layer support schema-on-read, a variety of data structures and formats, and use data partitioning for cost and performance optimization.

Security and governance layer

The security and governance layer is responsible for protecting the data in the storage layer and processing resources in all other layers. It provides mechanisms for access control, encryption, network protection, usage monitoring, and auditing. The security layer also monitors activities of all components in other layers and generates a detailed audit trail. Components of all other layers provide native integration with the security and governance layer.

Serverless data lake centric analytics architecture

To compose the layers described in our logical architecture, we introduce a reference architecture that uses AWS serverless and managed services. In this approach, AWS services take over the heavy lifting of the following:

  • Providing and managing scalable, resilient, secure, and cost-effective infrastructural components
  • Ensuring infrastructural components natively integrate with each other

This reference architecture allows you to focus more time on rapidly building data and analytics pipelines. It significantly accelerates new data onboarding and driving insights from your data. The AWS serverless and managed components enable self-service across all data consumer roles by providing the following key benefits:

  • Easy configuration-driven use
  • Freedom from infrastructure management
  • Pay-per-use pricing model

The following diagram illustrates this architecture.

Ingestion layer

The ingestion layer in our serverless architecture is composed of a set of purpose-built AWS services to enable data ingestion from a variety of sources. Each of these services enables simple self-service data ingestion into the data lake landing zone and provides integration with other AWS services in the storage and security layers. Individual purpose-built AWS services match the unique connectivity, data format, data structure, and data velocity requirements of operational database sources, streaming data sources, and file sources.

Operational database sources

Typically, organizations store their operational data in various relational and NoSQL databases. AWS Data Migration Service (AWS DMS) can connect to a variety of operational RDBMS and NoSQL databases and ingest their data into Amazon Simple Storage Service (Amazon S3) buckets in the data lake landing zone. With AWS DMS, you can first perform a one-time import of the source data into the data lake and replicate ongoing changes happening in the source database. AWS DMS encrypts S3 objects using AWS Key Management Service (AWS KMS) keys as it stores them in the data lake. AWS DMS is a fully managed, resilient service and provides a wide choice of instance sizes to host database replication tasks.

AWS Lake Formation provides a scalable, serverless alternative, called blueprints, to ingest data from AWS native or on-premises database sources into the landing zone in the data lake. A Lake Formation blueprint is a predefined template that generates a data ingestion AWS Glue workflow based on input parameters such as source database, target Amazon S3 location, target dataset format, target dataset partitioning columns, and schedule. A blueprint-generated AWS Glue workflow implements an optimized and parallelized data ingestion pipeline consisting of crawlers, multiple parallel jobs, and triggers connecting them based on conditions. For more information, see Integrating AWS Lake Formation with Amazon RDS for SQL Server.

Streaming data sources

The ingestion layer uses Amazon Kinesis Data Firehose to receive streaming data from internal and external sources. With a few clicks, you can configure a Kinesis Data Firehose API endpoint where sources can send streaming data such as clickstreams, application and infrastructure logs and monitoring metrics, and IoT data such as devices telemetry and sensor readings. Kinesis Data Firehose does the following:

  • Buffers incoming streams
  • Batches, compresses, transforms, and encrypts the streams
  • Stores the streams as S3 objects in the landing zone in the data lake

Kinesis Data Firehose natively integrates with the security and storage layers and can deliver data to Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service (Amazon ES) for real-time analytics use cases. Kinesis Data Firehose is serverless, requires no administration, and has a cost model where you pay only for the volume of data you transmit and process through the service. Kinesis Data Firehose automatically scales to adjust to the volume and throughput of incoming data.

File sources

Many applications store structured and unstructured data in files that are hosted on Network Attached Storage (NAS) arrays. Organizations also receive data files from partners and third-party vendors. Analyzing data from these file sources can provide valuable business insights.

Internal file shares

AWS DataSync can ingest hundreds of terabytes and millions of files from NFS and SMB enabled NAS devices into the data lake landing zone. DataSync automatically handles scripting of copy jobs, scheduling and monitoring transfers, validating data integrity, and optimizing network utilization. DataSync can perform one-time file transfers and monitor and sync changed files into the data lake. DataSync is fully managed and can be set up in minutes.

Partner data files

FTP is most common method for exchanging data files with partners. The AWS Transfer Family is a serverless, highly available, and scalable service that supports secure FTP endpoints and natively integrates with Amazon S3. Partners and vendors transmit files using SFTP protocol, and the AWS Transfer Family stores them as S3 objects in the landing zone in the data lake. The AWS Transfer Family supports encryption using AWS KMS and common authentication methods including AWS Identity and Access Management (IAM) and Active Directory.

Data APIs

Organizations today use SaaS and partner applications such as Salesforce, Marketo, and Google Analytics to support their business operations. Analyzing SaaS and partner data in combination with internal operational application data is critical to gaining 360-degree business insights. Partner and SaaS applications often provide API endpoints to share data.

SaaS APIs

The ingestion layer uses AWS AppFlow to easily ingest SaaS applications data into the data lake. With a few clicks, you can set up serverless data ingestion flows in AppFlow. Your flows can connect to SaaS applications (such as SalesForce, Marketo, and Google Analytics), ingest data, and store it in the data lake. You can schedule AppFlow data ingestion flows or trigger them by events in the SaaS application. Ingested data can be validated, filtered, mapped and masked before storing in the data lake. AppFlow natively integrates with authentication, authorization, and encryption services in the security and governance layer.

Partner APIs

To ingest data from partner and third-party APIs, organizations build or purchase custom applications that connect to APIs, fetch data, and create S3 objects in the landing zone by using AWS SDKs. These applications and their dependencies can be packaged into Docker containers and hosted on AWS Fargate. Fargate is a serverless compute engine for hosting Docker containers without having to provision, manage, and scale servers. Fargate natively integrates with AWS security and monitoring services to provide encryption, authorization, network isolation, logging, and monitoring to the application containers.

AWS Glue Python shell jobs also provide serverless alternative to build and schedule data ingestion jobs that can interact with partner APIs by using native, open-source, or partner-provided Python libraries. AWS Glue provides out-of-the-box capabilities to schedule singular Python shell jobs or include them as part of a more complex data ingestion workflow built on AWS Glue workflows.

Third-party data sources

Your organization can gain a business edge by combining your internal data with third-party datasets such as historical demographics, weather data, and consumer behavior data. AWS Data Exchange provides a serverless way to find, subscribe to, and ingest third-party data directly into S3 buckets in the data lake landing zone. You can ingest a full third-party dataset and then automate detecting and ingesting revisions to that dataset. AWS Data Exchange is serverless and lets you find and ingest third-party datasets with a few clicks.

Storage layer

Amazon S3 provides the foundation for the storage layer in our architecture. Amazon S3 provides virtually unlimited scalability at low cost for our serverless data lake. Data is stored as S3 objects organized into landing, raw, and curated zone buckets and prefixes. Amazon S3 encrypts data using keys managed in AWS KMS. IAM policies control granular zone-level and dataset-level access to various users and roles. Amazon S3 provides 99.99 % of availability and 99.999999999 % of durability, and charges only for the data it stores. To significantly reduce costs, Amazon S3 provides colder tier storage options called Amazon S3 Glacier and S3 Glacier Deep Archive. To automate cost optimizations, Amazon S3 provides configurable lifecycle policies and intelligent tiering options to automate moving older data to colder tiers. AWS services in our ingestion, cataloging, processing, and consumption layers can natively read and write S3 objects. Additionally, hundreds of third-party vendor and open-source products and services provide the ability to read and write S3 objects.

Data of any structure (including unstructured data) and any format can be stored as S3 objects without needing to predefine any schema. This enables services in the ingestion layer to quickly land a variety of source data into the data lake in its original source format. After the data is ingested into the data lake, components in the processing layer can define schema on top of S3 datasets and register them in the cataloging layer. Services in the processing and consumption layers can then use schema-on-read to apply the required structure to data read from S3 objects. Datasets stored in Amazon S3 are often partitioned to enable efficient filtering by services in the processing and consumption layers.

Cataloging and search layer

A data lake typically hosts a large number of datasets, and many of these datasets have evolving schema and new data partitions. A central Data Catalog that manages metadata for all the datasets in the data lake is crucial to enabling self-service discovery of data in the data lake. Additionally, separating metadata from data into a central schema enables schema-on-read for the processing and consumption layer components.

In our architecture, Lake Formation provides the central catalog to store and manage metadata for all datasets hosted in the data lake. Organizations manage both technical metadata (such as versioned table schemas, partitioning information, physical data location, and update timestamps) and business attributes (such as data owner, data steward, column business definition, and column information sensitivity) of all their datasets in Lake Formation. Services such as AWS Glue, Amazon EMR, and Amazon Athena natively integrate with Lake Formation and automate discovering and registering dataset metadata into the Lake Formation catalog. Additionally, Lake Formation provides APIs to enable metadata registration and management using custom scripts and third-party products. AWS Glue crawlers in the processing layer can track evolving schemas and newly added partitions of datasets in the data lake, and add new versions of corresponding metadata in the Lake Formation catalog.

Lake Formation provides the data lake administrator a central place to set up granular table- and column-level permissions for databases and tables hosted in the data lake. After Lake Formation permissions are set up, users and groups can access only authorized tables and columns using multiple processing and consumption layer services such as Athena, Amazon EMR, AWS Glue, and Amazon Redshift Spectrum.

Processing layer

The processing layer in our architecture is composed of two types of components:

  • Components used to create multi-step data processing pipelines
  • Components to orchestrate data processing pipelines on schedule or in response to event triggers (such as ingestion of new data into the landing zone)

AWS Glue and AWS Step Functions provide serverless components to build, orchestrate, and run pipelines that can easily scale to process large data volumes. Multi-step workflows built using AWS Glue and Step Functions can catalog, validate, clean, transform, and enrich individual datasets and advance them from landing to raw and raw to curated zones in the storage layer.

AWS Glue is a serverless, pay-per-use ETL service for building and running Python or Spark jobs (written in Scala or Python) without requiring you to deploy or manage clusters. AWS Glue automatically generates the code to accelerate your data transformations and loading processes. AWS Glue ETL builds on top of Apache Spark and provides commonly used out-of-the-box data source connectors, data structures, and ETL transformations to validate, clean, transform, and flatten data stored in many open-source formats such as CSV, JSON, Parquet, and Avro. AWS Glue ETL also provides capabilities to incrementally process partitioned data.

Additionally, you can use AWS Glue to define and run crawlers that can crawl folders in the data lake, discover datasets and their partitions, infer schema, and define tables in the Lake Formation catalog. AWS Glue provides more than a dozen built-in classifiers that can parse a variety of data structures stored in open-source formats. AWS Glue also provides triggers and workflow capabilities that you can use to build multi-step end-to-end data processing pipelines that include job dependencies and running parallel steps. You can schedule AWS Glue jobs and workflows or run them on demand. AWS Glue natively integrates with AWS services in storage, catalog, and security layers.

Step Functions is a serverless engine that you can use to build and orchestrate scheduled or event-driven data processing workflows. You use Step Functions to build complex data processing pipelines that involve orchestrating steps implemented by using multiple AWS services such as AWS Glue, AWS Lambda, Amazon Elastic Container Service (Amazon ECS) containers, and more. Step Functions provides visual representations of complex workflows and their running state to make them easy to understand. It manages state, checkpoints, and restarts of the workflow for you to make sure that the steps in your data pipeline run in order and as expected. Built-in try/catch, retry, and rollback capabilities deal with errors and exceptions automatically.

Consumption layer

The consumption layer in our architecture is composed using fully managed, purpose-built, analytics services that enable interactive SQL, BI dashboarding, batch processing, and ML.

Interactive SQL

Athena is an interactive query service that enables you to run complex ANSI SQL against terabytes of data stored in Amazon S3 without needing to first load it into a database. Athena queries can analyze structured, semi-structured, and columnar data stored in open-source formats such as CSV, JSON, XML Avro, Parquet, and ORC. Athena uses table definitions from Lake Formation to apply schema-on-read to data read from Amazon S3.

Athena is serverless, so there is no infrastructure to set up or manage, and you pay only for the amount of data scanned by the queries you run. Athena provides faster results and lower costs by reducing the amount of data it scans by using dataset partitioning information stored in the Lake Formation catalog. You can run queries directly on the Athena console of submit them using Athena JDBC or ODBC endpoints.

Athena natively integrates with AWS services in the security and monitoring layer to support authentication, authorization, encryption, logging, and monitoring. It supports table- and column-level access controls defined in the Lake Formation catalog.

Data warehousing and batch analytics

Amazon Redshift is a fully managed data warehouse service that can host and process petabytes of data and run thousands highly performant queries in parallel. Amazon Redshift uses a cluster of compute nodes to run very low-latency queries to power interactive dashboards and high-throughput batch analytics to drive business decisions. You can run Amazon Redshift queries directly on the Amazon Redshift console or submit them using the JDBC/ODBC endpoints provided by Amazon Redshift.

Amazon Redshift provides the capability, called Amazon Redshift Spectrum, to perform in-place queries on structured and semi-structured datasets in Amazon S3 without needing to load it into the cluster. Amazon Redshift Spectrum can spin up thousands of query-specific temporary nodes to scan exabytes of data to deliver fast results. Organizations typically load most frequently accessed dimension and fact data into an Amazon Redshift cluster and keep up to exabytes of structured, semi-structured, and unstructured historical data in Amazon S3. Amazon Redshift Spectrum enables running complex queries that combine data in a cluster with data on Amazon S3 in the same query.

Amazon Redshift provides native integration with Amazon S3 in the storage layer, Lake Formation catalog, and AWS services in the security and monitoring layer.

Business intelligence

Amazon QuickSight provides a serverless BI capability to easily create and publish rich, interactive dashboards. QuickSight enriches dashboards and visuals with out-of-the-box, automatically generated ML insights such as forecasting, anomaly detection, and narrative highlights. QuickSight natively integrates with Amazon SageMaker to enable additional custom ML model-based insights to your BI dashboards. You can access QuickSight dashboards from any device using a QuickSight app, or you can embed the dashboard into web applications, portals, and websites.

QuickSight allows you to directly connect to and import data from a wide variety of cloud and on-premises data sources. These include SaaS applications such as Salesforce, Square, ServiceNow, Twitter, GitHub, and JIRA; third-party databases such as Teradata, MySQL, Postgres, and SQL Server; native AWS services such as Amazon Redshift, Athena, Amazon S3, Amazon Relational Database Service (Amazon RDS), and Amazon Aurora; and private VPC subnets. You can also upload a variety of file types including XLS, CSV, JSON, and Presto.

To achieve blazing fast performance for dashboards, QuickSight provides an in-memory caching and calculation engine called SPICE. SPICE automatically replicates data for high availability and enables thousands of users to simultaneously perform fast, interactive analysis while shielding your underlying data infrastructure. QuickSight automatically scales to tens of thousands of users and provides a cost-effective, pay-per-session pricing model.

QuickSight allows you to securely manage your users and content via a comprehensive set of security features, including role-based access control, active directory integration, AWS CloudTrail auditing, single sign-on (IAM or third-party), private VPC subnets, and data backup.

Predictive analytics and ML

Amazon SageMaker is a fully managed service that provides components to build, train, and deploy ML models using an interactive development environment (IDE) called Amazon SageMaker Studio. In Amazon SageMaker Studio, you can upload data, create new notebooks, train and tune models, move back and forth between steps to adjust experiments, compare results, and deploy models to production, all in one place by using a unified visual interface. Amazon SageMaker also provides managed Jupyter notebooks that you can spin up with just a few clicks. Amazon SageMaker notebooks provide elastic compute resources, git integration, easy sharing, pre-configured ML algorithms, dozens of out-of-the-box ML examples, and AWS Marketplace integration, which enables easy deployment of hundreds of pre-trained algorithms. Amazon SageMaker notebooks are preconfigured with all major deep learning frameworks, including TensorFlow, PyTorch, Apache MXNet, Chainer, Keras, Gluon, Horovod, Scikit-learn, and Deep Graph Library.

ML models are trained on Amazon SageMaker managed compute instances, including highly cost-effective Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances. You can organize multiple training jobs by using Amazon SageMaker Experiments. You can build training jobs using Amazon SageMaker built-in algorithms, your custom algorithms, or hundreds of algorithms you can deploy from AWS Marketplace. Amazon SageMaker Debugger provides full visibility into model training jobs. Amazon SageMaker also provides automatic hyperparameter tuning for ML training jobs.

You can deploy Amazon SageMaker trained models into production with a few clicks and easily scale them across a fleet of fully managed EC2 instances. You can choose from multiple EC2 instance types and attach cost-effective GPU-powered inference acceleration. After the models are deployed, Amazon SageMaker can monitor key model metrics for inference accuracy and detect any concept drift.

Amazon SageMaker provides native integrations with AWS services in the storage and security layers.

Security and governance layer

Components across all layers of our architecture protect data, identities, and processing resources by natively using the following capabilities provided by the security and governance layer.

Authentication and authorization

IAM provides user-, group-, and role-level identity to users and the ability to configure fine-grained access control for resources managed by AWS services in all layers of our architecture. IAM supports multi-factor authentication and single sign-on through integrations with corporate directories and open identity providers such as Google, Facebook, and Amazon.

Lake Formation provides a simple and centralized authorization model for tables hosted in the data lake. After implemented in Lake Formation, authorization policies for databases and tables are enforced by other AWS services such as Athena, Amazon EMR, QuickSight, and Amazon Redshift Spectrum. In Lake Formation, you can grant or revoke database-, table-, or column-level access for IAM users, groups, or roles defined in the same account hosting the Lake Formation catalog or another AWS account. The simple grant/revoke-based authorization model of Lake Formation considerably simplifies the previous IAM-based authorization model that relied on separately securing S3 data objects and metadata objects in the AWS Glue Data Catalog.

Encryption

AWS KMS provides the capability to create and manage symmetric and asymmetric customer-managed encryption keys. AWS services in all layers of our architecture natively integrate with AWS KMS to encrypt data in the data lake. It supports both creating new keys and importing existing customer keys. Access to the encryption keys is controlled using IAM and is monitored through detailed audit trails in CloudTrail.

Network protection

Our architecture uses Amazon Virtual Private Cloud (Amazon VPC) to provision a logically isolated section of the AWS Cloud (called VPC) that is isolated from the internet and other AWS customers. AWS VPC provides the ability to choose your own IP address range, create subnets, and configure route tables and network gateways. AWS services from other layers in our architecture launch resources in this private VPC to protect all traffic to and from these resources.

Monitoring and logging

AWS services in all layers of our architecture store detailed logs and monitoring metrics in AWS CloudWatch. CloudWatch provides the ability to analyze logs, visualize monitored metrics, define monitoring thresholds, and send alerts when thresholds are crossed.

All AWS services in our architecture also store extensive audit trails of user and service actions in CloudTrail. CloudTrail provides event history of your AWS account activity, including actions taken through the AWS Management Console, AWS SDKs, command line tools, and other AWS services. This event history simplifies security analysis, resource change tracking, and troubleshooting. In addition, you can use CloudTrail to detect unusual activity in your AWS accounts. These capabilities help simplify operational analysis and troubleshooting.

Additional considerations

In this post, we talked about ingesting data from diverse sources and storing it as S3 objects in the data lake and then using AWS Glue to process ingested datasets until they’re in a consumable state. This architecture enables use cases needing source-to-consumption latency of a few minutes to hours. In a future post, we will evolve our serverless analytics architecture to add a speed layer to enable use cases that require source-to-consumption latency in seconds, all while aligning with the layered logical architecture we introduced.

Conclusion

With AWS serverless and managed services, you can build a modern, low-cost data lake centric analytics architecture in days. A decoupled, component-driven architecture allows you to start small and quickly add new purpose-built components to one of six architecture layers to address new requirements and data sources.

We invite you to read the following posts that contain detailed walkthroughs and sample code for building the components of the serverless data lake centric analytics architecture:


About the Authors

Praful Kava is a Sr. Specialist Solutions Architect at AWS. He guides customers to design and engineer Cloud scale Analytics pipelines on AWS. Outside work, he enjoys travelling with his family and exploring new hiking trails.

 

 

 

Changbin Gong is a Senior Solutions Architect at Amazon Web Services (AWS). He engages with customers to create innovative solutions that address customer business problems and accelerate the adoption of AWS services. In his spare time, Changbin enjoys reading, running, and traveling.

Big data processing in a data warehouse environment using AWS Glue 2.0 and PySpark

Post Syndicated from Kaushik Krishnamurthi original https://aws.amazon.com/blogs/big-data/big-data-processing-in-a-data-warehouse-environment-using-aws-glue-2-0-and-pyspark/

The AWS Marketing Data Science and Engineering team enables AWS Marketing to measure the effectiveness and impact of various marketing initiatives and campaigns. This is done through a data platform and infrastructure strategy that consists of maintaining data warehouse, data lake, and data transformation (ETL) pipelines, and designing software tools and services to run related operations. While providing various business intelligence (BI) and machine learning (ML) solutions for marketers, there is particular focus on the timely delivery of error-free, reliable, self-served, reusable, and scalable ways to measure and report business metrics. In this post, we discuss one such example of improving operational efficiency and how we optimized our ETL process using AWS Glue 2.0 and PySpark SQL to achieve huge parallelism and reduce the runtime significantly—under 45 minutes—to deliver data to business much sooner.

Solution overview

Our team maintained an ETL pipeline to process the entire history of a dataset. We did this by running a SQL query repeatedly in Amazon Redshift, incrementally processing 2 months at a time to account for several years of historical data, with several hundreds of billions of rows in total. The input to this query is detailed service billing metrics across various AWS products, and the output is aggregated and summarized usage data. We wanted to move this heavy ETL process outside of our data warehouse environment, so that business users and our other relatively smaller ETL processes can use the Amazon Redshift resources fully for complex analytical queries.

Over the years, raw data feeds were captured in Amazon Redshift into separate tables, with 2 months of data in each. We first UNLOAD these to Amazon Simple Storage Service (Amazon S3) as Parquet formatted files and create AWS Glue tables on top of them by running CREATE TABLE DDLs in Amazon Athena as a one-time exercise. The source data is now available to be used as a DataFrame or DynamicFrame in an AWS Glue script.

Our query is dependent on a few more dimension tables that we UNLOAD again but in an automated fashion daily because we need the most recent version of these tables.

Next, we convert Amazon Redshift SQL queries to equivalent PySpark SQL. The data generated from the query output is written back to Amazon Redshift using AWS Glue DynamicFrame and DataSink. For more information, see Moving Data to and from Amazon Redshift.

We perform development and testing using Amazon SageMaker notebooks attached to an AWS Glue development endpoint.

After completing the tests, the script is deployed as a Spark application on the serverless Spark platform of AWS Glue. We do this by creating a job in AWS Glue and attaching our ETL script. We use the recently announced version AWS Glue 2.0.

The job can now be triggered via the AWS Command Line Interface (AWS CLI) using any workflow management or job scheduling tool. We use an internal distributed job scheduling tool to run the AWS Glue job periodically.

Design choices

We made a few design choices based on a few different factors. Firstly, we used the same Amazon Redshift SQL queries with minimal changes by relying on Spark SQL, due to Spark SQL’s language syntax being very similar to traditional ANSI-SQL.

We also used several techniques to optimize our Spark script for better memory management and speed. For example, we used broadcast joins for smaller tables involved in joins. See the following code:

-- Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
-- https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#join-hints

AWS Glue DynamicFrame allowed us to create an AWS Glue DataSink pointed to our Amazon Redshift destination and write the output of our Spark SQL directly to Amazon Redshift without having to export to Amazon S3 first, which requires an additional ETL to copy from Amazon S3 to Amazon Redshift. See the following code:

# Convert Spark DataFrame to Glue DynamicFrame:
myDyF = DynamicFrame.fromDF(myDF, glueContext, "dynamic_df")

# Connecting to destination Redshift database:
connection_options = {
    "dbtable": "example.redshift_destination",
    "database": "aws_marketing_redshift_db",
    "preactions": "delete from example.redshift_destination where date between '"+start_dt+"' AND '"+end_dt+"';",
    "postactions": "insert into example.job_status select 'example' as schema_name, 'redshift_destination' as table_name, to_date("+run_dt[:8]+",'YYYYMMDD') as run_date;",
}

# Glue DataSink:
datasink = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame=myDyF,
    catalog_connection="aws_marketing_redshift_db_read_write",
    connection_options=connection_options,
    redshift_tmp_dir=args["TempDir"],
    transformation_ctx="datasink"
)

We also considered horizontal scaling vs. vertical scaling. Based on the results observed during our tests for performance tuning, we chose to go with 75 as the number of workers and G.2X as the worker type. This translates to 150 data processing units (DPU) in AWS Glue. With G.2X, each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB of disk) and provides one executor per worker. The performance was nearly twice as fast when compared to G.1X for our dataset’s partitioning scheme, SQL aggregate functions, filters, and more put together. Each G.2X worker maps to 2 DPUs and runs twice the number of concurrent tasks compared to G.1X. This worker type is recommended for memory-intensive jobs and jobs that run intensive transforms. For more information, see the section Understanding AWS Glue Worker types in Best practices to scale Apache Spark jobs and partition data with AWS Glue.

We tested various choices of worker types between Standard, G.1X, and G.2X while also tweaking the number of workers. The job run time reduced proportionally as we added more G.2X instances.

Before AWS Glue 2.0, earlier versions involved AWS Glue jobs spending several minutes for the cluster to become available. We observed an approximate average startup time of 8–10 minutes for our AWS Glue job with 75 or more workers. With AWS Glue 2.0, you can see much faster startup times. We noticed startup times of less than 1 minute on average in almost all our AWS Glue 2.0 jobs, and the ETL workload began within 1 minute from when the job run request was made. For more information, see Running Spark ETL Jobs with Reduced Startup Times.

Although cost is a factor to consider while running a large ETL, you’re billed only for the duration of the AWS Glue job. For Spark jobs with AWS Glue 2.0, you’re billed in 1-second increments (with a 1-minute minimum). For more information, see AWS Glue Pricing.

Additional design considerations

During implementation, we also considered additional optimizations and alternatives in case we ran into issues. For example, if you want to allocate more resources to the write operations into Amazon Redshift, you can modify the workload management (WLM) configuration in Amazon Redshift accordingly so sufficient compute power from Amazon Redshift is available for the AWS Glue jobs to write data into Amazon Redshift.

To complement our ETL process, we can also perform an elastic resize of the Amazon Redshift cluster to a larger size, making it more powerful in a matter of minutes and allowing more parallelism, which helps improve the speed of our ETL load operations.

To submit an elastic resize of an Amazon Redshift cluster using Bash, see the following code:

cmd=$(aws redshift --region 'us-east-1' resize-cluster --cluster-identifier ${REDSHIFT_CLUSTER_NAME} --number-of-nodes ${NUMBER_OF_NODES} --no-classic)

To monitor the elastic resize of an Amazon Redshift cluster using Bash, see the following code:

cluster_status=$(aws redshift --region 'us-east-1' describe-clusters --cluster-identifier ${REDSHIFT_CLUSTER_NAME} | jq -r ".Clusters[0].ClusterStatus")
cluster_availability_status=$(aws redshift --region 'us-east-1' describe-clusters --cluster-identifier ${REDSHIFT_CLUSTER_NAME} | jq -r ".Clusters[0].ClusterAvailabilityStatus")

while [ "$cluster_status" != "available" ] || [ "$cluster_availability_status" != "Available" ]
do
	echo "$cluster_status" | ts
	echo "$cluster_availability_status" | ts
	echo "Waiting for Redshift resize cluster to complete..."
	sleep 60
	cluster_status=$(aws redshift --region 'us-east-1' describe-clusters --cluster-identifier ${REDSHIFT_CLUSTER_NAME} | jq -r ".Clusters[0].ClusterStatus")
	cluster_availability_status=$(aws redshift --region 'us-east-1' describe-clusters --cluster-identifier ${REDSHIFT_CLUSTER_NAME} | jq -r ".Clusters[0].ClusterAvailabilityStatus")
done

echo "$cluster_status" | ts
echo "$cluster_availability_status" | ts
echo "Done"

ETL overview

To submit AWS Glue jobs using Python, we use the following code:

jobs = []
# 'glue' is an authenticated boto3 client object
jobs.append((glue_job_name, glue.start_job_run(
    JobName=glue_job_name,
    Arguments=arguments
)))

For our use case, we have multiple jobs. Each job can have multiple job runs, and each job run can have multiple retries. To monitor jobs, we use the following pseudo code:

while overall batch is still in progress:
      loop over all job runs of all submitted jobs:
            if job run is still in progress:
                  print job run status
                  wait
            else if job run has completed:
                  print success
            else job run has failed:
                  wait for retry to begin
                  loop over up to 10 retries of this job run:
                        if retry is still in progress:
                              print retry status     
                              wait
                              break
                        else if retry has completed:
                              print success
                              break
                        else retry has failed:
                              wait for next retry to begin
                        if this is the 10th i.e. final retry that failed:
                              print failure
                              loop over all job runs of all submitted jobs:
                                    loop over all retries of job run:
                                          build all job runs list
                                    kill all job runs list
                              wait for kill job runs to complete
                              send failure signal back to caller
      update overall batch status

The Python code is as follows:

job_run_status_overall = 'STARTING'
while job_run_status_overall in ['STARTING', 'RUNNING', 'STOPPING']:
    print("")
    job_run_status_temp = 'SUCCEEDED'
    for job, response in jobs:
        glue_job_name = job
        job_run_id = response['JobRunId']
        job_run_response = glue.get_job_run(JobName=glue_job_name, RunId=job_run_id)
        job_run_status = job_run_response['JobRun']['JobRunState']
        if job_run_status in ['STARTING', 'RUNNING', 'STOPPING']:
            job_run_status_temp = job_run_status
            logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
            time.sleep(120)
        elif job_run_status == 'SUCCEEDED':
            logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
        else:
            time.sleep(30)
            for i in range(1, 11):
                try:
                    job_run_id_temp = job_run_id+'_attempt_'+str(i)
                    # print("Checking for " + job_run_id_temp)
                    job_run_response = glue.get_job_run(JobName=glue_job_name, RunId=job_run_id_temp)
                    # print("Found " + job_run_id_temp)
                    job_run_id = job_run_id_temp
                    job_run_status = job_run_response['JobRun']['JobRunState']
                    if job_run_status in ['STARTING', 'RUNNING', 'STOPPING']:
                        job_run_status_temp = job_run_status
                        logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
                        time.sleep(30)
                        break
                    elif job_run_status == 'SUCCEEDED':
                        logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
                        break
                    else:
                        logger.info("Glue job ({}) with run id {} has status : {}".format(glue_job_name, job_run_id, job_run_status))
                        time.sleep(30)
                except Exception as e:
                    pass
                if i == 10:
                    logger.info("All attempts failed: Glue job ({}) with run id {} and status: {}".format(glue_job_name, job_run_id, job_run_status))
                    logger.info("Cleaning up: Stopping all jobs and job runs...")
                    for job_to_stop, response_to_stop in jobs:
                        glue_job_name_to_stop = job_to_stop
                        job_run_id_to_stop = response_to_stop['JobRunId']
                        job_run_id_to_stop_temp = []
                        for j in range(0, 11):
                            job_run_id_to_stop_temp.append(job_run_id_to_stop if j == 0 else job_run_id_to_stop+'_attempt_'+str(j))
                        job_to_stop_response = glue.batch_stop_job_run(JobName=glue_job_name_to_stop, JobRunIds=job_run_id_to_stop_temp)
                    time.sleep(30)
                    raise ValueError("Glue job ({}) with run id {} and status: {}".format(glue_job_name, job_run_id, job_run_status))
    job_run_status_overall = job_run_status_temp

Our set of source data feeds consists of multiple historical AWS Glue tables, with 2 months’ data in each, spanning across the past few years and a year into the future:

  • Tables for year 2016: table_20160101, table_20160301, table_20160501, …, table_20161101. (6 tables)
  • Tables for year 2017: table_20170101, table_20170301, table_20170501, …, table_20171101. (6 tables)
  • Tables for year 2018: table_20180101, table_20180301, table_20180501, …, table_20181101. (6 tables)
  • Tables for year 2019: table_20190101, table_20190301, table_20190501, …, table_20191101. (6 tables)
  • Tables for year 2020: table_20200101, table_20200301, table_20200501, …, table_20201101. (6 tables)
  • Tables for year 2021: table_20210101, table_20210301, table_20210501, …, table_20211101. (6 tables)

The tables add up to 36 tables (therefore, 36 SQL queries) with about 800 billion rows to process (excluding the later months of year 2020 and year 2021, the tables for which are placeholders and empty at the time of writing).

Due to the high volume of data, we want to trigger our AWS Glue job multiple times: one job run request for each table, all at once to achieve parallelism (as opposed to sequential, stacked, or staggered-over-time job runs), resulting in 36 total job runs needed to process 6 years of data. In AWS Glue, we created 12 identical jobs with three maximum concurrent runs of each, thereby allowing the 12 * 3 = 36 job run requests that we needed. However, we encountered a few bottlenecks and limitations, which we addressed by the workarounds we discuss in the following section.

Limitations and workarounds

We needed to increase the limit on how many IP addresses we can have within one VPC. To do this, we made sure the VPC’s CIDR was configured to allow as many IP addresses as needed to launch the over 2,000 workers expected when running all the AWS Glue jobs. The following table shows an example configuration.

IPv4 CIDR Available IPv4
10.0.0.0/20 4091

For better availability and parallelism, we spread our jobs across multiple AWS Glue connections by doing the following:

  • Splitting our VPC into multiple subnets, with each subnet in a different Availability Zone
  • Creating one AWS Glue connection for each subnet (one each of us-east-1a, 1c, and 1d) so our request for over 2,000 worker nodes wasn’t made within one Availability Zone

This VPC splitting approach makes sure the job requests are evenly distributed across the three Availability Zones we chose. The following table shows an example configuration.

Subnet VPC IPv4 CIDR Available IPv4 Availability Zone
my-subnet-glue-etl-us-east-1c my-vpc 10.0.0.0/20 4091 us-east-1c
my-subnet-glue-etl-us-east-1a my-vpc 10.0.16.0/20 4091 us-east-1a
my-subnet-glue-etl-us-east-1d my-vpc 10.0.32.0/20 4091 us-east-1d

The following diagram illustrates our architecture.

Summary

In this post, we shared our experience exploring the features and capabilities of AWS Glue 2.0 for our data processing needs. We consumed over 4,000 DPUs across all our AWS Glue jobs because we used over 2,000 workers of G.2X type. We spread our jobs across multiple connections mapped to different Availability Zones of our Region: us-east-1a, 1c, and 1d, for better availability and parallelism.

Using AWS Glue 2.0, we could run all our PySpark SQLs in parallel and independently without resource contention between each other. With earlier AWS Glue versions, launching each job took an extra 8–10 minutes for the cluster to boot up, but with the reduced startup time in AWS Glue 2.0, each job is ready to start processing data in less than 1 minute. And each AWS Glue job runs a Spark version of our original SQL query and directly writes output back to our Amazon Redshift destination configured via AWS Glue DynamicFrame and DataSink.

Each job takes approximately 30 minutes. And when jobs are submitted together, due to high parallelism, all our jobs still finish within 40 minutes. Although the job durations of AWS Glue 2.0 are similar to 1.0, saving an additional 8–10 minutes of startup time previously observed for a large sized cluster is a huge benefit. The duration of our long-running ETL process was reduced from several hours to under an hour, resulting in significant improvement in runtime.

Based on our experience, we plan to migrate to AWS Glue 2.0 for a large number of our current and future data platform ETL needs.


About the Author

Kaushik Krishnamurthi is a Senior Data Engineer at Amazon Web Services (AWS), where he focuses on building scalable platforms for business analytics and machine learning. Prior to AWS, he worked in several business intelligence and data engineering roles for many years.

 

 

 

 

Mercado Libre: How to Block Malicious Traffic in a Dynamic Environment

Post Syndicated from Gaston Ansaldo original https://aws.amazon.com/blogs/architecture/mercado-libre-how-to-block-malicious-traffic-in-a-dynamic-environment/

Blog post contributors: Pablo Garbossa and Federico Alliani of Mercado Libre

Introduction

Mercado Libre (MELI) is the leading e-commerce and FinTech company in Latin America. We have a presence in 18 countries across Latin America, and our mission is to democratize commerce and payments to impact the development of the region.

We manage an ecosystem of more than 8,000 custom-built applications that process an average of 2.2 million requests per second. To support the demand, we run between 50,000 to 80,000 Amazon Elastic Cloud Compute (EC2) instances, and our infrastructure scales in and out according to the time of the day, thanks to the elasticity of the AWS cloud and its auto scaling features.

Mercado Libre

As a company, we expect our developers to devote their time and energy building the apps and features that our customers demand, without having to worry about the underlying infrastructure that the apps are built upon. To achieve this separation of concerns, we built Fury, our platform as a service (PaaS) that provides an abstraction layer between our developers and the infrastructure. Each time a developer deploys a brand new application or a new version of an existing one, Fury takes care of creating all the required components such as Amazon Virtual Private Cloud (VPC), Amazon Elastic Load Balancing (ELB), Amazon EC2 Auto Scaling group (ASG), and EC2) instances. Fury also manages a per-application Git repository, CI/CD pipeline with different deployment strategies, such like blue-green and rolling upgrades, and transparent application logs and metrics collection.

Fury- MELI PaaS

For those of us on the Cloud Security team, Fury represents an opportunity to enforce critical security controls across our stack in a way that’s transparent to our developers. For instance, we can dictate what Amazon Machine Images (AMIs) are vetted for use in production (such as those that align with the Center for Internet Security benchmarks). If needed, we can apply security patches across all of our fleet from a centralized location in a very scalable fashion.

But there are also other attack vectors that every organization that has a presence on the public internet is exposed to. The AWS recent Threat Landscape Report shows a 23% YoY increase in the total number of Denial of Service (DoS) events. It’s evident that organizations need to be prepared to quickly react under these circumstances.

The variety and the number of attacks are increasing, testing the resilience of all types of organizations. This is why we started working on a solution that allows us to contain application DoS attacks, and complements our perimeter security strategy, which is based on services such as AWS Shield and AWS Web Application Firewall (WAF). In this article, we will walk you through the solution we built to automatically detect and block these events.

The strategy we implemented for our solution, Network Behavior Anomaly Detection (NBAD), consists of four stages that we repeatedly execute:

  1. Analyze the execution context of our applications, like CPU and memory usage
  2. Learn their behavior
  3. Detect anomalies, gather relevant information and process it
  4. Respond automatically

Step 1: Establish a baseline for each application

End user traffic enters through different AWS CloudFront distributions that route to multiple Elastic Load Balancers (ELBs). Behind the ELBs, we operate a fleet of NGINX servers from where we connect back to the myriad of applications that our developers create via Fury.

MELI Architecture - nomaly detection project-step 1

Step 1: MELI Architecture – Anomaly detection project

We collect logs and metrics for each application that we ship to Amazon Simple Storage Service (S3) and Datadog. We then partition these logs using AWS Glue to make them available for consumption via Amazon Athena. On average, we send 3 terabytes (TB) of log files in parquet format to S3.

Based on this information, we developed processes that we complement with commercial solutions, such as Datadog’s Anomaly Detection, which allows us to learn the normal behavior or baseline of our applications and project expected adaptive growth thresholds for each one of them.

Anomaly detection

Step 2: Anomaly detection

When any of our apps receives a number of requests that fall outside the limits set by our anomaly detection algorithms, an Amazon Simple Notification Service (SNS) event is emitted, which triggers a workflow in the Anomaly Analyzer, a custom-built component of this solution.

Upon receiving such an event, the Anomaly Analyzer starts composing the so-called event context. In parallel, the Data Extractor retrieves vital insights via Athena from the log files stored in S3.

The output of this process is used as the input for the data enrichment process. This is responsible for consulting different threat intelligence sources that are used to further augment the analysis and determine if the event is an actual incident or not.

At this point, we build the context that will allow us not only to have greater certainty in calculating the score, but it will also help us validate and act quicker. This context includes:

  • Application’s owner
  • Affected business metrics
  • Error handling statistics of our applications
  • Reputation of IP addresses and associated users
  • Use of unexpected URL parameters
  • Distribution by origin of the traffic that generated the event (cloud providers, geolocation, etc.)
  • Known behavior patterns of vulnerability discovery or exploitation
Step 2: MELI Architecture - Anomaly detection project

Step 2: MELI Architecture – Anomaly detection project

Step 3: Incident response

Once we reconstruct the context of the event, we calculate a score for each “suspicious actor” involved.

Step 3: MELI Architecture - Anomaly detection project

Step 3: MELI Architecture – Anomaly detection project

Based on these analysis results we carry out a series of verifications in order to rule out false positives. Finally, we execute different actions based on the following criteria:

Manual review

If the outcome of the automatic analysis results in a medium risk scoring, we activate a manual review process:

  1. We send a report to the application’s owners with a summary of the context. Based on their understanding of the business, they can activate the Incident Response Team (IRT) on-call and/or provide feedback that allows us to improve our automatic rules.
  2. In parallel, our threat analysis team receives and processes the event. They are equipped with tools that allow them to add IP addresses, user-agents, referrers, or regular expressions into Amazon WAF to carry out temporary blocking of “bad actors” in situations where the attack is in progress.

Automatic response

If the analysis results in a high risk score, an automatic containment process is triggered. The event is sent to our block API, which is responsible for adding a temporary rule designed to mitigate the attack in progress. Behind the scenes, our block API leverages AWS WAF to create IPSets. We reference these IPsets from our custom rule groups in our web ACLs, in order to block IPs that source the malicious traffic. We found many benefits in the new release of AWS WAF, like support for Amazon Managed Rules, larger capacity units per web ACL as well as an easier to use API.

Conclusion

By leveraging the AWS platform and its powerful APIs, and together with the AWS WAF service team and solutions architects, we were able to build an automated incident response solution that is able to identify and block malicious actors with minimal operator intervention. Since launching the solution, we have reduced YoY application downtime over 92% even when the time under attack increased over 10x. This has had a positive impact on our users and therefore, on our business.

Not only was our downtime drastically reduced, but we also cut the number of manual interventions during this type of incident by 65%.

We plan to iterate over this solution to further reduce false positives in our detection mechanisms as well as the time to respond to external threats.

About the authors

Pablo Garbossa is an Information Security Manager at Mercado Libre. His main duties include ensuring security in the software development life cycle and managing security in MELI’s cloud environment. Pablo is also an active member of the Open Web Application Security Project® (OWASP) Buenos Aires chapter, a nonprofit foundation that works to improve the security of software.

Federico Alliani is a Security Engineer on the Mercado Libre Monitoring team. Federico and his team are in charge of protecting the site against different types of attacks. He loves to dive deep into big architectures to drive performance, scale operational efficiency, and increase the speed of detection and response to security events.

Architecting a Data Lake for Higher Education Student Analytics

Post Syndicated from Craig Jordan original https://aws.amazon.com/blogs/architecture/architecting-data-lake-for-higher-education-student-analytics/

One of the keys to identifying timely and impactful actions is having enough raw material to work with. However, this up-to-date information typically lives in the databases that sit behind several different applications. One of the first steps to finding data-driven insights is gathering that information into a single store that an analyst can use without interfering with those applications.

For years, reporting environments have relied on a data warehouse stored in a single, separate relational database management system (RDBMS). But now, due to the growing use of Software as a service (SaaS) applications and NoSQL database options, data may be stored outside the data center and in formats other than tables of rows and columns. It’s increasingly difficult to access the data these applications maintain, and a data warehouse may not be flexible enough to house the gathered information.

For these reasons, reporting teams are building data lakes, and those responsible for using data analytics at universities and colleges are no different. However, it can be challenging to know exactly how to start building this expanded data repository so it can be ready to use quickly and still expandable as future requirements are uncovered. Helping higher education institutions address these challenges is the topic of this post.

About Maryville University

Maryville University is a nationally recognized private institution located in St. Louis, Missouri, and was recently named the second fastest growing private university by The Chronicle of Higher Education. Even with its enrollment growth, the university is committed to a highly personalized education for each student, which requires reliable data that is readily available to multiple departments. University leaders want to offer the right help at the right time to students who may be having difficulty completing the first semester of their course of study. To get started, the data experts in the Office of Strategic Information and members of the IT Department needed to create a data environment to identify students needing assistance.

Critical data sources

Like most universities, Maryville’s student-related data centers around two significant sources: the student information system (SIS), which houses student profiles, course completion, and financial aid information; and the learning management system (LMS) in which students review course materials, complete assignments, and engage in online discussions with faculty and fellow students.

The first of these, the SIS, stores its data in an on-premises relational database, and for several years, a significant subset of its contents had been incorporated into the university’s data warehouse. The LMS, however, contains data that the team had not tried to bring into their data warehouse. Moreover, that data is managed by a SaaS application from Instructure, called “Canvas,” and is not directly accessible for traditional extract, transform, and load (ETL) processing. The team recognized they needed a new approach and began down the path of creating a data lake in AWS to support their analysis goals.

Getting started on the data lake

The first step the team took in building their data lake made use of an open source solution that Harvard’s IT department developed. The solution, comprised of AWS Lambda functions and Amazon Simple Storage Service (S3) buckets, is deployed using AWS CloudFormation. It enables any university that uses Canvas for their LMS to implement a solution that moves LMS data into an S3 data lake on a daily basis. The following diagram illustrates this portion of Maryville’s data lake architecture:

The data lake for the Learning Management System data

Diagram 1: The data lake for the Learning Management System data

The AWS Lambda functions invoke the LMS REST API on a daily schedule resulting in Maryville’s data, which has been previously unloaded and compressed by Canvas, to be securely stored into S3 objects. AWS Glue tables are defined to provide access to these S3 objects. Amazon Simple Notification Service (SNS) informs stakeholders the status of the data loads.

Expanding the data lake

The next step was deciding how to copy the SIS data into S3. The team decided to use the AWS Database Migration Service (DMS) to create daily snapshots of more than 2,500 tables from this database. DMS uses a source endpoint for secure access to the on-premises database instance over VPN. A target endpoint determines the specific S3 bucket into which the data should be written. A migration task defines which tables to copy from the source database along with other migration options. Finally, a replication instance, a fully managed virtual machine, runs the migration task to copy the data. With this configuration in place, the data lake architecture for SIS data looks like this:

Diagram 2: Migrating data from the Student Information System

Diagram 2: Migrating data from the Student Information System

Handling sensitive data

In building a data lake you have several options for handling sensitive data including:

  • Leaving it behind in the source system and avoid copying it through the data replication process
  • Copying it into the data lake, but taking precautions to ensure that access to it is limited to authorized staff
  • Copying it into the data lake, but applying processes to eliminate, mask, or otherwise obfuscate the data before it is made accessible to analysts and data scientists

The Maryville team decided to take the first of these approaches. Building the data lake gave them a natural opportunity to assess where this data was stored in the source system and then make changes to the source database itself to limit the number of highly sensitive data fields.

Validating the data lake

With these steps completed, the team turned to the final task, which was to validate the data lake. For this process they chose to make use of Amazon Athena, AWS Glue, and Amazon Redshift. AWS Glue provided multiple capabilities including metadata extraction, ETL, and data orchestration. Metadata extraction, completed by Glue crawlers, quickly converted the information that DMS wrote to S3 into metadata defined in the Glue data catalog. This enabled the data in S3 to be accessed using standard SQL statements interactively in Athena. Without the added cost and complexity of a database, Maryville’s data analyst was able to confirm that the data loads were completing successfully. He was also able to resolve specific issues encountered on particular tables. The SQL queries, written in Athena, could later be converted to ETL jobs in AWS Glue, where they could be triggered on a schedule to create additional data in S3. Athena and Glue enabled the ETL that was needed to transform the raw data delivered to S3 into prepared datasets necessary for existing dashboards.

Once curated datasets were created and stored in S3, the data was loaded into an AWS Redshift data warehouse, which supported direct access by tools outside of AWS using ODBC/JDBC drivers. This capability enabled Maryville’s team to further validate the data by attaching the data in Redshift to existing dashboards that were running in Maryville’s own data center. Redshift’s stored procedure language allowed the team to port some key ETL logic so that the engineering of these datasets could follow a process similar to approaches used in Maryville’s on-premises data warehouse environment.

Conclusion

The overall data lake/data warehouse architecture that the Maryville team constructed currently looks like this:

The complete architecture

Diagram 3: The complete architecture

Through this approach, Maryville’s two-person team has moved key data into position for use in a variety of workloads. The data in S3 is now readily accessible for ad hoc interactive SQL workloads in Athena, ETL jobs in Glue, and ultimately for machine learning workloads running in EC2, Lambda or Amazon Sagemaker. In addition, the S3 storage layer is easy to expand without interrupting prior workloads. At the time of this writing, the Maryville team is both beginning to use this environment for machine learning models described earlier as well as adding other data sources into the S3 layer.

Acknowledgements

The solution described in this post resulted from the collaborative effort of Christine McQuie, Data Engineer, and Josh Tepen, Cloud Engineer, at Maryville University, with guidance from Travis Berkley and Craig Jordan, AWS Solutions Architects.

Crafting serverless streaming ETL jobs with AWS Glue

Post Syndicated from Radhika Ravirala original https://aws.amazon.com/blogs/big-data/crafting-serverless-streaming-etl-jobs-with-aws-glue/

Organizations across verticals have been building streaming-based extract, transform, and load (ETL) applications to more efficiently extract meaningful insights from their datasets. Although streaming ingest and stream processing frameworks have evolved over the past few years, there is now a surge in demand for building streaming pipelines that are completely serverless. Since 2017, AWS Glue has made it easy to prepare and load your data for analytics using ETL.

In this post, we dive into streaming ETL in AWS Glue, a feature that allows you to build continuous ETL applications on streaming data. Streaming ETL in AWS Glue is based on Apache Spark’s Structured Streaming engine, which provides a fault-tolerant, scalable, and easy way to achieve end-to-end stream processing with exactly-once semantics. This post walks you through an example of building a stream processing pipeline using AWS Glue that includes reading streaming data from Amazon Kinesis Data Streams, schema discovery, running streaming ETL, and writing out the results to a sink.

Serverless streaming ETL architecture

For this post, our use case is relevant to our current situation with the COVID-19 pandemic. Ventilators are in high demand and are increasingly used in different settings: hospitals, nursing homes, and even private residences. Ventilators generate data that must be monitored, and an increase in ventilator usage means there is a tremendous amount of streaming data that needs to be processed promptly, so patients can be attended to as quickly as possible as the need arises. In this post, we build a streaming ETL job on ventilator metrics and enhance the data with details to raise the alert level if the metrics fall outside of the normal range. After you enrich the data, you can use it to visualize on monitors.

In our streaming ETL architecture, a Python script generates sample ventilator metrics and publishes them as a stream into Kinesis Data Streams. We create a streaming ETL job in AWS Glue that consumes continuously generated ventilator metrics in micro-batches, applies transformations, performs aggregations, and delivers the data to a sink, so the results can be visualized or used in downstream processes.

Because businesses often augment their data lakes built on Amazon Simple Storage Service (Amazon S3) with streaming data, our first use case applies transformations on the streaming JSON data ingested via Kinesis Data Streams and loads the results in Parquet format to an Amazon S3 data lake. After ingested to Amazon S3, you can query the data with Amazon Athena and build visual dashboards using Amazon QuickSight.

For the second use case, we ingest the data from Kinesis Data Streams, join it with reference data in Amazon DynamoDB to calculate alert levels, and write the results to an Amazon DynamoDB sink. This approach allows you to build near real-time dashboards with alert notifications.

The following diagram illustrates this architecture.

AWS Glue streaming ETL jobs

With AWS Glue, you can now create ETL pipelines on streaming data using continuously running jobs. You can ingest streaming data from Kinesis Data Streams and Amazon Managed Streaming for Kafka (Amazon MSK). AWS Glue streaming jobs can perform aggregations on data in micro-batches and deliver the processed data to Amazon S3. You can read from the data stream and write to Amazon S3 using the AWS Glue DynamicFrame API. You can also write to arbitrary sinks using native Apache Spark Structured Streaming APIs.

The following sections walk you through building a streaming ETL job in AWS Glue.

Creating a Kinesis data stream

First, we need a streaming ingestion source to consume continuously generated streaming data. For this post, we create a Kinesis data stream with five shards, which allows us to push 5,000 records per second into the stream.

  1. On the Amazon Kinesis dashboard, choose Data streams.
  2. Choose Create data stream.
  3. For Data stream name, enter ventilatorsstream.
  4. For Number of open shards, choose 5.

If you prefer to use the AWS Command Line Interface (AWS CLI), you can create the stream with the following code:

aws kinesis create-stream \
    --stream-name ventilatorstream \
    --shard-count 5

Generating streaming data

We can synthetically generate ventilator data in JSON format using a simple Python application (see the GitHub repo) or the Kinesis Data Generator (KDG).

Using a Python-based data generator

To generate streaming data using a Python script, you can run the following command from your laptop or Amazon Elastic Compute Cloud (Amazon EC2) instance. Make sure you have installed the faker library on your system and set up the boto3 credentials correctly before you run the script.

python3 generate_data.py --streamname glue_ventilator_stream

Using the Kinesis Data Generator

Alternatively, you can also use the Kinesis Data Generator with the ventilator template available on the GitHub repo. The following screenshot shows the template on the KDG console.

We start pushing the data after we create our AWS Glue streaming job.

Defining the schema

We need to specify a schema for our streaming data, which we can do one of two ways:

  • Retrieve a small batch of the data (before starting your streaming job) from the streaming source, infer the schema in batch mode, and use the extracted schema for your streaming job
  • Use the AWS Glue Data Catalog to manually create a table

For this post, we use the AWS Glue Data Catalog to create a ventilator schema.

  1. On the AWS Glue console, choose Data Catalog.
  2. Choose Tables.
  3. From the Add Table drop-down menu, choose Add table manually.
  4. For the table name, enter ventilators_table.
  5. Create a database with the name ventilatordb.
  6. Choose Kinesis as the type of source.
  7. Enter the name of the stream and https://kinesis.<aws-region>.amazonaws.com.
  8. For the classification, choose JSON.
  9. Define the schema according to the following table.
Column name Data type
ventilatorid int
eventtime string
serialnumber string
pressurecontrol int
o2stats int
minutevolume int
manufacturer string

 

  1. Choose Finish.

Creating an AWS Glue streaming job to hydrate a data lake on Amazon S3

With the streaming source and schema prepared, we’re now ready to create our AWS Glue streaming jobs. We first create a job to ingest data from the streaming source using AWS Glue DataFrame APIs.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Name, enter a UTF-8 String with no more than 255 characters.
  4. For IAM role¸ specify a role that is used for authorization to resources used to run the job and access data stores. Because streaming jobs require connecting to sources and sinks, you need to make sure that the AWS Identity and Access Management (IAM) role has permissions to read from Kinesis Data Stream, write to Amazon S3 and read, write to Amazon DynamoDB. Refer to Managing Access Permissions for AWS Glue Resources for details.
  5. For Type, choose Spark Streaming.
  6. For Glue Version, choose Spark 2.4, Python 3.
  7. For This job runs, select A new script authored by you.

You can have AWS Glue generate the streaming ETL code for you, but for this post, we author one from scratch.

  1. For Script file name, enter GlueStreaming-S3.
  2. For S3 path where script is stored, enter your S3 path.
  3. For Job bookmark, choose Disable.

For this post, we use the checkpointing mechanism of AWS Glue to keep track of the data read instead of a job bookmark.

  1. For Monitoring options, select Job metrics and Continuous logging.
  2. For Log filtering, select Standard filter and Spark UI.
  3. For Amazon S3 prefix for Spark event logs, enter the S3 path for the event logs.
  4. For Job parameters, enter the following key-values:
    1. –output path – The S3 path where the final aggregations are persisted
    2. –aws_region – The Region where you run the job

  5. Skip the connections part and choose Save and edit the script.

Streaming ETL to an Amazon S3 sink

We use the AWS Glue DynamicFrameReader class’s from_catalog method to read the streaming data. We specify the table name that has been associated with the data stream as the source of data (see the section Defining the schema). We add additional_options to indicate the starting position to read from in Kinesis Data Streams. TRIM_HORIZON allows us to start reading from the oldest record in the shard.

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

In the preceding code, sourceData represents a streaming DataFrame. We use the foreachBatch API to invoke a function (processBatch) that processes the data represented by this streaming DataFrame. The processBatch function receives a static DataFrame, which holds streaming data for a window size of 100s (default). It creates a DynamicFrame from the static DataFrame and writes out partitioned data to Amazon S3. See the following code:

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})

To transform the DynamicFrame to fix the data type for eventtime (from string to timestamp) and write the ventilator metrics to Amazon S3 in Parquet format, enter the following code:

def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

Putting it all together

In the Glue ETL code editor, enter the following code, then save and run the job:

import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'output_path'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3 sink locations
aws_region = args['aws_region']
output_path = args['output_path']

s3_target = output_path + "ventilator_metrics"
checkpoint_location = output_path + "cp/"
temp_path = output_path + "temp/"


def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

sourceData.printSchema()

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})
job.commit()

Querying with Athena

When the processed streaming data is written in Parquet format to Amazon S3, we can run queries on Athena. Run the AWS Glue crawler on the Amazon S3 location where the streaming data is written out. The following screenshot shows our query results.

For instructions on building visual dashboards with the streaming data in Amazon S3, see Quick Start: Create an Analysis with a Single Visual Using Sample Data. The following dashboards show distribution of metrics, averages, and alerts based on anomalies on an hourly basis, but you can create more advanced dashboards with much granular (minute) intervals.

Streaming ETL to a DynamoDB sink

For the second use case, we transform the streaming data as it arrives without micro-batching and persist the data to a DynamoDB table. Scripts to create DynamoDB tables are available in the GitHub repo. We use Apache Spark’s Structured Streaming API to read ventilator-generated data from the data stream, join it with reference data for normal metrics range in a DynamoDB table, compute the status based on the deviation from normal metric values, and write the processed data to a DynamoDB table. See the following code:

import sys
import datetime
import base64
import decimal
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'checkpoint_location', \
                            'dynamodb_sink_table', \
                            'dynamodb_static_table'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read parameters
checkpoint_location = args['checkpoint_location']
aws_region = args['aws_region']

# DynamoDB config
dynamodb_sink_table = args['dynamodb_sink_table']
dynamodb_static_table = args['dynamodb_static_table']

def write_to_dynamodb(row):
    '''
    Add row to DynamoDB.
    '''
    dynamodb = boto3.resource('dynamodb', region_name=aws_region)
    start = str(row['window'].start)
    end = str(row['window'].end)
    dynamodb.Table(dynamodb_sink_table).put_item(
      Item = { 'ventilatorid': row['ventilatorid'], \
                'status': str(row['status']), \
                'start': start, \
                'end': end, \
                'avg_o2stats': decimal.Decimal(str(row['avg_o2stats'])), \
                'avg_pressurecontrol': decimal.Decimal(str(row['avg_pressurecontrol'])), \
                'avg_minutevolume': decimal.Decimal(str(row['avg_minutevolume']))})

dynamodb_dynamic_frame = glueContext.create_dynamic_frame.from_options( \
    "dynamodb", \
    connection_options={
    "dynamodb.input.tableName": dynamodb_static_table,
    "dynamodb.throughput.read.percent": "1.5"
  }
)

dynamodb_lookup_df = dynamodb_dynamic_frame.toDF().cache()

# Read from Kinesis Data Stream
streaming_data = spark.readStream \
                    .format("kinesis") \
                    .option("streamName","glue_ventilator_stream") \
                    .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") \
                    .option("startingPosition", "TRIM_HORIZON") \
                    .load()

# Retrieve Sensor columns and do a simple projection
ventilator_fields = streaming_data \
    .select(from_json(col("data") \
    .cast("string"),glueContext.get_catalog_schema_as_spark_schema("ventilatordb","ventilators_table")) \
    .alias("ventilatordata")) \
    .select("ventilatordata.*") \
    .withColumn("event_time", to_timestamp(col('eventtime'), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("ts", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))

# Stream static join, ETL to augment with status
ventilator_joined_df = ventilator_fields.join(dynamodb_lookup_df, "ventilatorid") \
    .withColumn('status', when( \
    ((ventilator_fields.o2stats < dynamodb_lookup_df.o2_stats_min) | \
    (ventilator_fields.o2stats > dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol < dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol > dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume < dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume > dynamodb_lookup_df.minute_volume_max)), "RED") \
    .when( \
    ((ventilator_fields.o2stats >= dynamodb_lookup_df.o2_stats_min) |
    (ventilator_fields.o2stats <= dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol >= dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol <= dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume >= dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume <= dynamodb_lookup_df.minute_volume_max)), "GREEN") \
    .otherwise("ORANGE"))

ventilator_joined_df.printSchema()

# Drop the normal metric values
ventilator_transformed_df = ventilator_joined_df \
                            .drop('eventtime', 'o2_stats_min', 'o2_stats_max', \
                            'pressure_control_min', 'pressure_control_max', \
                            'minute_volume_min', 'minute_volume_max')

ventilator_transformed_df.printSchema()

ventilators_df = ventilator_transformed_df \
    .groupBy(window(col('ts'), '10 minute', '5 minute'), \
    ventilator_transformed_df.status, ventilator_transformed_df.ventilatorid) \
    .agg( \
    avg(col('o2stats')).alias('avg_o2stats'), \
    avg(col('pressurecontrol')).alias('avg_pressurecontrol'), \
    avg(col('minutevolume')).alias('avg_minutevolume') \
    )

ventilators_df.printSchema()

# Write to DynamoDB sink
ventilator_query = ventilators_df \
    .writeStream \
    .foreach(write_to_dynamodb) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_location) \
    .start()

ventilator_query.awaitTermination()

job.commit()

After the above code is run, ventilator metric aggregations get persisted in the Amazon DynamoDB table as follows. You can build custom user interface applications with the data in Amazon DynamoDB to create dashboards.

Conclusion

Streaming applications have become a core component of data lake architectures. With AWS Glue streaming, you can create serverless ETL jobs that run continuously, consuming data from streaming services like Kinesis Data Streams and Amazon MSK. You can load the results of streaming processing into an Amazon S3-based data lake, JDBC data stores, or arbitrary sinks using the Structured Streaming API.

For more information about streaming AWS Glue ETL jobs, see the following:

We encourage you to build a serverless streaming application using AWS Glue streaming ETL and share your experience with us. If you have any questions or suggestions, share them in the comments.


About the Author


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

Event-driven refresh of SPICE datasets in Amazon QuickSight

Post Syndicated from Dylan Qu original https://aws.amazon.com/blogs/big-data/event-driven-refresh-of-spice-datasets-in-amazon-quicksight/

Businesses are increasingly harnessing data to improve their business outcomes. To enable this transformation to a data-driven business, customers are bringing together data from structured and unstructured sources into a data lake. Then they use business intelligence (BI) tools, such as Amazon QuickSight, to unlock insights from this data.

To provide fast access to datasets, QuickSight provides a fully managed calculation engine called SPICE—the Super-fast, Parallel, In-Memory Calculation Engine. At the time of writing, SPICE enables you to cache up to 250 million rows or 500 GB of data per dataset.

To extract value from the data quickly, you need access to new data as soon as it’s available. In this post, we describe how to achieve this by refreshing SPICE datasets as part of your extract, transform, and load (ETL) pipelines.

Solution architecture

In this post, you automate the refresh of SPICE datasets by implementing the following architecture.

This architecture consists of two parts: an example ETL job and a decoupled event-driven process to refresh SPICE.

For the ETL job, you use Amazon Simple Storage Service (Amazon S3) as your primary data store. Data lands in an S3 bucket, which we refer to as the raw zone. An Amazon S3 trigger configured on this bucket triggers an AWS Lambda function, which starts an AWS Glue ETL job. This job processes the raw data and outputs processed data into another S3 bucket, which we refer to as the processed zone.

This sample ETL job converts the data to Apache Parquet format and stores it in the processed S3 bucket. You can modify the ETL job to achieve other objectives, like more granular partitioning, compression, or enriching of the data. The Glue Data Catalog stores the metadata and QuickSight datasets are created using Amazon Athena data sources.

To trigger the SPICE dataset refresh, after the ETL job finishes, an Amazon EventBridge rule triggers a Lambda function that initiates the refresh.

In summary, this pipeline transforms your data and updates QuickSight SPICE datasets upon completion.

Deploying the automated data pipeline using AWS CloudFormation

Before deploying the AWS CloudFormation template, make sure you have signed up for QuickSight in one of the 11 supported Regions:

  • US East (Ohio)
  • US East (N. Virginia)
  • US West (Oregon)
  • Asia Pacific (Mumbai)
  • Asia Pacific (Seoul)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Asia Pacific (Tokyo)
  • EU (Frankfurt)
  • EU (Ireland)
  • EU (London)

This post works with both Standard and Enterprise editions of QuickSight. Enterprise Edition provides richer features and higher limits compared to Standard Edition.

  1. After you sign up for QuickSight, you can use CloudFormation templates to create all the necessary resources by choosing Launch stack:
  2. Enter a stack name; for example, SpiceRefreshBlog.
  3. Acknowledge the AWS Identity and Access Management (IAM) resource creation.
  4. Choose Create stack.

The CloudFormation template creates the following resources in your AWS account:

  • Three S3 buckets to store the following:
    • AWS Glue ETL job script
    • Raw data
    • Processed data
  • Three Lambda functions to do the following:
    • Create the ETL job
    • Initiate the ETL job upon upload of new data in the raw zone
    • Initiate the SPICE dataset refresh when the ETL job is complete
  • An AWS Glue database
  • Two AWS Glue tables to store the following:
    • Raw data
    • Processed data
  • An ETL job to convert the raw data from CSV into Apache Parquet format
  • Four IAM roles: One each for the Lambda functions and one for the ETL job
  • An EventBridge rule that triggers on an AWS Glue job state change event with a state of Succeeded and invokes a Lambda function that performs the SPICE dataset refresh

Importing the dataset

For this post, you use the taxi Trip Record Data dataset publicly available from the NYC Taxi & Limousine Commission Trip Record Data dataset. You upload monthly data in CSV format to the raw zone S3 bucket.

This data is available in Amazon S3 through Open Data on AWS, a service designed to let you spend more time on data analysis rather than data acquisition.

You start by copying the For Hire Vehicle (FHV) data for March 2020. Because the data is already available in Amazon S3 through Open Data, run the following command to copy the data into the raw zone. Make sure you replace <raw bucket name> with the name of the raw bucket created by the CloudFormation template:

aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_2020-03.csv" s3://<raw bucket name>

After you copy the data into the raw zone, the Amazon S3 event trigger invokes the Lambda function that triggers the ETL job. You can see the job status on the AWS Glue console by choosing Jobs in the navigation pane. The process takes about 2 minutes.

When the job is complete, check that you can see the Parquet files in the processed zone S3 bucket.

Creating a QuickSight analysis of the data

To visualize the taxi data, we create a QuickSight analysis.

First, you need to give QuickSight the necessary permissions to access the processed zone S3 bucket. For instructions, see I Can’t Connect to Amazon S3.

Then complete the following steps to create an analysis of the taxi data:

  1. On the QuickSight console, choose Datasets.
  2. Choose New dataset.
  3. Choose Athena and provide a name for the data source (such as Athena).
  4. Choose Create data source.
  5. For Database, choose the name of the taxi AWS Glue database (starting with taxigluedatabase).
  6. For Tables, select processed_taxi_data as the table to visualize.
  7. Choose Select.
  8. Ensure Import to SPICE for quicker analytics is selected and choose Visualize.

After the data is imported into SPICE, you can create visuals to display the data. For example, the following screenshot shows a key performance indicator (KPI) of the number of taxi journeys aggregated at the month level and the number of journeys over time.

We use this dashboard to visualize the dataset again after we refresh SPICE with more data.

Automating the SPICE refresh

To refresh the SPICE dataset when the ETL job is complete, the CloudFormation template we deployed created an EventBridge rule that triggers a Lambda function each time an AWS Glue ETL job successfully completes. The following screenshot shows the code for the event pattern.

We need to configure the Lambda function with the ETL job name and the ID of the SPICE dataset we created in QuickSight.

  1. Locate the ETL job name on the AWS Glue console, named TaxiTransformationGlueJob-<unique id>.
  2. To find the SPICE dataset ID, run the following command using the AWS Command Line Interface (AWS CLI):
    aws quicksight list-data-sets --aws-account-id <your AWS account id> 

    The following screenshot shows the output with the dataset ID.

  3. On the Lambda console, open the Lambda function named SpiceRefreshBlog-QuicksightUpdateLambda-<unique id>.
  4. Update line 9 of the code to replace ReplaceWithGlueJobName with the AWS Glue job name and ReplaceWithYourDatasetID with the dataset ID.

Once a Glue job succeeds, this Lambda function is triggered. The EventBridge event that triggers the Lambda contains the name of the job. You can access this from the event as follows, as seen on line 25 of the function:

succeededJob = event[‘detail’][‘jobName’]

The Lambda function looks up the job name in the data_set_map dictionary. If the dictionary contains the job name, the dataset ID is accessed and the function calls the QuickSight Create Ingestion API to refresh the SPICE datasets.

You can extend the data_set_map dictionary to include additional job names and associated SPICE dataset IDs to be refreshed. If using this approach at scale, you might choose to move this configuration information to an Amazon DynamoDB table.

  1. Save the Lambda function by choosing Deploy.

Testing the automated refresh

Now that you have configured the Lambda function, we can test the ETL end-to-end process and make the next month’s data available for analysis.

To add the FHV data for April, run the following AWS CLI command:

aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_2020-04.csv" s3://<raw bucket name>

As before, this upload to the raw zone triggers the Lambda function that starts the ETL job. You can to see the progress of the job on the AWS Glue console.

When the job is complete, navigate to QuickSight and open the taxi analysis (or, if you still have it open, refresh the window).

You can now see that both months’ data is available for analysis. This step might take 1–2 minutes to load.

To see the status of each SPICE refresh, navigate back to the dataset on the QuickSight console and choose View History.

The following screenshot shows the status of previous refreshes and the number of rows that have been ingested into SPICE.

Now that you have tested the end-to-end process, you can try copying more FHV data to the raw zone and see the data within your QuickSight analysis.

Cleaning up

To clean up the resources you created by following along with this post, complete the following steps:

  1. Delete the QuickSight analysis you created.
  2. Delete the QuickSight dataset that you created.
  3. Delete the QuickSight data source:
    1. Choose New dataset.
    2. Select the data source and choose Delete data source.
  4. On the Amazon S3 console, delete the contents of the raw and processed S3 buckets.
  5. On the AWS CloudFormation console, select the stack SpiceRefreshBlog and choose Delete.

Conclusion

Using an event-based architecture to automate the refresh of your SPICE datasets makes sure that your business analysts are always viewing the latest available data. This reduction in time to analysis can help your business unlock insights quicker without having to wait for a manual or scheduled process. Additionally, by only refreshing SPICE when new data is available, the underlying data storage resources are used efficiently, so you only pay for what you need!

Get started with QuickSight today!


About the Authors

Rob Craig is a Senior Solutions Architect with AWS. He supports customers in the UK with their cloud journey, providing them with architectural advice and guidance to help them achieve their business outcomes.

 

 

 

 

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on Data Analytics, AI/ML and DevOps.

Making ETL easier with AWS Glue Studio

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/making-etl-easier-with-aws-glue-studio/

AWS Glue Studio is an easy-to-use graphical interface that speeds up the process of authoring, running, and monitoring extract, transform, and load (ETL) jobs in AWS Glue. The visual interface allows those who don’t know Apache Spark to design jobs without coding experience and accelerates the process for those who do.

AWS Glue Studio was designed to help you create ETL jobs easily. After you design a job in the graphical interface, it generates Apache Spark code for you, abstracting users from the challenges of coding. When the job is ready, you can run it and monitor the job status using the integrated UI.

AWS Glue Studio supports different types of data sources, both structured and semi-structured, and offers data processing in real time and batch. You can extract data from sources like Amazon Simple Storage Service (Amazon S3), Amazon Relational Database Service (Amazon RDS), Amazon Kinesis, and Apache Kafka. It also offers Amazon S3 and tables defined in the AWS Glue Data Catalog as destinations.

This post shows you how to create an ETL job to extract, filter, join, and aggregate data easily using AWS Glue Studio.

About this blog post
Time to read 15 minutes
Time to complete 45 minutes
Cost to complete (estimated) Amazon S3: $0.023
AWS Glue: 0.036
AWS Identity & Access Management: $0
Total Cost: $0.059
Learning level Intermediate (200)
Services used AWS Glue, Amazon S3, AWS Identity and Access Management

Overview of solution

To demonstrate how to create an ETL job using AWS Glue Studio, we use the Toronto parking tickets dataset, specifically the data about parking tickets issued in the city of Toronto in 2018, and the trials dataset, which contains all the information about the trials related to those parking tickets. The goal is to filter, join, and aggregate the two datasets to get the number of parking tickets handled per court in the city of Toronto during that year.

Prerequisites

For this walkthrough, you should have an AWS account. For this post, you launch the required AWS resources using AWS CloudFormation in the us-east-1 Region. If you haven’t signed up for AWS, complete the following tasks:

  1. Create an account.
  2. Create an AWS Identity and Access Management (IAM) user. For instructions, see Create an IAM User.

Important: If the AWS account you use to follow this guide uses AWS Lake Formation to manage permissions on the Glue data catalog, make sure that you log in as a user that is both a Data lake administrator and a Database creator, as described in the documentation.

Launching your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your stack in us-east-1:
  2. Select the I acknowledge that AWS CloudFormation might create IAM resources with custom names option.
  3. Choose Create stack.

Launching this stack creates AWS resources. The following resources shown in the AWS CloudFormation output are the ones you need in the next steps:

  • Key – Description
  • AWSGlueStudioRole – IAM role to run AWS Glue jobs
  • AWSGlueStudioS3Bucket – Name of the S3 bucket to store blog-related files
  • AWSGlueStudioTicketsYYZDB – AWS Glue Data Catalog database
  • AWSGlueStudioTableTickets – Data Catalog table to use as a source
  • AWSGlueStudioTableTrials – Data Catalog table to use as a source
  • AWSGlueStudioParkingTicketCount –Data Catalog table to use as the destination

Creating a job

A job is the AWS Glue component that allows the implementation of business logic to transform data as part of the ETL process. For more information, see Adding Jobs in AWS Glue.

To create an AWS Glue job using AWS Glue Studio, complete the following steps:

  1. On the AWS Management Console, choose Services.
  2. Under Analytics, choose AWS Glue.
  3. In the navigation pane, choose AWS Glue Studio.
  4. On the AWS Glue Studio home page, choose Create and manage jobs.

AWS Glue Studio supports different sources, including Amazon S3, Amazon RDS, Amazon Kinesis, and Apache Kafka. For this post, you use two AWS Glue tables as data sources and one S3 bucket as the destination.

  1. In the Create Job section, select Blank graph.
  2. Choose Create.

This takes you to the Visual Canvas to create an AWS Glue job.

  1. Change the Job name from Untitled Job to YYZ-Tickets-Job.

You now have an AWS Glue job ready to filter, join, and aggregate data from two different sources.

Adding sources

For this post, you use two AWS Glue tables as data sources: Tickets and Trials, which the CloudFormation template created. The data is located in an external S3 bucket in Parquet format. To add these tables as sources, complete the following steps:

  1. Choose the (+) icon.
  2. On the Node properties tab, for Name, enter Tickets.
  3. For Node type, choose S3.
  4. On the Data Source properties -S3 tab, for Database, choose yyz-tickets.
  5. For Table, choose tickets.
  6. For Partition predicate (optional), leave blank.

Before adding the second data source to the ETL job, be sure that the node you just created isn’t selected.

  1. Choose the (+) icon.
  2. On the Node properties tab, for Name, enter Trials.
  3. For Node type, choose S3.
  4. On the Data Source properties -S3 tab, for Database, choose yyz-tickets.
  5. For Table, choose trials.
  6. For Partition predicate (optional), leave blank.

You now have two AWS Glue tables as the data sources for the AWS Glue job.

Adding transforms

A transform is the AWS Glue Studio component were the data is modified. You have the option of using different transforms that are part of this service or custom code. To add transforms, complete the following steps:

  1. Choose the Tickets node.
  2. Choose the (+) icon.

  3. On the Node properties tab, for Name, enter Ticket_Mapping.
  4. For Node type, choose ApplyMapping.
  5. For Node parents, choose Tickets.
  6. On the Transform tab, change the ticket_number data type from decimal to int.
  7. Drop the following columns:
    • Location1
    • Location2
    • Location3
    • Location4
    • Province

Now you add a second ApplyMapping transform to modify the Trials data source.

  1. Choose the Trials data source node.
  2. Choose the (+) icon.
  3. On the Node properties tab, for Name, enter Trial_Mapping.
  4. For Node type, choose ApplyMapping.
  5. For Node parents, leave at default value (Trials).
  1. On the Transform tab, change the parking_ticket_number data type from long to int.

Now that you have set the right data types and removed some of the columns, it’s time to join the data sources using the Join transform.

  1. Choose the Ticket_Mapping transform.
  2. Choose the (+) icon.

  3. On the Node properties tab, for Name, enter Join_Ticket_Trial.
  4. For Node type, choose Join.
  5. For Node parents, choose Ticket_Mapping and Trial_Mapping.
  6. On the Transform tab, for Join type, choose Inner join.
  7. For Join conditions, choose Add condition.
  8. For Ticket_Mapping, choose ticket_number.
  9. For Trial_Mapping, choose parking_ticket_number.

Now the two data sources are joined by the ticket_number and parking_ticket_number columns.

Performing data aggregation

In this step, you do some data aggregation to see the number of tickets handled per court in Toronto.

AWS Glue Studio offers the option of adding custom code for those use cases that need a more complex transformation. For this post, we use PySpark code to do the data transformation. It contains Sparksql code and a combination of dynamic frames and data frames.

  1. Choose the Join_Tickets_Trial transform.
  2. Choose the (+) icon.

  3. On the Node properties tab, for Name, enter Aggregate_Tickets.
  4. For Node type, choose Custom transform.
  5. For Node parents, leave Join_Ticket_Trial selected.
  6. On the Transform tab, for Code block, change the function name from MyTransform to Aggregate_Tickets.
  7. Enter the following code:
    selected = dfc.select(list(dfc.keys())[0]).toDF()
    selected.createOrReplaceTempView("ticketcount")
    totals = spark.sql("select court_location as location, infraction_description as infraction, count(infraction_code) as total  FROM ticketcount group by infraction_description, infraction_code, court_location order by court_location asc")
    results = DynamicFrame.fromDF(totals, glueContext, "results")
    return DynamicFrameCollection({"results": results}, glueContext)
    

After adding the custom transformation to the AWS Glue job, you want to store the result of the aggregation in the S3 bucket. To do this, you need a Select from collection transform to read the output from the Aggregate_Tickets node and send it to the destination.

  1. Choose the New node node.
  2. Leave the Transform tab with the default values.
  3. On the Node Properties tab, change the name of the transform to Select_Aggregated_Data.
  4. Leave everything else with the default values.
  5. Choose the Select_Aggregated_Data node.
  6. Choose the (+) icon.

  7. On the Node properties tab, for Name, enter Ticket_Count_Dest.
  8. For Node type, choose S3 in the Data target section.
  9. For Node parents, choose Select_Aggregated_Data.
  10. On the Data Target Properties-S3 tab, for Format, choose Parquet.
  11. For Compression Type, choose GZIP.
  12. For S3 Target Location, enter s3://glue-studio-blog-{Your Account ID as a 12-digit number}/parking_tickets_count/.

The job should look like the following screenshot.

You now have three transforms to do data mapping, filtering, and aggregation.

Configuring the job

When the logic behind the job is complete, you must set the parameters for the job run. In this section, you configure the job by selecting components such as the IAM role and the AWS Glue version you use to run the job.

  1. On the Job details tab, for Description, enter Glue Studio blog post job.
  2. For IAM Role, choose AWSGlueStudioRole (which the CloudFormation template created).
  3. For Job Bookmark, choose Disable.
  4. For Number of retries, optionally enter 1.
  5. Choose Save.
  6. When the job is saved, choose Run.

Monitoring the job

AWS Glue Studio offers a job monitoring dashboard that provides comprehensive information about your jobs. You can get job statistics and see detailed info about the job and the job status when running.

  1. In the AWS Glue Studio navigation panel, choose Monitoring.
  2. Choose the entry with the job name YYZ-Tickets_Job.
  3. For get more details about the job run, choose View run details.
  4. Wait until Run Status changes to Succeeded.

You can verify that the job ran successfully on the Amazon Athena console.

  1. On the Athena console, choose the yyz-tickets database.
  2. Choose the icon next to the parking_tickets_count table (which the CloudFormation template created).

For more information about creating AWS Glue tables, see Defining Tables in the AWS Glue Data Catalog.

  1. Choose Preview table.

As you can see in the following screenshot, the information that the job generated is available and you can query the number of tickets types per court issued in the city of Toronto in 2018.

Cleaning up

To avoid incurring future charges and to clean up unused roles and policies, delete the resources you created: the CloudFormation stack, S3 bucket, and AWS Glue job.

Conclusion

In this post, you learned how to use AWS Glue Studio to create an ETL job. You can use AWS Glue Studio to speed up the ETL job creation process and allow different personas to transform data without any previous coding experience. For more information about AWS Glue Studio, see the AWS Glue Studio documentation and What’s New with AWS.


About the author

Leonardo Gómez is a Senior Analytics Specialist Solution Architect at AWS. Based in Toronto, Canada, He works with customers across Canada to design and build big data solutions.

 

 

 

 

 

 

 

Building an AWS Glue ETL pipeline locally without an AWS account

Post Syndicated from Adnan Alvee original https://aws.amazon.com/blogs/big-data/building-an-aws-glue-etl-pipeline-locally-without-an-aws-account/

If you’re new to AWS Glue and looking to understand its transformation capabilities without incurring an added expense, or if you’re simply wondering if AWS Glue ETL is the right tool for your use case and want a holistic view of AWS Glue ETL functions, then please continue reading. In this post, we walk you through several AWS Glue ETL functions with supporting examples, using a local PySpark shell in a containerized environment with no AWS artifact dependency. If you’re already familiar with AWS Glue and Apache Spark, you can use this solution as a quick cheat sheet for AWS Glue PySpark validations.

You don’t need an AWS account to follow along with this walkthrough. We use small example datasets for our use case and go through the transformations of several AWS Glue ETL PySpark functions: ApplyMapping, Filter, SplitRows, SelectFields, Join, DropFields, Relationalize, SelectFromCollection, RenameField, Unbox, Unnest, DropNullFields, SplitFields, Spigot and Write Dynamic Frame.

This post provides an introduction of the transformation capabilities of AWS Glue and provides insights towards possible uses of the supported functions. The goal is to get up and running with AWS Glue ETL functions in the shortest possible time, at no cost and without any AWS environment dependency.

Prerequisites

To follow along, you should have the following resources:

  • Basic programming experience
  • Basic Python and Spark knowledge (not required but good to have)
  • A desktop or workstation with Docker installed and running

If you prefer to set up the environment locally outside of a Docker container, you can follow the instructions provided in the GitHub repo, which hosts libraries used in AWS Glue. These libraries extend Apache Spark with additional data types and operations for ETL workflows.

Setting up resources

For this post, we use the amazon/aws-glue-libs:glue_libs_1.0.0_image_01 image from Dockerhub. This image has only been tested for AWS Glue 1.0 spark shell (PySpark). Additionally, this image also supports Jupyter and Zeppelin notebooks and a CLI interpreter. For the purpose of this post, we use the CLI interpreter. For more information on the container, please read Developing AWS Glue ETL jobs locally using a container.

To pull the relevant image from the Docker repository, enter the following command in a terminal prompt:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

To test on the command prompt, enter the following code:

docker run -itd --name glue_without_notebook amazon/aws-glue-libs:glue_libs_1.0.0_image_01
docker exec -it glue_without_notebook bash
/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

To test on Jupyter notebooks, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter \amazon/aws-glue-libs:glue_libs_1.0.0_image_01 \
/home/jupyter/jupyter_start.sh

Browse to ‘localhost:8888’ in a browser to open Jupyter notebooks.

Importing GlueContext

To get started, enter the following import statements in the PySpark shell. We import GlueContext, which wraps the Spark SQLContext, thereby providing mechanisms to interact with Apache Spark:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import *
from pyspark.sql import Row
glueContext = GlueContext(SparkContext.getOrCreate())

Dataset 1

We first generate a Spark DataFrame consisting of dummy data of an order list for a fictional company. We process the data using AWS Glue PySpark functions.

Enter the following code into the shell:

order_list = [
               ['1005', '623', 'YES', '1418901234', '75091'],\
               ['1006', '547', 'NO', '1418901256', '75034'],\
               ['1007', '823', 'YES', '1418901300', '75023'],\
               ['1008', '912', 'NO', '1418901400', '82091'],\
               ['1009', '321', 'YES', '1418902000', '90093']\
             ]

# Define schema for the order_list
order_schema = StructType([  
                      StructField("order_id", StringType()),
                      StructField("customer_id", StringType()),
                      StructField("essential_item", StringType()),
                      StructField("timestamp", StringType()),
                      StructField("zipcode", StringType())
                    ])

# Create a Spark Dataframe from the python list and the schema
df_orders = spark.createDataFrame(order_list, schema = order_schema)

The following .show() command allows us to view the DataFrame in the shell:

df_orders.show()

# Output
+--------+-----------+--------------+----------+-------+
|order_id|customer_id|essential_item| timestamp|zipcode|
+--------+-----------+--------------+----------+-------+
|    1005|        623|           YES|1418901234|  75091|
|    1006|        547|            NO|1418901256|  75034|
|    1007|        823|           YES|1418901300|  75023|
|    1008|        912|            NO|1418901400|  82091|
|    1009|        321|           YES|1418902000|  90093|
+--------+-----------+--------------+----------+-------+

DynamicFrame

A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required. We convert the df_orders DataFrame into a DynamicFrame.

Enter the following code in the shell:

dyf_orders = DynamicFrame.fromDF(df_orders, glueContext, "dyf") 

Now that we have our Dynamic Frame, we can start working with the datasets with AWS Glue transform functions.

ApplyMapping

The columns in our data might be in different formats, and you may want to change their respective names. ApplyMapping is the best option for changing the names and formatting all the columns collectively. For our dataset, we change some of the columns to Long from String format to save storage space later. We also shorten the column zipcode to zip. See the following code:

# Input 
dyf_applyMapping = ApplyMapping.apply( frame = dyf_orders, mappings = [ 
  ("order_id","String","order_id","Long"), 
  ("customer_id","String","customer_id","Long"),
  ("essential_item","String","essential_item","String"),
  ("timestamp","String","timestamp","Long"),
  ("zipcode","String","zip","Long")
])

dyf_applyMapping.printSchema()

# Output
root
|-- order_id: long
|-- customer_id: long
|-- essential_item: string
|-- timestamp: long
|-- zip: long

Filter

We now want to prioritize our order delivery for essential items. We can achieve that using the Filter function:

# Input 
dyf_filter = Filter.apply(frame = dyf_applyMapping, f = lambda x: x["essential_item"] == 'YES')

dyf_filter.toDF().show()

# Output 
+--------------+-----------+-----+----------+--------+
|essential_item|customer_id|  zip| timestamp|order_id|
+--------------+-----------+-----+----------+--------+
|           YES|        623|75091|1418901234|    1005|
|           YES|        823|75023|1418901300|    1007|
|           YES|        321|90093|1418902000|    1009|
+--------------+-----------+-----+----------+--------+

Map

Map allows us to apply a transformation to each record of a Dynamic Frame. For our case, we want to target a certain zip code for next day air shipping. We implement a simple “next_day_air” function and pass it to the Dynamic Frame:

# Input 

# This function takes in a dynamic frame record and checks if zipcode # 75034 is present in it. If present, it adds another column 
# “next_day_air” with value as True

def next_day_air(rec):
  if rec["zip"] == 75034:
    rec["next_day_air"] = True
  return rec

mapped_dyF =  Map.apply(frame = dyf_applyMapping, f = next_day_air)

mapped_dyF.toDF().show()

# Output
+--------------+-----------+-----+----------+--------+------------+
|essential_item|customer_id|  zip| timestamp|order_id|next_day_air|
+--------------+-----------+-----+----------+--------+------------+
|           YES|        623|75091|1418901234|    1005|        null|
|            NO|        547|75034|1418901256|    1006|        TRUE|
|           YES|        823|75023|1418901300|    1007|        null|
|            NO|        912|82091|1418901400|    1008|        null|
|           YES|        321|90093|1418902000|    1009|        null|
+--------------+-----------+-----+----------+--------+------------+

Dataset 2

To ship essential orders to the appropriate addresses, we need customer data. We demonstrate this by generating a custom JSON dataset consisting of zip codes and customer addresses. In this use case, this data represents the customer data of the company that we want to join later on.

We generate JSON strings consisting of customer data and use the Spark json function to convert them to a JSON structure (enter each jsonStr variable one at a time in case the terminal errors out):

# Input 
jsonStr1 = u'{ "zip": 75091, "customers": [{ "id": 623, "address": "108 Park Street, TX"}, { "id": 231, "address": "763 Marsh Ln, TX" }]}'
jsonStr2 = u'{ "zip": 82091, "customers": [{ "id": 201, "address": "771 Peek Pkwy, GA" }]}'
jsonStr3 = u'{ "zip": 75023, "customers": [{ "id": 343, "address": "66 P Street, NY" }]}'
jsonStr4 = u'{ "zip": 90093, "customers": [{ "id": 932, "address": "708 Fed Ln, CA"}, { "id": 102, "address": "807 Deccan Dr, CA" }]}'
df_row = spark.createDataFrame([
  Row(json=jsonStr1),
  Row(json=jsonStr2),
  Row(json=jsonStr3),
  Row(json=jsonStr4)
])

df_json = spark.read.json(df_row.rdd.map(lambda r: r.json))
df_json.show()

# Output
+-----------------------------------------------------+-----+
|customers                                            |zip  |
+-----------------------------------------------------+-----+
|[[108 Park Street, TX, 623], [763 Marsh Ln, TX, 231]]|75091|
|[[771 Peek Pkwy, GA, 201]]                           |82091|
|[[66 P Street, NY, 343]]                             |75023|
|[[708 Fed Ln, CA, 932], [807 Deccan Dr, CA, 102]]    |90093|
+-----------------------------------------------------+-----+
# Input
df_json.printSchema()

# Output
root
 |-- customers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- zip: long (nullable = true)

To convert the DataFrame back to a DynamicFrame to continue with our operations, enter the following code:

# Input
dyf_json = DynamicFrame.fromDF(df_json, glueContext, "dyf_json")

SelectFields

To join with the order list, we don’t need all the columns, so we use the SelectFields function to shortlist the columns we need. In our use case, we need the zip code column, but we can add more columns as the argument paths accepts a list:

# Input
dyf_selectFields = SelectFields.apply(frame = dyf_filter, paths=['zip'])

dyf_selectFields.toDF().show()

# Output
+-----+
|  zip|
+-----+
|75091|
|75023|
|90093|
+-----+

Join

The Join function is straightforward and manages duplicate columns. We had two columns named zip from both datasets. AWS Glue added a period (.) in one of the duplicate column names to avoid errors:

# Input
dyf_join = Join.apply(dyf_json, dyf_selectFields, 'zip', 'zip')
dyf_join.toDF().show()

# Output
+--------------------+-----+-----+
|           customers| .zip|  zip|
+--------------------+-----+-----+
|[[108 Park Street...|75091|75091|
|[[66 P Street, NY...|75023|75023|
|[[708 Fed Ln, CA,...|90093|90093|
+--------------------+-----+-----+

DropFields

Because we don’t need two columns with the same name, we can use DropFields to drop one or multiple columns all at once. The backticks (`) around .zip inside the function call are needed because the column name contains a period (.):

# Input
dyf_dropfields = DropFields.apply(
  frame = dyf_join,
  paths = "`.zip`"
)

dyf_dropfields.toDF().show()

# Output
+--------------------+-----+
|           customers|  zip|
+--------------------+-----+
|[[108 Park Street...|75091|
|[[66 P Street, NY...|75023|
|[[708 Fed Ln, CA,...|90093|
+--------------------+-----+

Relationalize

The Relationalize function can flatten nested structures and create multiple dynamic frames. Our customer column from the previous operation is a nested structure, and Relationalize can convert it into multiple flattened DynamicFrames:

# Input
dyf_relationize = dyf_dropfields.relationalize("root", "/home/glue/GlueLocalOutput")

To see the DynamicFrames, we can’t run a .show() yet because it’s a collection. We need to check what keys are present. See the following code:

# Input
dyf_relationize.keys()

# Output
dict_keys(['root', 'root_customers'])

In the follow-up function in the next section, we show how to pick the DynamicFrame from a collection of multiple DynamicFrames.

SelectFromCollection

The SelectFromCollection function allows us to retrieve the specific DynamicFrame from a collection of DynamicFrames. For this use case, we retrieve both DynamicFrames from the previous operation using this function.

To retrieve the first DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root')

dyf_selectFromCollection.toDF().show()

# Output
+---------+-----+
|customers|  zip|
+---------+-----+
|        1|75091|
|        2|75023|
|        3|90093|
+---------+-----+

To retrieve the second DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root_customers')

dyf_selectFromCollection.toDF().show()

# Output
+---+-----+---------------------+----------------+
| id|index|customers.val.address|customers.val.id|
+---+-----+---------------------+----------------+
|  2|    0|      66 P Street, NY|             343|
|  3|    0|       708 Fed Ln, CA|             932|
|  3|    1|    807 Deccan Dr, CA|             102|
|  1|    0|  108 Park Street, TX|             623|
|  1|    1|     763 Marsh Ln, TX|             231|
+---+-----+---------------------+----------------+

RenameField

The second DynamicFrame we retrieved from the previous operation introduces a period (.) into our column names and is very lengthy. We can change that using the RenameField function:

# Input
dyf_renameField_1 = RenameField.apply(dyf_selectFromCollection, "`customers.val.address`", "address")

dyf_renameField_2 = RenameField.apply(dyf_renameField_1, "`customers.val.id`", "cust_id")

dyf_dropfields_rf = DropFields.apply(
  frame = dyf_renameField_2,
  paths = ["index", "id"]
)

dyf_dropfields_rf.toDF().show()

# Output
+-------------------+-------+
|            address|cust_id|
+-------------------+-------+
|    66 P Street, NY|    343|
|     708 Fed Ln, CA|    932|
|  807 Deccan Dr, CA|    102|
|108 Park Street, TX|    623|
|   763 Marsh Ln, TX|    231|
+-------------------+-------+

ResolveChoice

ResloveChoice can gracefully handle column type ambiguities. For more information about the full capabilities of ResolveChoice, see the GitHub repo.

# Input
dyf_resolveChoice = dyf_dropfields_rf.resolveChoice(specs = [('cust_id','cast:String')])

dyf_resolveChoice.printSchema()

# Output
root
|-- address: string
|-- cust_id: string

Dataset 3

We generate another dataset to demonstrate a few other functions. In this use case, the company’s warehouse inventory data is in a nested JSON structure, which is initially in a String format. See the following code:

# Input
warehouse_inventory_list = [
              ['TX_WAREHOUSE', '{\
                          "strawberry":"220",\
                          "pineapple":"560",\
                          "mango":"350",\
                          "pears":null}'
               ],\
              ['CA_WAREHOUSE', '{\
                         "strawberry":"34",\
                         "pineapple":"123",\
                         "mango":"42",\
                         "pears":null}\
              '],
    		   ['CO_WAREHOUSE', '{\
                         "strawberry":"340",\
                         "pineapple":"180",\
                         "mango":"2",\
                         "pears":null}'
              ]
            ]


warehouse_schema = StructType([StructField("warehouse_loc", StringType())\
                              ,StructField("data", StringType())])

df_warehouse = spark.createDataFrame(warehouse_inventory_list, schema = warehouse_schema)
dyf_warehouse = DynamicFrame.fromDF(df_warehouse, glueContext, "dyf_warehouse")

dyf_warehouse.printSchema()

# Output
root
|-- warehouse_location: string
|-- data: string

Unbox

We use Unbox to extract JSON from String format for the new data. Compare the preceding printSchema() output with the following code:

# Input
dyf_unbox = Unbox.apply(frame = dyf_warehouse, path = "data", format="json")
dyf_unbox.printSchema()
# Output
root
|-- warehouse_loc: string
|-- data: struct
|    |-- strawberry: int
|    |-- pineapple: int
|    |-- mango: int
|    |-- pears: null

# Input 
dyf_unbox.toDF().show()

# Output
+-------------+----------------+
|warehouse_loc|            data|
+-------------+----------------+
| TX_WAREHOUSE|[220, 560, 350,]|
| CA_WAREHOUSE|  [34, 123, 42,]|
| CO_WAREHOUSE|  [340, 180, 2,]|
+-------------+----------------+

Unnest

Unnest allows us to flatten a single DynamicFrame to a more relational table format. We apply Unnest to the nested structure from the previous operation and flatten it:

# Input
dyf_unnest = UnnestFrame.apply(frame = dyf_unbox)

dyf_unnest.printSchema()

# Output 
root
|-- warehouse_loc: string
|-- data.strawberry: int
|-- data.pineapple: int
|-- data.mango: int
|-- data.pears: null

dyf_unnest.toDF().show()

# Output
+-------------+---------------+--------------+----------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|data.pears|
+-------------+---------------+--------------+----------+----------+
| TX_WAREHOUSE|            220|           560|       350|      null|
| CA_WAREHOUSE|             34|           123|        42|      null|
| CO_WAREHOUSE|            340|           180|         2|      null|
+-------------+---------------+--------------+----------+----------+

DropNullFields

The DropNullFields function makes it easy to drop columns with all null values. Our warehouse data indicated that it was out of pears and can be dropped. We apply the DropNullFields function on the DynamicFrame, which automatically identifies the columns with null values and drops them:

# Input
dyf_dropNullfields = DropNullFields.apply(frame = dyf_unnest)

dyf_dropNullfields.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

SplitFields

SplitFields allows us to split a DyanmicFrame into two. The function takes the field names of the first DynamicFrame that we want to generate followed by the names of the two DynamicFrames:

# Input
dyf_splitFields = SplitFields.apply(frame = dyf_dropNullfields, paths = ["`data.strawberry`", "`data.pineapple`"], name1 = "a", name2 = "b")

For the first DynamicFrame, see the following code:

# Input
dyf_retrieve_a = SelectFromCollection.apply(dyf_splitFields, "a")
dyf_retrieve_a.toDF().show()

# Output
+---------------+--------------+
|data.strawberry|data.pineapple|
+---------------+--------------+
|            220|           560|
|             34|           123|
|            340|           180|
+---------------+--------------+

For the second Dynamic Frame, see the following code:

# Input
dyf_retrieve_b = SelectFromCollection.apply(dyf_splitFields, "b")
dyf_retrieve_b.toDF().show()

# Output
+-------------+----------+
|warehouse_loc|data.mango|
+-------------+----------+
| TX_WAREHOUSE|       350|
| CA_WAREHOUSE|        42|
| CO_WAREHOUSE|         2|
+-------------+----------+

SplitRows

SplitRows allows us to filter our dataset within a specific range of counts and split them into two DynamicFrames:

# Input
dyf_splitRows = SplitRows.apply(frame = dyf_dropNullfields, comparison_dict = {"`data.pineapple`": {">": "100", "<": "200"}}, name1 = 'pa_200_less', name2 = 'pa_200_more')

For the first Dynamic Frame, see the following code:

# Input
dyf_pa_200_less = SelectFromCollection.apply(dyf_splitRows, 'pa_200_less')
dyf_pa_200_less.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

For the second Dynamic Frame, see the following code:

# Input
dyf_pa_200_more = SelectFromCollection.apply(dyf_splitRows, 'pa_200_more')
dyf_pa_200_more.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
+-------------+---------------+--------------+----------+

Spigot

Spigot allows you to write a sample dataset to a destination during transformation. For our use case, we write the top 10 records locally:

# Input
dyf_splitFields = Spigot.apply(dyf_pa_200_less, '/home/glue/GlueLocalOutput/Spigot/', 'top10')

Depending on your local environment configuration, Spigot may run into errors. Alternatively, you can use an AWS Glue endpoint or an AWS Glue ETL job to run this function.

Write Dynamic Frame

The write_dynamic_frame function writes a DynamicFrame using the specified connection and format. For our use case, we write locally (we use a connection_type of S3 with a POSIX path argument in connection_options, which allows writing to local storage):

# Input
glueContext.write_dynamic_frame.from_options(\
frame = dyf_splitFields,\
connection_options = {'path': '/home/glue/GlueLocalOutput/'},\
connection_type = 's3',\
format = 'json')

Conclusion

This article discussed the PySpark ETL capabilities of AWS Glue. Further testing with an AWS Glue development endpoint or directly adding jobs in AWS Glue is a good pivot to take the learning forward. For more information, see General Information about Programming AWS Glue ETL Scripts.


About the Authors

Adnan Alvee is a Big Data Architect for AWS ProServe Remote Consulting Services. He helps build solutions for customers leveraging their data and AWS services. Outside of AWS, he enjoys playing badminton and drinking chai.

 

 

Imtiaz (Taz) Sayed is the World Wide Tech Leader for Data Analytics at AWS. He is an ardent data engineer and relishes connecting with the data analytics community.

 

Developing AWS Glue ETL jobs locally using a container

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/developing-aws-glue-etl-jobs-locally-using-a-container/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. In the fourth post of the series, we discussed optimizing memory management. In this post, we focus on writing ETL scripts for AWS Glue jobs locally. AWS Glue is built on top of Apache Spark and therefore uses all the strengths of open-source technologies. AWS Glue comes with many improvements on top of Apache Spark and has its own ETL libraries that can fast-track the development process and reduce boilerplate code.

The AWS Glue team released the AWS Glue binaries and let you set up an environment on your desktop to test your code. We have used these libraries to create an image with all the right dependencies packaged together. The image has AWS Glue 1.0, Apache Spark, OpenJDK, Maven, Python3, the AWS Command Line Interface (AWS CLI), and boto3. We have also bundled Jupyter and Zeppelin notebook servers in the image so you don’t have to configure an IDE and can start developing AWS Glue code right away.

The AWS Glue team will release new images for various AWS Glue updates. The tags of the new images will follow the following convention: glue_libs_<glue-version>_image_<image-version>. For example, glue_libs_1.0.0_image_01. In this name, 1.0 is the AWS Glue major version, .0 is the patch version, and 01 is the image version. The patch version will be incremented for updates to the AWS Glue libraries of a major release. Image version will be incremented for the release of a new image of a major AWS Glue release. Both these increments will be reset with every major AWS Glue release. So, the first image released for AWS Glue 2.0 will be glue_libs_2.0.0_image_01.

We recommend pulling the highest image version for an AWS Glue major version to get the latest updates.

Prerequisites

Before you start, make sure that Docker is installed and the Docker daemon is running. For installation instructions, see the Docker documentation for Mac, Windows, or Linux. The machine running the Docker hosts the AWS Glue container. Also make sure that you have at least 7 GB of disk space for the image on the host running the Docker.

For more information about restrictions when developing AWS Glue code locally, see Local Development Restrictions.

Solution overview

In this post, we use amazon/aws-glue-libs:glue_libs_1.0.0_image_01 from Docker Hub. This image has only been tested for an AWS Glue 1.0 Spark shell (both for PySpark and Scala). It hasn’t been tested for an AWS Glue 1.0 Python shell.

We organize this post into the following three sections. You only have to complete one of the three sections (not all three) depending on your requirement:

  • Setting up the container to use Jupyter or Zeppelin notebooks
  • Setting up the Docker image with PyCharm Professional
  • Running against the CLI interpreter

This post uses the following two terms frequently:

  • Client – The system from which you access the notebook. You open a web browser on this system and put the notebook URL.
  • Host – The system that hosts the Docker daemon. The container runs on this system.

Sometimes, your client and host can be the same system.

Setting up the container to use Jupyter or Zeppelin notebooks

Setting up the container to run PySpark code in a notebook includes three high-level steps:

  1. Pulling the image from Docker Hub.
  2. Running the container.
  3. Opening the notebook.

Pulling the image from Docker Hub

If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

Open cmd on Windows or terminal on Mac and run the following command:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

Running the container

We pulled the image from Docker Hub in the previous step. We now run a container using this image.

The general format of the run command is:

docker run -itd -p <port_on_host>:<port_on_container_either_8888_or_8080> -p 4040:4040 <credential_setup_to_access_AWS_resources> --name <container_name> amazon/aws-glue-libs:glue_libs_1.0.0_image_01 <command_to_start_notebook_server>

The code includes the following information:

  • <port_on_host> – The local port of your host that is mapped to the port of the container. For our use case, the container port is either 8888 (for a Jupyter notebook) or 8080 (for a Zeppelin notebook). To keep things simple, we use the same port number as the notebook server ports on the container in the following examples.
  • <port_on_container_either_8888_or_8080> – The port of the notebook server on the container. The default port of Jupyter is 8888; the default port of Zeppelin is 8080.
  • 4040:4040 – This is required for SparkUI. 4040 is the default port for SparkUI. For more information, see Web Interfaces.
  • <credential_setup_to_access_AWS_resources> – In this section, we go with the typical case of mounting the host’s directory, containing the credentials. We assume that your host has the credentials configured using aws configure. The flow chart in the Appendix section explains various ways to set the credentials if the assumption doesn’t hold for your environment.
  • <container_name> – The name of the container. You can use any text here.

  • amazon/aws-glue-libs:glue_libs_1.0.0_image_01 – The name of the image that we pulled in the previous step.
  • <command_to_start_notebook_server> – We run /home/zeppelin/bin/zeppelin.sh for a Zeppelin notebook and /home/jupyter/jupyter_start.sh for a Jupyter notebook. If you want to run your code against the CLI interpreter, you don’t need a notebook server and can leave this argument blank.
The following example code starts a Jupyter notebook and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
The following example code starts a Jupyter notebook and passes read-write credentials from a Windows host:

docker run -itd -p 8888:8888 -p 4040:4040 -v %UserProfile%\.aws:/root/.aws:rw --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

To run a Zeppelin notebook, replace 8888:8888 with 8080:8080, glue_jupyter with glue_zeppelin, and /home/jupyter/jupyter_start.sh with /home/zeppelin/bin/zeppelin.sh. For example, the following command starts a Zeppelin notebook server and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8080:8080 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_zeppelin amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/zeppelin/bin/zeppelin.sh

You can now run the following command to make sure that the container is running:

docker ps

The Jupyter notebook is configured to allow connections from all IP addresses without authentication, and the Zeppelin notebook is configured to use anonymous access. This configuration makes sure that you can start working on your local machine with just two commands (docker pull and docker run). If your scenario mandates a different configuration, run the container without running the notebook startup script (/home/jupyter/jupyter_start.sh or /home/zeppelin/bin/zeppelin.sh). This starts the container but not the notebook server. You can then run the bash shell on the container using the following command, edit the required notebook configurations, and start the notebook server:

docker exec -it <container_name> bash

For example,

docker exec -it glue_jupyter bash.

The following example code is the docker run command without the notebook server startup:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01

If you’re running the container on Amazon Elastic Compute Cloud (Amazon EC2) instance, you have to set up your inbound rules in the security group to allow communication on the ports used by the notebook server. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

Opening the notebook

If your client and host are the same machine, enter the following URL for Jupyter: http://localhost:8888.

You can write PySpark code in the notebook as shown here. You can also use SQL magic (%%sql) to directly write SQL against the tables in the AWS Glue Data Catalog. If your catalog table is on top of JSON data, you have to place json-serde.jar in the /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/jars directory of the container and restart the kernel in your Jupyter notebook. You can place the jar in this directory by first running the bash shell on the container using the following command:

docker exec -it <container_name> bash

If you have a local directory that holds your notebooks, you can mount it to /home/jupyter/jupyter_default_dir using the -v option. These notebooks are available to you when you open the Jupyter notebook URL. For example, see the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro -v C:\Users\admin\Documents\notebooks:/home/jupyter/jupyter_default_dir --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

The URL for Zeppelin is http://localhost:8080.

For Zeppelin notebooks, include %spark.pyspark on the top to run PySpark code.

If your host is Amazon EC2 and your client is your laptop, replace localhost in the preceding URLs with your host’s public IP.

Depending on your network or if you’re on a VPN, you might have to set an SSH tunnel. The general format of the tunnel is the following code:

ssh -i <absolute_path_to_your_private_key_for_EC2> -v -N -L <port_on_client>:<ip_of_the_container>:<port_8888_or_8080> ec2-user@<public_ip_address_of_ec2_host>

Your security group controlling the EC2 instance should allow inbound on port 22 from the client. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

You can get the <ip_of_the_container> under the IPAddress field when you run docker inspect <container_name>. For example: docker inspect glue_jupyter.

If you set up the tunnel, the URL to access the notebook is: http://localhost:<port_on_client>.

Use 8888 or 8080 for <port_8888_or_8080>, depending on if you’re running a Jupyter or Zeppelin notebook.

You can now use the following sample code to test your notebook:

from pyspark import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate()) 
inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
inputDF.toDF().show()

Although awsglue-datasets is a public bucket, you at least need the following permissions, attached to the AWS Identity and Access Management (IAM) user used for your container, to view the data:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3ReadOnly",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::awsglue-datasets/*"
        }
    ]
}

You can also see the databases in your AWS Glue Data Catalog using the following code:

spark.sql("show databases").show()

You need AWS Glue permissions to run the preceding command. The following are the minimum permissions required to run the code. Replace <account_number> with your account number and <region> with your Region:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GlueAccess",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account_number>:database/*",
                "arn:aws:glue:<region>:<account_number>:catalog"
            ]
        }
    ]
}

Similarly, you can query the AWS Glue Data Catalog tables too. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

You can stop here if you want to develop AWS Glue code locally using only notebooks.

Setting up the Docker image with PyCharm Professional

This section talks about setting up PyCharm Professional to use the image. For this post, we use Windows. There may be a few differences when using PyCharm on a Mac.

  1. Open cmd (or terminal for Mac) and pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01 using the following command:
    docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

    If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

  2. Choose the Docker icon (right-click) and choose Settings (this step isn’t required for Mac or Linux).
  3. In the General section, select Expose daemon on tcp://localhost:2375 without TLS (this step isn’t required for Mac or Linux). Note the warning listed under the checkbox. This step is based on PyCharm documentation.
  4. Choose Apply & Restart (this step isn’t required for Mac or Linux).
  5. Choose the Docker icon (right-click) and choose Restart… if the Docker doesn’t restart automatically (this step isn’t required for Mac or Linux).
  6. Open PyCharm and create a Pure Python project (if you don’t have one).
  7. Under File, choose Settings… (for Mac, under PyCharm, choose Preferences).
  8. Under Settings, choose Project Interpreter. In the following screenshot, GlueProject is the name of my project. Your project name might be different.
  9. Choose Show All… from the drop-down menu.
  10. Choose the + icon.

  11. Choose Docker.
  12. Choose New.
  13. For Name, enter a name (for example, Docker-Glue).
  14. Keep other settings at their default.
  15. If running on Windows, for Connect to Docker daemon with, select TCP socket and enter the Engine API URL.
    For this post, we enter tcp://localhost:2375 because Docker and PyCharm are on the same Windows machine.
    If running on a Mac, select Docker for Mac. No API URL is required.
  16. Make sure you see the message Connection successful.

For Windows, if you don’t see this message, Docker may not have restarted after you changed the settings in Step 4. Restart the Docker and repeat these steps again. For more information about connection settings, see PyCharm documentation.

The following screenshots show steps 13-16 in Windows and Mac.

  1. Choose OK.

You should now see the image listed in the drop-down menu.

  1. Choose the image that you pulled from Docker Hub (amazon/aws-glue-libs:glue_libs_1.0.0_image_01).
  2. Choose OK.

You now see the interpreter listed.

  1. Choose OK.

This lists all the packages in the image.

  1. Choose OK.

Steps 22-27 help you get AWS Glue-related code completion suggestions from PyCharm.

  1. Download the following file: https://s3.amazonaws.com/aws-glue-jes-prod-us-east-1-assets/etl-1.0/python/PyGlue.zip.
  2. Under File, choose Settings (for Mac, under PyCharm, choose Preferences).
  3. Under Project: <Project name>, choose Project Structure.
  4. Choose Add Content Root.
  5. Choose the newly downloaded PyGlue.zip file.
  6. In the Settings window, choose OK.
  7. Choose the project (right-click) and choose New, Python File.
  8. Enter a name for the Python file and press Enter.
  9. Enter the following code in the file and save it. For more information about the minimum permissions required to run this code, see this section.
    from pyspark import SparkContext
    from awsglue.context import GlueContext
    
    glueContext = GlueContext(SparkContext.getOrCreate()) 
    inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
    inputDF.toDF().show()
    

  10. Choose Add Configuration.
  11. Choose the +icon.
  12. Under Add New Configuration, choose Python.
  13. For Name, enter a name.
  14. For Environment variables, enter the following:
    PYTHONPATH=/home/aws-glue-libs/awsglue.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/pyspark.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/py4j-0.10.7-src.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python

  15. For Script path, select the newly created script in Step 29.
  16. For Python interpreter, choose the newly created interpreter.
  17. Choose Docker Container Settings.
  18. Under Volume bindings, choose the +icon.
  19. For Host path, add the absolute path .aws folder that holds the credentials and the config files.
  20. For Container path, add /root/.aws.
  21. Choose OK.
  22. For Run/Debug Configurations, choose OK.
  23. Run the code by choosing the green button on the top right.

You can also see the databases in your AWS Glue Data Catalog using the following code. For more information about the minimum permissions required to run this code, see this section.

spark.sql("show databases").show()

Similarly, you can also query the catalog tables. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

PyCharm gives code completion suggestions for AWS Glue (see the following screenshot). This is possible because of the steps you completed earlier.

Running against the CLI interpreter

You can always run the bash shell on the container and run your PySpark code directly against the CLI interpreter in the container.

  1. Complete Pulling the image from Docker Hub step and Running the container step in the section Setting up the container to use Jupyter of Zeppelin notebooks.
  2. Run the bash shell on the container by entering the following code. Replace <container_name> with the name (--name argument) you used earlier.
    docker exec -it <container_name> bash

  3. Run one of the following commands:
    1. For PySpark, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

    2. For Scala, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/spark-shell

Conclusion

In this post, we learned about a three-step process to get started on AWS Glue and Jupyter or Zeppelin notebook. Although notebooks are a great way to get started and a great asset to data scientists and data wranglers, data engineers generally have a source control repository, an IDE, and a well-defined CI/CD process. Because PyCharm is a widely used IDE for PySpark development, we showed how to use the image with PyCharm Professional. You can develop your code locally in your IDE and test it locally using the container, and your CI/CD process can run as it does with any other IDE and source control tool in your organization. Although we showed integration with PyCharm, you can similarly integrate the container with any IDE that you use to complete your CI/CD story with AWS Glue.


Appendix

The following section discusses various ways to set the credentials to access AWS resources (such as Amazon Simple Storage Service (Amazon S3), AWS Step Functions, and more) from the container.

You need to provide your AWS credentials to connect to an AWS service from the container. The AWS SDKs and CLIs use provider chains to look for AWS credentials in several different places, including system or user environment variables and in local AWS configuration files. For more information about how to set up credentials, see https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/credentials.html. To generate the credentials using the AWS Management Console, see Managing Access Keys (Console). For instructions on generating credentials with the AWS CLI, see create-access-key. For more information about generating credentials with an API, see CreateAccessKey.

The following flow chart shows the various ways to set up AWS credentials for the container. Most of these mechanisms don’t work with PyCharm because we use the image there and not the container. You can use the container as an SSH interpreter in PyCharm and then use one of the credential setting mechanisms listed here. However, that discussion is out of the scope of this post.

Note that the numbers, in brackets, match the code snippets that follow the chart.

(1) To find more info about the syntax of setting up the tunnel, see this.

(2) To set credentials using the docker cp command to copy credentials from the Windows host to the container, enter the following code (this example code uses the container name glue_jupyter):

docker cp %UserProfile%\.aws\.  glue_jupyter:/root/.aws

(3) To mount the host’s .aws directory on the container with rw option, see this.

(4) To mount the host’s .aws directory on the container with ro option, see this.

(5) To set the credentials in a file, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --env-file /datalab_pocs/glue_local/env_variables.txt --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

/datalab_pocs/glue_local/env_variables.txt is the absolute path of the file holding the environment variables. The file should have the following variables:

  • AWS_ACCESS_KEY_ID=<Access_id>
  • AWS_SECRET_ACCESS_KEY=<Access_key>
  • AWS_REGION=<Region>

For more information about Regions, see Regions, Availability Zones, and Local Zones.

(6) To set the credentials in the docker run command, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -e AWS_ACCESS_KEY_ID=<ID> -e AWS_SECRET_ACCESS_KEY=<Key> -e AWS_REGION=<Region>  --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

(7) To set credentials using aws configure on the container, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
docker exec -it glue_jupyter bash
aws configure


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

 

 

 

How Aruba Networks built a cost analysis solution using AWS Glue, Amazon Redshift, and Amazon QuickSight

Post Syndicated from Siddharth Thacker original https://aws.amazon.com/blogs/big-data/how-aruba-networks-built-a-cost-analysis-solution-using-aws-glue-amazon-redshift-and-amazon-quicksight/

This is a guest post co-written by Siddharth Thacker and Swatishree Sahu from Aruba Networks.

Aruba Networks is a Silicon Valley company based in Santa Clara that was founded in 2002 by Keerti Melkote and Pankaj Manglik. Aruba is the industry leader in wired, wireless, and network security solutions. Hewlett-Packard acquired Aruba in 2015, making it a wireless networking subsidiary with a wide range of next-generation network access solutions.

Aruba Networks provides cloud-based platform called Aruba Central for network management and AI Ops. Aruba cloud platform supports thousands of workloads to support customer facing production environment and also a separate development platform for Aruba engineering.

The motivation to build the solution presented in this post was to understand the unit economics of the AWS resources used by multiple product lines across different organization pillars. Aruba wanted a faster, effective, and reliable way to analyze cost and usage data and visualize that into a dashboard. This solution has helped Aruba in multiple ways, including:

  • Visibility into costs – Multiple Aruba teams can now analyze the cost of their application via data surfaced with this solution
  • Cost optimization – The solution helps teams identify new cost-optimization opportunities by making them aware of the higher-cost resources with low utilization so they can optimize accordingly
  • Cost management – The Cloud DevOps organization, the group who built this solution, can effectively plan at the application level and have a direct positive impact on gross margins
  • Cost savings – With daily cost data available, engineers can see the monetary impact of right-sizing compute and other AWS resources almost immediately
  • Big picture as well as granular – Users can visualize cost data from the top down and track cost at a business level and a specific resource level

Overview of the solution

This post describes how Aruba Networks automated the solution, from generating the AWS Cost & Usage Report (AWS CUR) to its final visualization on Amazon QuickSight. In this solution, they start by configuring the CUR on their primary payer account, which publishes the billing reports to an Amazon Simple Storage Service (Amazon S3) bucket. Then they use an AWS Glue crawler to define and catalog the CUR data. As the new CUR data is delivered daily, the data catalog is updated, and the data is loaded into an Amazon Redshift database using Amazon Redshift Spectrum and SQL. The reporting and visualization layer is built using QuickSight. Finally, the entire pipeline is automated by using AWS Data Pipeline.

The following diagram illustrates this architecture.

Aruba prefers the AWS CUR Report to AWS Cost Explorer because AWS Cost Explorer provides usage information at a high level, and not enough granularity for detailed operations, such as data transfer cost. AWS CUR provides the most detailed information available about your AWS costs and usage at an hourly granularity. This allows the Aruba team to drill down the costs by the hour or day, product or product resource, or custom tags, enabling them to achieve their goals.

Aruba implemented the solution with the following steps:

  1. Set up the CUR delivery to a primary S3 bucket from the billing dashboard.
  2. Use Amazon S3 replication to copy the primary payer S3 bucket to the analytics bucket. Having a separate analytics account helps prevent direct access to the primary account.
  3. Create and schedule the crawler to crawl the CUR data. This is required to make the metadata available in the Data Catalog and update it quickly when new data arrives.
  4. Create respective Amazon Redshift schema and tables.
  5. Orchestrate an ETL flow to load data to Amazon Redshift using Data Pipeline.
  6. Create and publish dashboards using QuickSight for executives and stakeholders.

Insights generated

The Aruba DevOps team built various reports that provide the cost classifications on AWS services, weekly cost by applications, cost by product, infrastructure, resource type, and much more using the detailed CUR data as shown by the following screenshot.

For example, using the following screenshot, Aruba can conveniently figure out that compute cost is the biggest contributor compared to other costs. To reduce the cost, they can consider using various cost-optimization methods like buying reserved instances, savings plans, or Spot Instances wherever applicable.

Similarly, the following screenshot highlights the cost doubled compared to the first week of April. This helps Aruba to identify anomalies quickly and make informed decisions.

Setting up the CUR delivery

For instructions on setting up a CUR, see Creating Cost and Usage Reports.

To reduce complexity in the workflow, Aruba chose to create resources in the same region with hourly granularity, mainly to see metrics more frequently.

To lower the storage costs for data files and maximize the effectiveness of querying data with serverless technologies like Amazon Athena, Amazon Redshift Spectrum, and Amazon S3 data lake, save the CUR in Parquet format. The following screenshot shows the configuration for delivery options.

The following table shows some example CUR data.

bill_payer_account_id line_item_usage_account_id line_item_usage_start_date line_item_usage_end_date line_item_product_code line_item_usage_type line_item_operation
123456789 111222333444 00:00.0 00:00.0 AmazonEC2 USW2-EBS:VolumeP-IOPS.piops CreateVolume-P-IOPS
123456789 111222333444 00:00.0 00:00.0 AmazonEC2 USW2-APN1-AWS-In-Bytes LoadBalancing-PublicIP-In
123456789 111222333444 00:00.0 00:00.0 AmazonEC2 USW2-DataProcessing-Bytes LoadBalancing
123456789 111222333444 00:00.0 00:00.0 AmazonEC2 USW2-EBS:SnapshotUsage CreateSnapshot
123456789 555666777888 00:00.0 00:00.0 AmazonEC2 USW2-EBS:SnapshotUsage CreateSnapshot
123456789 555666777888 00:00.0 00:00.0 AmazonEC2 USW2-EBS:SnapshotUsage CreateSnapshot
123456789 555666777888 00:00.0 00:00.0 AmazonEC2 USW2-DataTransfer-Regional-Bytes InterZone-In
123456789 555666777888 00:00.0 00:00.0 AmazonS3 USW2-Requests-Tier2 ReadLocation
123456789 555666777888 00:00.0 00:00.0 AmazonEC2 USW2-DataTransfer-Regional-Bytes InterZone-In

Replicating the CUR data to your analytics account

For security purposes, other teams aren’t allowed to access the primary (payer) account, and therefore can’t access CUR data generated from that account. Aruba replicated the data to their analytics account and build the cost analysis solution there. Other teams can access the cost data without getting access permission for the primary account. The data is replicated across accounts by adding an Amazon S3 replication rule in the bucket. For more information, see Adding a replication rule when the destination bucket is in a different AWS account.

Cataloging the data with a crawler and scheduling it to run daily

Because AWS delivers all daily reports in a report date range report-prefix/report-name/yyyymmdd-yyyymmdd folder, Aruba uses AWS Glue crawlers to crawl through the data and update the catalog.

AWS Glue is a fully managed ETL service that makes it easy to prepare and load the data for analytics. Once the AWS Glue is pointed to the data stored on AWS, it discovers the data and stores the associated metadata (such as table definition and schema) in the Data Catalog. After the data is cataloged, the data is immediately searchable, queryable, and available for ETL. For more information, see Populating the AWS Glue Data Catalog.

The following screenshot shows the crawler created on Amazon S3 location of the CUR data.

The following code is an example table definition populated by the crawler.:

CREATE EXTERNAL TABLE `cur_parquet`(
  `identity_line_item_id` string, 
  `identity_time_interval` string, 
  `bill_invoice_id` string, 
………
………
  `resource_tags_user_infra_role` string)

PARTITIONED BY ( 
  `year` string, 
  `month` string )

ROW FORMAT SERDE  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://curS3bucket/Parquet/'

Transforming and loading using Amazon Redshift

Next in the analytics service, Aruba chose Amazon Redshift over Athena. Aruba has a use case to integrate cost data together with other tables already present in Amazon Redshift and hence using the same service makes it easy to integrate with their existing data. To further filter and transform data at the same time, and simplify the multi-step ETL, Aruba chose Amazon Redshift Spectrum. It helps to efficiently query and load CUR data from Amazon S3. For more information, see Getting started with Amazon Redshift Spectrum.

Use the following query to create an external schema and map it to the AWS Glue database created earlier in the Data Catalog:

--Choose a schema name of your choice, cur_redshift_external_schema name is just an example--
 create external schema cur_redshift_spectrum_external_schema from data catalog database 
 'aruba_curr_db' iam_role 'arn:aws:iam::xxxxxxxxxxxxx:role/redshiftclusterrole' 
 create external database if not exists;

The table created in the Data Catalog appears under the Amazon Redshift Spectrum schema. The schema, table, and records created can be verified with the following SQL code:

SELECT Count(*) 
FROM   cur_redshift_spectrum_external_schema.<TABLE>; 

--Query the right partition, year=2020 and month=2 is used an example
SELECT Count(*) 
FROM   cur_redshift_spectrum_external_schema.<TABLE> 
WHERE  year=2020 
AND    month=2;

Next, transform and load the data into the Amazon Redshift table. Aruba started by creating an Amazon Redshift table to contain the data. The following SQL code can be used to create the production table with the desired columns:

CREATE TABLE redshift_schema.redshift_table 
  ( 
     usage_start_date TIMESTAMP, 
     usage_end_date   TIMESTAMP, 
     service_region   VARCHAR (256), 
     service_az       VARCHAR (256), 
     aws_resource_id  VARCHAR (256), 
     usage_amount     FLOAT (17), 
     charge_currency  VARCHAR (256), 
     aws_product_name VARCHAR (256), 
     instance_family  VARCHAR (256), 
     instance_type    VARCHAR (256), 
     unblended_cost   FLOAT (17), 
     usage_cost       FLOAT (17)
  ); 

CUR is dynamic in nature, which means that some columns may appear or disappear with each update. When creating the table, we take static columns only. For more information, see Line item details.

Next, insert and update to ingest the data from Amazon S3 to the Amazon Redshift table. Each CUR update is cumulative, which means that each version of the CUR includes all the line items and information from the previous version.

The reports generated throughout the month are estimated and subject to change during the rest of the month. AWS finalizes the report at the end of each month. Finalized reports have the calculations for the blended and unblended costs, and cover all the usage for the month. For this use case, Aruba updates the last 45 days of data to make sure the finalized cost is captured. The below sample query can be used to verify the updated data:

-- Create Table Statement
 INSERT INTO redshift_schema.redshift_table
            (usage_start_date, 
             usage_end_date, 
             service_region, 
             service_az, 
             aws_resource_id, 
             usage_amount, 
             charge_currency, 
             aws_product_name, 
             instance_family, 
             instance_type, 
             unblended_cost,
             Usage_Cost ) 
 SELECT line_item_usage_start_date, 
       line_item_usage_end_date, 
       line_item_operation, 
       line_item_availability_zone, 
       line_item_resource_id, 
       line_item_usage_amount, 
       line_item_currency_code, 
       product_product_name, 
       product_instance_family, 
       product_instance_type, 
       line_item_unblended_cost,
       case when line_item_type='Usage' then line_item_unblended_cost
            else 0
            end as usage_cost 
 FROM   cur_redshift_external_schema.cur_parquet_parquet
 WHERE  line_item_usage_start_date >= date_add('day', -45, getdate()) 
       AND line_item_usage_start_date < date_add('day', 1, getdate()); 

Using Data Pipeline to orchestrate the ETL workflow

To automate this ETL workflow, Aruba chose Data Pipeline. Data Pipeline helps to reliably process and move data between different AWS compute and storage services, as well as on-premises data sources. With Data Pipeline, Aruba can regularly access their data where it’s stored, transform and process it at scale, and efficiently transfer the results to AWS services such as Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon DynamoDB, and Amazon EMR. Although the detailed steps of setting up this pipeline are out of scope for this blog, there is a sample workflow definition JSON file, which can be imported after making the necessary changes.

Data Pipeline workflow

The following screenshot shows the multi-step ETL workflow using Data Pipeline. Data Pipeline is used to run the INSERT query daily, which inserts and updates the latest CUR data into our Amazon Redshift table from the external table.

In order to copy data to Amazon Redshift,  RedshiftDataNode and RedshiftCopyActivity can be used, and then scheduled to run periodically.

Sharing metrics and creating visuals with QuickSight

To share the cost and usage with other teams, Aruba choose QuickSight using Amazon Redshift as the data source. QuickSight is a native AWS service that seamlessly integrates with other AWS services such as Amazon Redshift, Athena, Amazon S3, and many other data sources.

As a fully managed service, QuickSight lets Aruba to easily create and publish interactive dashboards that include ML Insights. In addition to building powerful visualizations, QuickSight provides data preparation tools that makes it easy to filter and transform the data into the exact needed dataset. As a cloud-native service, dashboards can be accessed from any device and embedded into applications and portals, allowing other teams to monitor their resource usage easily. For more information about creating a dataset, see Creating a Dataset from a Database. Quicksight Visuals can then be created from this dataset.

The following screenshot shows a visual comparison of device cost and count to help find the cost per device. This visual helped Aruba quickly identify the cost per device increase in April and take necessary actions.

Similarly, the following visualization helped Aruba identify an increase in data transfer cost and helped them decide to invest in rearchitecting their application.

The following visualization classifies the cost spend per resource.

Conclusion

In this post, we discussed how Aruba Networks was able to successfully achieve the following:

  • Generate CUR and use AWS Glue to define data, catalog the data, and update the metadata
  • Use Amazon Redshift Spectrum to transform and load the data to Amazon Redshift tables
  • Query, visualize, and share the data stored using QuickSight
  • Automate and orchestrate the entire solution using Data Pipeline

Aruba use this solution to automatically generate a daily cost report and share it with their stakeholders, including executives and cloud operations team.

 


About the Authors

Siddharth Thacker works in Business & Finance Strategy in Cloud Software division at Aruba Networks. Siddharth has Master’s in Finance with experience in industries like banking, investment management, cloud software and focuses on business analytics, margin improvement and strategic partnerships at Aruba. In his spare time, he likes exploring outdoors and participate in team sports.

Swatishree Sahu is a Technical Data Analyst at Aruba Networks. She has lived and worked in India for 7 years as an SME for SOA-based integration tools before coming to US to pursue her master’s in Business Analytics from UT Dallas. Breaking down and analyzing data is her passion. She is a Star Wars geek, and in her free time, she loves gardening, painting, and traveling.

Ritesh Chaman is a Technical Account Manager at Amazon Web Services. With 10 years of experience in the IT industry, Ritesh has a strong background in Data Analytics, Data Management, and Big Data systems. In his spare time, he loves cooking (spicy Indian food), watching sci-fi movies, and playing sports.

 

 

 

Kunal Ghosh is a Solutions Architect at AWS. His passion is to build efficient and effective solutions on the cloud, especially involving Analytics, AI, Data Science, and Machine Learning. Besides family time, he likes reading and watching movies, and is a foodie.