Tag Archives: AWS Glue

How Wind Mobility built a serverless data architecture

Post Syndicated from Pablo Giner original https://aws.amazon.com/blogs/big-data/how-wind-mobility-built-a-serverless-data-architecture/

Guest post by Pablo Giner, Head of BI, Wind Mobility.

Over the past few years, urban micro-mobility has become a trending topic. With the contamination indexes hitting historic highs, cities and companies worldwide have been introducing regulations and working on a wide spectrum of solutions to alleviate the situation.

We at Wind Mobility strive to make commuters’ life more sustainable and convenient by bringing short distance urban transportation to cities worldwide.

At Wind Mobility, we scale our services at the same pace as our users demand them, and we do it in an economically and environmentally viable way. We optimize our fleet distribution to avoid overcrowding cities with more scooters than those that are actually going to be used, and we position them just meters away from where our users need them and at the time of the day when they want them.

How do we do that? By optimizing our operations to their fullest. To do so, we need to be very well informed about our users’ behavior under varying conditions and understand our fleet’s potential.

Scalability and flexibility for rapid growth

We knew that before we could solve this challenge, we needed to collect data from many different sources, such as user interactions with our application, user demand, IoT signals from our scooters, and operational metrics. To analyze the numerous datasets collected and extract actionable insights, we needed to build a data lake. While the high-level goal was clear, the scope was less so. We were working hard to scale our operation as we continued to launch new markets. The rapid growth and expansion made it very difficult to predict the volume of data we would need to consume. We were also launching new microservices to support our growth, which resulted in more data sources to ingest. We needed an architecture that allowed us to be agile and quickly adopt to meet our growth. It became clear that a serverless architecture was best positioned to meet those needs, so we started to design our 100% serverless infrastructure.

The first challenge was ingesting and storing data from our scooters in the field, events from our mobile app, operational metrics, and partner APIs. We use AWS Lambda to capture changes in our operational databases and mobile app and push the events to Amazon Kinesis Data Streams, which allows us to take action in real time. We also use Amazon Kinesis Data Firehose to write the data to Amazon Simple Storage Service (Amazon S3), which we use for analytics.

After we were in Amazon S3 and adequately partitioned as per its most common use cases (we partition by date, region, and business line, depending on the data source), we had to find a way to query this data for both data profiling (understanding structure, content, and interrelationships) and ad hoc analysis. For that we chose AWS Glue crawlers to catalog our data and Amazon Athena to read from the AWS Glue Data Catalog and run queries. However, ad hoc analysis and data profiling are relatively sporadic tasks in our team, because most of the data processing computing hours are actually dedicated to transforming the multiple data sources into our data warehouse, consolidating the raw data, modeling it, adding new attributes, and picking the data elements, which constitute 95% of our analytics and predictive needs.

This is where all the heavy lifting takes place. We parse through millions of scooter and user events generated daily (over 300 events per second) to extract actionable insight. We selected AWS Glue to perform this task. Our primary ETL job reads the newly added raw event data from Amazon S3, processes it using Apache Spark, and writes the results to our Amazon Redshift data warehouse. AWS Glue plays a critical role in our ability to scale on demand. After careful evaluation and testing, we concluded that AWS Glue ETL jobs meet all our needs and free us from procuring and managing infrastructure.

Architecture overview

The following diagram represents our current data architecture, showing two serverless data collection, processing, and reporting pipelines:

  • Operational databases from Amazon Relational Database Service (Amazon RDS) and MongoDB
  • IoT and application events, followed by Athena for data profiling and Amazon Redshift for reporting

Our data is curated and transformed multiple times a day using an automated pipeline running on AWS Glue. The team can now focus on analyzing the data and building machine learning (ML) applications.

We chose Amazon QuickSight as our business intelligence tool to help us visualize and better understand our operational KPIs. Additionally, we use Amazon Elastic Container Registry (Amazon ECR) to store our Docker images containing our custom ML algorithms and Amazon Elastic Container Service (Amazon ECS) where we train, evaluate, and host our ML models. We schedule our models to be trained and evaluated multiple times a day. Taking as input curated data about demand, conversion, and flow of scooters, we run the models to help us optimize fleet utilization for a particular city at any given time.

The following diagram represents how data from the data lake is incorporated into our ML training, testing, and serving system. First, our developers work in the application code and commit their changes, which are built into new Docker images by our CI/CD pipeline and stored in the Amazon ECR registry. These images are pushed into Amazon ECS and tested in DEV and UAT environments before moving to PROD (where they are triggered by the Amazon ECS task scheduler). During their execution, the Amazon ECS tasks (some train the demand and usage forecasting models, some produce the daily and hourly predictions, and others optimize the fleet distribution to satisfy the forecast) read their configuration and pull data from Amazon S3 (which has been previously produced by scheduled AWS Glue jobs), finally storing their results back into Amazon S3. Executions of these pipelines are tracked via MLFlow (in a dedicated Amazon Elastic Compute Cloud (Amazon EC2) server) and the final result indicating the fleet operations required is fit into a Kepler map, which is then consumed by the operators on the field.

Conclusion

We at Wind Mobility place data at the forefront of our operations. For that, we need our data infrastructure to be as flexible as the industry and the context we operate in, which is why we chose serverless. Over the course of a year, we have built a data lake, a data warehouse, a BI suite, and a variety of (production) data science applications. All of that with a very small team.

Also, within the last 12 months, we have scaled up several of our data pipelines by a factor of 10, without slowing our momentum or redesigning any part of our architecture. When it came to double our fleet in 1 week and increase the frequency at which we capture data from scooters by a factor of 10, our serverless data architecture scaled with no issues. This allowed us to focus on adding value by simplifying our operation, reacting to changes quickly, and delighting our users.

We have measured our success in multiple dimensions:

  • Speed – Serverless is faster to deploy and expand; we believe we have reduced our time to market for the entire infrastructure by a factor of 2
  • Visibility – We have 360 degree visibility of our operations worldwide, accessible by our city managers, finance team, and management board
  • Optimized fleet deployment – We know, at any minute of the day, the number of scooters that our customers need over the next few hours, which reduces unsatisfied demand by more than 50%

If you face a similar challenge, our advice is clear: go fully serverless and use the spectrum of solutions available from AWS.

Follow us and discover more about Wind Mobility on Facebook, Instagram and LinkedIn.

 


About the Author

Pablo Giner is Head of BI at Wind Mobility. Pablo’s background is in wheels (motorcycle racing > vehicle engineering > collision insurance > eScooters sharing…) and for the last few years he has specialized in forming and developing data teams. At Wind Mobility, he leads the data function (data engineering + analytics + data science), and the project he is most proud of is what they call smart fleet rebalancing, an AI backed solution to reposition their fleet in real-time. “In God we trust. All others must bring data.” – W. Edward Deming

 

 

 

Process data with varying data ingestion frequencies using AWS Glue job bookmarks

Post Syndicated from Dipankar Kushari original https://aws.amazon.com/blogs/big-data/process-data-with-varying-data-ingestion-frequencies-using-aws-glue-job-bookmarks/

We often have data processing requirements in which we need to merge multiple datasets with varying data ingestion frequencies. Some of these datasets are ingested one time in full, received infrequently, and always used in their entirety, whereas other datasets are incremental, received at certain intervals, and joined with the full datasets to generate output. To address this requirement, this post demonstrates how to build an extract, transform, and load (ETL) pipeline using AWS Glue.

Using AWS Glue

AWS Glue provides a serverless environment to extract, transform, and load a large number of datasets from several sources for analytics purposes. It has a feature called job bookmarks to process incremental data when rerunning a job on a scheduled interval. A job bookmark is composed of the states for various job elements, such as sources, transformations, and targets. This is done by persisting state information from a job run that helps AWS Glue prevent reprocessing old data.

For this use case, we use an AWS Glue job with job bookmarks enabled to process files received in varying frequencies (a full dataset signifying files that are received one time, and incremental datasets signifying files that are received in certain regular intervals). These files are merged together. In addition to enabling job bookmarks, we also use an optional parameter transformation_ctx (transformation context) in an AWS Glue PySpark dynamic frame. This acts as a unique identifier for the ETL operator instance to identify state information within a job bookmark for a given operator. AWS Glue uses transformation_ctx to index the key to the bookmark state.

You can capture and maintain state information for incremental datasets and avoid reprocessing by using transformation context. Transformation context is omitted for the full dataset file, which results in the job run state information not getting captured for the full dataset and allowing it to participate in the next processing event in its entirety. Even though the job bookmark flag is enabled at the AWS Glue job level, because transformation context is omitted for the full dataset, every time the job runs, the entire data from the full dataset is used as part of the job. In contrast, only the newly arrived datasets are processed for the incremental datasets.

Solution overview

To demonstrate the job bookmark utility of AWS Glue, we use TLC Trip Record Data datasets. We use NYC yellow taxi trip data monthly files as the incremental dataset, and NYC taxi zone lookup as the full dataset. The monthly yellow taxi trip data has a field named PULocationID (where a customer was picked up), which is joined with the LocationID field from the NYC taxi zone lookup file to create an output dataset that contains Borough, Zone, and service_zone from the NYC taxi zone lookup dataset and all the fields (except the PULocationID field) from the monthly NYC taxi trip data file.

The following diagram depicts a high-level architecture of the process.

Descriptions of Diagram

  • Two Amazon S3 Raw bucket locations are used for storing incoming CSV source data (NYC taxi monthly files (Incremental Dataset) and NYC Taxi lookup file (Full Dataset)).
  • A Bookmark enabled glue Job joins data between monthly trip data file and the taxi zone lookup file to generate output parquet files and creates NYC taxi trip table in Glue Data Catalog and Redshift database.
  • S3 Curated Bucket is used to store NYC Taxi monthly processed parquet files.

Creating the AWS CloudFormation stack

You use the following AWS CloudFormation template to create the below mentioned resources in your preferred AWS account and Region:

Additionally, make sure you have an Amazon EC2 key pair created in the account and Region you’re launching the stack from.

To provide the stack parameters, complete the following steps:

  1. For Stack name, enter BigDataBlog-GlueBookmark-Stack.

  1. For RedshiftClusterIdentifier, enter bigdatablogrscluster.
  2. For NodeType, choose large.
  3. For NumberOfNodes, choose 2.
  4. For DatabaseName, enter bigdatablogdev.

  1. For MasterUserName, enter bigdatabloguser.
  2. For MasterUserPassword, enter a password for the master user account.
  3. For Maintenancewindow, enter sun:05:00-sun:05:30.
  4. For EC2InstanceType, choose micro.
  5. For SubscriptionEmail, enter your preferred email.
  6. For MyIPAddressCidr, enter your IP address.

You can find your IP Address by browsing https://www.whatismyip.com/ and looking up the value for My Public IPv4 is:. Add /32 at the end to make it CIDR-compatible and most restrictive.

  1. For DestinationPrefixListId, enter your prefix list ID.

To find your ID, set AWS credentials by entering aws configure in the command prompt. Run aws ec2 describe-prefix-lists to get the PrefixListId where PrefixListName is com.amazonaws.<<AWS region>>.s3 from the output.

  1. For NewS3BucketName, enter the name of your S3 bucket.

  1. For gluedatabase, enter bigdatabloggluedb.
  2. For EC2KeyName, enter the name of your key pair.

For instructions on creating a stack, see Creating a Stack on the AWS CloudFormation Console.

Make sure the stack is complete before moving to the next steps.

Creating the AWS Glue job

To create your AWS Glue job, complete the following steps:

  1. Download NYC yellow monthly trip data for October 2019 and November 2019 and save them under the s3://<<Your S3 Bucket>>/tripdata/ prefix.
  2. Download the NYC Taxi Zone lookup table and save it under the s3://<<Your S3 Bucket>>/tripdata-lookup/ prefix.
  3. Use the following PySpark script and change the piece of the code enclosed inside <<…>>.

You can find the values for the following keys on the Outputs tab for the CloudFormation stack:

    • S3Bucket
    • Snstopic

You can find the values for the following keys on the Parameters tab for the CloudFormation stack:

    • EC2KeyName
    • MyIPAddressCidr
    • NewS3BucketName
    • SubscriptionEmail

  1. When the AWS Glue script is ready, upload it to the S3 bucket under the s3://<<Your S3 Bucket>>/glue-script/ prefix.

You refer to this when you create the AWS Glue job.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Create job.
  3. For Name, enter a name for the job. For more information about AWS Glue job naming, see Jobs.
  4. For IAM role, choose the role the CloudFormation template created. Use the value for the key Glueaccessrole from the stack outputs.
  5. For Type, choose Spark.
  6. For Glue version, choose Spark 2.4, Python 3 (Glue Version 1.0).
  7. For This job runs, choose An existing script that you provide.
  8. For S3 path where the script is stored, choose the script file that you saved earlier under the s3://<<Your S3 Bucket>>/Glue-script/ prefix.
  9. In the Advanced properties section, for Job bookmark, choose Enable.
  10. For Catalog options, select Use Glue Data Catalog as the Hive metastore.
  11. For Connections, enter the value of the key GlueConnection from the stack outputs.
  12. Choose Save job and edit script.

Creating an Amazon Redshift database schema

Before you run the AWS Glue job, you need to connect to the Amazon Redshift cluster and create an Amazon Redshift database schema named Glue_bookmark_redshift_schema. To connect to the cluster, use one of the JDBC client-based SQL tools, such as SQL Workbench/J. For instructions, see How can I access a private Amazon Redshift cluster from my local machine?

To access the cluster, you use the Amazon Redshift master user bigdatabloguser (the value for MasterUserName on the Parameters tab of the CloudFormation stack) and the password you provided when creating the stack.

Running AWS Glue job

The Glue Job takes only one argument; name of the file being processed. Pass the file name, such as yellow_tripdata_2019-10.csv, while processing that file. This enables you to track the records that belong to a specific file so that it’s easier to evaluate the result of multiple job runs using different files.

When the Glue job run is successful, you can see the output Parquet files under the /tripdata-joined-output/ prefix inside the S3 bucket you created by running the CloudFormation template. You can also use Amazon Athena to query the data from the table created in the Data Catalog. For more information, see Running SQL Queries Using Amazon Athena.

Query the Amazon Redshift database table named redshift_bookmark_table and review the output.

Explaining the solution

A bookmark-enabled AWS Glue job (in PySpark) is created that reads the NYC yellow taxi trip’s monthly file, joins it with NYC taxi zone lookup file, produces files in Parquet format, and saves them in an Amazon s3 location.

A Data Catalog table is created that refers to the Parquet files’ location in Amazon S3. The resulting dataset is also loaded into an Amazon Redshift table using the AWS Glue PySpark job.

The AWS Glue job bookmark transformation context is used while the AWS Glue dynamic frame is created by reading a monthly NYC taxi file, whereas the transformation context is disabled while reading and creating the dynamic frame for the taxi zone lookup file (because the entire file is required for processing each monthly trip file). This allows you to process each monthly trip file exactly one time and reuse the entire taxi zone lookup file as many times as required because the missing transformation context for the lookup file doesn’t allow the bookmark context to be set for that file.

When a new NYC trip data monthly file arrives and the AWS Glue job runs, it only processes the newly arrived monthly file and ignores any previously processed monthly files. Similarly, when the Data Catalog table data is copied into Amazon Redshift, it only copies the newly processed underlying Parquet files’ data and appends it to the Amazon Redshift table. At this time the transformation context is enabled to utilize the job bookmark, and the AWS Glue dynamic frame is created by reading the Data Catalog table.

The following PySpark code uses the transformation context to create an AWS Glue dynamic frame while reading the monthly incremental file:

taxidata = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths": [InputDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'},transformation_ctx = "taxidata")

However, the following code omits transformation context when creating the AWS Glue dynamic frame for the lookup file:

Lookupdata  = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths":[InputLookupDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'})

Additionally, the following code uses the transformation context while reading the Data Catalog table, which is loaded into an Amazon Redshift table:

datasource0 = GlueContext.create_dynamic_frame.from_catalog(database = Glue_catalog_database, table_name = Glue_table_name, transformation_ctx = "datasource0")

You can see in the screenshot below that the 2019 October yellow taxi trip data file has arrived for processing (the incremental dataset).

To process each month’s data, you need the taxi zone lookup (full dataset).

The following screenshot shows the output of the AWS Glue job after processing the 2019 October trip data, saved in Parquet format.

The following two screenshots show the Amazon Redshift table, displaying the count of records for the October 2019 taxi data and only October 2019 taxi data file has been processed so far, respectively

The following screenshot shows that the November 2019 NYC taxi data file has arrived for processing.

The following screenshot shows the output of the AWS Glue job after processing the 2019 November trip data, saved in Parquet format. The job only processed the November data and ignored the October data (to be reprocessed) because the job bookmark and transformation context was enabled.

The following screenshot shows that the Amazon Redshift table now has both October and November data and shows the total record count.

The following screenshot shows individual record count for each month.

Querying with Athena

You can also review the dataset in Athena, which uses the same Glue Data Catalog. The following screenshot of an Athena query shows the Data Catalog table has both October and November data, with the total record count.

The following screenshot of an Athena query shows the individual record count for each month.

The following screenshot shows the location information, including borough, zone, and service zone, which is available in the taxi zone lookup and is joined with the October taxi trip data.

The following screenshot shows the output for the same query on the November data.

Cleaning up

When you’re done using this solution, you should delete the CloudFormation stack to avoid incurring any further charges.

Conclusion

This post describes how you can merge datasets received in different frequencies as part of your ETL pipeline processing using AWS Glue job bookmarks. The use case demonstrated how to use job bookmarks and transformation context to build an ETL pipeline for processing several incremental datasets.

 


About the Authors

Dipankar is a Senior Data Architect with AWS Professional Services, helping customers build analytics platform and solutions. He has a keen interest in distributed computing. Dipankar enjoys spending time playing chess and watching old Hollywood movies.

 

 

 

Ashok Padmanabhan is a big data consultant with AWS Professional Services, helping customers build big data and analytics platform and solutions. When not building and designing data lakes, Ashok enjoys spending time at beaches near his home in Florida.

Moovit embraces data lake architecture by extending their Amazon Redshift cluster to analyze billions of data points every day

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/moovit-embraces-data-lake-architecture-by-extending-their-amazon-redshift-cluster-to-analyze-billions-of-data-points-every-day/

Amazon Redshift is a fast, fully managed, cloud-native data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing business intelligence tools.

Moovit is a leading Mobility as a Service (MaaS) solutions provider and maker of the top urban mobility app. Guiding over 800 million users in more than 3,200 cities across 103 countries to get around town effectively and conveniently, Moovit has experienced exponential growth of their service in the last few years. The company amasses up to 6 billion anonymous data points a day to add to the world’s largest repository of transit and urban mobility data, aided by Moovit’s network of more than 685,000 local editors that help map and maintain local transit information in cities that would otherwise be unserved.

Like Moovit, many companies today are using Amazon Redshift to analyze data and perform various transformations on the data. However, as data continues to grow and become even more important, companies are looking for more ways to extract valuable insights from the data, such as big data analytics, numerous machine learning (ML) applications, and a range of tools to drive new use cases and business processes. Companies are looking to access all their data, all the time, by all users and get fast answers. The best solution for all those requirements is for companies to build a data lake, which is a centralized repository that allows you to store all your structured, semi-structured, and unstructured data at any scale.

With a data lake built on Amazon Simple Storage Service (Amazon S3), you can easily run big data analytics using services such as Amazon EMR and AWS Glue. You can also query structured data (such as CSV, Avro, and Parquet) and semi-structured data (such as JSON and XML) by using Amazon Athena and Amazon Redshift Spectrum. You can also use a data lake with ML services such as Amazon SageMaker to gain insights.

Moovit uses an Amazon Redshift cluster to allow different company teams to analyze vast amounts of data. They wanted a way to extend the collected data into the data lake and allow additional analytical teams to access more data to explore new ideas and business cases.

Additionally, Moovit was looking to manage their storage costs and evolve to a model that allowed cooler data to be maintained at the lowest cost in S3, and maintain the hottest data in Redshift for the most efficient query performance. The proposed solution implemented a hot/cold storage pattern using Amazon Redshift Spectrum and reduced the local disk utilization on the Amazon Redshift cluster to make sure costs are maintained. Moovit is currently evaluating the new RA3 node with managed storage as an additional level of flexibility that will allow them to easily scale the amount of hot/cold storage without limit.

In this post we demonstrate how Moovit, with the support of AWS, implemented a lake house architecture by employing the following best practices:

  • Unloading data into Amazon Simple Storage Service (Amazon S3)
  • Instituting a hot/cold pattern using Amazon Redshift Spectrum
  • Using AWS Glue to crawl and catalog the data
  • Querying data using Athena

Solution overview

The following diagram illustrates the solution architecture.

The solution includes the following steps:

  1. Unload data from Amazon Redshift to Amazon S3
  2. Create an AWS Glue Data Catalog using an AWS Glue crawler
  3. Query the data lake in Amazon Athena
  4. Query Amazon Redshift and the data lake with Amazon Redshift Spectrum

Prerequisites

To complete this walkthrough, you must have the following prerequisites:

  1. An AWS account.
  2. An Amazon Redshift cluster.
  3. The following AWS services and access: Amazon Redshift, Amazon S3, AWS Glue, and Athena.
  4. The appropriate AWS Identity and Access Management (IAM) permissions for Amazon Redshift Spectrum and AWS Glue to access Amazon S3 buckets. For more information, see IAM policies for Amazon Redshift Spectrum and Setting up IAM Permissions for AWS Glue.

Walkthrough

To demonstrate the process Moovit used during their data architecture, we use the industry-standard TPC-H dataset provided publicly by the TPC organization.

The Orders table has the following columns:

ColumnType
O_ORDERKEYint4
O_CUSTKEYint4
O_ORDERSTATUSvarchar
O_TOTALPRICEnumeric
O_ORDERDATEdate
O_ORDERPRIORITYvarchar
O_CLERKvarchar
O_SHIPPRIORITYint4
O_COMMENTvarchar
SKIPvarchar

Unloading data from Amazon Redshift to Amazon S3

Amazon Redshift allows you to unload your data using a data lake export to an Apache Parquet file format. Parquet is an efficient open columnar storage format for analytics. Parquet format is up to twice as fast to unload and consumes up to six times less storage in Amazon S3, compared with text formats.

To unload cold or historical data from Amazon Redshift to Amazon S3, you need to run an UNLOAD statement similar to the following code (substitute your IAM role ARN):

UNLOAD ('select o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, skip
FROM tpc.orders
ORDER BY o_orderkey, o_orderdate') 
TO 's3://tpc-bucket/orders/' 
CREDENTIALS 'aws_iam_role=arn:aws:iam::<account_number>:role/>Role<'
FORMAT AS parquet allowoverwrite PARTITION BY (o_orderdate);

It is important to define a partition key or column that minimizes Amazon S3 scans as much as possible based on the query patterns intended. The query pattern is often by date ranges; for this use case, use the o_orderdate field as the partition key.

Another important recommendation when unloading is to have file sizes between 128 MB and 512 MB. By default, the UNLOAD command splits the results to one or more files per node slice (virtual worker in the Amazon Redshift cluster) which allows you to use the Amazon Redshift MPP architecture. However, this can potentially cause files created by every slice to be small. In Moovit’s use case, the default UNLOAD using PARALLEL ON yielded dozens of small (MBs) files. For Moovit, PARALLEL OFF yielded the best results because it aggregated all the slices’ work into the LEADER node and wrote it out as a single stream controlling the file size using the MAXFILESIZE option.

Another performance enhancement applied in this use case was the use of Parquet’s min and max statistics. Parquet files have min_value and max_value column statistics for each row group that allow Amazon Redshift Spectrum to prune (skip) row groups that are out of scope for a query (range-restricted scan). To use row group pruning, you should sort the data by frequently-used columns. Min/max pruning helps scan less data from Amazon S3, which results in improved performance and reduced cost.

After unloading the data to your data lake, you can view your Parquet file’s content in Amazon S3 (assuming it’s under 128 MB). From the Actions drop-down menu, choose Select from.

You’re now ready to populate your Data Catalog using an AWS Glue crawler.

Creating a Data Catalog with an AWS Glue crawler

To query your data lake using Athena, you must catalog the data. The Data Catalog is an index of the location, schema, and runtime metrics of the data.

An AWS Glue crawler accesses your data store, extracts metadata (such as field types), and creates a table schema in the Data Catalog. For instructions, see Working with Crawlers on the AWS Glue Console.

Querying the data lake in Athena

After you create the crawler, you can view the schema and tables in AWS Glue and Athena, and can immediately start querying the data in Athena. The following screenshot shows the table in the Athena Query Editor.

Querying Amazon Redshift and the data lake using a unified view with Amazon Redshift Spectrum

Amazon Redshift Spectrum is a feature of Amazon Redshift that allows multiple Redshift clusters to query from same data in the lake. It enables the lake house architecture and allows data warehouse queries to reference data in the data lake as they would any other table. Amazon Redshift clusters transparently use the Amazon Redshift Spectrum feature when the SQL query references an external table stored in Amazon S3. Large multiple queries in parallel are possible by using Amazon Redshift Spectrum on external tables to scan, filter, aggregate, and return rows from Amazon S3 back to the Amazon Redshift cluster.

Following best practices, Moovit decided to persist all their data in their Amazon S3 data lake and only store hot data in Amazon Redshift. They could query both hot and cold datasets in a single query with Amazon Redshift Spectrum.

The first step is creating an external schema in Amazon Redshift that maps a database in the Data Catalog. See the following code:

CREATE EXTERNAL SCHEMA spectrum 
FROM data catalog 
DATABASE 'datalake' 
iam_role 'arn:aws:iam::<account_number>:role/mySpectrumRole'
CREATE EXTERNAL DATABASE IF NOT EXISTS;

After the crawler creates the external table, you can start querying in Amazon Redshift using the mapped schema that you created earlier. See the following code:

SELECT * FROM spectrum.orders;

Lastly, create a late binding view that unions the hot and cold data:

CREATE OR REPLACE VIEW lake_house_joint_view AS (SELECT * FROM public.orders WHERE o_orderdate >= dateadd(‘day’,-90,date_trunc(‘day’,getdate())) 
UNION ALL SELECT * FROM spectrum.orders WHERE o_orderdate < dateadd(‘day’,-90,date_trunc(‘day’,getdate())) WITH NO SCHEMA BINDING;

Summary

In this post, we showed how Moovit unloaded data from Amazon Redshift to a data lake. By doing that, they exposed the data to many additional groups within the organization and democratized the data. These benefits of data democratization are substantial because various teams within Moovit can access the data, analyze it with various tools, and come up with new insights.

As an additional benefit, Moovit reduced their Amazon Redshift utilized storage, which allowed them to maintain cluster size and avoid additional spending by keeping all historical data within the data lake and only hot data in the Amazon Redshift cluster. Keeping only hot data on the Amazon Redshift cluster prevents Moovit from deleting data frequently, which saves IT resources, time, and effort.

If you are looking to extend your data warehouse to a data lake and leverage various tools for big data analytics and machine learning (ML) applications, we invite you to try out this walkthrough.

 


About the Authors

Yonatan Dolan is a Business Development Manager at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value.

 

 

 

 

Alon Gendler is a Startup Solutions Architect at Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud.

 

 

 

 

Vincent Gromakowski is a Specialist Solutions Architect for Amazon Web Services.

 

 

Build an end to end, automated inventory forecasting capability with AWS Lake Formation and Amazon Forecast

Post Syndicated from Syed Jaffry original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-automated-inventory-forecasting-capability-with-aws-lake-formation-and-amazon-forecast/

Amazon Forecast is a fully managed service that uses machine learning (ML) to generate highly accurate forecasts without requiring any prior ML experience. Forecast is applicable in a wide variety of use cases, including estimating product demand, inventory planning, and workforce planning.

With Forecast, there are no servers to provision or ML models to build manually. Additionally, you only pay for what you use, and there is no minimum fee or upfront commitment. To use Forecast, you only need to provide historical data for what you want to forecast, and any additional related data that may influence your forecasts. The latter may include both time-varying data, such as price, events, and weather, and categorical data, such as color, genre, or region. The service automatically trains and deploys ML models based on your data and provides you with a custom API to retrieve forecasts.

This post demonstrates how you can automate the data extraction, transformation, and use of Forecast for the use case of a retailer that requires recurring replenishment of inventory. You achieve this by using AWS Lake Formation to build a secure data lake and ingest data into it, orchestrate the data transformation using an AWS Glue workflow, and visualize the forecast results in Amazon QuickSight.

Use case background

Retailers have a recurring need to replenish inventory. For example, consider a clothing retailer that typically sells through its e-commerce and physical store channels. You need to maintain optimum levels of inventory on hand to meet demand while minimizing warehouse costs. Some of the common questions you need to answer for effective inventory management are:

  • What is the optimal quantity of inventory to reorder from my supplier for my next sales cycle?
  • What should the composition of product SKUs be in the purchase order to the supplier?
  • How do I most effectively determine the right mix and quantity of products to stock at individual retail store locations?

You can use Forecast to answer these questions. You extract data from the source systems, apply transformations to make the data ready for use in Forecast, and use Forecast to load, train, and forecast.

The following diagram shows the end-to-end system architecture of the proposed solution using Lake Formation, AWS Glue, and Amazon QuickSight.

You use Lake Formation to manage governance and access control on the data lake. Additionally, you use the following resources:

  1. Lake Formation blueprint to ingest sales data into a data lake
  2. AWS Lambda and Amazon S3 event notification to trigger an AWS Glue workflow
  3. AWS Glue workflow to trigger the execution of the data transform AWS Glue job
  4. AWS Glue workflow to orchestrate the three steps within Forecast (load, train, forecast)
  5. Forecast to export the forecast results into the data lake
  6. AWS Glue to trigger a crawler on the exported forecast results
  7. Amazon Athena and Amazon QuickSight to visualize the exported forecast results

Setting up the required IAM policies

Before you get started, you need to set up the required IAM policies. Complete the following steps:

  1. Sign in to the IAM console as a user with the AdministratorAccess AWS managed policy.
  2. Create an IAM user named report_builder to use when building your Amazon QuickSight analysis report and dashboard for visualization.
  3. Grant the AmazonAthenaFullAccess policy to the user.In the following steps, you create an IAM role for the AWS Glue jobs, crawler, and workflow to assume during their execution.
  4. On the IAM console, choose Roles.
  5. Choose Create role.
  6. On the Create role page, choose AWS service, and then choose Glue.
  7. Choose Next: Permissions.
  8. From the list of available policies, search for the AWSGlueServiceRole policy and select it.
  9. Name the role GLUE_WORKFLOW_ROLE.
  10. Choose Create role.
  11. On the Roles page, search for and choose GLUE_WORKFLOW_ROLE.
  12. On the Trust relationships tab, choose Edit trust relationship.
  13. Add the following assume role for Forecast :
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "forecast.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }

  14. Choose Update Trust Policy.
  15. On the role Summary page, on the Permissions tab, choose Add inline policy.
  16. Add the following inline policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "lakeformation:GetDataAccess",
                    "lakeformation:GrantPermissions"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::forecast-blog-processed/*",
                    "arn:aws:s3:::forecast-blog-published/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
      "forecast:CreateDataset",
      "forecast:CreateDatasetGroup",
      "forecast:CreateDatasetImportJob",
      "forecast:CreateForecast",
      "forecast:CreateForecastExportJob",
      "forecast:CreatePredictor",
      "forecast:DescribeDataset",
      "forecast:DescribeDatasetGroup",
      "forecast:DescribeDatasetImportJob",
      "forecast:DescribeForecast",
      "forecast:DescribeForecastExportJob",
      "forecast:DescribePredictor",
      "forecast:ListDatasetGroups",
      "forecast:ListDatasetImportJobs",
      "forecast:ListDatasets",
      "forecast:ListForecastExportJobs",
      "forecast:ListForecasts",
      "forecast:ListPredictors",
      "forecast:UpdateDatasetGroup"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:PassRole",
                    "iam:GetRole"
                ],
                "Resource": [
                    "arn:aws:iam::*:role/AWSGlueServiceRole*",
                    "arn:aws:iam::[your aws account id]:role/GLUE_WORKFLOW_ROLE"
                ],
                "Condition": {
                    "StringLike": {
                        "iam:PassedToService": [
                            "glue.amazonaws.com"
                        ]
                    }
                }
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:PassRole",
                    "iam:GetRole"
                ],
                "Resource": "arn:aws:iam::[your aws account id]:role/GLUE_WORKFLOW_ROLE",
                "Condition": {
                    "StringEquals": {
                        "iam:PassedToService": "forecast.amazonaws.com"
                    }
                }
            }
        ]
    }

Setting up your data lake storage

Next, you need a data lake to use in this automation. You create S3 buckets for data storage and apply appropriate security and governance. If you already have a data lake on Amazon S3, you can continue to use those S3 buckets with Lake Formation.

On the Amazon S3 console, create the following buckets:

  • forecast-blog-landing (for raw data ingest)
  • forecast-blog-processed (for the transformed data)
  • forecast-blog-published (for end consumers to access results)

This post includes walkthrough instructions for the landing folder, which you can repeat for the other two folders.

Enabling centralized access control on your data lake

You use Lake Formation’s centralized access control to enable access to the underlying S3 buckets for users and roles. With this model, you don’t need to create any additional IAM access policies or S3 bucket policies for your users and roles.

  1. Sign in to the AWS Lake Formation console as a data lake administrator IAM user.For instructions on setting up a data lake administrator user, see Create a Data Lake Administrator.To manage access control from Lake Formation, you register the three S3 buckets you created earlier as data lake locations with Lake Formation.
  1. On the Lake Formation dashboard, choose Register Location.
  2. For Amazon S3 path, enter your S3 bucket location (s3://forecast-blog-landing).
  3. Choose Register location.
  4. Repeat these steps for the processed and published.

Setting up the Lake Formation data catalog for your data lake

You create three databases in the Lake Formation data catalog, one for each S3 bucket you created earlier. All transformations done via AWS Glue operate on the databases in this catalog.

  1. On the Lake Formation console, under Data catalog, choose Databases.
  2. Choose Create database.
  3. In the Database details section, for Name, enter the name for the database (forecast-blog-landing-db).
  4. For Location, enter the location to the corresponding S3 bucket.
  5. For Description, add an optional description.
  6. Deselect Use only IAM access control for new tables in this database.
  7. Choose Create database.
  8. Repeat the above steps for the processed and published.
  9. On the Databases page, select the database you just created.
  10. From the Actions drop-down menu, choose Grant.
  11. For IAM users and roles, choose the GLUE_WORKFLOW_ROLE you created earlier.
  12. For Database permissions, select Create table.
  13. Choose Grant.At this stage, you have set up your data lake with Lake Formation. The following diagram illustrates your resources so far.

You’re now ready to ingest sales data into your data lake.

Ingesting data

This post uses an example MySQL table called sales, which contains 2 years of sales history of a single product. The schema of the table is as follows:

Column NameColumn Type
InvoiceNoint
StockCodeint
Descriptionvarchar(200)
Quantityint
InvoiceDatevarchar(50)
UnitPricefloat
CustomerIDint
StoreLocationvarchar(100)
CustomerNamevarchar(200)
  1. To begin the ingestion, create the source database.The following CloudFormation template creates a free-tier RDS MySQL instance with a database named sales_schema within a new VPC with the required sample data for this post. The template deploys in the us-west-2 Region.

  1. Create a new Glue connection for the source database.
  2. On the Lake Formation console, choose Blueprints.
  3. Choose Use a blueprint.
  4. Select Incremental database as the blueprint type for a regular ingest of sales data from the source relational database.
  5. Follow the prompts to complete the incremental blueprint setup.
  6. For Database connection, choose forecast-blog-db.
  7. For Source data path, enter sales_schema/sales.
  8. For Target database, choose forecast-blog-landing-db.
  9. For Target storage location, enter s3://forecast-blog-landing.
  10. For Data format, choose Parquet.
  11. For Workflow name, enter forecast-blog-wf.
  12. For IAM role, choose GLUE_WORKFLOW_ROLE.
  13. For Table prefix, enter blog.

The target location must be the landing S3 bucket that you created earlier, and table prefix must be set to blog. This is the raw data that subsequent AWS Glue jobs transform and process. For information about using an incremental database blueprint, see Importing Data Using Workflows.

Start the blueprint after you create it.

Orchestrating data transformation and forecast generation

When you have the raw sales data ingested into the data lake landing bucket, you execute a custom AWS Glue workflow to orchestrate the automation of data transformation, AWS Forecast load/train/forecast execution, and making the forecasts available for business dashboards. Complete the following steps:

  1. Create an AWS Glue crawler to crawl the published S3 bucket that you created earlier.
    1. Use the GLUE_WORKFLOW_ROLE that you created earlier.
  2. Create the following AWS Glue jobs to use in the forecasting automation.
    1. Create the data transformation job as a Spark job, and create the remaining jobs as Python shell (Python 3) jobs.
    2. For IAM role, use GLUE_WORKFLOW_ROLE.
  3. On the Connections page, choose Save job and edit script without selecting any connections. The following jobs are available on GitHub:

Next, you create a new AWS Glue workflow to orchestrate the entire automation. The workflow lets you build and orchestrate a sequence of AWS Glue jobs and crawlers via triggers to complete a complex process.

  1. On the AWS Glue console, choose Workflows.
  2. Choose Add workflow.
  3. For Workflow name, enter AmazonForecastWorkflow.
  4. For Description, add an optional description.
  5. For Default run properties, enter the keys and values in the following table.
KeyValue
landingDBforecast-blog-landing-db
landingDBTableblog_sales_schema_sales
processedBucketforecast-blog-processed
publishedBucketforecast-blog-published
  1. Choose Add workflow.
    After you create the workflow, you add triggers, jobs, and a crawler in your workflow.
  1. Choose the workflow you created earlier.
  2. Choose Add trigger.
  3. For name, enter StartWorkflow.
  4. For Trigger type, select On demand.
  5. Choose Add.
  6. On the Jobs tab, choose the job you created earlier.
  7. Choose Add.
  8. Choose Add node.
  9. Create a new trigger to watch the end of the transform job and start the data import job.
  10. For Name, enter StartDataImport.
  11. For Trigger type, select Event.
  12. For Trigger logic, leave as Start after ANY watched event.
  13. Choose Add.
  1. Choose Add node to the left of the trigger and choose the data transformation job you created earlier.
  2. Choose Add node to the right of the trigger and choose the import data job you created earlier.
  1. Repeat these steps to create the following triggers:
TriggerGlue job to completeGlue job to start
CheckImportTrigger

 

Importing data into Forecast

 

Checking the load data job status
StartPredictorTrigger

Checking the load data job status

 

Training the Forecast predictor

 

CheckPredictorTrigger

 

Training the Forecast predictor

 

Checking the train predictor job status
GenerateForecastTriggerChecking the train predictor job statusGenerating forecast
CheckForecastTriggerGenerating forecastChecking the forecast job status
ExportForecastTriggerChecking the forecast job statusExporting forecast to data lake published bucket
CheckExportTriggerExporting forecast to data lake published bucket

Checking the export forecast job status

 

StartCrawlerTrigger

 

Checking the export forecast job status

 

Crawler you created to crawl the published S3 bucket
  1. From the Actions drop-down menu, choose Run.

This starts the end-to-end forecasting process.

Visualizing your forecasts

To provide your users with a dashboard that refreshes regularly with new forecasts, set up an Amazon QuickSight report and a dashboard and connection to the data lake via Athena. You can use Amazon QuickSight and the Athena data source to access the forecast data and make visualizations.

First, you need to grant access to the forecast data to QuickSight. Identify the Amazon QuickSight service role in your account. Amazon QuickSight assumes the service role (aws-quicksight-service-role-v0) to interact with other AWS services. The service role is automatically created when you start using Amazon QuickSight.

  1. As the data lake administrator user, on the Lake Formation console, locate the table created by the AWS Glue workflow under your published-db.This is the table that has the exported forecast data to visualize.
  2. Grant Select access on this table to the Amazon QuickSight service role using the Grant action within Lake Formation.For more information, see Granting Data Catalog Permissions.You now create a dashboard to visualize the forecast data in Amazon QuickSight.
  3. On the Amazon QuickSight console, choose Manage data.
  4. Create a data source for Athena.
  5. For Data source name, enter forecast-blog-published-db.
  6. Choose Create data source.
  7. For Database, choose forecast-blog-published-db.
  8. Select the table that your crawler created for the exported forecast (forecast_blog_published).
  9. Choose Select.
  10. Choose Visualize.
  11. Create a new analysis on the dataset.
  12. Publish a visualization dashboard.The following screenshot is a QuickSight dashboard displaying your exported forecast. The dashboard was created from a line chart analysis with the columns p10, p50, and p90 values selected for the y-axis and date selected for the x-axis.

Forecast generates probabilistic forecasts so you can generate forecasts at different percentiles depending on your specific use case (for example, if under-stocking or over-stocking is critical to the business). The preceding graph represents the upper and lower bands of forecasted inventory values for the product in your sample data and a selected location (applied as a filter) for a 10-day period. Using this, you can decide on the optimum levels of stock to hold or order for that week.

The p10 is the lower boundary, meaning that there’s only a 10% chance that the actual value is below this line. However, P90 is the upper bound, meaning that there’s a 90% chance that the actual value is below this line. As your training data becomes more comprehensive, the p10 and p90 start to converge. You can also generate forecasts on custom quantiles of your choosing.

For conservative planning, choose a value closer to the p90, which means you’re willing to purchase more inventory than what you actually sell. For aggressive planning, choose a value closer to the p10, which means that you’re willing to accept the risk of running out of inventory.

Conclusion

In this post, you learned how to build an automated inventory forecasting capability for your business on AWS using AI through Forecast and Lake Formation. You learned how to set up a data lake on AWS with the required security governance using Lake Formation. You also learned how to automate the end-to-end process of ingesting sales data into your data lake and automating the data transformation; loading, training, and generating forecasts with Forecast; and making the forecasts accessible to your end-users via Amazon QuickSight visualizations.

 


About the Author

Syed Jaffry is a solutions architect with Amazon Web Services. He works with Financial Services customers to help them deploy secure, resilient, scalable and high performance applications in the cloud.

 

 

Build an AWS Well-Architected environment with the Analytics Lens

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/build-an-aws-well-architected-environment-with-the-analytics-lens/

Building a modern data platform on AWS enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools. Yet you may be unsure of how to get started and the impact of certain design decisions. To address the need to provide advice tailored to specific technology and application domains, AWS added the concept of well-architected lenses 2017. AWS now is happy to announce the Analytics Lens for the AWS Well-Architected Framework. This post provides an introduction of its purpose, topics covered, common scenarios, and services included.

The new Analytics Lens offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. The goal is to give you a consistent way to design and evaluate cloud architectures, based on the following five pillars:

  • Operational excellence
  • Security
  • Reliability
  • Performance efficiency
  • Cost optimization

The tool can help you assess the analytics workloads you have deployed in AWS by identifying potential risks and offering suggestions for improvements.

Using the Analytics Lens to address common requirements

The Analytics Lens models both the data architecture at the core of the analytics applications and the application behavior itself. These models are organized into the following six areas, which encompass the vast majority of analytics workloads deployed on AWS:

  1. Data ingestion
  2. Security and governance
  3. Catalog and search
  4. Central storage
  5. Processing and analytics
  6. User access

The following diagram illustrates these areas and their related AWS services.

There are a number of common scenarios where the Analytics Lens applies, such as the following:

  • Building a data lake as the foundation for your data and analytics initiatives
  • Efficient batch data processing at scale
  • Building a platform for streaming ingest and real-time event processing
  • Handling big data processing and streaming
  • Data-preparation operations

Whichever of these scenarios fits your needs, building to the principles of the Analytics Lens in the AWS Well-Architected Framework can help you implement best practices for success.

The Analytics Lens explains when and how to use the core services in the AWS analytics portfolio. These include Amazon Kinesis, Amazon Redshift, Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. It also explains how Amazon Simple Storage Service (Amazon S3) can serve as the storage for your data lake and how to integrate with relevant AWS security services. With reference architectures, best practices advice, and answers to common questions, the Analytics Lens can help you make the right design decisions.

Conclusion

Applying the lens to your existing architectures can validate the stability and efficiency of your design (or provide recommendations to address the gaps that are identified). AWS is committed to the Analytics Lens as a living tool; as the analytics landscape evolves and new AWS services come on line, we’ll update the Analytics Lens appropriately. Our mission will always be to help you design and deploy well-architected applications.

For more information about building your own Well-Architected environment using the Analytics Lens, see the Analytics Lens whitepaper.

Special thanks to the following individuals who contributed to building this resource, among many others who helped with review and implementation: Radhika Ravirala, Laith Al-Saadoon, Wallace Printz, Ujjwal Ratan, and Neil Mukerje.

Are there questions you’d like to see answered in the tool? Share your thoughts and questions in the comments.

 


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at Amazon Web Services. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

Optimize memory management in AWS Glue

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/

AWS Glue provides a serverless environment to prepare and process datasets for analytics using the power of Apache Spark. In the third post of the series, we discussed how AWS Glue can automatically generate code to perform common data transformations. We also looked at how you can use AWS Glue Workflows to build data pipelines that enable you to easily ingest, transform and load data for analytics.

Apache Spark provides several knobs to control how memory is managed for different workloads. However, this is not an exact science and applications may still run into a variety of out of memory (OOM) exceptions because of inefficient transformation logic, unoptimized data partitioning or other quirks in the underlying Spark engine. In this post of the series, we will go deeper into the inner working of a Glue Spark ETL job, and discuss how we can combine AWS Glue capabilities with Spark best practices to scale our jobs to efficiently handle the variety and volume of our data.

Scaling the Apache Spark driver

Apache Spark driver is responsible for analyzing the job, coordinating, and distributing work to tasks to complete the job in the most efficient way possible. In majority of ETL jobs, the driver is typically involved in listing table partitions and the data files in Amazon S3 before it compute file splits and work for individual tasks. The driver then coordinates tasks running the transformations that will process each file split. In addition, the driver needs to keep track of the progress of each task is making and collect the results at the end. The Spark driver may become a bottleneck when a job needs to process large number of files and partitions. AWS Glue offers five different mechanisms to efficiently manage memory on the Spark driver when dealing with a large number of files.

  1. Push down predicates: Glue jobs allow the use of push down predicates to prune the unnecessary partitions from the table before the underlying data is read. This is useful when you have a large number of partitions in a table and you only want to process a subset of them in your Glue ETL job. Pruning catalog partitions reduces both the memory footprint of the driver and the time required to list the files in the pruned partitions. Push down predicates are applied first to ignore unnecessary partitions before the job bookmark and other exclusions can further filter the list of files to be read from each partition. Below is an example to how to use push down predicates to only process data for events logged only on weekends.
    partitionPredicate ="date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"
    
    datasource = glue_context.create_dynamic_frame.from_catalog(
        database = "githubarchive_month", 
        table_name = "data", 
        push_down_predicate = partitionPredicate)

  2. Glue S3 Lister: AWS Glue provides an optimized mechanism to list files on S3 while reading data into a DynamicFrame. The Glue S3 Lister can be enabled by setting the DynamicFram’s additional_options parameter useS3ListImplementation to True. The Glue S3 Lister offers advantage over the default S3 list implementation by strictly iterating over the final list of filtered files to be read.
    datasource = glue_context.create_dynamic_frame.from_catalog(
        database = "githubarchive_month", 
        table_name = "data", 
        push_down_predicate = partitionPredicate,
        additional_options = {"useS3ListImplementation":True}
    )  

  3. Grouping: AWS Glue allows you to consolidate multiple files per Spark task using the file grouping feature. Grouping files together reduces the memory footprint on the Spark driver as well as simplifying file split orchestration. Without grouping, a Spark application must process each file using a different Spark task. Each task must then send mapStatus object containing the location information to the Spark driver. In our testing using AWS Glue standard worker type, we found that Spark applications processing more than roughly 650,000 files often cause the Spark driver to crash with an out of memory exception as shown by the following error message:
    # java.lang.OutOfMemoryError: Java heap space
    # -XX:OnOutOfMemoryError="kill -9 %p"
    # Executing /bin/sh -c "kill -9 12039"...

    • groupFiles allows you to group files within a Hive-style S3 partition (inPartition) and across S3 partitions (acrossPartition). groupSize is an optional field that allows you to configure the amount of data to be read from each file and processed by individual Spark tasks.
      dyf = glueContext.create_dynamic_frame_from_options("s3",
          {'paths': ["s3://input-s3-path/"],
          'recurse':True,
          'groupFiles': 'inPartition',
          'groupSize': '1048576'}, 
          format="json")

  4. Exclusions for S3 Paths: To further aid in filtering out files that are not required by the job, AWS Glue introduced a mechanism for users to provide a glob expression for S3 paths to be excluded. This speeds job processing while reducing the memory footprint on the Spark driver. The following code snippet shows how to exclude all objects ending with _metadata in the selected S3 path.
    dyf = glueContext.create_dynamic_frame_from_options("s3",
        {'paths': ["s3://input-s3-path/"],
        'exclusions': "\"[\"input-s3-path/**_metadata\"]\""}, 
        format="json")

    Exclusions for S3 Storage Classes: AWS Glue offers the ability to exclude objects based on their underlying S3 storage class. As the lifecycle of data evolve, hot data becomes cold and automatically moves to lower cost storage based on the configured S3 bucket policy, it’s important to make sure ETL jobs process the correct data. This is particularly useful when working with large datasets that span across multiple S3 storage classes using Apache Parquet file format where Spark will try to read the schema from the file footers in these storage classes. Amazon S3 offers 5 different storage classes which are STANDARD, INTELLIGENT_TIERING, STANDARD_IA, ONEZONE_IA, GLACIER, DEEP_ARCHIVE and REDUCED_REDUNDANCY. When reading data using DynamicFrames, you can specify a list of S3 storage classes you want to exclude. This feature leverages the optimized AWS Glue S3 Lister. The following example shows how to exclude files stored in GLACIER and DEEP_ARCHIVE storage classes.

    glueContext.create_dynamic_frame.from_catalog(
        database = "my_database",
        tableName = "my_table_name",
        redshift_tmp_dir = "",
        transformation_ctx = "my_transformation_context",
        additional_options = {
            "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"]
        )
    )

    GLACIER and DEEP_ARCHIVE storage classes only allow listing files and require an asynchronous S3 restore process to read the actual data. The following is the exception you will see when trying to access Glacier and Deep Archive storage classes from your Glue ETL job:

    java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
    The operation is not valid for the object's storage class (Service: Amazon S3; Status Code: 403; 
    Error Code: InvalidObjectState; Request ID: ), S3 Extended Request ID: (1)

  5. Optimize Spark queries: Inefficient queries or transformations can have a significant impact on Apache Spark driver memory utilization.Common examples include:
    • collect is a Spark action that collects the results from workers and return them back to the driver. In some cases the results may be very large overwhelming the driver. It is recommended to be careful while using collect as it can frequently cause Spark driver OOM exceptions as shown below:
      An error occurred while calling 
      z:org.apache.spark.api.python.PythonRDD.collectAndServe.
      Job aborted due to stage failure:
      Total size of serialized results of tasks is bigger than spark.driver.maxResultSize

    • Shared Variables: Apache Spark offers two different ways to share variables between Spark driver and executors: broadcast variables and accumulators. Broadcast variables are useful to provide a read-only copy of data or fact tables shared across Spark workers to improve map-side joins. Accumulators are useful to provide a writeable copy to implement distributed counters across Spark executors. Both should be used carefully and destroyed when no longer needed as they can frequently result in Spark driver OOM exceptions.

Scaling Apache Spark executors

Apache Spark executors process data in parallel. However, un-optimized reads from JDBC sources, unbalanced shuffles, buffering of rows with PySpark UDFs, exceeding off-heap memory on each Spark worker, skew in size of partitions, can all result in Spark executor OOM exceptions. We list below some of the best practices with AWS Glue and Apache Spark for avoiding these conditions that result in OOM exceptions.

  1. JDBC Optimizations: Apache Spark uses JDBC drivers to fetch data from JDBC sources such as MySQL, PostgresSQL, Oracle.
    • Fetchsize: By default, the Spark JDBC drivers configure the fetch size to zero. This means that the JDBC driver on the Spark executor tries to fetch all the rows from the database in one network round trip and cache them in memory, even though Spark transformation only streams through the rows one at a time. This may result in the Spark executor running out of memory with the following exception:
      WARN YarnAllocator: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      ERROR YarnClusterScheduler: Lost executor 4 on ip-10-1-2-96.ec2.internal: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      WARN TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3, ip-10-1-2-96.ec2.internal, executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

      In Spark, you can avoid this scenario by explicitly setting the fetch size parameter to a non-zero default value. With AWS Glue, Dynamic Frames automatically use a fetch size of 1,000 rows that bounds the size of cached rows in JDBC driver and also amortizes the overhead of network round-trip latencies between the Spark executor and database instance. The example below shows how to read from a JDBC source using Glue dynamic frames.

      val (url, database, tableName) = {
       ("jdbc_url", "db_name", "table_name")
       } 
      val source = glueContext.getSource(format, sourceJson)
      val dyf = source.getDynamicFrame

  • Spark’s Read Partitioning: Apache Spark by default uses only one executor to open up a JDBC connection with the database and read the entire table into a Spark dataframe. This can result in an unbalanced distribution of data processed across different executors. As a result, it is usually recommended to use a partitionColumn, lowerBound, upperBound, and numPartitions to enable reading in parallel from different executors. This allows for more balanced partitioning if there exists a column that has a uniform value distribution. However, Apache Spark restricts the partitionColumn to be one of numeric, date, or timestamp data types. For example:
    val df = spark.read.jdbc(url=jdbcUrl, 
        table="employees", partitionColumn="emp_no", 
        lowerBound=1L, upperBound=100000L, numPartitions=100, 
        fetchsize=1000, connectionProperties=connectionProperties)

  • Glue’s Read Partitioning: AWS Glue enables partitioning JDBC tables based on columns with generic types, such as string. This enables you to read from JDBC sources using non-overlapping parallel SQL queries executed against logical partitions of your table from different Spark executors. You can control partitioning by setting a hashfield or hashexpression. You can also control the number of parallel reads that are used to access your data by specifying hashpartitions. For best results, this column should have an even distribution of values to spread the data between partitions. For example, if your data is evenly distributed by month, you can use the month column to read each month of data in parallel. Based on the database instance type, you may like to tune the number of parallel connections by adjusting the hashpartitions. For example:
    glueContext.create_dynamic_frame.from_catalog(
        database = "my_database",
        tableName = "my_table_name",
        transformation_ctx = "my_transformation_context",
        additional_options = {
            'hashfield': 'month',
            'hashpartitions': '5'
        )
    )

  • Bulk Inserts: AWS Glue offers parallel inserts for speeding up bulk loads into JDBC targets. The following example uses a bulk size of two, which allows two inserts to happen in parallel. This is helpful for improving the performance of writes into databases such as Aur.
    val optionsMap = Map(
      "user" -> user,
      "password" -> pwd,
      "url" -> postgresEndpoint,
      "dbtable" -> table,
      "bulkSize" -> "2")
    val options = JsonOptions(optionsMap)
    val jdbcWrapper = JDBCWrapper(glueContext, options)
    glueContext.getSink("postgresql", options).writeDynamicFrame(dyf)

  1. Join Optimizations: One common reason for Apache Spark applications running out of memory is the use of un-optimized joins across two or more tables. This is typically a result of data skew due to the distribution of join columns or an inefficient choice of join transforms. Additionally, ordering of transforms and filters in the user script may limit the Spark query planner’s ability to optimize. There are 3 popular approaches to optimize join’s on AWS Glue.
    • Filter tables before Join: You should pre-filter your tables as much as possible before joining. This helps to minimize the data shuffled between the executors over the network. You can use AWS Glue push down predicates for filtering based on partition columns, AWS Glue exclusions for filtering based on file names, AWS Glue storage class exclusions for filtering based on S3 storage classes, and use columnar storage formats such as Parquet and ORC that support discarding row groups based on column statistics such as min/max of column values.
    • Broadcast Small Tables: Joining tables can result in large amounts of data being shuffled or moved over the network between executors running on different workers. Because of this, Spark may run out of memory and spill the data to physical disk on the worker. This behavior can be observed in the following log message:
      INFO [UnsafeExternalSorter] — Thread 168 spilling sort data of 3.1 GB to disk (0 time so far)

      In cases where one of the tables in the join is small, few tens of MBs, we can indicate Spark to handle it differently reducing the overhead of shuffling data. This is performed by hinting Apache Spark that the smaller table should be broadcasted instead of partitioned and shuffled across the network. The Spark parameter spark.sql.autoBroadcastJoinThreshold configures the maximum size, in bytes, for a table that will be broadcast to all worker nodes when performing a join. Apache Spark will automatically broadcast a table when it is smaller than 10 MB. You can also explicitly tell Spark which table you want to broadcast as shown in the following example:

      val employeesDF = employeesRDD.toDF
      va departmentsDF = departmentsRDD.toDF
      
      // materializing the department data
      val tmpDepartments = broadcast(departmentsDF.as("departments"))
      
      val joinedDF = employeesDF.join(broadcast(tmpDepartments), 
         $"depId" === $"id",  // join by employees.depID == departments.id 
         "inner")
      
      // Show the explain plan and confirm the table is marked for broadcast
      joinedDF.explain()
      
      == Physical Plan ==
      *BroadcastHashJoin [depId#14L], [id#18L], Inner, BuildRight
      :- *Range (0, 100, step=1, splits=8)
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
         +- *Range (0, 100, step=1, splits=8

  1. PySpark User Defined Functions (UDFs): Using PySpark UDFs can turn out to be costly for executor memory. This is because data must be serialized/deserialized when it is exchanged between the Spark executor JVM and the Python interpreter. The Python interpreter needs to process the serialized data in Spark executor’s off-heap memory. For datasets with large or nested records or when using complex UDFs, this processing can consume large amounts of off-heap memory and can lead to OOM exceptions resulting from exceeding the yarn memoryOverhead. Here what the error message looks like:
    ERROR YarnClusterScheduler: Lost executor 1 on ip-xxx:
    Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used.
    Consider boosting spark.yarn.executor.memoryOverhead

    Similarly, data serialization can be slow and often leads to longer job execution times. To avoid such OOM exceptions, it is a best practice to write the UDFs in Scala or Java instead of Python. They can be imported by providing the S3 Path of Dependent Jars in the Glue job configuration. Another optimization to avoid buffering of large records in off-heap memory with PySpark UDFs is to move select and filters upstream to earlier execution stages for an AWS Glue script.

  2. Incremental processing: Processing large datasets in S3 can result in costly network shuffles, spilling data from memory to disk, and OOM exceptions. To avoid these scenarios, it is a best practice to incrementally process large datasets using AWS Glue Job Bookmarks, Push-down Predicates, and Exclusions. Concurrent job runs can process separate S3 partitions and also minimize the possibility of OOMs caused due to large Spark partitions or unbalanced shuffles resulting from data skew. Vertical scaling with higher memory instances can also mitigate the chances of OOM exceptions because of insufficient off-heap memory or Apache Spark applications that can not be readily optimized.

You can also use Glue’s G.1X and G.2X worker types that provide more memory and disk space to vertically scale your Glue jobs that need high memory or disk space to store intermediate shuffle output. Vertical scaling for Glue jobs is discussed in our first blog post of this series.

Conclusion

In this post, we discussed a number of techniques to enable efficient memory management for Apache Spark applications when reading data from Amazon S3 and compatible databases using a JDBC connector. We described how Glue ETL jobs can utilize the partitioning information available from AWS Glue Data Catalog to prune large datasets, manage large number of small files, and use JDBC optimizations for partitioned reads and batch record fetch from databases. You can use some or all of these techniques to help ensure your ETL jobs perform well.

In the next post, we will describe how you can develop Apache Spark applications and ETL scripts locally from your laptop itself with the Glue Spark Runtime containing these optimizations. You can build against the Glue Spark Runtime available from Maven or using a Docker container for cross-platform support. You can develop using Jupyter/Zeppelin notebooks, or your favorite IDE such as PyCharm. Next, you can deploy those Spark applications on AWS Glue’s serverless Spark platform.

 


About the Author

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

Build an automatic data profiling and reporting solution with Amazon EMR, AWS Glue, and Amazon QuickSight

Post Syndicated from Francesco Marelli original https://aws.amazon.com/blogs/big-data/build-an-automatic-data-profiling-and-reporting-solution-with-amazon-emr-aws-glue-and-amazon-quicksight/

In typical analytics pipelines, one of the first tasks that you typically perform after importing data into your data lakes is data profiling and high-level data quality analysis to check the content of the datasets. In this way, you can enrich the basic metadata that contains information such as table and column names and their types.

The results of data profiling help you determine whether the datasets contain the expected information and how to use them downstream in your analytics pipeline. Moreover, you can use these results as one of the inputs to an optional data semantics analysis stage.

The great quantity and variety of data in modern data lakes make unstructured manual data profiling and data semantics analysis impractical and time-consuming. This post shows how to implement a process for the automatic creation of a data profiling repository, as an extension of AWS Glue Data Catalog metadata, and a reporting system that can help you in your analytics pipeline design process and by providing a reliable tool for further analysis.

This post describes in detail the application Data Profiler for AWS Glue Data Catalog and provides step-by-step instructions of an example implementation.

Overview and architecture

The following diagram illustrates the architecture of this solution.

Data Profiler for AWS Glue Data Catalog is an Apache Spark Scala application that profiles all the tables defined in a database in the Data Catalog using the profiling capabilities of the Amazon Deequ library and saves the results in the Data Catalog and an Amazon S3 bucket in a partitioned Parquet format. You can use other analytics services such as Amazon Athena and Amazon QuickSight to query and visualize the data.

For more information about the Amazon Deequ data library, see Test data quality at scale with Deequ or the source code on the GitHub repo.

Metadata can be defined as data about data. Metadata for a table contains information like the table name and other attributes, column names and types, and the physical location of the files that contain the data. The Data Catalog is the metadata repository in AWS, and you can use it with other AWS services like Athena, Amazon EMR, and Amazon Redshift.

After you create or update the metadata for tables in a database (for example, adding new data to the table), either with an AWS Glue crawler or manually, you can run the application to profile each table. The results are stored as new versions of the tables’ metadata in the Data Catalog, which you can view interactively via the AWS Lake Formation console or query programmatically via the AWS CLI for AWS Glue.

For more information about the Data Profiler, see the GitHub repo.

The Deequ library does not support tables with nested data (such as JSON). If you want to run the application on a table with nested data, this must be un-nested/flattened or relationalized before profiling. For more information about useful transforms for this task, see AWS Glue Scala DynamicFrame APIs or AWS Glue PySpark Transforms Reference.

The following table shows the profiling metrics the application computes for column data types Text and Numeric. The computation of some profiling metrics for Text columns can be costly and is disabled by default. You can enable it by setting the compExp input parameter to true (see next section).

MetricDescriptionData Type
ApproxCountDistinctApproximate number of distinct values, computed with HyperLogLogPlusPlus sketches.Text / Numeric
CompletenessFraction of non-null values in a column.Text / Numeric
DistinctnessFraction of distinct values of a column over the number of all values of a column. Distinct values occur at least one time. For example, [a, a, b] contains two distinct values a and b, so distinctness is 2/3.Text / Numeric
MaxLengthMaximum length of the column.Text
MinLengthMinimum length of the column.Text
CountDistinctExact number of distinct values.Text
EntropyEntropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). For example, [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055.Text
HistogramThe summary of values in a column of a table. Groups the given column’s values and calculates the number of rows with that specific value and the fraction of this value.Text
UniqueValueRatioFraction of unique values over the number of all distinct values of a column. Unique values occur exactly one time; distinct values occur at least one time. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2.Text
UniquenessFraction of unique values over the number of all values of a column. Unique values occur exactly one time. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3.Text
ApproxQuantilesApproximate quantiles of a distribution.Numeric
MaximumMaximum value of the column.Numeric
MeanMean value of the column.Numeric
MinimumMinimum value of the column.Numeric
StandardDeviationStandard deviation value of the column.Numeric
SumSum of the column.Numeric

Application description

You can run the application via spark-submit on a transient or permanent EMR cluster (see the “Creating an EMR cluster” section in this post for minimum version specification) with Spark installed and configured with the Data Catalog settings option Use for Spark table metadata enabled.

The following example code executes the application:

 $ spark-submit \
  --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
  --master yarn \
  --deploy-mode cluster \
  --name data-profiler-for-aws-glue-data-catalog \
  /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
  --dbName nyctlcdb \
  --region eu-west-1 \
  --compExp true \
  --statsPrefix DQP \
  --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
  --profileUnsupportedTypes true \
  --noOfBins 30 \
  --quantiles 10

The following table summarizes the input parameters that the application accepts.

NameTypeRequiredDefaultDescription
--dbName (-d)StringYesN/AData Catalog database name. The database must be defined in the Catalog owned by the same account where the application is executed.
--region (-r)StringYesN/AAWS Region endpoint where the Data Catalog database is defined, for example us-west-1 or us-east-1. For more information, see Regional Endpoints.
--compExp (-c)BooleanNofalseIf true, the application also executes “expensive” profiling analyzers on Text columns. These are CountDistinct, Entropy, Histogram, UniqueValueRatio, and Uniqueness. If false, only the following default analyzers are executed: ApproxCountDistinct, Completeness, Distinctness, MaxLength, MinLength. All analyzers for Numeric columns are always executed.
--statsPrefix (-p)StringNoDQPString prepended to the statistics names in the Data Catalog. The application also adds two underscores (__). This is useful to identify metrics calculated by the application.
--s3BucketPrefix (-s)StringNoblankFormat must be s3Buckename/prefix. If specified, the application writes Parquet files with metrics in the prefixes db_name=…/table_name=….
--profileUnsupportedTypes (-u)BooleanNofalseBy default, the Amazon Deequ library only supports Text and Numeric columns. If this parameter is set to true, the application also profiles columns of type Boolean and Date.
--noOfBins (-b)IntegerNo10When --compExp (-c) is true, sets the number of maximum values to create for the Histogram analyzer for String columns.
--quantiles (-q)IntegerNo10Sets the number of quantiles to calculate for the ApproxQuantiles analyzer for numeric columns.

Setting up your environment

The following walkthrough demonstrates how to create and populate a Data Catalog database with three tables, which simulates a process with monthly updates. For this post, you simulate three monthly runs: February 2, 2019, March 2, 2019, and April 2, 2019.

After table creation and after each monthly table update, you run the application to generate and update the profiling information for each table in the Data Catalog and Amazon S3 repository. The post also provides examples of how to query the data using the AWS CLI and Athena, and build a simple Amazon QuickSight dashboard.

This post uses the New York City Taxi and Limousine Commission (TLC) Trip Record Data on the Registry of Open Data on AWS. In particular, you use the tables yellow_tripdata, fhv_tripdata, and green_tripdata.

The following steps explain how to set up the environment.

Creating an EMR cluster

The first step is to create an EMR cluster. Connect to the cluster master node and execute the code via spark-submit. Make sure that the cluster version is at least 5.28.0 with at least Hadoop and Spark installed and that you use the Data Catalog as table metadata for Spark.

The master node should also be accessible via SSH. For instructions, see Connect to the Master Node Using SSH.

Downloading the application

You can download the source code and create a uber jar with all dependencies from the application GitHub repo. You can build the application as a uber jar with all dependencies using the Scala Build Tool (sbt) with the following commands (adjust the memory values according to your needs):

$ export SBT_OPTS="-Xms1G -Xmx3G -Xss2M -XX:MaxMetaspaceSize=3G" && sbt assembly

By default, the .jar file is created in the following path, relative to the project root directory:

./target/scala-2.11/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar

When the .jar file is available, copy it to the master node of the EMR cluster. One way to copy the file to the master node code is to copy it to Amazon S3 from the client where the file was created and download it from Amazon S3 to the master node.

For this post, copy the file in the /home/hadoop directory of the master node. The full path is /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar.

Setting up S3 buckets and copy initial data

You use an S3 bucket to store the data that you profile. For this post, the bucket name is

s3://aws-big-data-blog-samples/

You need to create a bucket with a unique name in your account and store the data there. When the bucket is created and available, use the AWS CLI to copy the first set of files for January 2019 (therefore simulating the February 2, 2019, run) from the s3://nyc-tlc/ bucket. See the following code:

$ DEST_BUCKET=aws-big-data-blog-samples
$ MONTH=2019-01
 
$ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

After you copy the data to your destination bucket, create a second bucket to store the files with the profiling metrics created by the application. This step is optional because you can write the metrics to a prefix in an existing bucket. See the following code:

$ aws s3 mb s3://deequ-profiler/

You are now ready to create the database and tables metadata in the Data Catalog.

Creating metadata in the Data Catalog

The first step is to create a database in AWS Glue. You can create the database using the AWS CLI. See the following code:

$ aws glue create-database \
    --database-input '{"Name": "nyctlcdb"}'

Alternatively, on the AWS Glue console, choose Databases, Add database.

After you create the database, create a new AWS Glue Crawler to infer the schema of the data in the files you copied in the previous step. Complete the following steps:

  1. On the AWS Glue console, choose Crawler.
  2. Choose Add crawler.
  3. For Crawler name, enter nyc-tlc-db-raw.
  4. Choose Next.
  5. For Choose a data store, choose S3.
  6. For Crawl data in, choose Specified path in my account.
  7. For Include path, enter the S3 bucket and prefix where you copied the data earlier.
  8. Choose Next.
  9. In the Choose an IAM role section, select Choose an existing IAM role.
  10. Choose an IAM role that provides access to the S3 bucket and allows writing to the Data Catalog, or create a new one while creating this crawler.
  11. Choose Next.
  12. Choose the database you created earlier to store the tables’ metadata.
  13. Review the crawler properties and choose Finish.
    You can run the crawler when it’s ready. It creates three new tables in the database. The following screenshot shows the update you receive that the crawler is complete.
  14. You can now use the Lake Formation console to check the tables are correct. See the following screenshot of the Tables.If you select one of the tables, the table version is now 0. See the following screenshot.You can also perform the same check using the AWS CLI. See the following code:
    $ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId' 

    [
        "0"
    ]

  15. Check the parameters in the table metadata to verify which values the crawler generated. See the following code:
    $ aws glue get-table \
    	--database-name nyctlcdb \
    	--name trip_data_yellow \
       	--query 'Table.Parameters' 

    {
        "CrawlerSchemaDeserializerVersion": "1.0",
        "CrawlerSchemaSerializerVersion": "1.0",
        "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
        "areColumnsQuoted": "false",
        "averageRecordSize": "144",
        "classification": "csv",
        "columnsOrdered": "true",
        "compressionType": "none",
        "delimiter": ",",
        "objectCount": "1",
        "recordCount": "4771445",
        "sizeKey": "687088084",
        "skip.header.line.count": "1",
        "typeOfData": "file"
    }

  16. Check the metadata attributes for three columns in the same table. This post chooses the following columns because they have different data types, though any other column is suitable for this check. In the following code, the only attributes currently available for the columns are “Name” and “Type:”
    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'

    [
        {
            "Name": "fare_amount",
            "Type": "double"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'

    [
        {
            "Name": "passenger_count",
            "Type": "bigint"
        }
    ]

You can display the same information via the Lake Formation console. See the following screenshot.

You are now ready to execute the application.

First application execution

Connect to the EMR cluster master node via SSH and run the application with the following code (change the input parameters as needed, especially the value for the s3BucketPrefix parameter):

$ spark-submit \
    --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
    --master yarn \
    --deploy-mode cluster \
    --name data-profiler-for-aws-glue-data-catalog \
    /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
    --dbName nyctlcdb \
    --region eu-west-1 \
    --compExp true \
    --statsPrefix DQP \
    --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
    --profileUnsupportedTypes true \
    --noOfBins 30 \
    --quantiles 10

Profiling information in the metadata in the Data Catalog

When the application is complete, you can recheck the metadata via the Lake Formation console for the tables and verify that a new table version was created. See the following screenshot.

You can verify the same information via the AWS CLI. See the following code:

$ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId'
[
    "1",
    "0"
]

Check the metadata for the table and verify that the profiling information the application generated was successfully stored. In the following code, the parameter “DQP__Size” was generated, which contains the number of records in the table as calculated by the Deequ library:

$ aws glue get-table \ 
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.Parameters'
{
    "CrawlerSchemaDeserializerVersion": "1.0-",
    "CrawlerSchemaSerializerVersion": "1.0",
    "DQP__Size": "7667793.0",
    "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
    "areColumnsQuoted": "false",
    "averageRecordSize": "144",
    "classification": "csv",
    "columnsOrdered": "true",
    "compressionType": "none",
    "delimiter": ",",
    "objectCount": "1",
    "recordCount": "4771445",
    "sizeKey": "687088084",
    "skip.header.line.count": "1",
    "typeOfData": "file"
}

Similarly, you can verify that the metadata for the columns you checked previously contains the profiling information the application generated. This is stored in the “Parameters” object for each column. Each new attribute starts with the string “DQP” as specified in the statsPrefix input parameter. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 
[
    {
        "Name": "store_and_fwd_flag",
        "Type": "string",
        "Parameters": {
            "DQP__ApproxCountDistinct": "3.0",
            "DQP__Completeness": "1.0",
            "DQP__CountDistinct": "3.0",
            "DQP__Distinctness": "3.912468685578758E-7",
            "DQP__Entropy": "0.03100483390393341",
            "DQP__Histogram.abs.N": "7630142.0",
            "DQP__Histogram.abs.Y": "37650.0",
            "DQP__Histogram.abs.store_and_fwd_flag": "1.0",
            "DQP__Histogram.bins": "3.0",
            "DQP__Histogram.ratio.N": "0.9950897213839758",
            "DQP__Histogram.ratio.Y": "0.004910148200401341",
            "DQP__Histogram.ratio.store_and_fwd_flag": "1.3041562285262527E-7",
            "DQP__MaxLength": "18.0",
            "DQP__MinLength": "1.0",
            "DQP__UniqueValueRatio": "0.3333333333333333",
            "DQP__Uniqueness": "1.3041562285262527E-7"
        }
    }
]
$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'
[
    {
        "Name": "fare_amount",
        "Type": "double",
        "Parameters": {
            "DQP__ApproxCountDistinct": "6125.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "8.187492802687814E-4",
            "DQP__Maximum": "623259.86",
            "DQP__Mean": "12.40940884025023",
            "DQP__Minimum": "-362.0",
            "DQP__StandardDeviation": "262.0720412055651",
            "DQP__Sum": "9.515276582999998E7",
            "DQP__name-0.1": "5.0",
            "DQP__name-0.2": "6.0",
            "DQP__name-0.3": "7.0",
            "DQP__name-0.4": "8.0",
            "DQP__name-0.5": "9.0",
            "DQP__name-0.6": "10.5",
            "DQP__name-0.7": "12.5",
            "DQP__name-0.8": "15.5",
            "DQP__name-0.9": "23.5",
            "DQP__name-1.0": "623259.86"
        }
    }
]

The parameters named “DQP__name-x.x” are the results of the ApproxQuantiles Deequ analyzer for numeric columns; the number of quantiles is set via the –quantiles (-q) input parameter of the application. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'
[
    {
        "Name": "passenger_count",
        "Type": "bigint",
        "Parameters": {
            "DQP__ApproxCountDistinct": "10.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "1.3041562285262527E-6",
            "DQP__Maximum": "9.0",
            "DQP__Mean": "1.5670782410373156",
            "DQP__Minimum": "0.0",
            "DQP__StandardDeviation": "1.2244305354114957",
            "DQP__Sum": "1.201603E7",
            "DQP__name-0.1": "1.0",
            "DQP__name-0.2": "1.0",
            "DQP__name-0.3": "1.0",
            "DQP__name-0.4": "1.0",
            "DQP__name-0.5": "1.0",
            "DQP__name-0.6": "1.0",
            "DQP__name-0.7": "1.0",
            "DQP__name-0.8": "2.0",
            "DQP__name-0.9": "3.0",
            "DQP__name-1.0": "9.0"
        }
    }
]

Profiling information in Amazon S3

You can now also verify that the profiling information was saved in Parquet format in the S3 bucket you specified in the s3BucketPrefix application input parameter. The following screenshot shows the buckets via the Amazon S3 console.

The data is stored using prefixes that are compatible with Apache Hive partitions. This is useful to optimize performance and costs when you use analytics services like Athena. The partitions are defined on db_name and table_name. The following screenshot shows the details of table_name=trip_data_yellow.

Each execution of the application generates one Parquet file appending data to the metrics table for each physical table.

Second execution after monthly table updates

To run the application after monthly table updates, complete the following steps:

  1. Copy the new files for February 2019 to simulate the March 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-02
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. The following screenshot shows that the three tables were updated successfully.
  3. Check that the crawler created a third version of the table. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "2",
        "1",
        "0"
    ]

  4. Rerun the application to generate the new profiling metadata, entering the same code as before. To keep clean information, before storing new profiling information in the metadata, the application removes all custom attributes starting with the string specified in the “statsPrefix” See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

    Following a successful execution, a new version of the table was created. See the following code:

    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "3",
        "2",
        "1",
        "0"
    ]

  5. Check the value of the DQP__Size attribute; its value has changed. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "1.4687169E7"
    }

  6. Check one of the columns you saw earlier to see the updated profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "2.042599223853147E-7",
                "DQP__Entropy": "0.0317381414905775",
                "DQP__Histogram.abs.N": "1.4613018E7",
                "DQP__Histogram.abs.Y": "74149.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "2.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9949513074984022",
                "DQP__Histogram.ratio.Y": "0.005048556328316233",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.361732815902098E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

Third execution after monthly tables updates

To run the application a third time, complete the following steps:

  1. Copy the new files for March 2019 to simulate the April 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-03
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. You now have five versions of the table metadata. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "4",
        "3",
        "2",
        "1",
        "0"
    ]

  3. Rerun the application to update the profiling information. See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

  4. Check the DQP__Size parameter to see its new updated value. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "2.2519715E7"
    }

  5. Check one of the columns you saw earlier to the update profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "1.3321660598280218E-7",
                "DQP__Entropy": "0.030948463301702846",
                "DQP__Histogram.abs.N": "2.2409376E7",
                "DQP__Histogram.abs.Y": "110336.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "3.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9951003376374878",
                "DQP__Histogram.ratio.Y": "0.004899529145906154",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.3321660598280218E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

You can view and manage the same values via the Lake Formation console. See the following screenshot of the Edit column section.

Data profiling reporting with Athena and Amazon QuickSight

As demonstrated earlier, the application can save profiling information in Parquet format to an S3 bucket and prefix into db_name and table_name partitions. See the following code:

$ aws s3 ls s3://deequ-profiler/deequ-profiler-metrics/ --recursive
2020-01-28 09:30:12          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/_SUCCESS
2020-01-28 09:17:15       6506 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-760dafb1-fc37-4700-a506-a9dc71b7a745-c000.snappy.parquet
2020-01-28 09:01:19       6498 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-78dd2c4a-83c2-44c4-aa71-30e7a9fb0089-c000.snappy.parquet
2020-01-28 09:30:11       6505 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-cff4f2de-64b4-4338-a0f6-a50ed34a378f-c000.snappy.parquet
2020-01-28 09:30:08          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/_SUCCESS
2020-01-28 09:01:15       6355 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-0d5969c9-70a7-4cd4-ac64-8f16e35e23b5-c000.snappy.parquet
2020-01-28 09:17:11       6353 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-12a7b0b0-6a2a-45d5-a241-645148af41d7-c000.snappy.parquet
2020-01-28 09:30:08       6415 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-adecccd6-a884-403f-aa80-c574647a10f9-c000.snappy.parquet
2020-01-28 09:29:56          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/_SUCCESS
2020-01-28 09:16:59       6408 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-2e5e3280-29db-41b9-be67-a68ef8cb9777-c000.snappy.parquet
2020-01-28 09:01:02       6424 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-c4972037-7d3c-4279-8b77-361741133816-c000.snappy.parquet
2020-01-28 09:29:55       6398 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-f2d6076e-7019-4b03-97ba-a6aab8a677b8-c000.snappy.parquet

The application generates one Parquet file per execution.

Preparing metadata for profiler metrics data

To prepare the metadata for profiler metrics data, complete the following steps:

  1. On the Lake Formation console, create a new database with the name deequprofilerdb to contain the metadata.
  2. On the AWS Glue console, create a new crawler with the name deequ-profiler-metrics to infer the schema of the profiling information stored in Amazon S3.

The following screenshot shows the properties of the new crawler.

After you run the crawler, one table with the name deequ_profiler_metrics was created in the database. The table has the following columns.

NameData TypePartitionDescription
instancestringColumn name the statistic in column “name” refers to. Set to “*” if entity is “Dataset”.
entitystringEntity the statistic refers to. Valid values are “Column” and “Dataset”.
namestringMetrics name, derived from the Deequ Analyzer used for the calculation.
valuedoubleValue of the metric.
typestringData type of the column if entity is “Column”, blank otherwise.
db_name_embedstringDatabase name, same values as in partition “db_name”.
table_name_embedstringTable name, same values as in partition “table_name”.
profiler_run_dtdateDate the profiler application was run.
profiler_run_tstimestampDate/time the profile application was run; it can also be used as execution identifier.
db_namestring1Database name.
table_namestring2Table name.

Reporting with Athena

You can use Athena to run a query that checks the statistics for a column in the database for the execution you ran in March 2019. See the following code:

SELECT db_name, 
    profiler_run_dt,
    profiler_run_ts,
    table_name, 
    entity,
    instance,
    name,
    value
FROM "deequprofilerdb"."deequ_profiler_metrics" 
WHERE db_name = 'nyctlcdb' AND
    table_name = 'trip_data_yellow' AND 
    entity = 'Column' AND
    instance = 'extra' AND
    profiler_run_dt = date_parse('2019-03-02','%Y-%m-%d')
ORDER BY name
;

The following screenshot shows the query results.

Reporting with Amazon QuickSight

To create a dashboard in Amazon QuickSight based on the profiling metrics data the application generated, complete the following steps:

  1. Create a new QuickSight dataset called deequ_profiler_metrics with Athena as the data source.
  2. In the Choose your table section, select the profiling metrics table that you created earlier.
  3. Import the data into SPICE.

After you create the dataset, you can view it and edit its properties. For this post, leave the properties unchanged.

You are now ready to build visualizations and dashboards.

The following images in this section show a simple analysis with controls that allow for the selection of the Database, Table profiled, Entity, Column, and Profiling Metric.

Control NameMapped Column
Databasedb_name
Tabletable_name
Entityentity
Columninstance
Profiling Metricname

For more information about adding controls, see Create Amazon QuickSight dashboards that have impact with parameters, on-screen controls, and URL actions.

For example, you can select the Size metric of a specific table to see how many records are available in the table after each monthly load. See the following screenshot.

Similarly, you can use the same analysis to see how a specific metric changes over time for a column. The following screenshot shows that the mean of the fare_amount column changes after each monthly load.

You can select any metric calculated on any column, which makes for a very flexible profiling data reporting system.

Conclusion

This post demonstrated how to extend the metadata contained in the Data Catalog with profiling information calculated with an Apache Spark application based on the Amazon Deequ library running on an EMR cluster.

You can query the Data Catalog using the AWS CLI. You can also build a reporting system with Athena and Amazon QuickSight to query and visualize the data stored in Amazon S3.

Special thanks go to Sebastian Schelter at Amazon Search and Sven Hansen and Vincent Gromakowski at AWS for their help and support

 


About the Author

Francesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

Simplify data pipelines with AWS Glue automatic code generation and Workflows

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/simplify-data-pipelines-with-aws-glue-automatic-code-generation-and-workflows/

In the previous post of the series, we discussed how AWS Glue job bookmarks help you to incrementally load data from Amazon S3 and relational databases. We also saw how using the AWS Glue optimized Apache Parquet writer can help improve performance and manage schema evolution.

In the third post of the series, we’ll discuss three topics. First, we’ll look at how AWS Glue can automatically generate code to help transform data in common use cases such as selecting specific columns, flattening deeply nested records, efficiently parsing nested fields, and handling column data type evolution.

Second, we’ll outline how to use AWS Glue Workflows to build and orchestrate data pipelines using different Glue components such as Crawlers, Apache Spark and Python Shell ETL jobs.

Third, we’ll see how to leverage SparkSQL in your ETL jobs to perform SQL based transformations on datasets stored in Amazon S3 and relational databases.

Automatic Code Generation & Transformations: ApplyMapping, Relationalize, Unbox, ResolveChoice

AWS Glue can automatically generate code to help perform a variety of useful data transformation tasks. These transformations provide a simple to use interface for working with complex and deeply nested datasets. For example, some relational databases or data warehouses do not natively support nested data structures. AWS Glue can automatically generate the code necessary to flatten those nested data structures before loading them into the target database saving time and enabling non-technical users to work with data.

The following is a list of the popular transformations AWS Glue provides to simplify data processing:

  1. ApplyMapping is a transformation used to perform column projection and convert between data types. In this example, we use it to unnest several fields, such as action.id, which we map to the top-level action.id field. We also cast the id column to a long.
    medicare_output = medicare_src.apply_mapping(
        [('id, 'string', id, 'string'), 
        ('type, string, type', string),
        ('actor.id, 'int', actor.id', int),
        ('actor.login', 'string', actor.login', 'string'),
        ('actor.display_login', 'string', 'actor.display_login', 'string'),
        ('actor.gravatar_id', 'long', 'actor.gravatar_id', 'long'),
        ('actor.url', 'string','actor.url', 'string'),
        ('actor.avatar_url', 'string', 'actor.avatar_url', string)]
    )

  1. Relationalize converts a nested dataset stored in a DynamicFrameto a relational (rows and columns) format. Nested structures are unnested into top level columns and arrays decomposed into different tables with appropriate primary and foreign keys inserted. The result is a collection of DynamicFrames representing a set of tables that can be directly inserted into a relational database. More detail about relationalize can be found here.
    ## An example relationalizing and writing to Redshift
    dfc = history.relationalize("hist_root", redshift_temp_dir)
    ## Cycle through results and write to Redshift.
    for df_name in dfc.keys():
        df = dfc.select(df_name)
        print "Writing to Redshift table: ", df_name, " ..."
        glueContext.write_dynamic_frame.from_jdbc_conf(frame = df, 
            catalog_connection = "redshift3", 
            connection_options = {"dbtable": df_name, "database": "testdb"}, 
            redshift_tmp_dir = redshift_temp_dir)

  2. Unbox parses a string field of a certain type, such as JSON, into individual fields with their corresponding data types and store the result in a DynamicFrame. For example, you may have a CSV file with one field that is in JSON format {“a”: 3, “b”: “foo”, “c”: 1.2}. Unbox will reformat the JSON string into three distinct fields: an int, a string, and a double. The Unbox transformation is commonly used to replace costly Python User Defined Functions required to reformat data that may result in Apache Spark out of memory exceptions. The following example shows how to use Unbox:
    df_result = df_json.unbox('json', "json")

  3. ResolveChoice: AWS Glue Dynamic Frames support data where a column can have fields with different types. These columns are represented with Dynamic Frame’s choice type. For example, Dynamic Frame schema for the medicare dataset shows up as follows:
    root
     |-- drg definition: string
     |-- provider id: choice
     |    |-- long
     |    |-- string
     |-- provider name: string
     |-- provider street address: string

    This is because the “provider id” column could either be a long or string type. The Apache Spark Dataframe considers the whole dataset and is forced to cast it to the most general type, namely string. Dynamic Frames allow you to cast the type using the ResolveChoice transform. For example, you can cast the column to long type as follows.

    medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
    
    medicare_res.printSchema()
     
    root
     |-- drg definition: string
     |-- provider id: long
     |-- provider name: string
     |-- provider street address: string

    This transform would also insert a null where the value was a string that could not be cast. As a result, the records with string type casted to null values can also be identified now. Alternatively, the choice type can also be cast to struct, which keeps values of both types.

Build and orchestrate data pipelines using AWS Glue Workflows

AWS Glue Workflows provide a visual tool to author data pipelines by combining Glue crawlers for schema discovery, and Glue Spark and Python jobs to transform the data. Relationships can be defined and parameters passed between task nodes to enable users to build pipelines of varying complexity. Workflows can be scheduled to run on a schedule or triggered programmatically. You can track the progress of each node independently or the entire workflow making it easier to troubleshoot your pipelines.

A typical workflow for ETL workloads is organized as follows:

  1. Glue Python command triggered manually, on a schedule, or on an external CloudWatch event. It would pre-process or list the partitions in Amazon S3 for a table under a base location. For example, a CloudTrail logs partition to process could be: s3://AWSLogs/ACCOUNTID/CloudTrail/REGION/YEAR/MONTH/DAY/HOUR/.The Python command can list all the regions and schedule crawlers to create different Glue Data Catalog tables on each region.
  2. Glue Crawlers triggered next to populate new partitions for every hour in Glue Data Catalog for recently ingested in Amazon S3.
  3. Concurrent Glue ETL jobs triggered to separately filter and process each partition or a group of partitions. For example, CloudTrail events corresponding to the last week can be read by a Glue ETL job by passing in the partition prefix as Glue job parameters and using Glue ETL push down predicates to just read all the partitions in that prefix.Partitioning and orchestrating concurrent Glue ETL jobs allows you to scale and reliably execute individual Apache Spark applications by processing only a subset of partitions in the Glue Data Catalog table. The transformed data can then be concurrently written back by all individual Glue ETL jobs to a common target table in Amazon S3 data lake, AWS Redshift or other databases.

Finally, a Glue Python command can be triggered to capture the completion status of the different Glue entities including Glue Crawlers, parallel Glue ETL jobs; and post-process or retry any failed components.

Executing SQL using SparkSQL in AWS Glue

AWS Glue Data Catalog as Hive Compatible Metastore

The AWS Glue Data Catalog is a managed metadata repository compatible with the Apache Hive Metastore API. You can follow the detailed instructions here to configure your AWS Glue ETL jobs and development endpoints to use the Glue Data Catalog. You also need to add the Hive SerDes to the class path of AWS Glue Jobs to serialize/deserialize data for the corresponding formats. You can then natively run Apache Spark SQL queries against your tables stored in the Data Catalog.

The following example assumes that you have crawled the US legislators dataset available at s3://awsglue-datasets/examples/us-legislators. We’ll use the Spark shell running on AWS Glue developer endpoint to execute SparkSQL queries directly on the legislators’ tables cataloged in the AWS Glue Data Catalog.

>>> spark.sql("use legislators")
DataFrame[]
>>> spark.sql("show tables").show()
+-----------+------------------+-----------+
|   database|         tableName|isTemporary|
+-----------+------------------+-----------+
|legislators|        areas_json|      false|
|legislators|    countries_json|      false|
|legislators|       events_json|      false|
|legislators|  memberships_json|      false|
|legislators|organizations_json|      false|
|legislators|      persons_json|      false|

>>> spark.sql("select distinct organization_id from memberships_json").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

A similar approach to the above would be to use AWS Glue DynamicFrame API to read the data from S3. The DynamicFrame is then converted to a Spark DataFrame using the toDF method. Next, a temporary view can be registered for DataFrame, which can be queried using SparkSQL. The key difference between the two approaches is the use of Hive SerDes for the first approach, and native Glue/Spark readers for the second approach. The use of native Glue/Spark provides the performance and flexibility benefits such as computation of the schema at runtime, schema evolution, and job bookmarks support for Glue Dynamic Frames.

>>> memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json")
>>> memberships.toDF().createOrReplaceTempView("memberships")
>>> spark.sql("select distinct organization_id from memberships").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

Workflows and S3 Consistency

If you have a workflow of external processes ingesting data into S3, or upstream AWS Glue jobs generating input for a table used by downstream jobs in a workflow, you can encounter the following Apache Spark errors.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 16.0 failed 4 times, most recent failure: Lost task 10.3 in stage 16.0 (TID 761, ip-<>.ec2.internal, executor 1): 
java.io.FileNotFoundException: No such file or directory 's3://<bucket>/fileprefix-c000.snappy.parquet'
It is possible the underlying files have been updated.
You can explicitly invalidate the cache in Spark by running 
'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

These errors happen when the upstream jobs overwrite to the same S3 objects that the downstream jobs are concurrently listing or reading. This can also happen due to eventual consistency of S3 resulting in overwritten or deleted objects get updated at a later time when the downstream jobs are reading. A common manifestation of this error occurs when you are create a SparkSQL view and execute SQL queries in the downstream job. To avoid these errors, the best practice is to set up a workflow with upstream and downstream jobs scheduled at different times, and read/write to different S3 partitions based on time.

You can also enable the S3-optimized output committer for your Glue jobs by passing in a special job parameter: “–enable-s3-parquet-optimized-committer” set to true. This committer improves application performance by avoiding list and rename operations in Amazon S3 during job and task commit phases. It also avoids issues that can occur with Amazon S3’s eventual consistency during job and task commit phases, and helps to minimize task failures.

Conclusion

In this post, we discussed how to leverage the automatic code generation process in AWS Glue ETL to simplify common data manipulation tasks such as data type conversion and flattening complex structures. We also explored using AWS Glue Workflows to build and orchestrate data pipelines of varying complexity. Lastly, we looked at how you can leverage the power of SQL, with the use of AWS Glue ETL and Glue Data Catalog, to query and transform your data.

In the final post, we will explore specific capabilities in AWS Glue and best practices to help you better manage the performance, scalability and operation of AWS Glue Apache Spark jobs.

 


About the Authors

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

Exploring the public AWS COVID-19 data lake

Post Syndicated from Jason Berkowitz original https://aws.amazon.com/blogs/big-data/exploring-the-public-aws-covid-19-data-lake/

The AWS COVID-19 data lake—a centralized repository of up-to-date and curated datasets on or related to the spread and characteristics of the novel coronavirus (SARS-CoV-2) and its associated illness, COVID-19—is now publicly available. For more information, see A public data lake for analysis of COVID-19 data. Globally, there are several efforts underway to gather this data, and AWS is working with partners to make this crucial data freely available and keep it up-to-date.

This data is readily available for you to ask questions, blend it with your own datasets, and create new insights in your own data lake. AWS is supporting Northwestern University in performing research developing pandemic-surveillance methods. Ariel Chandler, health informatics PhD candidate, says, “The AWS COVID-19 data lake provided me access to public data easily so I didn’t have to do the heavy lifting to get access to information that should be at everyone’s fingertips. Access to the AWS Data Exchange and these processing tools are helping to track, report, and visualize the spread of COVID-19 across the state to aid with the Illinois public health response. The data lake uses a wide range of data sources, including consumer and location data, to inform which communities are most at risk. That information is used to guide the provision of medical and social services to those who need them the most during this crisis.”

You can also produce new ways to query the information and publish those insights back into the data lake. Data may come from public websites, data purchased via data providers on AWS Data Exchange, or internal systems.

This post walks you through accessing the AWS COVID-19 data lake through the AWS Glue Data Catalog via Amazon SageMaker or Jupyter and using the open-source AWS Data Wrangler library. AWS Data Wrangler is an open-source Python package that extends the power of Pandas library to AWS and connects DataFrames and AWS data-related services (such as Amazon Redshift, Amazon S3, AWS Glue, Amazon Athena, and Amazon EMR). For more information about what you can build by using this data lake, see the associated public Jupyter notebook on GitHub.

The data for this post is from the following sources:

This data lake is comprised of data in a publicly readable Amazon S3 bucket. For a complete selection of COVID-19 data, see Data related to COVID-19 available for Research & Development. For instructions on subscribing to data products, see AWS Data Exchange – Find, Subscribe To, and Use Data Products.

Solution overview

This walkthrough includes the following steps:

  1. Installing the AWS CLI
  2. Configuring Amazon SageMaker
  3. Exploring the data through the Data Catalog

You also explore four analyses and their visualizations:

  • County-level percent changes
  • Foot traffic to public venues
  • Impact of number of cases on hospital beds
  • Impact of population density on hospital beds

Prerequisites

This post assumes that you have configured access to the data using an AWS CloudFormation template. For instructions, see A public data lake for analysis of COVID-19 data.

You also need access to an AWS account with permissions to do the following:

  • Create a CloudFormation stack
  • Create AWS Glue resources (catalog databases and tables)
  • Launch Amazon SageMaker notebooks

Installing the AWS CLI

Your first step is to install the AWS CLI and configure it for the us-east-2 Region. This is where the COVID-19 public data lake exists.

If you plan to work locally in Jupyter, you should set up a virtual environment for installing Python packages. Make sure that the following Python packages are installed: plotly, pandas, numpy, and awswrangler.

Configuring Amazon SageMaker

To configure Amazon SageMaker, complete the following steps:

  1. Create your Amazon SageMaker notebook instance in us-east-2 (the database and tables you created in the post A public data lake for analysis of COVID-19 data are in that Region).
  2. Record the IAM role you use for the notebook instance.
  3. Modify the IAM role assigned to the notebook instance to add the policies AmazonAthenaFullAccess and AWSDataExchangeSubscriberFullAccess.
  4. Create a Jupyter notebook on your new notebook instance.

Make sure the following Python packages are installed: plotly, pandas, numpy, awswrangler. For more information about installing external Python packages, see Install External Libraries and Kernels in Notebook Instances.

Exploring the data through the Data Catalog

When the CloudFormation stack shows the status CREATE_COMPLETE, you can view the tables the template created. You’re now ready to explore the data and its visualizations. This post provides four examples of visualizations.

County-level percent changes

Enigma – Global Coronavirus (COVID-19) Data (Johns Hopkins) tracks the global number of cases, recoveries, and deaths per day. Data sources include the World Health Organization (WHO), the US Centers for Disease Control and Prevention (CDC), and the National Health Commission of the People’s Republic of China (NHC). The data is collected by Johns Hopkins University and supported by the ESRI Living Atlas Team.

You can visualize the percent increase of a US county’s infected population over a day with this data. For example, if a county has a population of 1,000 and its infected population increases from 10 to 100 from Monday to Tuesday, then its infected population increased from 1% to 10%.

The following visualizations show the increase in the percent of population infected from March 29, 2020, to March 30, 2020, in New York City and surrounding areas. The more yellow a county, the larger increase of cases occurred. Gray counties increased less than 0.01% from March 29 to March 30.

The following visualization zooms in on New York counties. The yellow county is New York County (the borough of Manhattan), with an increase of 0.23% of its population with COVID-19 from March 29 to March 30. The blue counties to its east are Nassau County and Suffolk County, with 0.07% and 0.05% increases, respectively. The blue-green county north of New York County is Westchester County, with an increase of 0.08%.

The accompanying notebook allows you to vary the date parameter to visualize various rates of increase and zoom out of the map to visualize the entire United States.

Foot traffic to public venues

Foursquare – COVID-19 Foot Traffic Data is a daily aggregated and anonymized percentage dataset that demonstrates how foot traffic to various venues (such as airports, gyms, and grocery stores) has changed since February 19, 2020, in different metro areas. To obtain the following visualizations, you download the data from AWS Data Exchange and use Amazon SageMaker notebooks to visualize.

The following visualization uses the foot traffic data to plot the change of foot traffic to various venues after February 19. The plot shows how public traffic to shopping malls, clothing stores, casual dining chains, and airports have a sharp decline after the National Emergency Declaration on March 13. On the other hand, traffic to grocery stores, warehouse stores, and drug stores have sharp increases in the same period.

You can make similar plots for various metro areas, including New York City, San Francisco/Oakland, Los Angeles, Seattle, and 19 different venues with the accompanying notebook.

Impact of number of cases on hospital beds

The following visualizations use Enigma – Global Coronavirus (COVID-19) Data (Johns Hopkins) and Rearc – USA Hospital Beds – COVID-19 | Definitive Healthcare to analyze how the growing number of COVID-19 cases affects local hospitals. The hospital bed dataset is a dataset of the numbers of licensed beds, staffed beds, ICU beds, and the bed utilization rate for hospitals in the United States.

The first plot shows the growth in the number of hospitalized cases over 10 days in New York County. The hospitalized cases are calculated with a 10% hospitalization rate, which means 10% of all COVID-19 cases for Manhattan result in hospitalization. In the accompanying notebook, the hospitalization rate is a parameter, so you can visualize how various hospitalization rates have different healthcare needs. To generate this plot, you use the daily COVID-19 case information from Johns Hopkins to simulate the number of hospitalized cases and use the Definitive Healthcare hospital bed data to calculate the total hospital capacity for Manhattan.

The second visualization is a hospital utilization plot by county for the entire United States. The more yellow a county, the more its healthcare resources are burdened with a 20% COVID-19 hospitalization rate. Counties in gray have less than a 5% hospitalization rate.

As with the previous visualization, you can simulate various hospitalization rates in the accompanying notebook to visualize how COVID-19 burdens health care resources around the country. You can also change the data parameter to visualize how healthcare resource requirements change over time.

Impact of population density on hospital beds

The following visualization uses Enigma – Global Coronavirus (COVID-19) Data (Johns Hopkins), Rearc – USA Hospital Beds – COVID-19 | Definitive Healthcare, and US Census County-Based Data to compare the confirmed cases and available hospital beds per square kilometer for two different counties. The Enigma dataset provides case data; the Rearc dataset provides hospital bed information throughout the country, which is aggregated at the county level. The number of cases and beds is normalized by the land area in square kilometers using the US Census County data.

In the accompanying notebook, you can change the scope of visualization, the number of entities, and the bed resources. The following visualization provides cases and licensed beds per square kilometer at the county level for Alameda and San Diego counties.

These examples are a few of the innumerable analyses you can run on the public data lake.

Cleaning up

You incur no additional cost for accessing the AWS COVID-19 data lake beyond the standard charges for the AWS services that you use. For example, if you use Athena, you incur the costs for running queries and the data storage for the query result in Amazon S3, but incur no costs for accessing the data lake. Depending on the Amazon SageMaker instance you choose, you may incur Amazon SageMaker fees. For more information, see Amazon SageMaker Pricing.

To avoid recurring charges, shut down and delete the Amazon SageMaker instance, any S3 buckets you created, and disable auto-subscriptions for AWS Data Exchange.

Conclusion

Combining our efforts across organizations and scientific disciplines can help us win the fight against the COVID-19 pandemic. With the AWS COVID-19 data lake, you can experiment with and analyze curated data related to the virus, and share your own data and results. We believe that through an open and collaborative effort that combines data, technology, and science, we can inspire insights and foster breakthroughs necessary to contain, curtail, and ultimately cure COVID-19.

For more information about the public AWS COVID-19 data lake visit: https://aws.amazon.com/covid-19-data-lake/.

 


About the Authors

Jason Berkowitz is the Americas Data & Analytics Professional Services Practice Lead. He comes from a background in Machine Learning, Data Lake Architectures and helping customers become data-driven. He is currently working helping customers shape their data lakes and analytic journeys on AWS within Professional Services.

 

 

Colby Wise is a senior data scientist and manager at Amazon Machine Learning Solutions Lab, where he helps AWS customers across different industries accelerate their AI and cloud adoption.

 

 

 

 

 

Ninad Kulkarni is a data scientist in the Amazon Machine Learning Solutions Lab. He helps customers adopt ML and AI solutions by building solutions to address their business problems. Most recently, he has built predictive models for sports customers for on-screen consumption to improve fan engagement.

 

 

New – Serverless Streaming ETL with AWS Glue

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-serverless-streaming-etl-with-aws-glue/

When you have applications in production, you want to understand what is happening, and how the applications are being used. To analyze data, a first approach is a batch processing model: a set of data is collected over a period of time, then run through analytics tools. To be able to react quickly, you can use a streaming model, where data is processed as it arrives, a record at a time or in micro-batches of tens, hundreds, or thousands of records.

Managing continuous ingestion pipelines and processing data on-the-fly is quite complex, because it’s an always-on system that needs to be managed, patched, scaled, and generally taken care of. Today, we are making this easier and more cost-effective to implement by extending AWS Glue jobs, based on Apache Spark, to run continuously and consume data from streaming platforms such as Amazon Kinesis Data Streams and Apache Kafka (including the fully-managed Amazon MSK).

In this way, Glue can provision, manage, and scale the infrastructure needed to ingest data to data lakes on Amazon S3, data warehouses such as Amazon Redshift, or other data stores. For example, you can store streaming data in a DynamoDB table for quick lookups, or in Elasticsearch to look for specific patterns. This procedure is usually referred to as extract, transform, load (ETL).

As you process streaming data in a Glue job, you have access to the full capabilities of Spark Structured Streaming to implement data transformations, such as aggregating, partitioning, and formatting as well as joining with other data sets to enrich or cleanse the data for easier analysis. For example, you can access an external system to identify fraud in real-time, or use machine learning algorithms to classify data, or detect anomalies and outliers.

Processing Streaming Data with AWS Glue
To try this new feature, I want to collect data from IoT sensors and store all data points in an S3 data lake. I am using a Raspberry Pi with a Sense HAT to collect temperature, humidity, barometric pressure, and its position in space in real-time (using the integrated gyroscope, accelerometer, and magnetometer). Here’s an architectural view of what I am building:

First, I register the device with AWS IoT Core, and run the following Python code to send, once per second, a JSON message with sensor data to the streaming-data MQTT topic. I have a single device in this setup, with more devices, I would use a subtopic per device, for example streaming-data/{client_id}.

import time
import datetime
import json
from sense_hat import SenseHat
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder

sense = SenseHat()

topic = "streaming-data"
client_id = "raspberrypi"

# Callback when connection is accidentally lost.


def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(
        return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print("Resubscribe results: {}".format(resubscribe_results))

    for topic, qos in resubscribe_results['topics']:
        if qos is None:
            sys.exit("Server rejected resubscribe to topic: {}".format(topic))


# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))


def collect_and_send_data():
    publish_count = 0
    while(True):

        humidity = sense.get_humidity()
        print("Humidity: %s %%rH" % humidity)

        temp = sense.get_temperature()
        print("Temperature: %s C" % temp)

        pressure = sense.get_pressure()
        print("Pressure: %s Millibars" % pressure)

        orientation = sense.get_orientation_degrees()
        print("p: {pitch}, r: {roll}, y: {yaw}".format(**orientation))

        timestamp = datetime.datetime.fromtimestamp(
            time.time()).strftime('%Y-%m-%d %H:%M:%S')

        message = {
            "client_id": client_id,
            "timestamp": timestamp,
            "humidity": humidity,
            "temperature": temp,
            "pressure": pressure,
            "pitch": orientation['pitch'],
            "roll": orientation['roll'],
            "yaw": orientation['yaw'],
            "count": publish_count
        }
        print("Publishing message to topic '{}': {}".format(topic, message))

        mqtt_connection.publish(
            topic=topic,
            payload=json.dumps(message),
            qos=mqtt.QoS.AT_LEAST_ONCE)
        time.sleep(1)
        publish_count += 1


if __name__ == '__main__':
    # Spin up resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint="a1b2c3d4e5f6g7-ats.iot.us-east-1.amazonaws.com",
        cert_filepath="rapberrypi.cert.pem",
        pri_key_filepath="rapberrypi.private.key",
        client_bootstrap=client_bootstrap,
        ca_filepath="root-CA.crt",
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=client_id,
        clean_session=False,
        keep_alive_secs=6)

    connect_future = mqtt_connection.connect()

    # Future.result() waits until a result is available
    connect_future.result()
    print("Connected!")

    # Subscribe
    print("Subscribing to topic '{}'...".format(topic))
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message_received)

    subscribe_result = subscribe_future.result()
    print("Subscribed with {}".format(str(subscribe_result['qos'])))

    collect_and_send_data()

This is an example of the JSON messages sent by the device:

{
    "client_id": "raspberrypi",
    "timestamp": "2020-04-16 11:33:23",
    "humidity": 39.35261535644531,
    "temperature": 30.10732078552246,
    "pressure": 1020.447509765625,
    "pitch": 4.044007304723748,
    "roll": 7.533848064912158,
    "yaw": 77.01560798660883,
    "count": 104
}

In the Kinesis console, I create the my-data-stream data stream (1 shard is more than enough for my workload). Back in the AWS IoT console, I create an IoT rule to send all data from the MQTT topic to this Kinesis data stream.

Now that all sensor data is sent to Kinesis, I can leverage the new Glue integration to process data as it arrives. In the Glue console, I manually add a table in the Glue Data Catalog. I select Kinesis as the type of source, and enter my stream name and the endpoint of the Kinesis Data Streams service. Note that for Kafka streams, before creating the table, you need to create a Glue connection.

I select JSON as data format, and define the schema for the streaming data. If I don’t specify a column here, it will be ignored when processing the stream.

After that, I confirm the final recap step, and create the my_streaming_data table. We are working to add schema inference to streaming ETL jobs. With that, specifying the full schema up front won’t be necessary. Stay tuned.

To process the streaming data, I create a Glue job. For the IAM role, I create a new one attaching the AWSGlueServiceRole and AmazonKinesisReadOnlyAccess managed policies. Depending on your use case and the set up of your AWS accounts, you may want to use a role providing more fine-grained access.

For the data source, I select the table I just created, receiving data from the Kinesis stream.

To get a script generated by Glue, I select the Change schema transform type. As target, I create a new table in the Glue Data Catalog, using an efficient format like Apache Parquet. The Parquet files generated by this job are going to be stored in an S3 bucket whose name starts with aws-glue- (including the final hyphen). By following the naming convention for resources specified in the AWSGlueServiceRole policy, this job has the required permissions to access those resources.

I leave the default mapping that keeps in output all the columns in the source stream. In this way, I can ingest all the records using the proposed script, without having to write a single line of code.

I quickly review the proposed script and save. Each record is processed as a DynamicFrame, and I can apply any of the Glue PySpark Transforms or any transforms supported by Spark Structured Streaming. By default with this configuration, only ApplyMapping is used.

I start the job, and after a few minutes I see the Parquet files containing the output of the job appearing in the output S3 bucket. They are partitioned by ingest date (year, month, day, and hour).

To populate the Glue Data Catalog with tables based on the content of the S3 bucket, I add and run a crawler. In the crawler configuration, I exclude the checkpoint folder used by Glue to keep track of the data that has been processed. After less than a minute, a new table has been added.

In the Amazon Athena console, I refresh database and tables, and select to preview the output_my_data containing ingest data from this year. In this way, I see the first ten records in the table, and get a confirmation that my setup is working!

Now, as data is being ingested, I can run more complex queries. For example, I can get the minimum and maximum temperature, collected from the device sensors, and the overall number of records stored in the Parquet files.

Looking at the results, I see more than 8,000 records have been processed, with a maximum temperature of 31 degrees Celsius (about 88 degrees Fahrenheit). Actually, it was never really this hot. Temperature is measured by these sensors very close to the device, and is growing as the device is warming up with usage.

I am using a single device in this set up, but the solution implemented here can easily scale up with the number of data sources.

Available Now
Support for streaming sources is available in all regions where Glue is offered, as described in the AWS Region table. For more information, please have a look at the documentation.

Managing a serverless ETL pipeline with Glue makes it easier and more cost-effective to set up and manage streaming ingestion processes, reducing implementation efforts so you can focus on the business outcomes of analytics. You can set up a whole ingestion pipeline without writing code, as I did in this walkthrough, or customize the proposed script based on your needs.

Let me know what are you going to use this new feature for!

Danilo

Query, visualize, and forecast TruFactor web session intelligence with AWS Data Exchange

Post Syndicated from Jay Park original https://aws.amazon.com/blogs/big-data/query-visualize-and-forecast-trufactor-web-session-intelligence-with-aws-data-exchange/

Given the infinite nature of data, finding the right data set to gain business insights can be a challenge. You can improve your business by having access to a central repository of various data sets to query, visualize, and forecast. With AWS Data Exchange, finding the right data set has become much simpler. As an example, you can use data sets on web session visitation and demographics to help you understand which demographic groups visit your website most frequently. You can then improve your business through machine learning (ML) models and visitation forecasts.

AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. After you subscribe to a data product within AWS Data Exchange, you can use the AWS Data Exchange API, AWS CLI, or the AWS Management Console to load data into Amazon S3 directly. You can then analyze the imported data with a wide variety of AWS services, ranging from analytics to machine learning.

This post showcases TruFactor Intelligence-as-a-Service data on AWS Data Exchange. TruFactor’s anonymization platform and proprietary AI ingests, filters, and transforms more than 85 billion high-quality raw signals daily from wireless carriers, OEMs, and mobile apps into a unified phygital consumer graph across physical and digital dimensions. TruFactor intelligence is application-ready for use within any AWS analytics or ML service to power your models and applications running on AWS, with no additional processing required. Common use cases include the following:

  • Consumer segmentation – Web intelligence on internet browsing behavior in the US provides a complete view of the consumer, including interests, opinions, values, digital behavior, and sentiment, to inform segmentation of your customers and those of your competitors.
  • Customer acquisition or churn campaigns – Internet browsing behavior can identify affinity properties for new prospects as well as switching to competitors’ websites.

This walkthrough uses TruFactor’s Daily Mobile Web Session Index and Daily Demographics by Mobile Web Sessions data sets, which are both available for free subscription through the AWS Data Exchange console. While there are commercial data sets available for purchase in AWS Data Exchange, this post uses trial data sets to showcase the breadth and depth of analytics possible with TruFactor’s intelligence.

This TruFactor intelligence is aggregated on over 3 billion records from telco carrier networks and mobile apps per day, originating from approximately 30 million consistent users, distilled into session-level information that provides a complete view of user digital interests. The accuracy, breadth of data provided, and the persistency of the panel deliver a unified view of consumers that can inform insights or power analytic models or applications on AWS.

These two data sets have applications across verticals such as retail, financial services, and advertising. Common use cases include creating detailed customer segmentation (for example, full DNA maps of consumers based on visits to specific web HTTP hosts), identifying affinity properties, and estimating demand for apps or services. This intelligence is also ideal for identifying trends and changes over time.

Solution overview

The following diagram illustrates the architecture of the solution.

The workflow is comprised of the following steps:

  1. Subscribe to a data set from AWS Data Exchange and export to Amazon S3
  2. Run an AWS Glue crawler to load product data
  3. Perform queries with Amazon Athena
  4. Visualize the queries and tables with Amazon QuickSight
  5. Run an ETL job with AWS Glue
  6. Create a time series forecast with Amazon Forecast
  7. Visualize the forecasted data with Amazon QuickSight

This post looks at the demographic distributions across various websites and how to use ML to forecast website visitation.

Walkthrough overview

The walkthrough includes the following steps:

  1. Subscribe to a TruFactor data set from the AWS Data Exchange console and export the data set to Amazon S3
  2. Use an AWS Glue crawler to load the product data into an AWS Glue Data Catalog
  3. Use Amazon Athena for SQL querying
  4. Visualize the query views and tables with Amazon QuickSight
  5. Use AWS Glue jobs to extract, transform, and load your data for forecasting with Amazon Forecast
  6. Use Amazon Forecast to create a time series forecast of the transformed data
  7. Visualize the forecasted web visitation data with Amazon QuickSight

You do not have to perform additional processing or manipulation of the TruFactor intelligence for this walkthrough.

The data sets

The TruFactor data sets this post uses are in Parquet format and snappy compression. The following section provides additional details and schema for each data set.

TruFactor Daily Mobile Web Session Index (US – Nationwide) — Trial

The TruFactor Daily Mobile Web Session Index (US – Nationwide) — Trial data set provides aggregate information per HTTP host as a view of the internet browsing behavior in the US. TruFactor generates the data from high-quality packet layer data sourced from mobile carriers that includes the mobile internet traffic originating from a user’s device. TruFactor derives the projected counts from observed counts that are filtered for exclusion and anonymized to make sure users cannot be re-identified. It extrapolates values from US Census data using a proprietary algorithm. For the avoidance of doubt, this data set does not include user-level data.

The following screenshot shows the schema for the mobile web session data set by HTTP host, session time, MB transferred, number of events, sessions, users, and dates.

TruFactor Daily Demographics by Mobile Web Session (US) — Trial

The TruFactor Daily Demographics by Mobile Web Session (US) — Trial data set includes aggregate demographics: a projected distribution of users per HTTP host as a view of the internet browsing behavior in the US. TruFactor generates the data from high-quality packet layer data sourced from mobile carriers that includes the mobile internet traffic originating from a user’s device. TruFactor derives the distribution from observed counts that are filtered for exclusion and anonymized to make sure users cannot be re-identified. It extrapolates values from US Census data using a proprietary algorithm. Demographics include gender, age range, ethnicity, and income range.

The following screenshot shows the partial schema for the demographics by web session data set. The full schema includes the following attributes: HTTP host, age ranges, genders, ethnicity, income ranges, and date.

Prerequisites

To complete this walkthrough successfully, you must have the following resources:

  • An AWS account.
  • Familiarity with AWS core services and concepts.
  • The ability to launch new resources in your account. Some resources may not be eligible for Free Tier usage and might incur costs.
  • Subscription to TruFactor’s Daily Mobile Web Session Index (US – Nationwide) – Trial and Daily Demographics by Mobile Web Session (US) – Trial data sets. For instructions on subscribing to a data set on AWS Data Exchange, see AWS Data Exchange – Find, Subscribe To, and Use Data Products.

Using AWS Data Exchange, Amazon S3, AWS Glue, Amazon Athena, and Amazon QuickSight

This section examines the key demographics of visitors to the top seven e-commerce websites. This information can help you understand which demographic groups are visiting your website most frequently and also help you target ads and cater to certain demographics groups. You use AWS Glue crawlers to crawl your data sets in Amazon S3, populate your AWS Glue Data Catalog, query the AWS Glue Data Catalog using Amazon Athena, and use Amazon QuickSight to visualize the queries.

Step 1: Exporting the data from AWS Data Exchange to Amazon S3

To export your TruFactor data set subscriptions into an Amazon S3 bucket, complete the following steps:

  1. Create an Amazon S3 bucket in your working account. For the purposes of our demo, we have named our S3 bucket trufactor-data-exchange-bucket.
  2. Create two folders within the S3 bucket: web_sess and demo_by_web_sess.

This post uses a trial data set with a sample of 14 days. A paid subscription to TruFactor’s Web Sessions data on AWS Data Exchange includes 6 months of historical data, which refreshes daily.

The following screenshot shows the two folders within the S3 bucket.You are now ready to export the data sets.

  1. On the AWS Data Exchange console, under Subscriptions, locate TruFactor Daily Mobile Web Sessions Index (US – Nationwide) – Trial.
  2. Under Revisions, choose the most recent Revision ID.
  3. Choose all assets except the manifest.json files.
  4. Choose Export to Amazon S3.
  5. In the window that opens, choose the S3 bucket and folder to export the product data into.
    • Export all the assets into the S3 bucket’s web_sess folder.
  6. Repeat the previous steps for the TruFactor Daily Demographics by Mobile Web Sessions (US) – Trial data set, with the following change:
    • Export the assets into the demo_by_web_sess folder in your S3 bucket.
  7. Check to make sure you successfully imported the TruFactor data sets in the Overview. The following screenshot shows that the data sets are partitioned into folders by date. Each folder contains Parquet files of web session data for each day.

Step 2: Populating your AWS Glue Data Catalog with the TruFactor data sets

Now that you have successfully exported the TruFactor data sets into an Amazon S3 bucket, you create and run an AWS Glue crawler to crawl your Amazon S3 bucket and populate the AWS Glue Data Catalog. Complete the following steps:

  1. On the AWS Glue console, under Data Catalog, choose Crawlers.
  2. Choose Add crawler.
  3. For Crawler name, enter a name; for example, trufactor-data-exchange-crawler.
  4. For Crawler source type, choose Data stores.
  5. Choose Next.
  6. For Choose a data store, choose S3.
  7. For Crawl data in, select Specified path in my account.
  8. For Include path, enter the path for the web_sess data set folder. The crawler points to the following path: s3://<trufactor-data-exchange-bucket>/web_sess.
  9. Choose Next.
  10. Select Yes to Add another data store.
  11. Choose Next.
  12. For Include path, enter the path for the demo_by_web_sess data set folder. The crawler points to the following path: s3://<trufactor-data-exchange-bucket>/demo_by_web_sess.
  13. Choose Next.
  14. In the Choose an IAM role section, select Create an IAM role. This is the role that the AWS Glue crawler and AWS Glue jobs use to access the Amazon S3 bucket and its content.
  15. For IAM role, enter the suffix demo-data-exchange.
  16. Choose Next.
  17. In the schedule section, leave the Frequency with the default Run on Demand.
  18. Choose Next.
  19. In the Output section, choose Add database.
  20. Enter a name for the database; for example, trufactor-db.
  21. Choose Next, then choose Finish.This database contains the tables that the crawler discovers and populates. With these data sets separated into different tables, you join and relationalize the data.
  1. In the Review all steps section, review the crawler settings and choose Finish.
  2. Under Data Catalog, choose Crawlers.
  3. Select the crawler you just created.
  4. Choose Run crawler.The AWS Glue crawler crawls the data sources and populates your AWS Glue Data Catalog. This process can take up to a few minutes. When the crawler is finished, you can see two tables added to your crawler details. See the following screenshot.You can now view your new tables.
  1. Under Databases, choose Tables.
  2. Choose your database.
  3. Choose View the tables. The table names correspond to the Amazon S3 folder directory you used to point your AWS Glue crawler. See the following screenshot.

Step 3: Querying the data using Amazon Athena

After you populate the AWS Glue Data Catalog with TruFactor’s Mobile Web Session and Demographics data, you can use Amazon Athena to run SQL queries and create views for visualization. Complete the following steps:

  1. On the Amazon Athena console, choose Query Editor.
  2. On the Database drop-down menu, choose the database you created.
  3. To preview one of the tables in Amazon Athena, choose Preview table.
    On the Results section, you should see 10 records from the web_sess table. See the following screenshot.In this next step, you run a query that creates a view of the Web Session Index and Demographics data across a group of e-commerce HTTP hosts. This is broken down by the percentage of users categorized by age and gender, number of users, MB transferred, and number of sessions ordered by date.
  4. Run the following SQL query in Amazon Athena:
    CREATE OR REPLACE VIEW e_commerce_web_sess_data AS 
    SELECT
      "date_parse"("a"."partition_0", '%Y%m%d') "date",
      "a"."http_host",
      "a"."users",
      "a"."mb_transferred",
      "a"."number_of_sessions",
      "b"."18_to_25",
      "b"."26_to_35",
      "b"."36_to_45",
      "b"."46_to_55",
      "b"."56_to_65",
      "b"."66_to_75",
      "b"."76_plus",
      "b"."male",
      "b"."female"
    FROM  
      ((
       SELECT
         "partition_0",
         "http_host",
         "users",
         "mb_transferred",
         "number_of_sessions"
       FROM
         "trufactor-db"."web_sess"
       WHERE ("http_host" IN ('www.amazon.com', 'www.walmart.com', 'www.ebay.com', 'www.aliexpress.com', 'www.etsy.com', 'www.rakuten.com', 'www.craigslist.com'))
    )  a
    LEFT JOIN (
       SELECT
         "http_host" "http_host_2",
         "partition_0" "partition_2",
         "age_ranges"."18_to_25",
         "age_ranges"."26_to_35",
         "age_ranges"."36_to_45",
         "age_ranges"."46_to_55",
         "age_ranges"."56_to_65",
         "age_ranges"."66_to_75",
         "age_ranges"."76_plus",
         "genders"."male",
         "genders"."female"
       FROM
         "trufactor-db"."demo_by_web_sess"
       WHERE ("http_host" IN ('www.amazon.com', 'www.walmart.com', 'www.ebay.com', 'www.aliexpress.com', 'www.etsy.com', 'www.rakuten.com', 'www.craigslist.com'))
    )  b ON (("a"."http_host" = "b"."http_host_2") AND ("a"."partition_0" = "b"."partition_2")))
    ORDER BY "date" ASC

  5. After you create the view, you can preview it by repeating the above steps for previewing a table. The following screenshot shows the results, which include the number of users, user percentages by age group and gender, and a list of e-commerce hosts listed by date.

Step 4: Visualizing with Amazon QuickSight

After you query your data sets in Amazon Athena, you can use Amazon QuickSight to visualize your results. You must first grant Amazon QuickSight access to the Amazon S3 bucket that holds the TruFactor data sets, which you can do through the Manage QuickSight setting on the Amazon QuickSight console. After you grant access to the Amazon S3 bucket, you visualize the tables and queries with Amazon QuickSight. Complete the following steps:

  1. In the Amazon QuickSight console, choose New Analysis.
  2. Choose New data set.
  3. Choose Athena as the data source.
  4. For Data source name, enter trufactor-data-exchange-source.
  5. From the drop-down menu, choose the database and view you created.
  6. Choose Directly query your data.
  7. Choose Visualize. Because TruFactor intelligence is application-ready, you can gain immediate insights by using Amazon Athena to query and Amazon QuickSight to visualize. This post includes visualizations of the data set for the first two weeks of October 2019. The following graph visualizes the number of users on different HTTP hosts.The following pie charts further filter the HTTP hosts by age range.The following bar chart offers another visualization of users by age range.You could add other fields such as income range, ethnicity, and gender.

Running AWS Glue Jobs and Amazon Forecast

This section discusses how to use AWS Glue jobs to query and export your data set for forecasting with Amazon Forecast. This walkthrough examines the amount of users’ visitation over 14 days across the top 50 HTTP hosts ranked by users’ visitation. From there, you forecast the users’ visitation for these HTTP hosts for the next three days.

Step 1: Creating and running an AWS Glue job

To create and run your AWS Glue job, complete the following steps:

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Name, enter a name for the AWS Glue job; for example, demo-glue-job.
  4. For Type and Glue version, keep the default values.
  5. For This job runs, select A new script to be authored by you.
  6. In the Security configuration, script libraries, and job parameters (optional) section, set the Maximum capacity cluster size to 2. This reduces the cost of running the AWS Glue job. By default, the cluster size is set to 10 Data Processing Units (DPU).
  7. Choose Next.
  8. In the Connections section, keep the default values.
  9. Choose Save job and edit script.
  10. Enter the following code in the script section, and replace YOUR_BUCKET_NAME on line 42 with the name of your bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.job import Job
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    db_name = "trufactor-db"
    tbl_name = "web_sess"
    
    web_sess_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name, transformation_ctx = "web_sess_dyf")
    web_sess_df = web_sess_dyf.toDF()
    web_sess_df.createOrReplaceTempView("webSessionTable")
    web_sess_sql_df = spark.sql("""
    SELECT to_date(partition_0, 'yyyyMMdd') AS date,
             http_host,
             users
    FROM 
        (SELECT partition_0,
             http_host,
             users,
             row_number()
            OVER ( PARTITION By partition_0
        ORDER BY users DESC ) AS rn
        FROM webSessionTable )
    WHERE rn<=50
    ORDER BY date""")
    
    web_sess_sql_df.coalesce(1).write.format("csv").option("header","false").save("s3://YOUR_BUCKET_NAME/amazon_forecast_demo/dataset/sampleset")
    job.commit()

    This code queries the top 50 HTTP hosts, ranked by users’ visitation during the first half of October and returns the users, date, and HTTP hosts columns. The query results upload to your Amazon S3 bucket in CSV format (you need the files in CSV to use Amazon Forecast).

  11. Choose Save and close the AWS Glue job screen.Before you can run the AWS Glue job, you need to modify the IAM role associated with AWS Glue. Currently, the IAM role only has permission to get and put objects in the directories you specified earlier. You need to update the IAM policy to allow permission to get and put objects in all subdirectories of the Amazon S3 bucket.
  12. On the IAM console, choose the role you used for this walkthrough: AWSGlueServiceRole-demo-data-exchange.
  13. In the Summary section for the IAM role, on the Permissions tab, choose the IAM policy associated with the Managed policy.
  14. Choose Edit policy.
  15. Change the view from Visual editor to JSON.
  16. Within this JSON object, under Resource, add another resource into the list of values. The following code is the updated IAM policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::trufactor-data-exchange-bucket/web_sess*",
                    "arn:aws:s3:::trufactor-data-exchange-bucket/demo_by_web_sess*",
                    "arn:aws:s3:::trufactor-data-exchange-bucket/*"
                ]
            }
        ]
    }

  17. Choose Review policy and Save changes.
  18. On the AWS Glue console, under ETL, choose Jobs. Select the job you created earlier.
  19. From the Action drop-down menu, choose Run job. On the History tab, you can see when the status changes to Succeeded. See the following screenshot.This job can take 15–20 minutes to complete.

Step 2: Creating a dataset group, training a predictor, and creating forecasts in Amazon Forecast

To create your dataset group, train a predictor, and create forecasts, complete the following steps:

  1. On the Amazon Forecast console, choose View dataset groups.
  2. Choose Create dataset group.
  3. For Dataset group name, enter a name; for example, users_visitation_sample_dataset_group.
  4. For Forecasting domain, choose Web traffic.
  5. Choose Next.
  6. On the Create target time series dataset page, for Dataset name, enter the name of your dataset; for example, users_visitation_sample_dataset.
  7. For Frequency of your data, choose 1 day.
  8. For Data schema, update the data schema JSON object with the following code:
    {
      "Attributes":[
        {
          "AttributeName": "timestamp",
          "AttributeType": "timestamp"
        },
        {
          "AttributeName": "item_id",
          "AttributeType": "string"
        },
        {
          "AttributeName": "value",
          "AttributeType": "float"
        }
      ]
    }

  9. Choose Next.
  10. On the Import target time series data page, for Dataset import name, enter your dataset name; for example, users_visitation_sample_dataset_import.
  11. For Timestamp format, enter yyyy-MM-dd.
  12. For IAM Role, create a new role and grant Amazon Forecast access to the S3 bucket that you are using for this demo.
  13. For Data Location, use the S3 path that you exported your CSV file to after the AWS Glue job: s3://<trufactor-data-exchange-bucket>/amazon_forecast_demo/dataset/sampleset.
  14. Review the settings for import target time series data and choose Start import.The process of importing the data can take approximately 10 minutes. When the status changes to Active, you can begin training a predictor.
  1. On the Dashboard page, choose Start next to Predictor training.
  2. On the Train predictor page, for Predictor name, enter a name for the predictor; for example, users_visitation_sample_dataset_predictor.
  3. For Forecast horizon, choose 3.
  4. For Forecast frequency, choose day.
  5. For Algorithm selection, select Manual. If you use the other algorithm option, AutoML, you allow Amazon Forecast to choose the right algorithm based on a pre-defined objective function, which is not necessary for this walkthrough.
  6. For Algorithm, choose Deep_AR_Plus (you use deep learning to forecast users’ visitation across 50 HTTP hosts).
  7. Leave all other options with the default values.
  8. Review the settings and choose Train predictor. The predictor training process can take 20–30 minutes. When the training completes, the status changes to Active. To evaluate the predictor’s (ML model) accuracy, Amazon Forecast splits the input time series data into two data sets: training and test. This process tests a predictive model on historical data and is called backtesting. When it splits the input time series data, it maintains the data’s order, which is crucial for time series data. After training the dataset, Amazon Forecast calculates the root mean square error (RSME) and weighted quantile losses to determine how well the predictor performed. For more detailed information about backtesting and predictor metrics, see Evaluating Predictor Accuracy. When the predictor is finished training, you can create a forecast.
  9. On the Dashboard page, under Generate forecasts, choose Start.
  10. For Forecast name, enter a forecast name; for example, users_visitation_sample_forecast.
  11. For Predictor, choose your trained predictor.
  12. For Forecast types, you can enter any values between 0.01 and 0.99 and the mean. These are percentage probabilities of satisfying the original demand. This post enters .50, .90, .99, mean.
  13. Choose Create a forecast.The forecast creation process can take 15–20 minutes.
  14. When the forecast is complete, choose Forecasts.
    You should see a single forecast. See the following screenshot.
    You can now export the generated forecast to a new folder within your existing Amazon S3 bucket for visualization with Amazon QuickSight.
  1. Choose the newly generated forecast.
  2. Under Exports, choose Create forecast export.
  3. For Export name, enter a name for the export; for example, users_visitation_sample_forecast_export.
  4. For Generated forecast, choose users_visitation_sample_forecast.
  5. For IAM Role, choose the role you created earlier.
  6. For S3 forecast export location, enter the S3 path to store the forecasts: s3://<trufactor-data-exchange-bucket>/amazon_forecast_demo/forecasts/sampleset.
  7. Choose Create forecast export.The exporting process can take up to 5 minutes. Alternatively, you can visualize the user visitation forecasts for the 50 HTTP hosts directly through the Amazon Forecast console or Query API.

Step 3: Querying a view using Amazon Athena and downloading the forecast file

Before you visualize users’ visitation forecast data, create a view in Amazon Athena for the top 50 HTTP hosts ranked by users’ visitation over 14 days. Complete the following steps:

  1. Run the following query in Amazon Athena:
    CREATE OR REPLACE VIEW "top_50_users" AS
    SELECT date_format(date_parse(partition_0,
             '%Y%m%d'),'%Y-%m-%d') AS "date", http_host, users
    FROM 
        (SELECT partition_0,
             http_host,
             users,
             row_number()
            OVER (PARTITION By partition_0
        ORDER BY  users DESC ) AS rn
        FROM "trufactor-db"."web_sess")
    WHERE rn<=50
    ORDER BY date

    The code queries the top 50 HTTP hosts ranked by users’ visitation sorted by date.

  2. In the Amazon S3 console, navigate to the S3 bucket and directory holding the files: s3://<trufactor-data-exchange-bucket>/amazon_forecast_demo/forecasts/sampleset. The following screenshot shows three different files inside the folder.
  1. Download the CSV file.

Step 4: Visualizing in Amazon QuickSight

To visualize the data in Amazon QuickSight, complete the following steps:

  1. On the Amazon QuickSight console, choose Manage data.
  2. Choose New data set.
  3. Choose Upload a file.
  4. Upload the CSV file that you downloaded.
  5. On the Confirm file upload settings page, choose Next.
  6. Choose Visualize.
  7. Return to the Amazon QuickSight console and choose Manage data.
  8. Choose New data set for the top 50 HTTP hosts view you queried earlier.
  9. On the Create a Data set page, find the data source you created earlier: trufactor-data-exchange-source.
  10. From the drop-down list, choose the database and view you created.
  11. Choose Directly query your data.
  12. Choose Visualize.
  13. On the new Amazon QuickSight analysis page, choose the pencil icon next to Data set.
  14. Choose Add data set.
  15. Choose the CSV file you uploaded.

You now have a single Amazon QuickSight analysis with multiple data sets to visualize.

The following graphs highlight the historical data for the users’ visitation across 50 HTTP hosts for the first two weeks of October and the mean forecast for users’ visitation for the next three days.

The following graphs highlight the historical data and forecasted P50, P90, and P99 quantile values for www.google.com.

Amazon Forecast makes it easier to get started with machine learning without having to create your own ML models from scratch. You can use this information to anticipate the web traffic for the upcoming week, which can aid in scaling your resources and applications accordingly.

Cleaning up

To avoid incurring future charges, delete the following resources that you created in this walkthrough:

  • The Amazon S3 bucket trufactor-data-exchange-bucket
  • The AWS Glue crawler trufactor-data-exchange-crawler
  • The AWS Glue job demo-glue-job
  • The AWS IAM role AWSGlueServiceRole-demo-data-exchange
  • The AWS Glue database trufactor-db
  • The Amazon QuickSight demo data sets and analysis
  • The following Amazon Forecast resources (in this order) for users_visitation_sample_dataset_group via the console:
    • Existing forecasts under Forecasts
    • Existing predictors under Predictors
    • Existing datasets under Datasets

Conclusion

This walkthrough detailed how to import a data set to Amazon S3 from AWS Data Exchange, use AWS Glue to run crawlers and an ETL job on the data, run SQL queries with Amazon Athena, create a time series forecast of the queried data with Amazon Forecast, and visualize the queried and forecasted data with Amazon QuickSight.

This post used TruFactor Intelligence-as-a-Service, one of the AWS Data Exchange launch partners, to power this walkthrough. TruFactor intelligence on AWS Data Exchange highlighted the ease of loading directly into Amazon S3 and layering advanced AWS services.

For more information about TruFactor and the AWS Data Exchange, see TruFactor on AWS Data Exchange on the TruFactor website. You can subscribe to TruFactor Intelligence directly on AWS Data Exchange or engage with TruFactor directly to identify the right offering from the larger product portfolio of anonymized consumer intelligence.


About the Authors

Jay Park is a solutions architect at AWS.

 

 

 

 

Ariana Rahgozar is a solutions architect at AWS.

 

 

 

 

Build a cloud-native network performance analytics solution on AWS for wireless service providers

Post Syndicated from Angelo Sampietro original https://aws.amazon.com/blogs/big-data/build-a-cloud-native-network-performance-analytics-solution-on-aws-for-wireless-service-providers/

This post demonstrates a serverless, cloud-based approach to building a network performance analytics solution using AWS services that can provide flexibility and performance while keeping costs under control with pay-per-use AWS services.

Without good network performance, you may struggle to face the challenges of real-time and low latency services and the increase of the total bandwidth your customers consume.

Considering the large volume of data that you need to ingest, store, and process every second for optimal performance monitoring, standard on-premises monitoring approaches are no longer efficient.

A cloud-native approach allows you to invest in the solutions that generate business value and move from the typical capital expenditure model to an operational expenditure model by avoiding upfront costs and over-provisioning of infrastructure.

Data and voice network complexity for mobile service providers

According to Cisco’s global mobile data traffic forecast, there will be 13.1 billion mobile-connected devices by 2023, and 1.4 billion of those will be 5G capable.

As a mobile service provider, you must understand how to perform accurate network planning and sizing for your access and core networks.

The increase in the global demand for network throughput and the number of services such as VoLTE, IoT, and video streaming on mobile networks is forcing mobile service providers to implement new architectures to match the desired quality of service (QoS).

Addressing optimal QoS when a multitude of services are running on a converged network is not an easy task. The workflow is complex, starting from collecting counters and statistics data from a multitude of different network elements to transforming the collected data into key performance indicators (KPIs) that you can link to the quality of one of the multiple services delivered over the network.

Modern mobile networks, with deployment of 4G, 5G, and IoT services, have given rise to an increased number of cells that are deployed on the territory, so you must collect counters and generate KPIs on thousands of different network elements.

Considering that every network element can generate a few thousand counters, the network performance system has to manage millions of measurements at every collection cycle.

This is difficult to manage at scale with on-premises deployments without a high-cost solution. Instead, you can use AWS services to design a modern network performance analytics solution that covers all the requirements of different departments of telecommunications service providers (TSPs).

Data and voice network architecture

The main problem that you may face as a service provider is the complexity of modern mobile networks, which originate from several evolutions of communication standards in the last few decades (from 2G to 5G for data core, from CS to VoLTE for voice core) and the hardware and capabilities of the network elements.

The following diagram illustrates a simplified schema of a mobile wireless network element currently deployed, where you can find:

  • Access network, with the network elements needed for the coverage from 2G to 5G
  • Core Network, that includes the network elements for all the functions needed to deliver the  services, the authentication and the database of all the users on the network
  • Services delivered by the network, that includes Voice (PSTN/PLMN), Internet (data service ) and Application Services

IoT network architecture and service differentiation

The IoT traffic paradigm is completely different from the traffic patterns of smartphone and tablets. These mobile devices normally open an always on session (PDP context or bearer) and generate a considerable amount of user plane traffic. An IoT device opens a session, sends few bytes, and closes the session to limit the power consumption and avoid allocating resources on cells when not necessary.

The following diagram depicts a real-world use case in which a service provider deploys IoT services, in most cases, IoT traffic is delivered to a different 4G core for the following reasons:

  • EPC optimization needed to manage IoT traffic
  • Avoiding IoT traffic managed by the same core network of customers’ traffic for security reasons
  • IoT devices generate huge volumes of control plane traffic that can impact customer performance if managed by the same network

This usage pattern requires a different traffic model and forces mobile service providers to perform firmware upgrades to the current cells or deploy new cells into the network. This has an impact on the QoS due to the increase in the number of counters and cells.

What is network performance, and why is it important?

You can use network performance monitoring systems to generate reports and insights and track network performance. Multiple organizations like Operations, Network Planning, and Engineering use these tools to view the overall quality of networks and services and control important KPIs like availability, response times, and download and upload speeds.

Network performance is strictly connected to the QoS that, by definition, is the mechanism to control the performance, reliability, and usability of a telecommunications service.

Pain points for service providers

Every vendor of network elements usually also provides the element managers for the hardware. These systems are proprietary, but most of the time, they comply with the standard provided by 3rd Generation Partnership Project (3GPP) for the XML file format of the performance measurement (PM) files to export the counters you need to measure network performance accurately.

Service providers normally also use multiple vendors for core and access network elements, so monitoring a non-heterogenous environment is one of the main challenges in today’s deployments.

Data types and 3GPP standards

Counters and statistics about the performance on a network element are exported in PM files. 3GPP standardized the format in a specific document, which you can download from the 3GPP website.

The network vendor is not guaranteed to follow this standard, but even if there are some customizations, the structure and format in most cases remain very similar to the open standard.

The following schema from 3GPP describes the XML format for PM that is exported from network elements.

The XML schema root element is “measCollectFile” and it has three child elements:

  1. “fileHeader”: contains information such as file sender and the beginning time of the measurements.
  2. “fileFooter”: contains the end time of the measurements.
  3. “measData”: contains all the measurements information such as measurement types and their values.

The most important tags are:

  • measInfo – Contains the family of the measurements, the granularity, and the counters list for each measValue
  • measValue – Contains multiple measResults with the results of the measurements

Building a network performance solution on AWS

In this section we describe a possible architecture that can be used to implement the solution on AWS following a serverless approach.

Prerequisites

To implement this solution, you must have the following prerequisites:

  • An AWS account in the same Region
  • The AdministratorAccess policy granted to your AWS account (for production, you should restrict access as needed)

This post uses the EU (Ireland) Region. However, you can choose another Region of your choice where the following services are available:

For more information about AWS Regions and where AWS services are available, see Region Table.

The following diagram illustrates the high-level, end-to-end solution and the AWS services it uses, the workflow begins with the ingestion of the files using SFTP or Kinesis Data Firehose, data is stored in S3 and processed with a Lambda function and Glue to create a data catalog. Data querying is done with Athena and the visualization in Quicksight.

Collecting data using Kinesis Data Firehose or AWS SFTP

For information about collecting PM files via element managers, see chapter 5 of Technical Specification 3GPP TS 31.432 on the 3GPP website.

Element managers act as collectors of XML measurement files that the network elements provide. They can send the files to an S3 bucket via AWS SFTP or Kinesis Data Firehose.

For the data transfer, you can refer to the following documentation about creating a Kinesis Firehose Delivery Stream and sending data to Kinesis Firehose Delivery Stream, or If you are planning to use SFTP transfer, you can read here how AWS Transfer for SFTP works.

Transforming data using a Lambda function

This post provides a Lambda function that is associated with the ObjectCreated event type of the destination S3 bucket rawxml prefix (s3://wireless-pm/rawxml). The function runs every time a new XML file is saved in this location.

The Lambda function is written in Python 3.7. You can download it from the GitHub repository for this blog post. The function uses a layer to resolve the dependency for the xmltodict library used in the code.

The following screenshot of the AWS Management Console shows some of the main properties of the Lambda function

  1. The Lambda name and its associated Lambda Layer.
  2. The S3 bucket trigger event on the ObjectCreated event.

The function transposes the XML files, converts them into CSV or JSON (depending on the value set in the function’s output_format environment variable), and formats the files with one record per measurement (measData), measure type, and value (measInfo). The technical specification 3GPP TS 32.435 document on the 3GPP website provides three example XML files in the Annex section.

The following code is an example A.1 XML file:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="MeasDataCollection.xsl"?>
<measCollecFile xmlns="http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec">
    <fileHeader fileFormatVersion="32.435 V7.0" vendorName="Company NN" dnPrefix="DC=a1.companyNN.com,SubNetwork=1,IRPAgent=1">
        <fileSender localDn="SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1" elementType="RNC"/>
        <measCollec beginTime="2000-03-01T14:00:00+02:00"/>
    </fileHeader>
    <measData>
        <managedElement localDn="SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1" userLabel="RNC Telecomville"/>
        <measInfo>
            <job jobId="1231"/>
            <granPeriod duration="PT900S" endTime="2000-03-01T14:14:30+02:00"/>
            <repPeriod duration="PT1800S"/>
            <measTypes>attTCHSeizures succTCHSeizures attImmediateAssignProcs succImmediateAssignProcs</measTypes>
            <measValue measObjLdn="RncFunction=RF-1,UtranCell=Gbg-997">
                <measResults>234 345 567 789</measResults>
            </measValue>
            <measValue measObjLdn="RncFunction=RF-1,UtranCell=Gbg-998">
                <measResults>890 901 123 234</measResults>
            </measValue>
            <measValue measObjLdn="RncFunction=RF-1,UtranCell=Gbg-999">
                <measResults>456 567 678 789</measResults>
                <suspect>true</suspect>
            </measValue>
        </measInfo>
    </measData>
    <fileFooter>
        <measCollec endTime="2000-03-01T14:15:00+02:00"/>
    </fileFooter>
</measCollecFile>

The Lambda function transforms the preceding file into the following JSON format (this code example only shows the first record of the transformed and transposed dataset):

{
    "fh_file_format_version": "32.435 V7.0",
    "fh_vendor_name": "Company NN",
    "fh_dn_prefix": "DC=a1.companyNN.com,SubNetwork=1,IRPAgent=1",
    "fh_fs_local_dn": "SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
    "fh_fs_element_type": "RNC",
    "fh_mc_begin_time": "2000-03-01T14:00:00+02:00",
    "ff_mc_end_time": "2000-03-01T14:15:00+02:00",
    "md_me_local_dn": "SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
    "md_me_user_label": "RNC Telecomville",
    "md_me_sw_version": "",
    "md_mi_meas_info_id": "",
    "md_mi_job_jobid": "1231",
    "md_mi_gp_duration": "PT900S",
    "md_mi_gp_end_time": "2000-03-01T14:14:30+02:00",
    "md_mi_rp_duration": "PT1800S",
    "md_mi_meas_obj_ldn": "RncFunction=RF-1,UtranCell=Gbg-997",
    "md_mi_meas_name": "attTCHSeizures",
    "md_mi_meas_value": 234,
    "md_mi_meas_p": null,
    "md_mi_meas_suspect": null
}

You can change the output field names in the get_record_header static method of the GPPXml class defined in the Lambda function. The fields md_mi_meas_name and md_mi_meas_value contain the measure name and measure value, respectively.

The transformed CSV and JSON files are saved in the raw_transform_csv and raw_transform_json prefixes, respectively, in the S3 bucket (only one format is created for each execution, depending on the value of the output_format environment variable). The following screenshot shows the S3 bucket overview on the Amazon S3 console.

For this use case, a Lambda function is triggered for the transformation task every time a new XML file arrives. Depending on the volume and size of the XML files, you can choose other compute options on AWS based on the right fit, such as in containers using AWS Batch, Amazon ECS, or Amazon EKS. For more information about configuring and running processes using Amazon ECS, see Building, deploying, and operating containerized applications with AWS Fargate.

Alternatively, you could ingest the raw XML file using a Kinesis Firehose stream with a Lambda function attached to it to transform the data to JSON and output directly to Amazon S3 in Parquet or ORC format using the record format conversion feature of Kinesis Firehose. You would need to predefine a table in the Data Catalog whose schema, serializer, and deserializer you use to convert the data.

Building a Data Catalog using AWS Glue

To create an AWS Glue table in the Data Catalog, first create an AWS Glue crawler. Complete the following steps:

  1. Select AWS Glue from the services in the AWS Console.
  2. Select Crawlers in the left side menu.
  3. Click the button “Add Crawlers”.
  4. For Crawler name, enter a name for your crawler; for example, Wireless_PM_crawler.
  5. Choose Next.
  6. For Choose a data store, choose S3.
  7. For Crawl data in, select Specified path in my account.
  8. For Include path, enter the path to the input files in CSV format.
  9. Choose Next.
  10. In the Choose an IAM role section, select Choose an existing IAM role.
  11. For IAM role, enter the name of your role to grant read permission to access the S3 bucket.
  12. Choose Next.The following code is the AWSGlueServiceRole-S3 IAM policy for this post:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::npm-blog/raw_transform_csv/Parquet*",
                    "arn:aws:s3:::npm-blog/raw_transform_csv/*"
                ]
            }
        ]
    }

  13. In the Choose the crawler’s output section, for Database, choose the database in which the Data Catalog table is created; for example, wireless_pm.
  14. Choose Next.In these next steps, you create an ETL job that converts from CSV to Parquet (a columnar file format) and writes the Parquet files to an S3 bucket. Parquet file format helps with performance of the downstream consumption of the data. To convert to Parquet, you can either use an AWS Glue generated script (which this post uses) or use your own PySpark or Scala script.
  15. In the Configure the job properties section, for Name, enter wireless_pm_parquet_conversion.
  16. For IAM role, enter AWSGlueServiceRole-wirelesspm
  17. For Type, choose Spark.
  18. For Glue version, choose Spark 2.4, Python 3 (Glue Version 1.0).
  19. For This job runs, select A proposed script generated by AWS Glue.
  20. For Script file name, enter wireless_pm_parquet_conversion.
  21. In the Choose a data target section, select Create tables in your data target.
  22. For Data store, choose Amazon S3.
  23. For Format, choose Parquet.
  24. For Target path, enter the path to your S3 bucket.
  25. Choose Next.

You can configure a trigger to start the job by creating a workflow in AWS Glue that runs the job after the crawler. When the job is complete, a new set of Parquet files are stored in the destination S3 bucket.

You can analyze the processed data directly with Athena by selecting the database created in AWS Glue, and analyzing the data from the AWS Glue table.

The following screenshot shows the AWS Glue database and the two tables created.

The following screenshot shows an example Athena query, which you can run by selecting a few columns to validate the data from the Data Catalog table you created.

You can also use other AWS services for analytics such as Amazon Redshift or Amazon EMR for further processing, analysis, and KPI calculations from the data.

Visualizing data using Amazon QuickSight

Amazon QuickSight is a business analytics service you can use to build visualizations, perform ad hoc analysis, and get business insights from your data. For more information, see What Is Amazon QuickSight?

Amazon QuickSight provides out-of-the-box integration with Athena, which lets you run SQL queries on top of the metadata in your Data Catalog. For more information, see Creating a Data Set Using Amazon Athena Data.

The following screenshot shows an Amazon QuickSight analysis created from an Amazon QuickSight dataset. The dataset is based on the table that AWS Glue defined, and you query it via Athena.

You can create a visualization by loading the example dataset of PM counters. For example, the following screenshot shows the measurement values for the GPRS SuccessAttach and AbortedAttach and highlights a problem.

Summary

This post discussed the main pain points for wireless service providers and how AWS services can help you build a serverless solution that scales according to your network growth with no upfront costs.

This solution also helps you visualize and analyze data. Additionally, it provided insights that can help operations and network planning departments manage and evolve their network according to current and future standards and services.

As always, AWS welcomes feedback. Please submit comments or questions in the comments section.

 


About the Authors

Angelo Sampietro is a senior partner solutions architect at Amazon Web Services. Angelo has a strong background in cloud computing, with 20 years of experience in the Telecommunications industry, working in the United States and Europe. With 5 AWS Certifications, including Big Data and Machine Learning Specialties, he currently works as Senior Partner Solutions Architect, helping Global System Integrators to be successful in the partnership with AWS. He loves to share new ideas with colleagues and friends and propose new solutions thinking out of the box.

 

 

Francesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

 

 

 

A public data lake for analysis of COVID-19 data

Post Syndicated from AWS Data Lake Team original https://aws.amazon.com/blogs/big-data/a-public-data-lake-for-analysis-of-covid-19-data/

As the COVID-19 pandemic continues to threaten and take lives around the world, we must work together across organizations and scientific disciplines to fight this disease. Innumerable healthcare workers, medical researchers, scientists, and public health officials are already on the front lines caring for patients, searching for therapies, educating the public, and helping to set policy. At AWS, we believe that one way we can help is to provide these experts with the data and tools needed to better understand, track, plan for, and eventually contain and neutralize the virus that causes COVID-19.

Today, we are making a public AWS COVID-19 data lake available – a centralized repository of up-to-date and curated datasets on or related to the spread and characteristics of the novel corona virus (SARS-CoV-2) and its associated illness, COVID-19. Globally, there are several efforts underway to gather this data, and we are working with partners to make this crucial data freely available and keep it up-to-date. Hosted on the AWS cloud, we have seeded our curated data lake with COVID-19 case tracking data from Johns Hopkins and The New York Times, hospital bed availability from Definitive Healthcare, and over 45,000 research articles about COVID-19 and related coronaviruses from the Allen Institute for AI. We will regularly add to this data lake as other reliable sources make their data publicly available.

The breakthroughs that can win the battle against this disease arrive faster when it’s easy for everyone to access and experiment with this vital information. The AWS COVID-19 data lake allows experimenters to quickly run analyses on the data in place without wasting time extracting and wrangling data from all the available data sources. They can use AWS or third-party tools to perform trend analysis, do keyword search, perform question/answer analysis, build and run machine learning models, or run custom analyses to meet their specific needs. Since every stakeholder in this battle brings their own perspective, users can choose to work with the public data lake, combine it with their own data, or subscribe to the source datasets directly through AWS Data Exchange.

We imagine local health authorities could build dashboards to track infections and collaborate to efficiently deploy vital resources like hospital beds and ventilators. Or epidemiologists could complement their own models and datasets to generate better forecasts of hotspots and trends.

For example, at Chan Zuckerberg Biohub, a nonprofit where leaders in science and technology collaborate to cure, prevent, or manage disease, scientists are using the AWS COVID-19 data lake for new epidemiological insights. “Our team of researchers is now analyzing trends in disease spread, its geography, and time evolution by leveraging datasets from the AWS COVID-19 data lake, combined with our own data, in order to better predict COVID epidemiology,” said Jim Karkanias, Vice President of Data Science and Information Technology at Chan Zuckerberg Biohub.

This post walks you through examples of how to use the AWS COVID-19 data lake for analysis. This data lake is comprised of data in a publicly readable Amazon S3 bucket (s3://covid19-lake). The post shows how to set up the definitions for that data in an AWS Glue Data Catalog to expose it to analytics engines. You can then query the AWS COVID-19 data lake with Amazon Athena, a serverless SQL query engine.

Prerequisites

This post assumes you have the following:

  • Access to an AWS account
  • Permissions to create an AWS CloudFormation stack
  • Permissions to create AWS Glue resources (catalog databases and tables)

Configuring access to the data using a CloudFormation template

To make the data from the AWS COVID-19 data lake available in the Data Catalog in your AWS account, create a CloudFormation stack using the following template. If you are signed in to your AWS account, the following link fills out most of the stack creation form for you. All you need to do is choose Create stack. For instructions on creating a CloudFormation stack, see Get Started in the Cloud Formation documentation.

This template creates a covid-19 database in your Data Catalog and tables that point to the public AWS COVID-19 data lake. You do not need to host the data in your account, and you can rely on AWS to refresh the data as datasets are updated through AWS Data Exchange.

Exploring the data through the Data Catalog in your AWS account

When the CloudFormation stack shows a status of CREATE_COMPLETE, access the Glue Data Catalog to see the tables that the template created. You should see the following tables:

  • Global Coronavirus (COVID-19) Data – Tracks confirmed COVID-19 cases in provinces, states, and countries across the world with a breakdown to the county level in the US.

 

Table NameDescriptionSourceProvider
enigma_jhuConfirmed COVID-19 casesJohns HopkinsEnigma

 

 

Table NameDescriptionSourceProvider
nytimes_statesData on COVID-19 cases at US state levelNY TimesRearc
nytimes_countiesData on COVID-19 cases at US county level

 

 

Table NameDescriptionSourceProvider
covid_testing_states_dailyUSA total test daily trend by stateCOVID Tracking ProjectRearc
covid_testing_us_dailyUSA total test daily trend
covid_testing_us_totalUSA total tests

 

 

Table NameDescriptionSourceProvider
hospital_bedsHospital beds and their utilization in the USDefinitive HealthcareRearc

 

 

Table NameDescriptionSource/Provider
alleninstitute_metadataMetadata on papers pulled from the CORD-19 dataset. The sha column indicates the paper ID, which is the file name of the paper in the data lake.Allen Institute for AI
alleninstitute_comprehend_medicalResults from Amazon Comprehend Medical run against the CORD-19 dataset.

 

  • Lookup tables to support visualizations.

 

Table NameDescription
country_codesLookup table for country codes
county_populationsLookup table for the population for each county based on recent census data
us_state_abbreviationsLookup table for US state abbreviations

In addition, you can see descriptions of the columns in these tables. For example, the following screenshot shows the metadata of the table containing COVID-19 cases from Johns Hopkins.

Querying data via Amazon Athena

This section demonstrates how to query these tables using Athena. Athena is a serverless interactive query service that makes it easy to analyze the data in the AWS COVID19 data lake. Athena supports SQL, a common language that data analysts use for analyzing structured data. To query the data, complete the following steps:

  1. Sign in to the Athena console.

If this is the first time you are using Athena, you must specify a query result location on Amazon S3.

  1. From the drop-down menu, choose the covid-19 database.
  2. Enter your query.

The following query returns the growth of confirmed cases for the past 7 days joined side-by-side with hospital bed availability, broken down by US county:

SELECT 
  cases.fips, 
  admin2 as county, 
  province_state, 
  confirmed,
  growth_count, 
  sum(num_licensed_beds) as num_licensed_beds, 
  sum(num_staffed_beds) as num_staffed_beds, 
  sum(num_icu_beds) as num_icu_beds
FROM 
  "covid-19"."hospital_beds" beds, 
  ( SELECT 
      fips, 
      admin2, 
      province_state, 
      confirmed, 
      last_value(confirmed) over (partition by fips order by last_update) - first_value(confirmed) over (partition by fips order by last_update) as growth_count,
      first_value(last_update) over (partition by fips order by last_update desc) as most_recent,
      last_update
    FROM  
      "covid-19"."enigma_jhu" 
    WHERE 
      from_iso8601_timestamp(last_update) > now() - interval '7' day AND country_region = 'US') cases
WHERE 
  beds.fips = cases.fips AND last_update = most_recent
GROUP BY cases.fips, confirmed, growth_count, admin2, province_state
ORDER BY growth_count desc

The following screenshot shows the results of this query.

Athena also allows you to run these queries through REST APIs, for example, for building your own visualizations. Moreover, Athena is just one of the many engines that you can use on the data lake. For example, you can use Amazon Redshift Spectrum to join lake data with other datasets in your Redshift data warehouse, or use Amazon QuickSight to visualize your datasets.

We have also created a public Amazon QuickSight dashboard from the COVID-19 case tracking data, testing data, and hospital bed data. You can track daily updates with this dashboard. You can also drill-down to see breakdowns by country, province, and county without having to write a line of SQL. The following is a recent screenshot of the dashboard.

CORD-19 research articles

The CORD-19 dataset is a collection of metadata and full-text of research articles about COVID-19, SARS-CoV-2, and related coronaviruses. You can index this data with Amazon Kendra for question/answer exploration, or enrich the data with Amazon Comprehend Medical. We have already done the latter and put it in the table called alleninstitute_comprehend_medical.

The alleninsitute_metadata table provides detailed fields for each paper, such as the title, authors, journal, and URL. The alleninstitute_comprehend_medical table contains key medical concepts such as medical condition, medication, dosage, strength, and frequency. With this metadata, you can quickly query over concepts, analyze or aggregate over authors and journals, and locate papers.

Aggregating over journals

Using IL-6 inhibitors is a possible therapy for COVID-19, and clinical trials are underway. To demonstrate how to use these tables, this post presents a use case in which you want to understand which journals discuss IL-6 the most by counting the papers they published. You can do this by running the following query:

SELECT m.journal,
       count(distinct(cm.paper_id)) as paper_count
FROM "covid-19".alleninstitute_metadata m
JOIN "covid-19".alleninstitute_comprehend_medical cm
    ON (contains(split(m.sha, '; '), cm.paper_id))
WHERE contains(generic_name, 'IL-6')
GROUP BY  m.journal
ORDER BY paper_count desc

The following screenshot shows an example of the results. The data provider updates this dataset over time, so your results may look different (here, we notice that the second highest count has no journal information).

Drilling down into papers

To see the URLs and the titles of the papers in one of these journals, you simply query both these tables again. For example, to drill into IL-6 related papers in the Crit Care journal, enter the following query:

SELECT distinct m.url, m.title
FROM "covid-19".alleninstitute_metadata m
JOIN "covid-19".alleninstitute_comprehend_medical cm
    ON (contains(split(m.sha, '; '), cm.paper_id))
WHERE contains(generic_name, 'IL-6')
      AND m.journal = 'Crit Care'

The following screenshot shows an example of the results.

These examples are a few of the innumerable analyses you can run on the public data lake. You incur no additional cost for accessing the AWS COVID-19 data lake beyond the standard charges for the AWS services that you use. For example, if you use Athena, you will incur the costs for running queries and the data storage in the S3 query result location, but incur no costs for accessing the data lake. In addition, if you want this data in raw form, you can subscribe to, download, and stay up-to-date through AWS Data Exchange. We encourage you to try using the public AWS COVID-19 data lake yourself.

Conclusion

Combining our efforts across organizations and scientific disciplines can help us win the fight against the COVID-19 pandemic. With the AWS COVID-19 data lake, anyone can experiment with and analyze curated data related to the disease, as well as share their own data and results. We believe that through an open and collaborative effort that combines data, technology, and science, we can inspire insights and foster breakthroughs necessary to contain, curtail, and ultimately cure COVID-19.

For daily updates on how AWS is addressing the crisis, see Amazon’s COVID-19 blog.

 


About the Authors

The AWS Data Lake Team members are Roy Ben-Alta, Jason Berkowitz, Chris Casey, Patrick Combes, Lucy Friedmann, Fred Lee, Megan Maxwell, Rourke McNamara, Herain Oberoi, Stephen Orban, Brian Ross, Nikki Rouda, Noah Schwartz, Noritaka Sekiyama, Mehul A. Shah, Ben Snively, and Ying Wang.

Load data incrementally and optimized Parquet writer with AWS Glue

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/load-data-incrementally-and-optimized-parquet-writer-with-aws-glue/

AWS Glue provides a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs. The first post of the series, Best practices to scale Apache Spark jobs and partition data with AWS Glue, discusses best practices to help developers of Apache Spark applications and Glue ETL jobs, big data architects, data engineers, and business analysts scale their data processing jobs running on AWS Glue automatically.

This post shows how to incrementally load data from data sources in an Amazon S3 data lake and databases using JDBC. It also shows how to scale AWS Glue ETL jobs by reading only newly added data using job bookmarks, and processing late-arriving data by resetting the job bookmark to the end of a prior job run. The post also reviews best practices using job bookmarks with complex AWS Glue ETL scripts and workloads.

Finally, the post shows how to use the custom AWS Glue Parquet writer optimized for performance by avoiding extra passes of the data and computing the schema at runtime. The AWS Glue Parquet writer also allows schema evolution in datasets with the addition or deletion of columns.

AWS Glue job bookmarks

AWS Glue’s Spark runtime has a mechanism to store state. This mechanism is used to track data processed by a particular run of an ETL job. The persisted state information is called job bookmark.

The snapshot above shows a view of the Glue Console with multiple job runs at different time instances of the same ETL job. Job bookmarks are used by AWS Glue jobs to process incremental data since the last job run. A job bookmark is composed of the states of various job elements, such as sources, transformations, and targets. For example, your AWS Glue job might read new partitions in an S3-backed table. AWS Glue tracks the partitions that the job has processed successfully to prevent duplicate processing and writing the same data to the target data store multiple times.

Job bookmark APIs

When using the AWS Glue console or the AWS Glue API to start a job, a job bookmark option is passed as a parameter.

There are three possible options:

  • Enable – This option causes the job to update the bookmark state after each successful run to keep track of processed data. Subsequent job run on the same data source only process newly added data since the last checkpoint.
  • Disable – Makes sure that job bookmarks are not used that can result in the job always processing the entire dataset. This is the default option.
  • Pause – Reads the state information and processes incremental data since the last checkpoint, but does not update it. You can use this option so that every subsequent run processes data from the same point in time.

In all cases, you are responsible for managing the output from previous job. For more information, see the first post in this series, Best practices to scale Apache Spark jobs and partition data with AWS Glue. For details about the parameters passed to a job, and specifically for a job bookmark, see Special Parameters Used by AWS Glue.

The following code example shows how to use job bookmarks in a Glue ETL job that reads from a AWS Glue table backed by a Amazon S3 location. The job receives new files from a Kinesis Firehose event stream in JSON format, transforms to rename two columns, converts and writes it out to Amazon Redshift.  transformation_ctx is the identifier for the job bookmark associated with this data source. For proper operation, you need job.init and job.commit to initialize and persist the bookmark state.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "firehose_s3_db",
                table_name = "firehose_s3_raw_table",
                transformation_ctx = "datasource0")
applymapping = ApplyMapping.apply(frame = datasource0, 
                mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")],
                transformation_ctx = "applymapping1")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping, catalog_connection = "redshift", connection_options = {"dbtable": "name", "database": "kinesis_db"}, redshift_tmp_dir= "s3://redshift_tmp_dir_path")

job.commit()

When using the APIs or CLI to start a job run, you need to add the following arguments to enable the job bookmark:

Job Arguments :

--job-bookmark-option, job-bookmark-enable
--JOB_NAME, glue-job-incremental

For S3 input sources, AWS Glue job bookmarks check the last modified time of the objects to verify which objects to reprocess. If there are new files arriving from Kinesis firehose, or existing files changed, since your last job run, the files are reprocessed when the job is run again using a periodic Glue job trigger or S3 trigger notification.

If you intend to reprocess all the data using the same job, reset the job bookmark. To reset the job bookmark state, use the AWS Glue console, the ResetJobBookmark Action (Python: reset_job_bookmark) API operation, or the AWS CLI. For example, enter the following command using the AWS CLI:

aws glue reset-job-bookmark --job-name my-job-name

You can also use the ResetJobBookmark API to a specific point for scheduled job runs by passing in the job run ID. It resets the state of the job bookmark to that of after the job run ID when it is complete. This functionality is similar to time travel; for example, you can now reprocess input data from a time in the past and use a different set of transformations in your ETL script or downstream jobs orchestrated with AWS Glue workflows in the ETL pipeline. From the AWS Glue Console, you can use the Rewind job bookmark option to reset the job bookmark state to the commit of a previous job run.

AWS Glue keeps track of bookmarks for each job. If you delete a job, you also delete the job bookmark. Popular S3-based storage formats, including JSON, CSV, Apache Avro, XML, and JDBC sources, support job bookmarks. Starting with AWS Glue version 1.0, columnar storage formats such as Apache Parquet and ORC are also supported.

Best practices 1: Development with job bookmarks

In some cases, you might enable AWS Glue job bookmarks but your AWS Glue job reprocesses data that it already processed in an earlier run. That could happen because of the following reasons:

  • Missing job commit – The job.commit() statement at the end of your AWS Glue ETL script updates the state of the job bookmark. If you don’t include it, the job reprocesses both the previously processed and new files. Make sure that the job commit statement is executed by all possible code paths in your user script leading to job completion.
  • Missing transformation context – Transformation context is an optional parameter in the GlueContext However, job bookmarks need it to function correctly. Confirm that you include the transformation context parameter when creating the DynamicFrame. See the following code example:
    sample_dynF=glueContext.create_dynamic_frame_from_catalog(database, 
                table_name,
                transformation_ctx="sample_dynF") 

  • JDBC sources – Job bookmarks require source tables to either have a primary key column[s] or a column[s] with incrementing values, which need to be specified in the source options, when you access relational databases using a JDBC connection. Job bookmarks can capture only newly added rows. This behavior does not apply to source tables stored on S3.
  • Last modified time – To identify which files stored on S3 to process, job bookmarks check the last modified time of the objects, not the file names. If your input objects changed since the last time the job ran, then they are reprocessed when the job runs again.

Best practices 2: Monitoring job bookmarks

There are three ways to inspect the behavior of job bookmarks for any job run:

  • File list store in tmp directory – All AWS Glue ETL jobs running Apache Spark and using DynamicFrames to read data output a manifest file containing a list of processed files per path. The manifest file is stored in the temporary location specified with the job. The path of the file is :<temporary location>/partitionlisting/<job name>/<run id>/<source transformation_ctx>.input-files.jsonThis file captures the list of files read for the corresponding data source regardless of any enabled job bookmarks.
  • Job metrics – You can use the AWS Glue job metrics to inspect the S3 read and write operations and track the number of bytes read by the job using bookmarks. You can also track the data a job reads across its multiple runs in the AWS Glue console. For more information, see Monitoring the Progress of Multiple Jobs.
  • Glue job logs – An AWS Glue job also emits logs in the Spark driver log stream related to processing and skipping of partitions in S3. The logs are stored in Amazon CloudWatch.

Skipping partitions

A job skips partition when it is empty or the creation timestamp of that particular partition in the AWS Glue Data Catalog is older than the timestamp of the last job run as captured by the job bookmark. The following example log message indicates the skipped partition:

19/05/21 14:49:22 WARN HadoopDataSource: Skipping Partition
{"year": "2019", "month": "03", "day": "26", "hour": "13"}
has no new files detected 
@ s3://input-s3-prefix/Year=2019/Month=03/Day=26/Hour=13/ 
or path does not exist

Processing partitions

When a job finds a new S3 partition created after the last job run or that has new files to process, it generates a log message. Log messages also indicate the percentage of the total number of files in the particular partition. The initial and final job bookmark filters of the current job run process these files. The following example illustrates the job bookmark filtering logic.

If the partition is new (created after the most recent job run based on the partition creation time), then the job processes all of the files in the partition. The partition creation time is 1559235148000, which is after the last job run. See the following example log message:

19/05/31 10:39:55 INFO PartitionFilesListerUsingBookmark:
Found new partition DynamicFramePartition([email protected],
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000) 
with 47 files

An existing partition triggers the first bookmark filter. This filter selects files with modification timestamps since the last job run. In the following example log message, 15 out of 47 files in the partition are new and should be processed:

19/05/31 10:40:31 INFO PartitionFilesListerUsingBookmark:
After initial job bookmarks filter, 
processing 31.91% of 47 files 
in partition DynamicFramePartition([email protected],
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

The final bookmark filter performs additional filtering to avoid race conditions related to S3’s eventual consistency. If a significantly large number of files arrive with the same modification time, this filter may exclude them from processing. In the following example log message, the filter processed all 15 files captured by the initial bookmark filter:

19/05/31 10:50:31 INFO PartitionFilesListerUsingBookmark:
After final job bookmarks filter, processing 100.00% of 15 files 
in partition DynamicFramePartition([email protected],
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

Optimized Apache Parquet writer

AWS Glue offers an optimized Apache Parquet writer when using DynamicFrames to improve performance. Apache Parquet format is generally faster for reads than writes because of its columnar storage layout and a pre-computed schema that is written with the data into the files. AWS Glue’s Parquet writer offers fast write performance and flexibility to handle evolving datasets. Unlike the default Apache Spark Parquet writer, it does not require a pre-computed schema or schema that is inferred by performing an extra scan of the input dataset.

You can enable the AWS Glue Parquet writer by setting the format parameter of the write_dynamic_frame.from_options function to glueparquet. As data is streamed through an AWS Glue job for writing to S3, the optimized writer computes and merges the schema dynamically at runtime, which results in faster job runtimes. The AWS Glue Parquet writer also enables schema evolution by supporting the deletion and addition of new columns.

You can tune the AWS Glue Parquet writer further by setting the format_options parameters. See the following code example:

block_size = 128*1024*1024
page_size = 1024*1024
glueContext.write_dynamic_frame.from_options(frame = dyFrame, 
connection_type = "s3", connection_options = {"path": output_dir}, 
format = "glueparquet", 
format_options = {"compression": "snappy", 
                  blockSize = block_size, pageSize = page_size})

The default values for format_options are the following:

  • compression is “snappy”
  • blockSize is 128 MB
  • pageSize is 1 MB

The blockSize specifies the size of a row group in a Parquet file that is buffered in memory. The pageSize specifies the size of the smallest unit in a Parquet file that must be read fully to access a single record.

Conclusion

This post discussed how AWS Glue job bookmarks help incrementally process data collected from S3 and relational databases. You also learned how using job bookmarks can make backfilling historical data simple. Interacting with job bookmarks is easy; you can enable, disable, pause, and rewind them to a prior point in time. You can better tune your jobs and build checks to make sure all of the data is processed correctly by monitoring the progress and state of job bookmarks.

You can also use the AWS Glue Parquet writer to optimize the performance of writing Apache Parquet files in your data lake. The optimized writer enables schema evolution for Parquet files so you can manage changes to your data automatically.

We hope you try out these features to load and write your data in your Apache Spark applications on AWS Glue.

The third post in this series discusses how AWS Glue’s automatic code generation capability enables you to process and transform complex datasets with ease. The post also shows how to execute SQL queries on your datasets directly from your AWS Glue ETL script, and how to schedule and orchestrate data pipelines with AWS Glue workflows.

 


About the Authors

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

 

Bijay Bisht is a senior software development engineer at AWS.

 

 

Analyze your Amazon S3 spend using AWS Glue and Amazon Redshift

Post Syndicated from Shayon Sanyal original https://aws.amazon.com/blogs/big-data/analyze-your-amazon-s3-spend-using-aws-glue-and-amazon-redshift/

The AWS Cost & Usage Report (CUR) tracks your AWS usage and provides estimated charges associated with that usage. You can configure this report to present the data at hourly or daily intervals, and it is updated at least one time per day until it is finalized at the end of the billing period. The Cost & Usage Report is delivered automatically to an Amazon S3 bucket that you specify, and you can download it from there directly. You can also integrate the report into Amazon Redshift, query it with Amazon Athena, or upload it to Amazon QuickSight. For more information, see Query and Visualize AWS Cost and Usage Data Using Amazon Athena and Amazon QuickSight.

This post presents a solution that uses AWS Glue Data Catalog and Amazon Redshift to analyze S3 usage and spend by combining the AWS CUR, S3 inventory reports, and S3 server access logs.

Prerequisites

Before you begin, complete the following prerequisites:

  • You need an S3 bucket for your S3 inventory and server access log data files. For more information, see Create a Bucket and What is Amazon S3?
  • You must have the appropriate IAM permissions for Amazon Redshift to be able to access the S3 buckets – for this post, choose two non-restrictive IAM roles (AmazonS3FullAccess and AWSGlueConsoleFullAccess), but restrict your access accordingly for your own scenarios.

Amazon S3 inventory

Amazon S3 inventory is one of the tools S3 provides to help manage your storage. You can use it to audit and report on the replication and encryption status of your objects for business, compliance, and regulatory needs. Amazon S3 inventory provides comma-separated values (CSV), Apache optimized row columnar (ORC), or Apache Parquet output files that list your objects and their corresponding metadata on a daily or weekly basis for a given S3 bucket.

Amazon S3 server access logs

Server access logging provides detailed records for the requests you make to a bucket. Server access logs are useful for many applications, for example in security and access audits. It can also help you learn about your customer base and understand your S3 bill.

AWS Glue

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. AWS Glue consists of a central metadata repository known as the Data Catalog, a crawler to populate the Data Catalog with tables, an ETL engine that automatically generates Python or Scala code, and a flexible scheduler that handles dependency resolution, job monitoring, and retries. AWS Glue is serverless, so there’s no infrastructure to set up or manage. This post uses AWS Glue to catalog S3 inventory data and server access logs, which makes it available for you to query with Amazon Redshift Spectrum.

Amazon Redshift

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. You can use Amazon Redshift to efficiently query and retrieve structured and semi-structured data from files in S3 without having to load the data into Amazon Redshift native tables. You can create Amazon Redshift external tables by defining the structure for files and registering them as tables in the AWS Glue Data Catalog.

Setting up S3 inventory reports for analysis

This post uses the Parquet file format for its inventory reports and delivers the files daily to S3 buckets. You can select both the frequency of delivery and output file formats under Advanced settings as shown in the screenshot below:

For more information about configuring your S3 inventory, see How Do I Configure Amazon S3 Inventory?

The following diagram shows the data flow for this solution:

Below steps summarize the data flow diagram represented above:

  • S3 Inventory Reports are delivered to an S3 bucket that you configure.
  • The AWS Glue crawler then crawls this S3 bucket and populates the metadata in the AWS Glue Data Catalog.
  • The AWS Glue Data Catalog is then accessible through an external schema in Redshift.
  • The S3 Inventory Reports (available in the AWS Glue Data Catalog) and the Cost and Usage Reports (available in another S3 bucket) are now ready to be joined and queried for analysis.

The inventory reports are delivered to an S3 bucket. The following screenshot shows the S3 bucket structure for the S3 inventory reports:

There is a data folder in this bucket. This folder contains the Parquet data you want to analyze. The following screenshot shows the content of the folder.

Because these are daily files, there is one file per day.

Configuring an AWS Glue crawler

You can use an AWS Glue crawler to discover this dataset in your S3 bucket and create the table schemas in the Data Catalog. After you create these tables, you can query them directly from Amazon Redshift.

To configure your crawler to read S3 inventory files from your S3 bucket, complete the following steps:

  1. Choose a crawler name.
  2. Choose S3 as the data store and specify the S3 path up to the data
  3. Choose an IAM role to read data from S3 – AmazonS3FullAccess and AWSGlueConsoleFullAccess.
  4. Set a frequency schedule for the crawler to run.
  5. Configure the crawler’s output by selecting a database and adding a prefix (if any).

This post uses the database s3spendanalysis.

The following screenshot shows the completed crawler configuration.

Run this crawler to add tables to your Glue Data Catalog. After the crawler has completed successfully, go to the Tables section on your AWS Glue console to verify the table details and table metadata. The following screenshot shows the table details and table metadata after your AWS Glue crawler has completed successfully:

Creating an external schema

Before you can query the S3 inventory reports, you need to create an external schema (and subsequently, external tables) in Amazon Redshift. An Amazon Redshift external schema references an external database in an external data catalog. Because you are using an AWS Glue Data Catalog as your external catalog, after you create an external schema in Amazon Redshift, you can see all the external tables in your Data Catalog in Amazon Redshift. To create the external schema, enter the following code:

create external schema spectrum_schema from data catalog
database 's3spendanalysis'
iam_role 'arn:aws:iam::<AWS_IAM_ROLE>';

Querying the table

On the Amazon Redshift dashboard, under Query editor, you can see the data table. You can also query the svv_external_schemas system table to verify that your external schema has been created successfully. See the following screenshot.

You can now query the S3 inventory reports directly from Amazon Redshift without having to move the data into Amazon Redshift first. The following screenshot shows how to do this using the Query Editor in the Amazon Redshift console:

Setting up S3 server access logs for analysis

The following diagram shows the data flow for this solution.

Below steps summarize the data flow diagram represented above:

  • S3 Server Access Logs are delivered to an S3 bucket that you configure.
  • These server access logs are then directly accessible to be queried from Amazon Redshift (note that we’ll be using CREATE EXTERNAL TABLE in Redshift Spectrum for this purpose, explained below).
  • The S3 Server Access Logs and the Cost and Usage Reports (available in another S3 bucket) are now ready to be joined and queried for analysis.

The S3 server access logs are delivered to an S3 bucket. For more information about setting up server access logging, see Amazon S3 Server Access Logging.

The following screenshot shows the S3 bucket structure for the server access logs.

The server access log files consist of a sequence of new-line delimited log records. Each log record represents one request and consists of space-delimited fields. The following code is an example log record:

b8ad5f5cfd3c09418536b47b157851fb7bea4a00486471093a7d765e35a4f8ef s3spendanalysisblog [23/Sep/2018:22:10:52 +0000] 72.21.196.65 arn:aws:iam::<AWS Account #>:user/shayons D5633DAD1063C5CA REST.GET.LIFECYCLE - "GET /s3spendanalysisblog?lifecycle= HTTP/1.1" 404 NoSuchLifecycleConfiguration 332 - 105 - "-" "S3Console/0.4, aws-internal/3 aws-sdk-java/1.11.408 Linux/4.9.119-0.1.ac.277.71.329.metal1.x86_64 OpenJDK_64-Bit_Server_VM/25.181-b13 java/1.8.0_181" -

Creating an external table

You can define the S3 server access logs as an external table. Because you already have an external schema, create an external table using the following code. This post uses RegEx SerDe to create a table that allows you to correctly parse all the fields present in the S3 server access logs. See the following code:

CREATE EXTERNAL TABLE spectrum_schema.s3accesslogs(
BucketOwner                   varchar(256), 
Bucket                        varchar(256), 
RequestDateTime               varchar(256), 
RemoteIP                      varchar(256), 
Requester                     varchar(256), 
RequestID                     varchar(256), 
Operation                     varchar(256), 
Key                           varchar(256), 
RequestURI_operation          varchar(256),
RequestURI_key                varchar(256),
RequestURI_httpProtoversion   varchar(256),
HTTPstatus                    varchar(256), 
ErrorCode                     varchar(256), 
BytesSent                     varchar(256), 
ObjectSize                    varchar(256), 
TotalTime                     varchar(256), 
TurnAroundTime                varchar(256), 
Referrer                      varchar(256), 
UserAgent                     varchar(256), 
VersionId                     varchar(256))
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  'input.regex' = '([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \"([^ ]*) ([^ ]*) ([^ ]*)\" (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)'
  )
STORED AS TEXTFILE
LOCATION
  's3://s3spendanalysisblog/accesslogs/';

Validating the data

You can validate the external table data in Amazon Redshift. The following screenshot shows how to do this using the Query Editor in the Amazon Redshift console:

You are now ready to analyze the data.

Analyzing the data using Amazon Redshift

In this post, you have a CUR file per day in your S3 bucket. The files themselves are organized in a monthly hierarchy. See the following screenshot.

Each day’s file consists of the following files for CUR data:

  • myCURReport-1.csv.gz – A zipped file of the data itself
  • myCURReport-Manifest.json – A JSON file that contains the metadata for the file
  • myCURReport-RedshiftCommands.sql – Amazon Redshift table creation scripts and a COPY command to create the CUR table from a Redshift manifest file
  • myCURReport-RedshiftManifest.json – The Amazon Redshift manifest file to create the CUR table

Using Amazon Redshift is one of the many ways to carry out this analysis. Amazon Redshift is a fast, fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing Business Intelligence (BI) tools. Amazon Redshift gives you fast querying capabilities over structured data using familiar SQL-based clients and BI tools using standard ODBC and JDBC connections. Queries are distributed and parallelized across multiple physical resources.

You are now ready to run SQL queries with the Amazon Redshift SQL Query Editor. This post also uses the psql client tool, a terminal-based front end from PostgreSQL, to query the data in the cluster.

To query the data, complete the following steps:

  1. Create a custom schema to contain your tables for analysis. See the following code:
    create schema if not exists redshift_schema;

    You should create your table in a schema other than public to control user access to database objects.

  2. Create a CUR table for the latest month in Amazon Redshift using the CUR SQL file in S3. See the following code:
    create table redshift_schema.AWSBilling201910 (
    identity_LineItemId VARCHAR(256),
    identity_TimeInterval VARCHAR(100),
    bill_InvoiceId VARCHAR(100),
    bill_BillingEntity VARCHAR(10),
    bill_BillType VARCHAR(100),
    bill_PayerAccountId VARCHAR(100),
    bill_BillingPeriodStartDate TIMESTAMPTZ,
    bill_BillingPeriodEndDate TIMESTAMPTZ,
    lineItem_UsageAccountId VARCHAR(100),
    lineItem_LineItemType VARCHAR(100),
    lineItem_UsageStartDate TIMESTAMPTZ,
    lineItem_UsageEndDate TIMESTAMPTZ,
    lineItem_ProductCode VARCHAR(100),
    lineItem_UsageType VARCHAR(100),
    lineItem_Operation VARCHAR(100),
    lineItem_AvailabilityZone VARCHAR(100),
    lineItem_ResourceId VARCHAR(256),
    lineItem_UsageAmount DECIMAL(11,2),
    lineItem_NormalizationFactor VARCHAR(10),
    lineItem_NormalizedUsageAmount DECIMAL(11,2),
    lineItem_CurrencyCode VARCHAR(10),
    lineItem_UnblendedRate DECIMAL(11,2),
    lineItem_UnblendedCost DECIMAL(11,2),
    lineItem_BlendedRate DECIMAL(11,2),
    lineItem_BlendedCost DECIMAL(11,2),
    lineItem_LineItemDescription VARCHAR(100),
    lineItem_TaxType VARCHAR(100),
    lineItem_LegalEntity VARCHAR(100),
    product_ProductName VARCHAR(100),
    product_alarmType VARCHAR(100),
    product_automaticLabel VARCHAR(100),
    product_availability VARCHAR(100),
    product_availabilityZone VARCHAR(100),
    product_clockSpeed VARCHAR(100),
    product_currentGeneration VARCHAR(100),
    product_databaseEngine VARCHAR(100),
    product_dedicatedEbsThroughput VARCHAR(100),
    product_deploymentOption VARCHAR(100),
    product_durability VARCHAR(100),
    product_ecu VARCHAR(100),
    product_edition VARCHAR(100),
    product_engineCode VARCHAR(100),
    product_enhancedNetworkingSupported VARCHAR(100),
    product_eventType VARCHAR(100),
    product_feeCode VARCHAR(100),
    product_feeDescription VARCHAR(100),
    product_fromLocation VARCHAR(100),
    product_fromLocationType VARCHAR(100),
    product_gpu VARCHAR(100),
    product_gpuMemory VARCHAR(100),
    product_group VARCHAR(100),
    product_groupDescription VARCHAR(100),
    product_instanceFamily VARCHAR(100),
    product_instanceType VARCHAR(100),
    product_instanceTypeFamily VARCHAR(100),
    product_io VARCHAR(100),
    product_labelingTaskType VARCHAR(100),
    product_licenseModel VARCHAR(100),
    product_location VARCHAR(100),
    product_locationType VARCHAR(100),
    product_maxThroughputvolume VARCHAR(100),
    product_maxVolumeSize VARCHAR(100),
    product_memory VARCHAR(100),
    product_messageDeliveryFrequency VARCHAR(100),
    product_messageDeliveryOrder VARCHAR(100),
    product_minVolumeSize VARCHAR(100),
    product_networkPerformance VARCHAR(100),
    product_normalizationSizeFactor VARCHAR(100),
    product_operation VARCHAR(100),
    product_physicalCpu VARCHAR(100),
    product_physicalGpu VARCHAR(100),
    product_physicalProcessor VARCHAR(100),
    product_processorArchitecture VARCHAR(100),
    product_processorFeatures VARCHAR(100),
    product_productFamily VARCHAR(100),
    product_protocol VARCHAR(100),
    product_queueType VARCHAR(100),
    product_region VARCHAR(100),
    product_servicecode VARCHAR(100),
    product_servicename VARCHAR(100),
    product_sku VARCHAR(100),
    product_storage VARCHAR(100),
    product_storageClass VARCHAR(100),
    product_storageMedia VARCHAR(100),
    product_subscriptionType VARCHAR(100),
    product_toLocation VARCHAR(100),
    product_toLocationType VARCHAR(100),
    product_transferType VARCHAR(100),
    product_usageFamily VARCHAR(100),
    product_usagetype VARCHAR(100),
    product_vcpu VARCHAR(100),
    product_version VARCHAR(100),
    product_volumeType VARCHAR(100),
    product_workforceType VARCHAR(100),
    pricing_RateId VARCHAR(100),
    pricing_publicOnDemandCost DECIMAL(11,2),
    pricing_publicOnDemandRate DECIMAL(11,2),
    pricing_term VARCHAR(100),
    pricing_unit VARCHAR(100),
    reservation_AmortizedUpfrontCostForUsage DECIMAL(11,2),
    reservation_AmortizedUpfrontFeeForBillingPeriod DECIMAL(11,2),
    reservation_EffectiveCost DECIMAL(11,2),
    reservation_EndTime TIMESTAMPTZ,
    reservation_ModificationStatus VARCHAR(100),
    reservation_NormalizedUnitsPerReservation BIGINT,
    reservation_RecurringFeeForUsage DECIMAL(11,2),
    reservation_StartTime TIMESTAMPTZ,
    reservation_SubscriptionId VARCHAR(100),
    reservation_TotalReservedNormalizedUnits BIGINT,
    reservation_TotalReservedUnits BIGINT,
    reservation_UnitsPerReservation BIGINT,
    reservation_UnusedAmortizedUpfrontFeeForBillingPeriod DECIMAL(11,2),
    reservation_UnusedNormalizedUnitQuantity BIGINT,
    reservation_UnusedQuantity BIGINT,
    reservation_UnusedRecurringFee DECIMAL(11,2),
    reservation_UpfrontValue BIGINT
    );

  3. Load the data into Amazon Redshift for the latest month, using the provided CUR Manifest file. See the following code:
    copy AWSBilling201910 from 's3://ss-cur//myCURReport/20191001-20191101/fd76beee-0709-42d5-bcb2-bb45f8ba1aae/myCURReport-RedshiftManifest.json'
    credentials 'arn:aws:iam::<AWS_IAM_ROLE>'
    GZIP CSV IGNOREHEADER 1 TIMEFORMAT 'auto' manifest;

  4. Validate the data loaded in the Amazon Redshift table. See the following code:
    select * from AWSBilling201910
    where lineItem_ProductCode = 'AmazonS3'
    and lineItem_ResourceId = 's3spendanalysisblog' limit 10;

    The following screenshot shows that data has been loaded correctly in the Amazon Redshift table:

Managing database security

You can manage database security in Amazon Redshift by controlling which users have access to which database objects. To make sure your objects are secure, create two groups: FINANCE and ADMIN, with two users in FINANCE and one user in ADMIN. Complete the following steps:

  1. Create the groups where the user accounts are assigned. The following code creates two different user groups:
    create group finance;
    create group admin;

    To view all user groups, query the PG_GROUP system catalog table (you should see finance and admin here):

    select * from pg_group:

  2. Create three database users with different privileges and add them to the groups. See the following code:
    create user finance1 password 'finance1Pass'
    in group finance;
    
    create user finance2 password 'finance2Pass'
    in group finance;
    
    create user admin1 password 'admin1Pass'
    in group admin;

    Validate the users have been successfully created. To view a list of users, query the PG_USER catalog table:

  3. Grant SELECT privileges to the FINANCE group and ALL privileges to the ADMIN group for your table AWSBilling201910 in redshift_schema. See the following code:
    grant select on table redshift_schema.AWSBilling201910 to group finance; 
    grant all on table redshift_schema.AWSBilling201910 to group admin;

    You can verify if you enforced database security correctly. The user finance1 tried to rename the table AWSBilling201910 in redshift_schema, but got a permission denied error message (due to restricted access). The following screenshot shows this scenario and the subsequent error message:

Example S3 inventory analysis

S3 charges split per bucket. The following query identifies the data storage and transfer costs for each separate S3 bucket:

SELECT
  "lineitem_productcode",
  "lineitem_usagetype",
  "lineitem_resourceid",
  b."storage_class",
  SUM(CASE
    WHEN "lineitem_usagetype" like '%Byte%' THEN "lineitem_usageamount"/1024
    ELSE "lineitem_usageamount"
  END) as "Usage",
  CASE
    WHEN "lineitem_usagetype" like '%Byte%' THEN 'TBs'
    ELSE 'Requests'
  END as "Usage Units",
  sum("lineitem_blendedcost") as cost
from awsbilling201902 a
  join spectrum_schema.data b
    on a.lineItem_ResourceId = b.bucket
where "product_productname" = 'Amazon Simple Storage Service'
group by
  "lineitem_productcode",
  "lineitem_usagetype",
  "lineitem_resourceid",
  b."storage_class"
order by
  sum("lineitem_blendedcost") desc;

The following screenshot shows the results of executing the above query:

Costs are split by type of storage (for example, Glacier versus standard storage).

The following query identifies S3 data transfer costs (intra-region and inter-region) by S3 storage class (usage amount, unblended cost, blended cost):

SELECT
 lineitem_productcode
 ,product_fromlocation
 ,product_tolocation,
  b.storage_class
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  join spectrum_schema.data b
ON
    a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
 AND a.product_productfamily = 'Data Transfer'
GROUP BY
 1,2,3,4
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

The following query identifies S3 fee, API request, and storage charges:

SELECT
 lineitem_productcode
 ,product_productfamily
 ,b.storage_class
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  join spectrum_schema.data b
ON
   a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
  and a.product_productfamily <> 'Data Transfer'
GROUP BY
 1,2,3
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

Server access logs sample analysis queries

S3 access log charges per operation type. The following query identifies the data storage and transfer costs for each separate HTTP operation:

SELECT
  "lineitem_productcode",
  "lineitem_usagetype",
  "lineitem_resourceid",
  b."operation",
  b."httpstatus",
  b."bytessent",
  SUM(CASE
      WHEN "lineitem_usagetype" like '%Byte%'
        THEN "lineitem_usageamount" / 1024
      ELSE "lineitem_usageamount"
      END) as "Usage",
  CASE
  WHEN "lineitem_usagetype" like '%Byte%'
    THEN 'TBs'
  ELSE 'Requests'
  END  as "Usage Units",
  sum("lineitem_blendedcost") as cost
from awsbilling201902 a
  join spectrum_schema.s3accesslogs b
    on a.lineItem_ResourceId = b.bucket
where "product_productname" = 'Amazon Simple Storage Service'
group by
  1, 2, 3, 4, 5, 6
order by
  sum("lineitem_blendedcost") desc;

The following screenshot shows the result of executing the above query:

The following query identifies S3 data transfer costs (intra-region and inter-region) by S3 operation and HTTP status (usage amount, unblended cost, blended cost):

SELECT
 lineitem_productcode
 ,product_fromlocation
 ,product_tolocation
 ,b.operation
 ,b.httpstatus
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  JOIN spectrum_schema.s3accesslogs b
ON
   a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
 AND a.product_productfamily = 'Data Transfer'
GROUP BY
 1,2,3,4,5
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

The following query identifies S3 fee, API request, and storage charges:

SELECT
 lineitem_productcode
 ,product_productfamily
 ,b.operation
 ,b.httpstatus
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  JOIN spectrum_schema.s3accesslogs b
ON
   a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
  and a.product_productfamily <> 'Data Transfer'
GROUP BY
 1,2,3,4
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

Overall data flow diagram

The following diagram shows the complete data flow for this solution.

Conclusion

AWS Glue makes provides an easy and convenient way to discover data stored in your S3 buckets automatically in a cloud-native, secure, and efficient way. This post demonstrated how to use AWS Glue and Amazon Redshift to analyze your S3 spend using Cost and Usage Reports. You also learned best practices for managing database security in Amazon Redshift through users and groups. Using this framework, you can start analyzing your S3 bucket spend with a few clicks in a matter of minutes on the AWS Management Console!

If you have questions or suggestions, please leave your thoughts in the comments section below.

 


About the Author

 Shayon Sanyal is a Data Architect, Data Lake for Global Financial Services at AWS.

 

 

 

How FactSet automated exporting data from Amazon DynamoDB to Amazon S3 Parquet to build a data analytics platform

Post Syndicated from Arvind Godbole original https://aws.amazon.com/blogs/big-data/how-factset-automated-exporting-data-from-amazon-dynamodb-to-amazon-s3-parquet-to-build-a-data-analytics-platform/

This is a guest post by Arvind Godbole, Lead Software Engineer with FactSet and Tarik Makota, AWS Principal Solutions Architect. In their own words “FactSet creates flexible, open data and software solutions for tens of thousands of investment professionals around the world, which provides instant access to financial data and analytics that investors use to make crucial decisions. At FactSet, we are always working to improve the value that our products provide.”

One area that we’ve been looking into is the relevancy of search results for our clients. Given the wide variety of client use cases and the large number of searches per day, we needed a platform to store anonymized usage data and allow us to analyze that data to boost results using our custom scoring algorithm. Amazon EMR was the obvious choice to host the calculations, but the question arose on how to get our anonymized data into a form that Amazon EMR could use. We worked with AWS and chose to use Amazon DynamoDB to prepare the data for usage in Amazon EMR.

This post walks you through how FactSet takes data from a DynamoDB table and converts that data into Apache Parquet. We store the Parquet files in Amazon S3 to enable near real-time analysis with Amazon EMR. Along the way, we encountered challenges related to data type conversion, which we will explain and show how we were able to overcome these.

Workflow overview

Our workflow contained the following steps:

  1. Anonymized log data is stored into DynamoDB tables. These entries have different fields, depending on how the logs were generated. Whenever we create items in the tables, we use DynamoDB Streams to write out a record. The stream records contain information from a single item in a DynamoDB table.
  2. An AWS Lambda function is hooked into the DynamoDB stream to capture the new items stored in a DynamoDB table. We built our Lambda function off of the lambda-streams-to-firehose project on GitHub to convert the DynamoDB stream image to JSON, which we stringify and push to Amazon Kinesis Data Firehose.
  3. Kinesis Data Firehose transforms the JSON data into Parquet using data contained within an AWS Glue Data Catalog table.
  4. Kinesis Data Firehose stores the Parquet files in S3.
  5. An AWS Glue crawler discovers the schema of DynamoDB items and stores the associated metadata into the Data Catalog.

The following diagram illustrates this workflow.

AWS Glue provides tools to help with data preparation and analysis. A crawler can run on a DynamoDB table to take inventory of the table data and store that information in a Data Catalog. Other services can use the Data Catalog as an index to the location, schema, and types of the table data. There are other ways to add metadata into a Data Catalog, but the key idea is that you can update and modify the metadata easily. For more information, see Populating the AWS Glue Data Catalog.

Problem: Data type disparities

Using a variety of technologies to build a solution often requires mapping and converting data types between these technologies. The cloud is no exception. In our case, log items stored in DynamoDB contained attributes of type String Set. String Set values caused data conversion exceptions when Kinesis tried to transform the data to Parquet. After investigating the problem, we found the following:

  • As the crawler indexes the DynamoDB table, Set data types (StringSet, NumberSet) are stored in the Glue metadata catalog as set<string> and set<bigint>.
  • Kinesis Data Firehose uses that same catalog when it performs the conversion to Apache Parquet. The conversion requires valid Hive data types.
  • set<string> and set<bigint> are not valid Hive data types, so the conversion fails, and an exception is generated. The exception looks similar to the following code:
    [{
       "lastErrorCode": "DataFormatConversion.InvalidSchema",
       "lastErrorMessage": "The schema is invalid. Error parsing the schema: Error: type expected at the position 38 of 'array,used:bigint>>' but 'set' is found."
    }]

Solution: Construct data mapping

While working with the AWS team, we confirmed that the Kinesis Data Firehose converter needs valid Hive data types in the Data Catalog to succeed. When it comes to complex data types, Hive doesn’t support set<data_type>, but it does support the following:

  • ARRAY<data_type>
  • MAP<primitive_type, data_type
  • STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • UNIONTYPE<data_type, data_type, ...>

In our case, this meant that we must convert set<string> and set<bigint> into array<string> and array<bigint>. Our first step was to manually change the types directly in the Data Catalog. After we updated the Data Catalog to change all occurrences of set<data_type> to array<data_type>, the Kinesis transformation to Parquet completed successfully.

Our business case calls for a data store that can store items with different attributes in the same table and the addition of new attributes on-the-fly. We took advantage of DynamoDB’s schema-less nature and ability to scale up and down on-demand so we could focus on our functionality and not the management of the underlying infrastructure. For more information, see Should Your DynamoDB Table Be Normalized or Denormalized?

If our data had a static schema, a manual change would be good enough. Given our business case, a manual solution wasn’t going to scale. Every time we introduced new attributes to the DynamoDB table, we needed to run the crawler, which re-created the metadata and overwrote the change.

Serverless event architecture

To automate the data type updates to the Data Catalog, we used Amazon EventBridge and Lambda to implement the modifications to the data type mapping. EventBridge is a serverless event bus that connects applications using events. An event is a signal that a system’s state has changed, such as the status of a Data Catalog table.

The following diagram shows the previous workflow with the new architecture.

  1. The crawler stays as-is and crawls the DynamoDB table to obtain the metadata.
  2. The metadata obtained by the crawler is stored in the Data Catalog. Previous metadata is updated or removed, and changes (manual or automated) are overwritten.
  3. The event GlueTableChanged in EventBridge listens to any changes to the Data Catalog tables. After we receive the event that there was a change to the table, we trigger the Lambda function.
  4. The Lambda function uses AWS SDK to update the Glue Catalog table using the glue.update_table() API to replace occurrences of set<data_type> with array<data_type>.

To set up EventBridge, we set Event pattern to be “Pre-defined pattern by service”. For service provider, we selected AWS and Glue as service. Event Type we selected “Glue Data Catalog Table State Change”. The following screenshot shows the EventBridge configuration that sends events to the Lambda function that updates the Data Catalog.

The following is the baseline Lambda code:

# This is NOT production worthy code please modify and implement error handling routines as appropriate
import json
import logging
import boto3

glue = boto3.client('glue')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Define subsegments manually
def table_contains_set(databaseName, tableName):
    
    # returns Glue Catalog description for Table structure
    response = glue.get_table( DatabaseName=databaseName,Name=tableName)
    logger.info(response)  
    
    # loop thru all the Columns of the table 
    isModified = False
    for i in response['Table']['StorageDescriptor']['Columns']: 
        logger.info("## Column: " + str(i['Name']))
        # if Column datatype starts with set< then change it to array<
        if i['Type'].find("set<") != -1:
            i['Type'] = i['Type'].replace("set<", "array<")
            isModified = True
            logger.info(i['Type'])
    
    if isModified:
        # following 3 statements simply clean up the response JSON so that update_table API call works
        del response['Table']['DatabaseName']
        del response['Table']['CreateTime']
        del response['Table']['UpdateTime']
        glue.update_table(DatabaseName=databaseName,TableInput=response['Table'],SkipArchive=True)
        
    logger.info("============ ### =============") 
    logger.info(response)
    
    return True
    
def lambda_handler(event, context):
    logger.info('## EVENT')
    # logger.info(event)
    # This is Sample of the event payload that would be received
    # { 'version': '0', 
    #   'id': '2b402842-21f5-1d76-1a9a-c90076d1d7da', 
    #   'detail-type': 'Glue Data Catalog Table State Change', 
    #   'source': 'aws.glue', 
    #   'account': '1111111111', 
    #   'time': '2019-08-18T02:53:41Z', 
    #   'region': 'us-east-1', 
    #   'resources': ['arn:aws:glue:us-east-1:111111111:table/ddb-glue-fh/ddb_glu_fh_sample'], 
    #   'detail': {
    #           'databaseName': 'ddb-glue-fh', 
    #           'changedPartitions': [], 
    #           'typeOfChange': 'UpdateTable', 
    #           'tableName': 'ddb_glu_fh_sample'
    #    }
    # }
    
    # get the database and table name of the Glue table triggered the event
    databaseName = event['detail']['databaseName']
    tableName = event['detail']['tableName']
    logger.info("DB: " + databaseName + " | Table: " + tableName)
    
    table_contains_set(databaseName, tableName)
   
    # TODO implement and modify
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

The Lambda function is straightforward; this post provides a basic skeleton. You can use this as a template to implement your own functionality for your specific data.

Conclusion

Simple things such as data type conversion and mapping can create unexpected outcomes and challenges when data crosses service boundaries. One of the advantages of AWS is the wide variety of tools with which you can create robust and scalable solutions tailored to your needs. Using event-driven architecture, we solved our data type conversion errors and automated the process to eliminate the issue as we move forward.

 


About the Authors

Arvind Godbole is a Lead Software Engineer at FactSet Research Systems. He has experience in building high-performance, high-availability client facing products and services, ranging from real-time financial applications to search infrastructure. He is currently building an analytics platform to gain insights into client workflows. He holds a B.S. in Computer Engineering from the University of California, San Diego

 

 

 

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

 

New for Amazon Redshift – Data Lake Export and Federated Query

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-for-amazon-redshift-data-lake-export-and-federated-queries/

A data warehouse is a database optimized to analyze relational data coming from transactional systems and line of business applications. Amazon Redshift is a fast, fully managed data warehouse that makes it simple and cost-effective to analyze data using standard SQL and existing Business Intelligence (BI) tools.

To get information from unstructured data that would not fit in a data warehouse, you can build a data lake. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. With a data lake built on Amazon Simple Storage Service (S3), you can easily run big data analytics and use machine learning to gain insights from your semi-structured (such as JSON, XML) and unstructured datasets.

Today, we are launching two new features to help you improve the way you manage your data warehouse and integrate with a data lake:

  • Data Lake Export to unload data from a Redshift cluster to S3 in Apache Parquet format, an efficient open columnar storage format optimized for analytics.
  • Federated Query to be able, from a Redshift cluster, to query across data stored in the cluster, in your S3 data lake, and in one or more Amazon Relational Database Service (RDS) for PostgreSQL and Amazon Aurora PostgreSQL databases.

This architectural diagram gives a quick summary of how these features work and how they can be used together with other AWS services.

Let’s explain the interactions you see in the diagram better, starting from how you can use these features, and the advantages they provide.

Using Redshift Data Lake Export

You can now unload the result of a Redshift query to your S3 data lake in Apache Parquet format. The Parquet format is up to 2x faster to unload and consumes up to 6x less storage in S3, compared to text formats. This enables you to save data transformation and enrichment you have done in Redshift into your S3 data lake in an open format.

You can then analyze the data in your data lake with Redshift Spectrum, a feature of Redshift that allows you to query data directly from files on S3. Or you can use different tools such as Amazon Athena, Amazon EMR, or Amazon SageMaker.

To try this new feature, I create a new cluster from the Redshift console, and follow this tutorial to load sample data that keeps track of sales of musical events across different venues. I want to correlate this data with social media comments on the events stored in my data lake. To understand their relevance, each event should have a way of comparing its relative sales to other events.

Let’s build a query in Redshift to export the data to S3. My data is stored across multiple tables. I need to create a query that gives me a single view of what is going on with sales. I want to join the content of the  sales and date tables, adding information on the gross sales for an event (total_price in the query), and the percentile in terms of all time gross sales compared to all events.

To export the result of the query to S3 in Parquet format, I use the following SQL command:

UNLOAD ('SELECT sales.*, date.*, total_price, percentile
           FROM sales, date,
                (SELECT eventid, total_price, ntile(1000) over(order by total_price desc) / 10.0 as percentile
                   FROM (SELECT eventid, sum(pricepaid) total_price
                           FROM sales
                       GROUP BY eventid)) as percentile_events
          WHERE sales.dateid = date.dateid
            AND percentile_events.eventid = sales.eventid')
TO 's3://MY-BUCKET/DataLake/Sales/'
FORMAT AS PARQUET
CREDENTIALS 'aws_iam_role=arn:aws:iam::123412341234:role/myRedshiftRole';

To give Redshift write access to my S3 bucket, I am using an AWS Identity and Access Management (IAM) role. I can see the result of the UNLOAD command using the AWS Command Line Interface (CLI). As expected, the output of the query is exported using the Parquet columnar data format:

$ aws s3 ls s3://MY-BUCKET/DataLake/Sales/
2019-11-25 14:26:56 1638550 0000_part_00.parquet
2019-11-25 14:26:56 1635489 0001_part_00.parquet
2019-11-25 14:26:56 1624418 0002_part_00.parquet
2019-11-25 14:26:56 1646179 0003_part_00.parquet

To optimize access to data, I can specify one or more partition columns so that unloaded data is automatically partitioned into folders in my S3 bucket. For example, I can unload sales data partitioned by year, month, and day. This enables my queries to take advantage of partition pruning and skip scanning irrelevant partitions, improving query performance and minimizing cost.

To use partitioning, I need to add to the previous SQL command the PARTITION BY option, followed by the columns I want to use to partition the data in different directories. In my case, I want to partition the output based on the year and the calendar date (caldate in the query) of the sales.

UNLOAD ('SELECT sales.*, date.*, total_price, percentile
           FROM sales, date,
                (SELECT eventid, total_price, ntile(1000) over(order by total_price desc) / 10.0 as percentile
                   FROM (SELECT eventid, sum(pricepaid) total_price
                           FROM sales
                       GROUP BY eventid)) as percentile_events
          WHERE sales.dateid = date.dateid
            AND percentile_events.eventid = sales.eventid')
TO 's3://MY-BUCKET/DataLake/SalesPartitioned/'
FORMAT AS PARQUET
PARTITION BY (year, caldate)
CREDENTIALS 'aws_iam_role=arn:aws:iam::123412341234:role/myRedshiftRole';

This time, the output of the query is stored in multiple partitions. For example, here’s the content of a folder for a specific year and date:

$ aws s3 ls s3://MY-BUCKET/DataLake/SalesPartitioned/year=2008/caldate=2008-07-20/
2019-11-25 14:36:17 11940 0000_part_00.parquet
2019-11-25 14:36:17 11052 0001_part_00.parquet
2019-11-25 14:36:17 11138 0002_part_00.parquet
2019-11-25 14:36:18 12582 0003_part_00.parquet

Optionally, I can use AWS Glue to set up a Crawler that (on demand or on a schedule) looks for data in my S3 bucket to update the Glue Data Catalog. When the Data Catalog is updated, I can easily query the data using Redshift Spectrum, Athena, or EMR.

The sales data is now ready to be processed together with the unstructured and semi-structured  (JSON, XML, Parquet) data in my data lake. For example, I can now use Apache Spark with EMR, or any Sagemaker built-in algorithm to access the data and get new insights.

Using Redshift Federated Query
You can now also access data in RDS and Aurora PostgreSQL stores directly from your Redshift data warehouse. In this way, you can access data as soon as it is available. Straight from Redshift, you can now perform queries processing data in your data warehouse, transactional databases, and data lake, without requiring ETL jobs to transfer data to the data warehouse.

Redshift leverages its advanced optimization capabilities to push down and distribute a significant portion of the computation directly into the transactional databases, minimizing the amount of data moving over the network.

Using this syntax, you can add an external schema from an RDS or Aurora PostgreSQL database to a Redshift cluster:

CREATE EXTERNAL SCHEMA IF NOT EXISTS online_system
FROM POSTGRES
DATABASE 'online_sales_db' SCHEMA 'online_system'
URI ‘my-hostname' port 5432
IAM_ROLE 'iam-role-arn'
SECRET_ARN 'ssm-secret-arn';

Schema and port are optional here. Schema will default to public if left unspecified and default port for PostgreSQL databases is 5432. Redshift is using AWS Secrets Manager to manage the credentials to connect to the external databases.

With this command, all tables in the external schema are available and can be used by Redshift for any complex SQL query processing data in the cluster or, using Redshift Spectrum, in your S3 data lake.

Coming back to the sales data example I used before, I can now correlate the trends of my historical data of musical events with real-time sales. In this way, I can understand if an event is performing as expected or not, and calibrate my marketing activities without delays.

For example, after I define the online commerce database as the online_system external schema in my Redshift cluster, I can compare previous sales with what is in the online commerce system with this simple query:

SELECT eventid,
       sum(pricepaid) total_price,
       sum(online_pricepaid) online_total_price
  FROM sales, online_system.current_sales
 GROUP BY eventid
 WHERE eventid = online_eventid;

Redshift doesn’t import database or schema catalog in its entirety. When a query is run, it localizes the metadata for the Aurora and RDS tables (and views) that are part of the query. This localized metadata is then used for query compilation and plan generation.

Available Now
Amazon Redshift data lake export is a new tool to improve your data processing pipeline and is supported with Redshift release version 1.0.10480 or later. Refer to the AWS Region Table for Redshift availability, and check the version of your clusters.

The new federation capability in Amazon Redshift is released as a public preview and allows you to bring together data stored in Redshift, S3, and one or more RDS and Aurora PostgreSQL databases. When creating a cluster in the Amazon Redshift management console, you can pick three tracks for maintenance: Current, Trailing, or Preview. Within the Preview track, preview_features should be chosen to participate to the Federated Query public preview. For example:

These features simplify data processing and analytics, giving you more tools to react quickly, and a single point of view for your data. Let me know what you are going to use them for!

Danilo

Provisioning the Intuit Data Lake with Amazon EMR, Amazon SageMaker, and AWS Service Catalog

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/provisioning-the-intuit-data-lake-with-amazon-emr-amazon-sagemaker-and-aws-service-catalog/

This post shares Intuit’s learnings and recommendations for running a data lake on AWS. The Intuit Data Lake is built and operated by numerous teams in Intuit Data Platform. Thanks to Tristan Baker (Chief Architect), Neil Lamka (Principal Product Manager), Achal Kumar (Development Manager), Nicholas Audo, and Jimmy Armitage for their feedback and support.

A data lake is a centralized repository for storing structured and unstructured data at any scale. At Intuit, creating such a pile of raw data is easy. However, more interesting challenges present themselves:

  1. How should AWS accounts be organized?
  2. What ingestion methods will be used? How will analysts find the data they need?
  3. Where should data be stored? How should access be managed?
  4. What security measures are needed to protect Intuit’s sensitive data?
  5. Which parts of this ecosystem can be automated?

This post outlines the approach taken by Intuit, though it is important to remember that there are many ways to build a data lake (for example, AWS Lake Formation).

We’ll cover the technologies and processes involved in creating the Intuit Data Lake at a high level, including the overall structure and the automation used in provisioning accounts and resources. Watch this space in the future for more detailed blog posts on specific aspects of the system, from the other teams and engineers who worked together to build the Intuit Data Lake.

Architecture

Account Structure

Data lakes typically follow a hub-and-spoke model, with the hub account containing shared services that control access to data sources. For the purposes of this post, we’ll refer to the hub account as Central Data Lake.

In this pattern, access to Central Data Lake is apportioned to spoke accounts called Processing Accounts. This model maintains separation between end users and allows for division of billing among distinct business units.

 

 

It is common to maintain two ecosystems: pre-production (Pre-Prod) and production (Prod). This allows data lake administrators to silo access to data by preventing connectivity between Pre-Prod and Prod.

To enable experimentation and testing, it may also be advisable to maintain separate VPC-based environments within Pre-Prod accounts, such as dev, qa, and e2e. Processing Account VPCs would then be connected to the corresponding VPC in Central Data Lake.

Note that at first, we connected accounts via VPC Peering. However, as we scaled we quickly approached the hard limit of 125 VPC peering connections, requiring us to migrate to AWS Transit Gateway. As of this writing, we connect multiple new Processing Accounts weekly.

 

 

Central Data Lake

There may be numerous services running in a hub account, but we’ll focus on the aspects that are most relevant to this blog: ingestion, sanitization, storage, and a data catalog.

 

 

Ingestion, Sanitization, and Storage

A key component to Central Data Lake is a uniform ingestion pattern for streaming data. One example is an Apache Kafka cluster running on Amazon EC2. (You can read about how Intuit engineers do this in another AWS blog.) As we deal with hundreds of data sources, we’ve enabled access to ingestion mechanisms via AWS PrivateLink.

Note: Amazon Managed Streaming for Apache Kafka (Amazon MSK) is an alternative for running Apache Kafka on Amazon EC2, but was not available at the start of Intuit’s migration.

In addition to stream processing, another method of ingestion is batch processing, such as jobs running on Amazon EMR. After data is ingested by one of these methods, it can be stored in Amazon S3 for further processing and analysis.

Intuit deals with a large volume of customer data, and each field is carefully considered and classified with a sensitivity level. All sensitive data that enters the lake is encrypted at the source. The ingestion systems retrieve the encrypted data and move it into the lake. Before it is written to S3, the data is sanitized by a proprietary RESTful service. Analysts and engineers operating within the data lake consume this masked data.

Data Catalog

A data catalog is a common way to give end users information about the data and where it lives. One example is a Hive Metastore backed by Amazon Aurora. Another alternative is the AWS Glue Data Catalog.

Processing Accounts

When Processing Accounts are delivered to end users, they include an identical set of resources. We’ll discuss the automation of Processing Accounts below, but the primary components are as follows:

 

 

                           Processing Account structure upon delivery to the customer

 

Data Storage Mechanisms

One reasonable question is whether all data should reside in Central Data Lake, or if it’s acceptable to distribute data across multiple accounts. A data lake might employ a combination of the two approaches, and classify data locations as primary or secondary.

The primary location for data is Central Data Lake, and it arrives there via the ingestion pipelines discussed previously. Processing Accounts can read from the primary source, either directly from the ingestion pipelines or from S3. Processing Accounts can contribute their transformed data back into Central Data Lake (primary), or store it in their own accounts (secondary). The proper storage location depends on the type of data, and who needs to consume it.

One rule worth enforcing is that no cross-account writes should be permitted. In other words, the IAM principal (in most cases, an IAM role assumed by EC2 via an instance profile) must be in the same account as the destination S3 bucket. This is because cross-account delegation is not supported—specifically, S3 bucket policies in Central Data Lake cannot grant Processing Account A access to objects written by a role in Processing Account B.

Another possibility is for EMR to assume different IAM roles via a custom credentials provider (see this AWS blog), but we chose not to go down this path at Intuit because it would have required many EMR jobs to be rewritten.

 

 

Data Access Patterns

The majority of end users are interested in the data that resides in S3. In Central Data Lake and some Processing Accounts, there may be a set of read-only S3 buckets: any account in the data lake ecosystem can read data from this type of bucket.

To facilitate management of S3 access for read-only buckets, we built a mechanism to control S3 bucket policies, administered entirely via code. Our deployment pipelines use account metadata to dynamically generate the correct S3 bucket policy based on the type of account (Pre-Prod or Prod). These policies are committed back into our code repository for auditability and ease of management.

We employ the same method for managing KMS key policies, as we use KMS with customer managed customer master keys (CMKs) for at-rest encryption in S3.

Here’s an example of a generated S3 bucket policy for a read-only bucket:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ProcessingAccountReadOnly",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::111111111111:root",
                    "arn:aws:iam::222222222222:root",
                    "arn:aws:iam::333333333333:root",
                    "arn:aws:iam::444444444444:root",
                    "arn:aws:iam::555555555555:root",
                    ...
                    ...
                    ...
                    "arn:aws:iam::999999999999:root",
                ]
            },
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::intuit-data-lake-example/*",
                "arn:aws:s3:::intuit-data-lake-example"
            ]
        }
    ]
}

Note that we grant access at the account level, rather than using explicit IAM principal ARNs. Because the reads are cross-account, permissions are also required on the IAM principals in Processing Accounts. Maintaining these policies—with automation, at that level of granularity—is untenable at scale. Furthermore, using specific IAM principal ARNs would create an external dependency on foreign accounts. For example, if a Processing Account deletes an IAM role that is referenced in an S3 bucket policy in Central Data Lake, the bucket policy can no longer be saved, causing interruptions to deployment pipelines.

Security

Security is mission critical for any data lake. We’ll mention a subset of the controls we use, but not dive deep.

Encryption

Encryption can be enforced both in transit and at rest, using multiple methods:

  1. Traffic within the lake should use the latest version of TLS (1.2 as of this writing)
  2. Data can be encrypted with application-level (client-side) encryption
  3. KMS keys can used for at-rest encryption of S3, EBS, and RDS

Ingress and Egress

There’s nothing out of the ordinary in our approach to ingress and egress, but it’s worth mentioning the standard patterns we’ve found important:

Policies restricting ingress and egress are the primary points at which a data lake can guarantee quality (ingress) and prevent loss (egress).

Authorization

Access to the Intuit Data Lake is controlled via IAM roles, meaning no IAM users (with long-term credentials) are created. End users are granted access via an internal service that manages role-based, federated access to AWS accounts. Regular reviews are conducted to remove nonessential users.

Configuration Management

We use an internal fork of Cloud Custodian, which is a suite of preventative, detective, and responsive controls consisting of Amazon CloudWatch Events and AWS Config rules. Some of the violations it reports and (optionally) mitigates include:

  • Unauthorized CIDRs in inbound security group rules
  • Public S3 bucket policies and ACLs
  • IAM user console access
  • Unencrypted S3 buckets, EBS volumes, and RDS instances

Lastly, Amazon GuardDuty is enabled in all Intuit Data Lake accounts and is monitored by Intuit Security.

Automation

If there is one thing we’ve learned building the Intuit Data Lake, it is to automate everything.

There are four areas of automation we’ll discuss in this blog:

  1. Creation of Processing Accounts
  2. Processing Account Orchestration Pipeline
  3. Processing Account Terraform Pipeline
  4. EMR and SageMaker deployment via Service Catalog

Creation of Processing Accounts

The first step in creating a Processing Account is to make a request through an internal tool. This triggers automation that provisions an Intuit-stamped AWS account under the correct business unit.

 

Note: AWS Control Tower’s Account Factory was not available at the start of our journey, but it can be leveraged to provision new AWS accounts in a secured, best practice, self-service way.

Account setup also includes automated VPC creation (with optional VPN), fully automated using Service Catalog. End users simply specify subnet sizes.

It’s worth noting that Intuit leverages Service Catalog for self-service deployment of other common patterns, including ingress security groups, VPC endpoints, and VPC peering. Here’s an example portfolio:

Processing Account Orchestration Pipeline

After account creation and VPC provisioning, the Processing Account Orchestration Pipeline runs. This pipeline executes one-time tasks required for Processing Accounts. These tasks include:

  • Bootstrapping an IAM role for use in further configuration management
  • Creation of KMS keys for S3, EBS, and RDS encryption
  • Creation of variable files for the new account
  • Updating the master configuration file with account metadata
  • Generation of scripts to orchestrate the Terraform pipeline discussed below
  • Sharing Transit Gateways via Resource Access Manager

Processing Account Terraform Pipeline

This pipeline manages the lifecycle of dynamic, frequently-updated resources, including IAM roles, S3 buckets and bucket policies, KMS key policies, security groups, NACLs, and bastion hosts.

There is one pipeline for every Processing Account, and each pipeline deploys a series of layers into the account, using a set of parameterized deployment jobs. A layer is a logical grouping of Terraform modules and AWS resources, providing a way to shrink Terraform state files and reduce blast radius if redeployment of specific resources is required.

EMR and SageMaker Deployment via Service Catalog

AWS Service Catalog facilitates the provisioning of Amazon EMR and Amazon SageMaker, allowing end users to launch EMR clusters and SageMaker notebook instances that work out of the box, with embedded security.

Service Catalog allows data scientists and data engineers to launch EMR clusters in a self-service fashion with user-friendly parameters, and provides them with the following:

  • Bootstrap action to enable connectivity to services in Central Data Lake
  • EC2 instance profile to control S3, KMS, and other granular permissions
  • Security configuration that enables at-rest and in-transit encryption
  • Configuration classifications for optimal EMR performance
  • Encrypted AMI with monitoring and logging enabled
  • Custom Kerberos connection to LDAP

For SageMaker, we use Service Catalog to launch notebook instances with custom lifecycle configurations that set up connections or initialize the following: Hive Metastore, Kerberos, security, Splunk logging, and OpenDNS. You can read more about lifecycle configurations in this AWS blog. Launching a SageMaker notebook instance with best-practice configuration is as easy as follows:

 

 

Conclusion

This post illustrates the building blocks we used in creating the Intuit Data Lake. Our solution isn’t wholly unique, but comprised of common-sense approaches we’ve gleaned from dozens of engineers across Intuit, representing decades of experience. These practices have enabled us to push petabytes of data into the lake, and serve hundreds of Processing Accounts with varying needs. We are still building, but we hope our story helps you in your data lake journey.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Ben Covi is a staff software engineer at Intuit. At any given moment, he’s probably losing a game of Catan.

 

 

 

Best practices to scale Apache Spark jobs and partition data with AWS Glue

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/best-practices-to-scale-apache-spark-jobs-and-partition-data-with-aws-glue/

AWS Glue provides a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs. This series of posts discusses best practices to help developers of Apache Spark applications and Glue ETL jobs, big data architects, data engineers, and business analysts scale their data processing jobs running on AWS Glue automatically.

The first post of this series discusses two key AWS Glue capabilities to manage the scaling of data processing jobs. The first allows you to horizontally scale out Apache Spark applications for large splittable datasets. The second allows you to vertically scale up memory-intensive Apache Spark applications with the help of new AWS Glue worker types. The post also shows how to use AWS Glue to scale Apache Spark applications with a large number of small files commonly ingested from streaming applications using Amazon Kinesis Data Firehose. Finally, the post shows how AWS Glue jobs can use the partitioning structure of large datasets in Amazon S3 to provide faster execution times for Apache Spark applications.

Understanding AWS Glue worker types

AWS Glue comes with three worker types to help customers select the configuration that meets their job latency and cost requirements. These workers, also known as Data Processing Units (DPUs), come in Standard, G.1X, and G.2X configurations.

The standard worker configuration allocates 5 GB for Spark driver and executor memory, 512 MB for spark.yarn.executor.memoryOverhead, and 50 GB of attached EBS storage. The G.1X worker allocates 10 GB for driver and executor memory, 2 GB memoryOverhead, and 64 GB of attached EBS storage. The G.2X worker allocates 20 GB for driver and executor memory, 4 GB memoryOverhead, and 128 GB of attached EBS storage.

The compute parallelism (Apache Spark tasks per DPU) available for horizontal scaling is the same regardless of the worker type. For example, both standard and G1.X workers map to 1 DPU, each of which can run eight concurrent tasks. A G2.X worker maps to 2 DPUs, which can run 16 concurrent tasks. As a result, compute-intensive AWS Glue jobs that possess a high degree of data parallelism can benefit from horizontal scaling (more standard or G1.X workers). AWS Glue jobs that need high memory or ample disk space to store intermediate shuffle output can benefit from vertical scaling (more G1.X or G2.x workers).

Horizontal scaling for splittable datasets

AWS Glue automatically supports file splitting when reading common native formats (such as CSV and JSON) and modern file formats (such as Parquet and ORC) from S3 using AWS Glue DynamicFrames. For more information about DynamicFrames, see Work with partitioned data in AWS Glue.

A file split is a portion of a file that a Spark task can read and process independently on an AWS Glue worker. By default, file splitting is enabled for line-delimited native formats, which allows Apache Spark jobs running on AWS Glue to parallelize computation across multiple executors. AWS Glue jobs that process large splittable datasets with medium (hundreds of megabytes) or large (several gigabytes) file sizes can benefit from horizontal scaling and run faster by adding more AWS Glue workers.

File splitting also benefits block-based compression formats such as bzip2. You can read each compression block on a file split boundary and process them independently. Unsplittable compression formats such as gzip do not benefit from file splitting. To horizontally scale jobs that read unsplittable files or compression formats, prepare the input datasets with multiple medium-sized files.

 

Each file split (the blue square in the figure) is read from S3, deserialized into an AWS Glue DynamicFrame partition, and then processed by an Apache Spark task (the gear icon in the figure). Deserialized partition sizes can be significantly larger than the on-disk 64 MB file split size, especially for highly compressed splittable file formats such as Parquet or large files using unsplittable compression formats such as gzip. Typically, a deserialized partition is not cached in memory, and only constructed when needed due to Apache Spark’s lazy evaluation of transformations, thus not causing any memory pressure on AWS Glue workers. For more information on lazy evaluation, see the RDD Programming Guide on the Apache Spark website.

However, explicitly caching a partition in memory or spilling it out to local disk in an AWS Glue ETL script or Apache Spark application can result in out-of-memory (OOM) or out-of-disk exceptions. AWS Glue can support such use cases by using larger AWS Glue worker types with vertically scaled-up DPU instances for AWS Glue ETL jobs.

Vertical scaling for Apache Spark jobs using larger worker types

A variety of AWS Glue ETL jobs, Apache Spark applications, and new machine learning (ML) Glue transformations supported with AWS Lake Formation have high memory and disk requirements. Running these workloads may put significant memory pressure on the execution engine. This memory pressure can result in job failures because of OOM or out-of-disk space exceptions. You may see exceptions from Yarn about memory and disk space.

Exceeding Yarn memory overhead

Apache Yarn is responsible for allocating cluster resources needed to run your Spark application. An application includes a Spark driver and multiple executor JVMs. In addition to the memory allocation required to run a job for each executor, Yarn also allocates an extra overhead memory to accommodate for JVM overhead, interned strings, and other metadata that the JVM needs. The configuration parameter spark.yarn.executor.memoryOverhead defaults to 10% of the total executor memory. Memory-intensive operations such as joining large tables or processing datasets with a skew in the distribution of specific column values may exceed the memory threshold, and result in the following error message:

18/06/13 16:54:29 ERROR YarnClusterScheduler: Lost executor 1 on ip-xxx:
Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.

Disk space

Apache Spark uses local disk on Glue workers to spill data from memory that exceeds the heap space defined by the spark.memory.fraction configuration parameter. During the sort or shuffle stages of a job, Spark writes intermediate data to local disk before it can exchange that data between the different workers. Jobs may fail due to the following exception when no disk space remains:

java.io.IOException: No space left on device
UnsafeExternalSorter: Thread 20 spilling sort data of 141.0 MB to disk (90 times so far)

AWS Glue job metrics

Most commonly, this is a result of a significant skew in the dataset that the job is processing. You can also identify the skew by monitoring the execution timeline of different Apache Spark executors using AWS Glue job metrics. For more information, see Debugging Demanding Stages and Straggler Tasks.

The following AWS Glue job metrics graph shows the execution timeline and memory profile of different executors in an AWS Glue ETL job. One of the executors (the red line) is straggling due to processing of a large partition, and actively consumes memory for the majority of the job’s duration.

With AWS Glue’s Vertical Scaling feature, memory-intensive Apache Spark jobs can use AWS Glue workers with higher memory and larger disk space to help overcome these two common failures. Using AWS Glue job metrics, you can also debug OOM and determine the ideal worker type for your job by inspecting the memory usage of the driver and executors for a running job. For more information, see Debugging OOM Exceptions and Job Abnormalities.

In general, jobs that run memory-intensive operations can benefit from the G1.X worker type, and those that use AWS Glue’s ML transforms or similar ML workloads can benefit from the G2.X worker type.

Apache Spark UI for AWS Glue jobs

You can also use AWS Glue’s support for Spark UI to inpect and scale your AWS Glue ETL job by visualizing the Directed Acyclic Graph (DAG) of Spark’s execution, and also monitor demanding stages, large shuffles, and inspect Spark SQL query plans. For more information, see Monitoring Jobs Using the Apache Spark Web UI.

The following Spark SQL query plan on the Spark UI shows the DAG for an ETL job that reads two tables from S3, performs an outer-join that results in a Spark shuffle, and writes the result to S3 in Parquet format.

As seen from the plan, the Spark shuffle and subsequent sort operation for the join transformation takes the majority of the job execution time. With AWS Glue vertical scaling, each AWS Glue worker co-locates more Spark tasks, thereby saving on the number of data exchanges over the network.

Scaling to handle large numbers of small files

An AWS Glue ETL job might read thousands or millions of files from S3. This is typical for Kinesis Data Firehose or streaming applications writing data into S3. The Apache Spark driver may run out of memory when attempting to read a large number of files. When this happens, you see the following error message:

# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
# Executing /bin/sh -c "kill -9 12039"...

Apache Spark v2.2 can manage approximately 650,000 files on the standard AWS Glue worker type. To handle more files, AWS Glue provides the option to read input files in larger groups per Spark task for each AWS Glue worker. For more information, see Reading Input Files in Larger Groups.

You can reduce the excessive parallelism from the launch of one Apache Spark task to process each file by using AWS Glue file grouping. This method reduces the chances of an OOM exception on the Spark driver. To configure file grouping, you need to set groupFiles and groupSize parameters. The following code example uses AWS Glue DynamicFrame API in an ETL script with these parameters:

dyf = glueContext.create_dynamic_frame_from_options("s3",
    {'paths': ["s3://input-s3-path/"],
    'recurse':True,
    'groupFiles': 'inPartition',
    'groupSize': '1048576'}, 
    format="json")

You can set groupFiles to group files within a Hive-style S3 partition (inPartition) or across S3 partitions (acrossPartition). In most scenarios, grouping within a partition is sufficient to reduce the number of concurrent Spark tasks and the memory footprint of the Spark driver. In benchmarks, AWS Glue ETL jobs configured with the inPartition grouping option were approximately seven times faster than native Apache Spark v2.2 when processing 320,000 small JSON files distributed across 160 different S3 partitions. A large fraction of the time in Apache Spark is spent building an in-memory index while listing S3 files and scheduling a large number of short-running tasks to process each file. With AWS Glue grouping enabled, the benchmark AWS Glue ETL job could process more than 1 million files using the standard AWS Glue worker type.

groupSize is an optional field that allows you to configure the amount of data each Spark task reads and processes as a single AWS Glue DynamicFrame partition. Users can set groupSize if they know the distribution of file sizes before running the job. The groupSize parameter allows you to control the number of AWS Glue DynamicFrame partitions, which also translates into the number of output files. However, using a considerably small or large groupSize can result in significant task parallelism or under-utilization of the cluster, respectively.

By default, AWS Glue automatically enables grouping without any manual configuration when the number of input files or task parallelism exceeds a threshold of 50,000. The default value of the groupFiles parameter is inPartition, so that each Spark task only reads files within the same S3 partition. AWS Glue computes the groupSize parameter automatically and configures it to reduce the excessive parallelism, and makes use of the cluster compute resources with sufficient Spark tasks running in parallel.

Partitioning data and pushdown predicates

Partitioning has emerged as an important technique for organizing datasets so that a variety of big data systems can query them efficiently. A hierarchical directory structure organizes the data, based on the distinct values of one or more columns. For example, you can partition your application logs in S3 by date, broken down by year, month, and day. Files corresponding to a single day’s worth of data receive a prefix such as the following:

s3://my_bucket/logs/year=2018/month=01/day=23/

Predicate pushdowns for partition columns

AWS Glue supports pushing down predicates, which define a filter criteria for partition columns populated for a table in the AWS Glue Data Catalog. Instead of reading all the data and filtering results at execution time, you can supply a SQL predicate in the form of a WHERE clause on the partition column. For example, assume the table is partitioned by the year column and run SELECT * FROM table WHERE year = 2019. year represents the partition column and 2019 represents the filter criteria.

AWS Glue lists and reads only the files from S3 partitions that satisfy the predicate and are necessary for processing.

To accomplish this, specify a predicate using the Spark SQL expression language as an additional parameter to the AWS Glue DynamicFrame getCatalogSource method. This predicate can be any SQL expression or user-defined function that evaluates to a Boolean, as long as it uses only the partition columns for filtering.

This example demonstrates this functionality with a dataset of Github events partitioned by year, month, and day. The following code example reads only those S3 partitions related to events that occurred on weekends:

%spark

val partitionPredicate ="date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"

val pushdownEvents = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

Here you can use the SparkSQL string concat function to construct a date string. The to_date function converts it to a date object, and the date_format function with the ‘E’ pattern converts the date to a three-character day of the week (for example, Mon or Tue). For more information about these functions, Spark SQL expressions, and user-defined functions in general, see the Spark SQL, DataFrames and Datasets Guide and list of functions on the Apache Spark website.

There is a significant performance boost for AWS Glue ETL jobs when pruning AWS Glue Data Catalog partitions. It reduces the time needed for the Spark query engine for listing files in S3 and reading and processing data at runtime. You can achieve further improvement as you exclude additional partitions by using predicates with higher selectivity.

Partitioning data before and during writes to S3

By default, data is not partitioned when writing out the results from an AWS Glue DynamicFrame—all output files are written at the top level under the specified output path. AWS Glue enables partitioning of DynamicFrame results by passing the partitionKeys option when creating a sink. For example, the following code example writes out the dataset in Parquet format to S3 partitioned by the type column:

%spark

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map("path" -> "$outpath", "partitionKeys" -> Seq("type"))),
    format = "parquet").writeDynamicFrame(projectedEvents)

In this example, $outpath is a placeholder for the base output path in S3. The partitionKeys parameter corresponds to the names of the columns used to partition the output in S3. When you execute the write operation, it removes the type column from the individual records and encodes it in the directory structure. To demonstrate this, you can list the output path using the following aws s3 ls command from the AWS CLI:

PRE type=CommitCommentEvent/
PRE type=CreateEvent/
PRE type=DeleteEvent/
PRE type=ForkEvent/
PRE type=GollumEvent/
PRE type=IssueCommentEvent/
PRE type=IssuesEvent/
PRE type=MemberEvent/
PRE type=PublicEvent/
PRE type=PullRequestEvent/
PRE type=PullRequestReviewCommentEvent/
PRE type=PushEvent/
PRE type=ReleaseEvent/
PRE type=WatchEvent/

For more information, see aws . s3 . ls in the AWS CLI Command Reference.

In general, you should select columns for partitionKeys that are of lower cardinality and are most commonly used to filter or group query results. For example, when analyzing AWS CloudTrail logs, it is common to look for events that happened between a range of dates. Therefore, partitioning the CloudTrail data by year, month, and day would improve query performance and reduce the amount of data that you need to scan to return the answer.

The benefit of output partitioning is two-fold. First, it improves execution time for end-user queries. Second, having an appropriate partitioning scheme helps avoid costly Spark shuffle operations in downstream AWS Glue ETL jobs when combining multiple jobs into a data pipeline. For more information, see Working with partitioned data in AWS Glue.

S3 or Hive-style partitions are different from Spark RDD or DynamicFrame partitions. Spark partitioning is related to how Spark or AWS Glue breaks up a large dataset into smaller and more manageable chunks to read and apply transformations in parallel. AWS Glue workers manage this type of partitioning in memory. You can control Spark partitions further by using the repartition or coalesce functions on DynamicFrames at any point during a job’s execution and before data is written to S3. You can set the number of partitions using the repartition function either by explicitly specifying the total number of partitions or by selecting the columns to partition the data.

Repartitioning a dataset by using the repartition or coalesce functions often results in AWS Glue workers exchanging (shuffling) data, which can impact job runtime and increase memory pressure. In contrast, writing data to S3 with Hive-style partitioning does not require any data shuffle and only sorts it locally on each of the worker nodes. The number of output files in S3 without Hive-style partitioning roughly corresponds to the number of Spark partitions. In contrast, the number of output files in S3 with Hive-style partitioning can vary based on the distribution of partition keys on each AWS Glue worker.

Conclusion

This post showed how to scale your ETL jobs and Apache Spark applications on AWS Glue for both compute and memory-intensive jobs. AWS Glue enables faster job execution times and efficient memory management by using the parallelism of the dataset and different types of AWS Glue workers. It also helps you overcome the challenges of processing many small files by automatically adjusting the parallelism of the workload and cluster. AWS Glue ETL jobs use the AWS Glue Data Catalog and enable seamless partition pruning using predicate pushdowns. It also allows for efficient partitioning of datasets in S3 for faster queries by downstream Apache Spark applications and other analytics engines such as Amazon Athena and Amazon Redshift. We hope you try out these best practices for your Apache Spark applications on AWS Glue.

The second post in this series will show how to use AWS Glue features to batch process large historical datasets and incrementally process deltas in S3 data lakes. It also demonstrates how to use a custom AWS Glue Parquet writer for faster job execution.

 


About the Author

Mohit Saxena is a technical lead at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.