Tag Archives: AWS Big Data

Writing to Apache Hudi tables using AWS Glue Custom Connector

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/writing-to-apache-hudi-tables-using-aws-glue-connector/

In today’s world, most organizations have to tackle the 3 V’s of variety, volume and velocity of big data. In this blog post, we talk about dealing with the variety and volume aspects of big data. The challenge of dealing with the variety involves processing data from various SQL and NoSQL systems. This variety can include data from rdbms sources such as Amazon Aurora or NoSQL sources such as Amazon DynamoDB or 3rd party APIs.

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. In order to enable customers process data from a variety of sources, the AWS Glue team has introuduced AWS Glue Custom Connectors, a new capability in AWS Glue and AWS Glue Studio that makes it easy for you to transfer data from SaaS applications and custom data sources to your data lake in Amazon S3. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. This new feature is over and above the AWS Glue Connections feature in the AWS Glue service.

In this post, we simplify the process to create Hudi tables with AWS Glue Custom Connector. The jar wrapped by the first version of AWS Glue Custom Connector is based on Apache Hudi 0.5.3. Instructions on creating the JAR file are in the previous post of this series.

Whereas the first post focused on creating an end-to-end architecture for replicating the data in a rdbms source to Lakehouse, this post focuses on volume aspect of big data. In this post, we create a Hudi table with an initial load of over 200 million records and then update 70 million of those records. The connector not only writes the data to Amazon Simple Storage Service (Amazon S3), but also creates the tables in the AWS Glue Data Catalog. If you’re creating a partitioned Hudi table, the connector also creates the partitions in the Data Catalog. We discuss the code for creating a partitioned Hudi table in the previous post in this series.

We use the Copy On Write storage type, which gives better read performance compared to Merge On Read. For more information about Hudi storage types, see Hudi Dataset Storage Types and Storage Types & Views.

Note that this post focuses on using the AWS Glue Custom Connector to write to Apache Hudi tables. Please implement other best practices such as encryption and network security while implementing the architecture for your workloads.

Creating the Apache Hudi connection using AWS Glue Custom Connector

To create your AWS Glue job with an AWS Glue Custom Connector, complete the following steps:

  1. Go to the AWS Glue Studio Console, search for AWS Glue Connector for Apache Hudi and choose AWS Glue Connector for Apache Hudi link.
    Go to the AWS Glue Studio Console, search for AWS Glue Connector for Apache Hudi and choose AWS Glue Connector for Apache Hudi link.
  2. Choose Continue to Subscribe.
    Choose Continue to Subscribe
  3. Review the Terms and Conditions and choose the Accept Terms button to continue.Review the Terms and Conditions and choose the Accept Terms button to continue.
  4. Make sure that the subscription is complete and you see the Effective date populated next to the product and then choose Continue to Configuration button.
    Make sure that the subscription is complete and you see the Effective date populated next to the product and then choose Continue to Configuration button.
  5. As of writing this blog, 0.5.3 is the latest version of the AWS Glue Connector for Apache Hudi. Make sure that 0.5.3 (Nov 19, 2020) is selected in the Software Version dropdown and Activate in AWS Glue Studio is selected in the Delivery Method dropdown. Choose Continue to Launch button.
    5. Choose Continue to Launch button.
  6. Under Launch this software, choose Usage Instructions and then choose Activate the Glue connector for Apache Hudi in AWS Glue Studio.
    6. Activate the Glue connector for Apache Hudi in AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, hudi-connection).
  2. For Description, enter a description.
    8. For Description, enter a description.
  3. Choose Create connection and activate connector.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

Configuring resources and permissions

For this post, we provide an AWS CloudFormation template to create the following resources:

  • Two AWS Glue jobs: hudi-init-load-job and hudi-upsert-job
  • An S3 bucket to store the Python scripts for these jobs
  • An S3 bucket to store the output files of these jobs
  • An AWS Lambda function to copy the scripts from the public S3 bucket to your account
  • AWS Identity and Access Management (IAM) roles and policies with appropriate permissions

Launch the following stack, providing your connection name, created in Step 9 of the previous section, for the HudiConnectionName parameter:

Launch the following stack, providing your connection name for the HudiConnectionName parameter:

Please check I acknowledge that AWS CloudFormation might create IAM resources with custom names check box before clicking the Create Stack button.

If you have AWS Lake Formation enabled in the Region in which you’re implementing this solution, make sure that you give HudiConnectorExecuteGlueHudiJobRole Create table permission in the default database. HudiConnectorExecuteGlueHudiJobRole is created by the CloudFormation stack that you created above.

Create table permission in the default database.

HudiConnectorExecuteGlueHudiJobRole should also have Create Database permission. You can grant this permission in Database creators section under Admins and database creators tab.

You can grant this permission in Database creators section under Admins and database creators tab.

Running the load job

You’re now ready to run the first of your two jobs. 

  1. On the AWS Glue console, select the job hudi-init-load-job.
  2. On the Action menu, choose Run job.
    On the Action menu, choose Run job.

My job finished in less than 10 minutes. The job inserted over 204 million records into the Hudi table.

The job inserted over 204 million records into the Hudi table.

Although rest of the code is standard Hudi PySpark code, I want to call out the last line of the code to show how easy it is to write to Hudi tables using AWS Glue:

glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = "marketplace.spark", connection_options = combinedConf)

In the preceding code, combinedConf is a Python dictionary that includes all your Apache Hudi configurations. You can download the HudiInitLoadNYTaxiData.py script to use.

Querying the data

The ny_yellow_trip_data table is now visible in the default database, and you can query it through Athena.

If you have Lake Formation enabled in this Region, the role or user querying the table should have Select permissions on the table.

You can now run the following query:

select count(*) cnt, vendorid from default.ny_yellow_trip_data group by vendorid

The following screenshot shows our output.

The following screenshot shows our output.

If you have Lake Formation enabled in this Region, make sure that you give Drop permission to HudiConnectorExecuteLambdaFnsRole so the CloudFormation template can drop the default.ny_yellow_trip_data table when you delete the stack.

If you have Lake Formation enabled in this Region, make sure that you give Drop permission to HudiConnectorExecuteLambdaFnsRole so the CloudFormation template can drop the default.ny_yellow_trip_data table when you delete the stack.

Running the upsert job

You can now run your second job, hudi-upsert-job. This job reads the newly written data and updates the vendor IDs of all the records that have vendorid=1. The new vendor ID for these records (over 78 million) is set as 9. You can download the HudiUpsertNYTaxiData.py script to use.

This job also finished in under 10 minutes.

This job also finished in under 10 minutes.

Querying the updated data

You can now query the updated Hudi table in Athena. The following screenshot shows that the vendor ID of over 78 million records has been changed to 9.

The following screenshot shows that the vendor ID of over 78 million records has been changed to 9.

Additional considerations

The AWS Glue Connector for Apache Hudi has not been tested for AWS Glue streaming jobs. Additionally, there are some hardcoded Hudi options in the AWS Glue job scripts. These options are set for the sample table that we create for this post. Update the options based on your workload.

Conclusion

In this post, we created an Apache Hudi table with AWS Glue Custom Connector and AWS Glue 2.0 jobs. We read over 200 million records from a public S3 bucket and created an Apache Hudi table using it. We then updated over 70 million of these records. With the new AWS Glue Custom Connector feature, we can now directly write an AWS Glue DynamicFrame to an Apache Hudi table.

Note that you can also use Glue jobs to write to Apache Hudi MoR tables. Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift talks about the process in detail. While it uses jars as an external dependency, you can now use the AWS Glue Connector for Apache Hudi for the same operation. The post uses HudiJob.py to write to MoR tables and then uses HudiMoRCompactionJob.scala to compact the MoR tables. Note that HudiMoRCompactionJob.scala has also been implemented using Glue jobs and hence you can use AWS Glue for compaction job too.


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

Building a cost efficient, petabyte-scale lake house with Amazon S3 lifecycle rules and Amazon Redshift Spectrum: Part 1

Post Syndicated from Cristian Gavazzeni original https://aws.amazon.com/blogs/big-data/part-1-building-a-cost-efficient-petabyte-scale-lake-house-with-amazon-s3-lifecycle-rules-and-amazon-redshift-spectrum/

The continuous growth of data volumes combined with requirements to implement long-term retention (typically due to specific industry regulations) puts pressure on the storage costs of data warehouse solutions, even for cloud native data warehouse services such as Amazon Redshift. The introduction of the new Amazon Redshift RA3 node types helped in decoupling compute from storage growth. Integration points provided by Amazon Redshift Spectrum, Amazon Simple Storage Service (Amazon S3) storage classes, and other Amazon S3 features allow for compliance of retention policies while keeping costs under control.

An enterprise customer in Italy asked the AWS team to recommend best practices on how to implement a data journey solution for sales data; the objective of part 1 of this series is to provide step-by-step instructions and best practices on how to build an end-to-end data lifecycle management system integrated with a data lake house implemented on Amazon S3 with Amazon Redshift. In part 2, we show some additional best practices to operate the solution: implementing a sustainable monthly ageing process, using Amazon Redshift local tables to troubleshoot common issues, and using Amazon S3 access logs to analyze data access patterns.

Amazon Redshift and Redshift Spectrum

At re:Invent 2019, AWS announced new Amazon Redshift RA3 nodes. Even though this introduced new levels of cost efficiency in the cloud data warehouse, we faced customer cases where the data volume to be kept is an order of magnitude higher due to specific regulations that impose historical data to be kept for up to 10–12 years or more. In addition, this historical cold data must be accessed by other services and applications external to Amazon Redshift (such as Amazon SageMaker for AI and machine learning (ML) training jobs), and occasionally it needs to be queried jointly with Amazon Redshift hot data. In these situations, Redshift Spectrum is a great fit because, among other factors, you can use it in conjunction with Amazon S3 storage classes to further improve TCO.

Redshift Spectrum allows you to query data that resides in S3 buckets using already in place application code and logic used for data warehouse tables, and potentially performing joins and unions of Amazon Redshift local tables and data on Amazon S3.

Redshift Spectrum uses a fleet of compute nodes managed by AWS that increases system scalability. To use it, we need to define at least an external schema and an external table (unless an external schema and external database are already defined in the AWS Glue Data Catalog). Data definition language (DDL) statements used to define an external table include a location attribute to address S3 buckets and prefixes containing the dataset, which could be in common file formats like ORC, Parquet, AVRO, CSV, JSON, or plain text. Compressed and columnar file formats like Apache Parquet are preferred because they provide less storage usage and better performance.

For a data catalog, we could use AWS Glue or an external hive metastore. For this post, we use AWS Glue.

S3 Lifecycle rules

Amazon S3 storage classes include S3 Standard, S3-IA, S3 One-Zone, S3 Intelligent-Tiering, S3 Glacier, and S3 Glacier Deep Archive. For our use case, we need to keep data accessible for queries for 5 years and with high durability, so we consider only S3 Standard and S3-IA for this time frame, and S3 Glacier only for long term (5–12 years). Data access to S3 Glacier requires data retrieval in the range of minutes (if using expedited retrieval) and this can’t be matched with the ability to query data. We can adopt Glacier for very cold data if you implement a manual process to first restore the Glacier archive to a temporary S3 bucket, and then query this data defined via an external table.

S3 Glacier Select allows you to query on data directly in S3 Glacier, but it only supports uncompressed CSV files. Because the objective of this post is to propose a cost-efficient solution, we didn’t consider it. If for any reason you have constraints for storing in CSV file format (instead of compressed formats like Parquet), Glacier Select might also be a good fit.

Excluding retrieval costs, the cost for storage for S3-IA is typically around 45% cheaper than S3 Standard, and S3 Glacier is 68% cheaper than S3-IA. For updated pricing information, see Amazon S3 pricing.

We don’t use S3 Intelligent Tiering because it bases storage transition on the last access time, and this resets every time we need to query the data. We use the S3 Lifecycle rules that are based either on creation time or prefix or tag matching, which is consistent regardless of data access patterns.

Simulated use case and retention policy

For our use case, we need to implement the data retention strategy for trip records outlined in the following table.

Corporate Rule Dataset Start Dataset end Data Storage Engine
Last 6 months in Redshift Spectrum December 2019 May 2020 Amazon Redshift local tables Amazon Redshift
Months 6–11 in Amazon S3 June 2019 November 2019 S3 Standard Redshift Spectrum
Months 12–14 in S3-IA March 2019 May 2019 S3-IA Redshift Spectrum
After month 15 January 2019 February 2019 Glacier N/A

For this post, we create a new table in a new Amazon Redshift cluster and load a public dataset. We use the New York City Taxi and Limousine Commission (TLC) Trip Record Data because it provides the required historical depth.

We use the Green Taxi Trip Records, based on monthly CSV files containing 20 columns with fields like vendor ID, pickup time, drop-off time, fare, and other information. 

Preparing the dataset and setting up the Amazon Redshift environment

As a first step, we create an AWS Identity and Access Management (IAM) role for Redshift Spectrum. This is required to allow access to Amazon Redshift to Amazon S3 for querying and loading data, and also to allow access to the AWS Glue Data Catalog whenever we create, modify, or delete a new external table.

  1. Create a role named BlogSpectrumRole.
  2. Edit the following two JSON files, which have the IAM policies according to the bucket and prefix used in this post, as needed, and attach the policies to the role you created:
    S3-Lakehouse-Policy.JSON

    	{
    	    "Version": "2012-10-17",
    	    "Statement": [
    	        {
    	            "Sid": "VisualEditor0",
    	            "Effect": "Allow",
    	            "Action": "s3:*",
    	            "Resource": [
    	                "arn:aws:s3:::rs-lakehouse-blog-post",
    	                "arn:aws:s3:::rs-lakehouse-blog-post/extract_longterm/*",
    	                "arn:aws:s3:::rs-lakehouse-blog-post/extract_midterm/*",
    	                "arn:aws:s3:::rs-lakehouse-blog-post/extract_shortterm/*",
    	    "arn:aws:s3:::rs-lakehouse-blog-post/accesslogs/*"
    	            ]
    	        }
    	    ]
    	}

    Glue-Lakehouse-Policy.JSON

    {
    	"Version": "2012-10-17",
    	"Statement": [
    		{
    			"Sid": "VisualEditor0",
    			"Effect": "Allow",
    			"Action": [
    				"glue:CreateDatabase",
    				"glue:DeleteDatabase",
    				"glue:GetDatabase",
    				"glue:GetDatabases",
    				"glue:UpdateDatabase",
    				"glue:CreateTable",
    				"glue:DeleteTable",
    				"glue:BatchDeleteTable",
    				"glue:UpdateTable",
    				"glue:GetTable",
    				"glue:GetTables",
    				"glue:BatchCreatePartition",
    				"glue:CreatePartition",
    				"glue:DeletePartition",
    				"glue:BatchDeletePartition",
    				"glue:UpdatePartition",
    				"glue:GetPartition",
    				"glue:GetPartitions",
    				"glue:BatchGetPartition"
    				],
    			"Resource": [
    				"*"
    			]
    		}
    	]
    }

Now you create a single-node Amazon Redshift cluster based on a DC2.large instance type, attaching the newly created IAM role BlogSpectrumRole.

  1. On the Amazon Redshift console, choose Create cluster.
  2. Keep the default cluster identifier redshift-cluster-1.
  3. Choose the node type DC2.large.
  4. Set the configuration to single node.
  5. Keep the default dbname and ports.
  6. Set a primary user password.
  7. Choose Create cluster.
  8. After the cluster is configured, check the attached IAM role on the Properties tab for the cluster.
  9. Take note of the IAM role ARN, because you use it to create external tables.

Copying data into Amazon Redshift

To connect to Amazon Redshift, you can use a free client like SQL Workbench/J or use the AWS console embedded query editor with the previously created credentials.

  1. Create a table according to the dataset schema using the following DDL statement:
    	create table greentaxi(
    	vendor_id integer,
    	pickup timestamp,
    	dropoff timestamp,
    	storeandfwd char(2),
    	ratecodeid integer,
    	pulocid integer,
    	dolocid integer,
    	passenger_count integer,
    	trip_dist real,
    	fare_amount real,
    	extra real,
    	mta_tax real,
    	tip_amount real,
    	toll_amount real,
    	ehail_fee real,
    	improve_surch real,
    	total_amount real,
    	pay_type integer,
    	trip_type integer,
    	congest_surch real
    );

The most efficient method to load data into Amazon Redshift is using the COPY command, because it uses the distributed architecture (each slice can ingest one file at the same time).

  1. Load year 2020 data from January to June in the Amazon Redshift table with the following command (replace the IAM role value with the one you created earlier):
    copy greentaxi from ‘s3://nyc-tlc/trip data/green_tripdata_2020’ 
    	iam_role ‘arn:aws:iam::123456789012:role/BlogSpectrumRole’ 
    	delimiter ‘,’ 
    	CSV 
    	dateformat ‘auto’
    	region ‘us-east-1’ 
    ignoreheader as 1;

Now the greentaxi table includes all records starting from January 2019 to June 2020. You’re now ready to leverage Redshift Spectrum and S3 storage classes to save costs.

Extracting data from Amazon Redshift

You perform the next steps using the AWS Command Line Interface (AWS CLI). For download and installation instructions, see Installing, updating, and uninstalling the AWS CLI version 2.

Use the AWS CONFIGURE command to set the access key and secret access key of your IAM user and your selected AWS Region (same as your S3 buckets) of your Amazon Redshift cluster.

In this section, we evaluate two different use cases:

  • New customer – As a new customer, you don’t have much Amazon Redshift old data, and want to extract only the oldest monthly data and apply a lifecycle policy based on creation date. Storage tiering only affects future data and is fully automated.
  • Old customer – In this use case, you come from a multi-year data growth, and need to move existing Amazon Redshift data to different storage classes. In addition, you want a fully automated solution but with the ability to override and decide what and when to transition data between S3 storage classes. This requirement is due to many factors, like the GDPR rule “right to be forgotten.” You may need to edit historical data to remove specific customer records, which changes the file creation date. For this reason, you need S3 Lifecycle rules based on tagging instead of creation date.

New customer use case

The UNLOAD command uses the result of an embedded SQL query to extract data from Amazon Redshift to Amazon S3, producing different file formats such as CSV, text, and Parquet. To extract data from January 2019, complete the following steps:

  1. Create a destination bucket like the following:
    aws s3 mb s3://rs-lakehouse-blog-post

  2. Create a folder named archive in the destination bucket rs-lakehouse-blog-post:
    aws s3api put-object --bucket rs-lakehouse-blog-post -–key archive

  3. Use the following SQL code to implement the UNLOAD statement. The SELECT on Data data types requires quoting as well as a SELECT statement embedded in the UNLOAD command:
    unload ('select * from greentaxi where pickup between ''2019-01-01 00:00:00'' and ''2019-01-30 23:59:59''')
    to 's3://rs-lakehouse-blog-post/archive/5to12years_taxi'
    iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole'; 

    1. You can perform a check with the AWS CLI:
      aws s3 ls s3://rs-lakehouse-blog-post/archive/
      2020-10-12 14:49:51          0 
      2020-11-04 09:51:00   34792620 5to12years_taxi0000_part_00
      2020-11-04 09:51:00   34792738 5to12years_taxi0001_part_00

  • The output shows that the UNLOAD statement generated two files of 33 MB each. By default, UNLOAD generates at least one file for each slice in the Amazon Redshift cluster. My cluster is a single node with DC2 type instances with two slices. This default file format is text, which is not storage optimized.

    To simplify the process, you create a single file for each month so that you can later apply lifecycle rules to each file. In real-world scenarios, extracting data with a single file isn’t the best practice in terms of performance optimization. This is just to simplify the process for the purpose of this post.

    1. Create your files with the following code:
      unload ('select * from greentaxi where pickup between ''2019-01-01 00:00:00'' and ''2019-01-31 23:59:59''')
      to 's3://rs-lakehouse-blog-post/archive/green_tripdata_2019-01'
      iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;

    The output of the UNLOAD commands is a single file (per month) in Parquet format, which takes 80% less space than the previous unload. This is important to save costs related to both Amazon S3 and Glacier, but also for costs associated to Redshift Spectrum queries, which is billed by amount of data scanned.

    1. You can check how efficient Parquet is compared to text format:
      aws s3 ls s3://rs-lakehouse-blog-post/archive/

      2020-08-12 17:17:42   14523090 green_tripdata_2019-01000.parquet 

    1. Clean up previous the text files:
      aws s3 rm s3://rs-lakehouse-blog-post/ --recursive \
      --exclude "*" --include "archive/5to12*"

      The next step is creating a lifecycle rule based on creation date to automate the migration to S3-IA after 12 months and to Glacier after 15 months. The proposed policy name is 12IA-15Glacier and it’s filtered on the prefix archive/.

    1. Create a JSON file containing the lifecycle policy definition named json:
      {
      	"Rules": [
      		{
      			"ID": "12IA-15Glacier",
      			"Filter": {
      				"Prefix": "archive"
      				},
      			"Status": "Enabled",
      			"Transitions": [
      			{
      				"Days": 365,
      				"StorageClass": "STANDARD_IA"
      			},
      			{
      	                    	"Days": 548,
      	                    	"StorageClass": "GLACIER"
      		}    
      		  ]
      }

    1. Run the following command to send the JSON file to Amazon S3:
      aws s3api put-bucket-lifecycle-configuration \ 
      --bucket rs-lakehouse-blog-post \ 
      --lifecycle-configuration file://lifecycle.json

    This lifecycle policy migrates all keys in the archive prefix from Amazon S3 to S3-IA after 12 months and from S3-IA to Glacier after 15 months. For example, if today were 2020-09-12, and you unload the 2020-03 data to Amazon S3, by 2021-09-12, this 2020-03 data is automatically migrated to S3-IA.

    If using this basic use case, you can skip the partition steps in the section Defining the external schema and external tables.

    Old customer use case

    In this use case, you extract data with different ageing in the same time frame. You extract all data from January 2019 to February 2019 and, because we assume that you aren’t using this data, archive it to S3 Glacier.

    Data from March 2019 to May 2019 is migrated as an external table on S3-IA, and data from June 2019 to November 2019 is migrated as an external table to S3 Standard. With this approach, you comply with customer long-term retention policies and regulations, and reduce TCO.

    You implement the retention strategy described in the Simulated use case and retention policy section.

    1. Create a destination bucket (if you also walked through the first use case, use a different bucket):
      aws s3 mb s3://rs-lakehouse-blog-post

    2. Create three folders named extract_longtermextract_midterm, and extract_shortterm in the destination bucket rs-lakehouse-blog-post. The following code is the syntax for creating the extract_longterm folder:
      aws s3api put-object --bucket rs-lakehouse-blog-post –key

    3. Extract the data:
      unload ('select * from greentaxi where pickup between ''2019-01-01 00:00:00'' and ''2019-01-31 23:59:59''')
      	to 's3://rs-lakehouse-blog-post/extract_longterm/green_tripdata_2019-01'
      iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;

    4. Repeat these steps for the February 2019 time frame.

    Managing data ageing with Amazon S3 storage classes and lifecycle policies

    In this section, you manage your data with storage classes and lifecycle policies.

    1. Migrate your keys in Parquet format to Amazon Glacier:
      aws s3api copy-object \
      --copy-source rs-lakehouse-blog-post/extract_longterm/green_tripdata_2019-01000.parquet \
      --storage-class GLACIER \
      --bucket rs-lakehouse-blog-post \
      --key extract_longterm/green_tripdata_2019-01000.parquet
      	 
      aws s3api copy-object \
      --copy-source rs-lakehouse-blog-post/extract_longterm/green_tripdata_2019-02000.parquet \
      --storage-class GLACIER \
      --bucket rs-lakehouse-blog-post \
      --key extract_longterm/green_tripdata_2019-02000.parquet

    2. Extract the data from March 2019 to May 2019 (months 12–15) and migrate them to S3-IA. The following code is for March:
      unload ('select * from greentaxi where pickup between ''2019-03-01 00:00:00'' and ''2019-03-31 23:59:59''')
      to 's3://rs-lakehouse-blog-post/extract_midterm/03/green_tripdata_2019-03'
      iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;

    3. Repeat the previous step for April and May.
    4. Migrate all three months to S3-IA using same process as before. The following code is for March:
      aws s3api copy-object \
      --copy-source rs-lakehouse-blog-post/extract_midterm/03/green_tripdata_2019-03000.parquet \
      --storage-class STANDARD_IA \ 8. --bucket rs-lakehouse-blog-post \
      --key extract_midterm/03/green_tripdata_2019-03000.parquet 

    5. Do the same for other two months.
    6. Check the newly applied storage class with the following AWS CLI command:
      aws s3api head-object \
      --bucket rs-lakehouse-blog-post \
      --key extract_midterm/03/green_tripdata_2019-03000.parquet
       
      {
          "AcceptRanges": "bytes",
          "LastModified": "2020-10-12T13:47:32+00:00",
          "ContentLength": 14087514,
          "ETag": "\"15bf39e6b3f32b10ef589d75e0988ce6\"",
          "ContentType": "application/x-www-form-urlencoded; charset=utf-8",
          "Metadata": {},
          "StorageClass": "STANDARD_IA"
      }

    In the next step, you tag every monthly file with a key value named ageing set to the number of months elapsed from the origin date.

    1. Set March to 14, April to 13, and May to 12:
      aws s3api put-object-tagging \
      --bucket rs-lakehouse-blog-post \
      --key extract_midterm/03/green_tripdata_2019-03000.parquet \
      --tagging '{"TagSet": [{ "Key": "ageing", "Value": "14"} ]}'
      	 
      aws s3api put-object-tagging \
      --bucket rs-lakehouse-blog-post \
      --key extract_midterm/04/green_tripdata_2019-04000.parquet \
      --tagging '{"TagSet": [{ "Key": "ageing", "Value": "13"} ]}'
      	 
      aws s3api put-object-tagging \
      --bucket rs-lakehouse-blog-post \
      --key extract_midterm/05/green_tripdata_2019-05000.parquet \
      --tagging '{"TagSet": [{ "Key": "ageing", "Value": "12"} ]}'

    In this set of three objects, the oldest file has the tag ageing set to value 14, and the newest is set to 12. In the second post in this series, you discover how to manage the ageing tag as it increases month by month.

    The next step is to create a lifecycle rule based on this specific tag in order to automate the migration to Glacier at month 15. The proposed policy name is 15IAtoGlacier and the definition is to limit the scope to only object with the tag ageing set to 15 in the specific bucket.

    1. Create a JSON file containing the lifecycle policy definition named json:
      {
          "Rules": [
              {
                  "ID": "15IAtoGlacier",
                  "Filter": {
                      "Tag": {
                          "Key": "ageing",
                          "Value": "15"
                      }
                  },
                  "Status": "Enabled",
                  "Transitions": [
                      {
                          "Days": 1,
                          "StorageClass": "GLACIER"
                      }
                  ]
              }
          ]
      }

      
      

    2. Run the following command to send the JSON file to Amazon S3:
      aws s3api put-bucket-lifecycle-configuration \
      --bucket rs-lakehouse-blog-post \
      --lifecycle-configuration file://lifecycle.json 

    This lifecycle policy migrates all objects with the tag ageing set to 15 from S3-IA to Glacier.

    Though I described this process as automating the migration, I actually want to control the process from the application level using the self-managed tag mechanism. I use this approach because otherwise, the transition is based on file creation date, and the objective is to be able to delete, update, or create a new file whenever needed (for example, to delete parts of records in order to comply to the GDPR “right to be forgotten” rule).

    Now you extract all data from June 2019 to November 2019 (7–11 months old) and keep them in Amazon S3 with a lifecycle policy to automatically migrate to S3-IA after ageing 12 months, using same process as described. These six new objects also inherit the rule created previously to migrate to Glacier after 15 months. Finally, you set the ageing tag as described before.

    Use the extract_shortterm prefix for these unload operations.

    1. Unload June 2019 with the following code:
      unload ('select * from greentaxi where pickup between ''2019-06-01 00:00:00 '' and ''2019-06-30 23:59:59''')
      to 's3://rs-lakehouse-blog-post/extract_shortterm/06/green_tripdata_2019-06'
      iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;

    2. Use the same logic for the remaining months up to October.
    3. For November, see the following code:
      	unload ('select * from greentaxi where pickup between ''2019-11-01 00:00:00'' and ''2019-11-30 23:59:59''')
      	to 's3://rs-lakehouse-blog-post/extract_shortterm/11/green_tripdata_2019-11''
      	iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;
      
      aws s3 ls –recursive s3://rs-lakehouse-blog-post/extract_shortterm/
      2020-10-12 14:52:11          0 extract_shortterm/
      2020-10-12 18:45:42          0 extract_shortterm/06/
      2020-10-12 18:46:49   10889436 extract_shortterm/06/green_tripdata_2019-06000.parquet
      2020-10-12 18:45:53          0 extract_shortterm/07/
      2020-10-12 18:47:03   10759747 extract_shortterm/07/green_tripdata_2019-07000.parquet
      2020-10-12 18:45:58          0 extract_shortterm/08/
      2020-10-12 18:47:24    9947793 extract_shortterm/08/green_tripdata_2019-08000.parquet
      2020-10-12 18:46:03          0 extract_shortterm/09/
      2020-10-12 18:47:45   10302432 extract_shortterm/09/green_tripdata_2019-09000.parquet
      2020-10-12 18:46:08          0 extract_shortterm/10/
      2020-10-12 18:48:00   10659857 extract_shortterm/10/green_tripdata_2019-10000.parquet
      2020-10-12 18:46:11          0 extract_shortterm/11/
      2020-10-12 18:48:14   10247201 extract_shortterm/11/green_tripdata_2019-11000.parquet

      aws s3 ls --recursive rs-lakehouse-blog-post/extract_longterm/

      2020-10-12 14:49:51          0 extract_longterm/
      2020-10-12 14:56:38   14403710 extract_longterm/green_tripdata_2019-01000.parquet
      2020-10-12 15:30:14   13454341 extract_longterm/green_tripdata_2019-02000.parquet

    4. Apply the tag ageing with range 11 to 6 (June 2019 to November 2019), using either the AWS CLI or console if you prefer.
    5. Create a new lifecycle rule named 12S3toS3IA, which transitions from Amazon S3 to S3-IA.
    6. With the AWS CLI, create a JSON file that includes the previously defined rule 15IAtoGlacier and new 12S3toS3IA, because the command s3api overwrites the current configuration (no incremental approach) with the new policy definition file (JSON). The following code is the new lifecycle.json:
      {
          "Rules": [
              {
                  "ID": "12S3toS3IA",
                  "Filter": {
                      "Tag": {
                          "Key": "ageing",
                          "Value": "12"
                      }
                  },
                  "Status": "Enabled",
                  "Transitions": [
                      {
                          "Days": 30,
                          "StorageClass": "STANDARD_IA"
                      }
                  ]
              },
              {
                  "ID": "15IAtoGlacier",
                  "Filter": {
                      "Tag": {
                          "Key": "ageing",
                          "Value": "15"
                      }
                  },
                  "Status": "Enabled",
                  "Transitions": [
                      {
                          "Days": 1,
                          "StorageClass": "GLACIER"
                      }
                  ]
              }
          ]
      }

      aws s3api get-bucket-lifecycle-configuration \
      --bucket rs-lakehouse-blog-post

    7. Check the applied policies with the following command:
      aws s3api get-bucket-lifecycle-configuration \
      --bucket rs-lakehouse-blog-post

  • You get in stdout a single JSON with merge of 15IAtoGlacier and 12S3toS3IA.

    Defining the external schema and external tables

    Before deleting the records you extracted from Amazon Redshift with the UNLOAD command, we define the external schema and external tables to enable Redshift Spectrum queries for these Parquet files.

    1. Enter the following code to create your schema:
      create external schema taxispectrum
      	from data catalog
      	database 'blogdb'
      	iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole'
      create external database if not exists;

    2. Create the external table taxi_archive in the taxispectrum external schema. If you’re walking through the new customer use case, replace the prefix extract_midterm with archive:
      create external table taxispectrum.taxi_archive(
      	vendor_id integer,
      	pickup timestamp,
      	dropoff timestamp,
      	storeandfwd char(2),
      	ratecodeid integer,
      	pulocid integer,
      	dolocid integer,
      	passenger_count integer,
      	trip_dist real,
      	fare_amount real,
      	extra real,
      	mta_tax real,
      	tip_amount real,
      	toll_amount real,
      	ehail_fee real,
      	improve_surch real,
      	total_amount real,
      	pay_type integer,
      	trip_type integer,
      	congest_surch real)
      	partitioned by (yearmonth char(7))
      	stored as parquet
      location 's3://rs-lakehouse-blog-post/extract_midterm/'

    3. Add the six files stored in Amazon S3 and three files stored in S3-IA as partitions (if you’re walking through the new customer use case, you can skip the following partitioning steps). The following code shows March and April:
      ALTER TABLE taxispectrum.taxi_archive
      ADD PARTITION (yearmonth='2019-03') 
      LOCATION 's3://rs-lakehouse-blog-post/extract_midterm/03/';
      ALTER TABLE taxispectrum.taxi_archive
      ADD PARTITION (yearmonth='2019-04') 
      LOCATION 's3://rs-lakehouse-blog-post/extract_midterm/04/';

    4. Continue this process up to December 2019, using extract_shortterm instead of extract_midterm.
    5. Check the table isn’t empty with the following SQL statement:
      Select count (*) from taxispectrum.taxi_archive 

    You get the number of entries in this external table.

    1. Optionally, you can check the partitions mapped to this table with a query to the Amazon Redshift internal table:
      select * from svv_external_partitions

      Run a SELECT command using partitioning in order to optimize costs related to Redshift Spectrum scanning:

      select * from taxispectrum.taxi_archive where yearmonth='2019-11' and fare_amount > 20

    2. Redshift Spectrum scans only specific partitions matching yearmonth.

    The final step is cleaning all the records extracted from the Amazon Redshift local tables:

    delete from public.greentaxi where pickup between '2019-01-01 00:00:00' and '2019-11-30 23:59:59'

    Conclusion

    We demonstrated how to extract historical data from Amazon Redshift and implement an archive strategy with Redshift Spectrum and Amazon S3 storage classes. In addition, we showed how to optimize Redshift Spectrum scans with partitioning.

    In the next post in this series, we show how to operate this solution day by day, especially for the old customer use case, and share some best practices.


    About the Authors

    Cristian Gavazzeni is a senior solution architect at Amazon Web Services. He has more than 20 years of experience as a pre-sales consultant focusing on Data Management, Infrastructure and Security. During his spare time he likes eating Japanese food and travelling abroad with only fly and drive bookings.

     

     

    Francesco MarelliFrancesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

    Run Apache Spark 3.0 workloads 1.7 times faster with Amazon EMR runtime for Apache Spark

    Post Syndicated from Al MS original https://aws.amazon.com/blogs/big-data/run-apache-spark-3-0-workloads-1-7-times-faster-with-amazon-emr-runtime-for-apache-spark/

    With Amazon EMR release 6.1.0, Amazon EMR runtime for Apache Spark is now available for Spark 3.0.0. EMR runtime for Apache Spark is a performance-optimized runtime for Apache Spark that is 100% API compatible with open-source Apache Spark.

    In our benchmark performance tests using TPC-DS benchmark queries at 3 TB scale, we found EMR runtime for Apache Spark 3.0 provides a 1.7 times performance improvement on average, and up to 8 times improved performance for individual queries over open-source Apache Spark 3.0.0. With Amazon EMR 6.1.0, you can now run your Apache Spark 3.0 applications faster and cheaper without requiring any changes to your applications.

    Results observed using TPC-DS benchmarks

    To evaluate the performance improvements, we used TPC-DS benchmark queries with 3 TB scale and ran them on a 6-node c4.8xlarge EMR cluster with data in Amazon Simple Storage Service (Amazon S3). We ran the tests with and without the EMR runtime for Apache Spark. The following two graphs compare the total aggregate runtime and geometric mean for all queries in the TPC-DS 3 TB query dataset between the Amazon EMR releases.

    The following table shows the total runtime in seconds.

    The following table shows the total runtime in seconds.

    The following table shows the geometric mean of the runtime in seconds.

    The following table shows the geometric mean of the runtime in seconds.

    In our tests, all queries ran successfully on EMR clusters that used the EMR runtime for Apache Spark. However, when using Spark 3.0 without the EMR runtime, 34 out of the 104 benchmark queries failed due to SPARK-32663. To work around these issues, we disabled spark.shuffle.readHostLocalDisk configuration. However, even after this change, queries 14a and 14b continued to fail. Therefore, we chose to exclude these queries from our benchmark comparison.

    The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is illustrated in the following chart. The horizontal axis shows each query in the TPC-DS 3 TB benchmark. The vertical axis shows the speedup of each query due to the EMR runtime. We found a 1.7 times performance improvement as measured by the geometric mean of the per-query speedups, with all queries showing a performance improvement with the EMR Runtime.

    The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is also illustrated in the following chart.

    Conclusion

    You can run your Apache Spark 3.0 workloads faster and cheaper without making any changes to your applications by using Amazon EMR 6.1. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more great Apache Spark optimizations, configuration best practices, and tuning advice.


    About the Authors

    AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

     

     

     

     

    Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

    Securing access to EMR clusters using AWS Systems Manager

    Post Syndicated from Sai Sriparasa original https://aws.amazon.com/blogs/big-data/securing-access-to-emr-clusters-using-aws-systems-manager/

    Organizations need to secure infrastructure when enabling access to engineers to build applications. Opening SSH inbound ports on instances to enable engineer access introduces the risk of a malicious entity running unauthorized commands. Using a Bastion host or jump server is a common approach used to allow engineer access to Amazon EMR cluster instances by enabling SSH inbound ports. In this post, we present a more secure way to access your EMR cluster launched in a private subnet that eliminates the need to open inbound ports or use a Bastion host.

    We strive to answer the following three questions in this post:

    1. Why use AWS Systems Manager Session Manager with Amazon EMR?
    2. Who can use Session Manager?
    3. How can Session Manager be configured on Amazon EMR?

    After answering these questions, we will walk you through configuring Amazon EMR with Session Manager and creating an AWS Identity and Access Management (IAM) policy to enable Session Manager capabilities on Amazon EMR. We also walk you through the steps required to configure secure tunneling to access Hadoop application web interfaces such as YARN Resource Manager and, Spark Job Server.

    Creating an IAM role

    AWS Systems Manager provides a unified user interface so you can view and manage your Amazon Elastic Compute Cloud (Amazon EC2) instances. Session Manager provides secure and auditable instance management. Systems Manager integration with IAM provides centralized access control to your EMR cluster. By default, Systems Manager doesn’t have permissions to perform actions on cluster instances. You must grant access by attaching an IAM role on the instance. Before you get started, create an IAM service role for cluster EC2 instances with the least privilege access policy.

    1. Create an IAM service role (Amazon EMR role for Amazon EC2) for cluster EC2 instances and attach the AWS managed Systems Manager core instance (AmazonSSMManagedInstanceCore) policy.

    1. Create an IAM policy with least privilege to allow the principal to initiate a Session Manager session on Amazon EMR cluster instances:
      {
      "Version": "2012-10-17",
          	"Statement": [
      		{
                  	"Effect": "Allow",
                 "Action": [
                           "ssm:DescribeInstanceProperties",
                           "ssm:DescribeSessions",
          	             "ec2:describeInstances",
           	             "ssm:GetConnectionStatus"
                  	],
                 "Resource": "*"
              	},
              	{
                  	"Effect": "Allow",
                  	"Action": [
                      		"ssm:StartSession"
                  	],
                  	"Resource": ["arn:aws:ec2:${Region}:${Account-Id}:instance/*"],
                      "Condition": {
                      		"StringEquals": { "ssm:resourceTag/ClusterType": [ "QACluster" ] }
                  }
              }
          ]
      }

      
      

    1. Attach the least privilege policy to the IAM principal (role or user).

    How Amazon EMR works with AWS Systems Manager Agent

    You can install and configure AWS Systems Manager Agent (SSM Agent) on Amazon EMR cluster node(s) using bootstrap actions. SSM Agent makes it possible for Session Manager to update, manage and configure these resources. Session Manager is available at no additional cost to manage Amazon EC2 instances, for cost on additional features refer Systems Manager pricing page. The agent processes requests from the Session Manager service in the AWS Cloud, and then runs them as specified in the user request. You can achieve dynamic port forwarding by installing the Systems Manager plug-in on a local computer. IAM policies provide centralized access control on the EMR cluster.

    The following diagram illustrates a high-level integration of AWS Systems Manager interaction with an EMR cluster.

    The following diagram illustrates a high-level integration of AWS Systems Manager interaction with an EMR cluster.

    Configuring SSM Agent on an EMR Cluster:

    To configure SSM Agent on your cluster, complete the following steps:

    1. While launching the EMR cluster, in the Bootstrap Actions section, choose add bootstrap action.
    2. Choose “Custom action”.
    3. Add a bootstrap action to run the following script from Amazon Simple Storage Service (Amazon S3) to install and configure SSM Agent on Amazon EMR cluster instances.

    SSM Agent expects localhost entry in the hosts file to allow traffic redirection from a local computer to the EMR cluster instance when dynamic port forwarding is used.

    SSM Agent expects localhost entry in the hosts file to allow traffic redirection from a local computer to the EMR cluster instance when dynamic port forwarding is used.

    #!/bin/bash
    ## Name: SSM Agent Installer Script
    ## Description: Installs SSM Agent on EMR cluster EC2 instances and update hosts file
    ##
    sudo yum install -y https://s3.amazonaws.com/ec2-downloads-windows/SSMAgent/latest/linux_amd64/amazon-ssm-agent.rpm
    sudo status amazon-ssm-agent >>/tmp/ssm-status.log
    ## Update hosts file
    echo "\n ########### localhost mapping check ########### \n" > /tmp/localhost.log
    lhost=`sudo cat /etc/hosts | grep localhost | grep '127.0.0.1' | grep -v '^#'`
    v_ipaddr=`hostname --ip-address`
    lhostmapping=`sudo cat /etc/hosts | grep $v_ipaddr | grep -v '^#'`
    if [ -z "${lhostmapping}" ];
    then
    echo "\n ########### IP address to localhost mapping NOT defined in hosts files. add now ########### \n " >> /tmp/localhost.log
    sudo echo "${v_ipaddr} localhost" >>/etc/hosts
    else
    echo "\n IP address to localhost mapping already defined in hosts file \n" >> /tmp/localhost.log
    fi
    echo "\n ########### IP Address to localhost mapping check complete and below is the content ########### " >> /tmp/localhost.log
    sudo cat /etc/hosts >> /tmp/localhost.log
    
    echo "\n ########### Exit script ########### " >> /tmp/localhost.log
    1. In the Security Options section, under Permissions, select Custom.
    2. For EMR role, choose IAM role you created.

    For EMR role, choose IAM role you created.

    1. After the cluster successfully launches, on the Session Manager console, choose Managed Instances.
    2. Select your cluster instance
    3. On the Actions menu, choose Start Session

    On the Actions menu, choose Start Session.

    Dynamic port forwarding to access Hadoop applications web UIs

    To gain access to Hadoop applications web UIs such as YARN Resource Manager, Spark Job Server, and more on the Amazon EMR primary node, you create a secure tunnel between your computer and the Amazon EMR primary node using Session Manager. By doing so, you avoid needing to create and manage a SOCKS proxy and other add-ons such as FoxyProxy etc.

    Before configuring port forwarding on your laptop, you must install the System Manager CLI extension (version 1.1.26.0 or more recent).

    When the prerequisites are met, you use the StartPortForwardingSession feature to create secure tunneling onto EMR cluster instances.

    aws ssm start-session --target "Your Instance ID" --document-name AWS-StartPortForwardingSession --parameters "portNumber"=["8080"],"localPortNumber"=["8158"]

    The following code demonstrates port forwarding from your laptop local port [8158] to a remote port [8080] on an EMR instance to access the Hadoop Resource Manager web UI:

    aws ssm start-session --target i-05a3f37cfc08ed176 --document-name AWS-StartPortForwardingSession --parameters '{"portNumber":["8080"], "localPortNumber":["8158"]}'

    Restricting IAM principal access based on Instance Tags

    In a multi-tenant Amazon EMR cluster environment, you can restrict access to Amazon EMR cluster instances based on specific Amazon EC2 tags. In the following example code, the IAM principal (IAM user or role) is allowed to start a session on any instance (Resource: arn:aws:ec2:*:*:instance/*) with the condition that the instance is a QACluster (ssm:resourceTag/ClusterType: QACluster).

    {
        "Version": "2012-10-17",
        "Statement": [
        		{
                	"Effect": "Allow",
                	"Action": [
                          "ssm:DescribeInstanceProperties",
     	     	          "ssm:DescribeSessions",
                           "ec2:describeInstances",
                           "ssm:GetConnectionStatus"
                	],
                	"Resource": "*"
            	},
            	{
                	"Effect": "Allow",
                	"Action": [ "ssm:StartSession" ],
                	"Resource": [ "arn:aws:ec2:${Region}:${Account-Id}:instance/*" ],
                	"Condition": {
                    		"StringEquals": { "aws:username": "${aws:username}"
                    },
                    		"StringLike": {
                        		"ssm:resourceTag/ClusterType": [ "QACluster" ]
                    }
                }
            }
        ]
    
    }

    
    

    If the IAM principal initiates a session to an instance that isn’t tagged or that has any tag other than ClusterType: QACluster, the execution results show is not authorized to perform ssm:StartSession.

    Restricting access to root-level commands on instance

    You can change the default user login behavior to restrict elevated permissions (root login) on a given user’s session. By default, sessions are launched using the credentials of a system-generated ssm-user. You can instead launch sessions using credentials of an operating system account by tagging an IAM user or role with the tag key SSMSessionRunAs or specify an operating system user name. Updates to Session Manager preferences enables this support.

    The following screenshots show a configuration for the IAM user appdev2, who is always allowed to start a session with ec2-user instead of the default ssm-user.

    The following screenshots show a configuration for the IAM user appdev2, who is always allowed to start a session with ec2-user instead of the default ssm-user

    Conclusion

    Amazon EMR with Session Manager can greatly improve your confidence in security and audit posture by centralizing access control and mitigating risk of managing access keys and inbound ports. It also reduces the overall cost, because as you get free from intermediate Bastion hosts.


    About the Authors

    Sai Sriparasa is a Sr. Big Data & Security Consultant with AWS Professional Services. He works with our customers to provide strategic and tactical big data solutions with an emphasis on automation, operations, governance & security on AWS. In his spare time, he follows sports and current affairs.

     

     

     

    Ravi Kadiri is a security data architect at AWS, focused on helping customers build secure data lake solutions using native AWS security services. He enjoys using his experience as a Big Data architect to provide guidance and technical expertise on Big Data & Analytics space. His interests include staying fit, traveling and spend time with friends & family.

    Building complex workflows with Amazon MWAA, AWS Step Functions, AWS Glue, and Amazon EMR

    Post Syndicated from Dipankar Ghosal original https://aws.amazon.com/blogs/big-data/building-complex-workflows-with-amazon-mwaa-aws-step-functions-aws-glue-and-amazon-emr/

    Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS and build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

    You can use AWS Step Functions as a serverless function orchestrator to build scalable big data pipelines using services such as Amazon EMR to run Apache Spark and other open-source applications on AWS in a cost-effective manner, and use AWS Glue for a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs

    For production pipelines, a common use case is to read data originating from a variety of sources. This data requires transformation to extract business value and generate insights before sending to downstream applications, such as machine learning algorithms, analytics dashboards, and business reports.

    This post demonstrates how to use Amazon MWAA as a primary workflow management service to create and run complex workflows and extend the directed acyclic graph (DAG) to start and monitor a state machine created using Step Functions. In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

    Architectural overview

    The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. This workflow uses Amazon EMR to preprocess data and starts a Step Functions state machine. The state machine transforms data using AWS Glue.

    The state machine transforms data using AWS Glue.

    The workflow includes the following core components:

    1. Airflow Scheduler triggers the DAG based on a schedule or manually.
    2. DAG uses PythonOperator to create an EMR cluster and waits for the cluster creation process to complete.
    3. DAG uses a custom operator EmrSubmitAndMonitorStepOperator to submit and monitor the Amazon EMR step.
    4. DAG uses PythonOperator to stop the EMR cluster when the preprocessing tasks are complete.
    5. DAG starts a Step Functions state machine and monitors it for completion using PythonOperator.

    You can build complex ETL pipelines with Step Functions separately and trigger them from an Airflow DAG.

    Prerequisites

    Before starting, create an Amazon MWAA environment. If this is your first time using Amazon MWAA, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

    Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs. It’s located on the environment details page on the Amazon MWAA console.

    Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs.

    Also note the AWS Identity and Access Management (IAM) execution role. This role should be modified to allow MWAA to read and write from your S3 bucket, submit an Amazon EMR step, start a Step Functions state machine, and read from the AWS Systems Manager Parameter Store. The IAM role is available in the Permissions section of the environment details.

    The IAM role is available in the Permissions section of the environment details.

    The solution references Systems Manager parameters in an AWS CloudFormation template and scripts. For information on adding and removing IAM identity permissions, see Adding and removing IAM identity permissions. A sample IAM policy is also provided in the GitHub repository amazon-mwaa-complex-workflow-using-step-functions.

    For this post, we use the MovieLens dataset. We concurrently convert the MovieLens CSV files to Parquet format and save them to Amazon S3 as part of preprocessing.

    Setting up the state machine using Step Functions

    Our solution extends the ETL pipeline to run a Step Functions state machine from the Airflow DAG. Step Functions lets you build visual workflows that enable fast translation of business requirements into technical requirements. With Step Functions, you can set up dependency management and failure handling using a JSON-based template. A workflow is a series of steps, such as tasks, choices, parallel runs, and timeouts with the output of one step acting as input into the next. For more information about other use cases, see AWS Step Functions Use Cases.

    The following diagram shows the ETL process set up through a Step Functions state machine.

    The following diagram shows the ETL process set up through a Step Functions state machine.

    In the workflow, the Process Data step runs an AWS Glue job, and the Get Job Status step periodically checks for the job completion. The AWS Glue job reads the input datasets and creates output data for the most popular movies and top-rated movies. After the job is complete, the Run Glue Crawler step runs an AWS Glue crawler to catalog the data. The workflow also allows you to monitor and respond to failures at any stage.

    Creating resources

    Create your resources by following the installation instructions provided in the amazon-mwaa-complex-workflow-using-step-functions README.md.

    Running the ETL workflow

    To run your ETL workflow, complete the following steps:

    1. On the Amazon MWAA console, choose Open Airflow UI.
    2. Locate the mwaa_movielens_demo DAG.
    3. Turn on the DAG.

    Turn on the DAG.

    1. Select the mwaa_movielens_demo DAG and choose Graph View.

    This displays the overall ETL pipeline managed by Airflow.

    This displays the overall ETL pipeline managed by Airflow.

    1. To view the DAG code, choose Code.

    To view the DAG code, choose Code.

    The code for the custom operator can be found in the amazon-mwaa-complex-workflow-using-step-functions GitHub repo. 

    1. From the Airflow UI, select the mwaa_movielens_demo DAG and choose Trigger DAG.
    2. Leave the Optional Configuration JSON box blank.

    Leave the Optional Configuration JSON box blank.

    When the Airflow DAG runs, the first task calls the PythonOperator to create an EMR cluster using Boto3. Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3. Boto provides object-oriented API, as well as low-level access to AWS services.

    The second task waits until the EMR cluster is ready and in the Waiting state. As soon as the cluster is ready, the data load task runs, followed by the data preprocessing tasks, which are started in parallel using EmrSubmitAndMonitorStepOperator. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. You can change the concurrency of Amazon EMR to run multiple Amazon EMR steps in parallel.

    When the data preprocessing tasks are complete, the EMR cluster is stopped and the DAG starts the Step Functions state machine to initiate data transformation.

    The final task in the DAG monitors the completion of the Step Functions state machine.

    The DAG run should complete in approximately 10 minutes.

    Verifying the DAG run

    While the DAG is running, you can view the task logs.

    1. From Graph View, select any task and choose View Log.

    From Graph View, select any task and choose View Log.

    1. When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

    When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

    1. You can also monitor ETL process completion from the Airflow UI.

    You can also monitor ETL process completion from the Airflow UI.

    1. On the Airflow UI, verify the completion from the log entries.

    On the Airflow UI, verify the completion from the log entries.

    Querying the data

    After the successful completion of the Airflow DAG, two tables are created in the AWS Glue Data Catalog. To query the data with Amazon Athena, complete the following steps:

    1. On the Athena console, choose Databases.
    2. Select the mwaa-movielens-demo-db database.

    You should see the two tables. If the tables aren’t listed, verify that the AWS Glue crawler run is complete and that the console is showing the correct Region.

    1. Run the following query:
      SELECT * FROM "mwaa-movielens-demo-db"."most_popular_movies" limit 10;

    The following screenshot shows the output.

    The following screenshot shows the output.

    Cleaning up

    To clean up the resources created as part of our CloudFormation template, delete the mwaa-demo-foundations stack. You can either use the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI).

    Conclusion

    In this post, we used Amazon MWAA to orchestrate an ETL pipeline on Amazon EMR and AWS Glue with Step Functions. We created an Airflow DAG to demonstrate how to run data processing jobs concurrently and extended the DAG to start a Step Functions state machine to build a complex ETL pipeline. A custom Airflow operator submitted and then monitored the Amazon EMR steps synchronously.

    If you have comments or feedback, please leave them in the comments section.


    About the Author

    Dipankar GhosalDipankar Ghosal is a Sr Data Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

    Introducing Amazon EMR integration with Apache Ranger

    Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-integration-with-apache-ranger/

    Data security is an important pillar in data governance. It includes authentication, authorization , encryption and audit.

    Amazon EMR enables you to set up and run clusters of Amazon Elastic Compute Cloud (Amazon EC2) instances with open-source big data applications like Apache Spark, Apache Hive, Apache Flink, and Presto. You may also want to set up multi-tenant EMR clusters where different users (or teams) can use a shared EMR cluster to run big data analytics workloads. In a multi-tenant cluster, it becomes important to set up mechanisms for authentication (determine who is invoking the application and authenticate the user), authorization (set up who has access to what data), and audit (maintain a log of who accessed what data).

    Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka.

    We’re happy to share that starting with Amazon EMR 5.32, we’re including plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon Simple Storage Service (Amazon S3), and Apache Hive.

    You can set up a multi-tenant EMR cluster, use Kerberos for user authentication, use Apache Ranger 2.0 (managed separately outside the EMR cluster) for authorization, and configure fine-grained data access policies for databases, tables, columns, and S3 objects. In this post, we explain how you can set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. We show how you can set up multiple short-running and long-running EMR clusters with a single, centralized Apache Ranger server that maintains data access control policies.

    Managed Apache Ranger plugins for PrestoSQL and PrestoDB will soon follow.

    You should consider this solution if one or all of these apply:

    • Have experience setting up and managing Apache Ranger admin server (needs to be self-managed)
    • Want to port existing Apache Ranger Hive policies over to Amazon EMR
    • Need to use the database-backed Hive Metastore and can’t use the AWS Glue Data Catalogdue to limitations
    • Require authorization support for Apache Spark (SQL and storage and file access) and Amazon S3
    • Store Apache Ranger authorization audits in Amazon Cloudwatch, avoiding the need to maintain an Apache Solr infrastructure

    With this native integration, you use the Amazon EMR security configuration to specify Apache Ranger details, without the need for custom bootstrap scripts. You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

    You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

    The following image shows table and column-level access set up for Apache SparkSQL.

    Additionally, SSH users are blocked from getting AWS Identity and Access Management (IAM) permissions tied with the Amazon EMR instance profiles. This disables  access to Amazon S3 using tools like the AWS Command Line Interface(AWS CLI).

    The following screenshot that shows access to Amazon S3 blocked when using AWS CLI.

    The following screenshot that shows access to Amazon S3 blocked when using AWS CLI. 

    The following screenshots shows how access to the same Amazon S3 location is set up and used through EMRFS (default EMR file system implementation for reading and writing files from Amazon S3).

    Prerequisites

    Before getting started, you must have the following prerequisites:

    • Self-managed Apache Ranger server (2.x only) outside of an EMR cluster
    • TLS mutual authentication enabled between Apache Ranger server and Apache Ranger plugins running on the EMR cluster
    • Additional IAM roles:
      • IAM role for Apache Ranger– Defines privileges that trusted processes have when submitting Spark and Hive jobs
      • IAM role for other AWS services– Defines privileges that end-users have when accessing services that aren’t protected by Apache Ranger plugins.
    • Updates to the Amazon EC2 EMR role:
    • New Apache Ranger service definitions installed for Apache Spark and Amazon S3
    • Apache Ranger server certificate and private key for plugins uploaded into Secrets Manager
    • A CloudWatch log group for Apache Ranger audits

    Architecture overview

    The following diagram illustrates the architecture for this solution.

    The following diagram illustrates the architecture for this solution.

    In the architecture, the Amazon EMR secret agent intercepts user requests and vends credentials based on user and resources. The Amazon EMR record server receives requests to access data from Spark, reads data from Amazon S3, and returns filtered data based on Apache Ranger policies.

    See Amazon EMR Components to learn more about Amazon EMR Secret Agent and Record Server.

    Setting up your resources

    In this section, we walk you through setting up your resources manually.

    If you want to use CloudFormation scripts to automate the setup, see the section Setting up your architecture with CloudFormation later in this post.

    Uploading SSL private keys and certificates to Secrets Manager

    Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see this script.

    Uploading SSL private keys and certificates to Secrets Manager

    Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see the script create-tls-certs.sh.

    Setting up an Apache Ranger server

    You need to set up a two-way SSL-enabled Apache Ranger server. To set up the server manually, refer to the script install-ranger-admin-server.sh.

    Installing Apache Ranger service definitions

    In this section, we review installing the Apache Ranger service definitions for Apache Spark and Amazon S3.

    Apache Spark

    To add a new Apache Ranger service definition, see the following script:

    mkdir /tmp/emr-spark-plugin/
    cd /tmp/emr-spark-plugin/
    
    # Download the Service definition
    wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-spark.json
    
    # Download Service implementation jar/class
    wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-spark-plugin-2.x.jar
    
    # Copy Service implementation jar to Ranger server
    export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
    mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark
    mv ranger-spark-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark
    
    # Add the service definition using the Ranger REST API
    curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-spark.json \
    -H "Accept: application/json" \
    -H "Content-Type: application/json" \
    -k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

    This script is included in the Apache Ranger server setup script, if you’re deploying resources with the CloudFormation template.

    The policy definition is similar to Apache Hive, except that the actions are limited to select only. The following screenshot shows the definition settings.

    The following screenshot shows the definition settings.

    To change permissions, for the user, choose select.

    To change permissions, for the user, choose select.

    Amazon S3 (via Amazon EMR File System)

    Similar to Apache Spark, we have a new Apache Ranger service definition for Amazon S3. See the following script:

    mkdir /tmp/emr-emrfs-s3-plugin/
    cd /tmp/emr-emrfs-s3-plugin/
    
    # Download the Service definition
    wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-emrfs.json
    
    # Download Service implementation jar/class
    wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-emr-emrfs-plugin-2.x.jar
    
    # Copy Service implementation jar to Ranger server
    export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
    mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs
    mv ranger-emrfs-s3-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs 
    
    # Add the service definition using the Ranger REST API
    curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-emrfs.json \
    -H "Accept: application/json" \
    -H "Content-Type: application/json" \
    -k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

    If you’re using the CloudFormation template, this script is included in the Apache Ranger server setup script.

    The following screenshot shows the policy details.

    The following screenshot shows the policy details.

    You can enable standard Amazon S3 access permissions in this policy.

    You can enable standard Amazon S3 access permissions in this policy. 

    Importing your existing Apache Hive policies

    You can import your existing Apache Hive policies into the Apache Ranger server tied to the EMR cluster. For more information, see User Guide for Import-Export.

    The following image shows how to use Apache Ranger’s export and import option.

     

    CloudWatch for Apache Ranger audits

    Apache Ranger audits are sent to CloudWatch. You should create a new Cloudwatch log group and specify that in the security configuration. See the following code:

    aws logs create-log-group --log-group-name /aws/emr/rangeraudit/

    You can search audit information using CloudWatch Insights. The following screenshot shows a query.

    The following screenshot shows a query.
    The following screenshot shows a query.

    New Amazon EMR security configuration

    The new Amazon EMR security configuration requires the following inputs:

    • IP address of the Apache Ranger server
    • IAM role for the Apache Ranger service (see the GitHub repo) running on the EMR cluster and accessing other AWS services (see the GitHub repo)
    • Secrets Manager name with the Apache Ranger admin server certificate
    • Secrets Manager name with the private key used by the plugins
    • CloudWatch log group name

    The following code is an example of using the AWS CLI to create this security configuration:

    aws emr create-security-configuration --name MyEMRRangerSecurityConfig --security-configuration
    '{
       "EncryptionConfiguration":{
          "EnableInTransitEncryption":false,
          "EnableAtRestEncryption":false
       },
       "AuthenticationConfiguration":{
          "KerberosConfiguration":{
             "Provider":"ClusterDedicatedKdc",
             "ClusterDedicatedKdcConfiguration":{
                "TicketLifetimeInHours":24
             }
          }
       },
       "AuthorizationConfiguration":{
          "RangerConfiguration":{
             "AdminServerURL":"https://<RANGER ADMIN SERVER IP>:8080",
             "RoleForRangerPluginsARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<RANGER PLUGIN DATA ACCESS ROLE NAME>",
             "RoleForOtherAWSServicesARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<USER ACCESS ROLE NAME>",
             "AdminServerSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES ADMIN SERVERS PUBLIC TLS CERTICATE>",
             "RangerPluginConfigurations":[
                {
                   "App":"Spark",
                   "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES SPARK PLUGIN PRIVATE TLS CERTICATE>",
                   "PolicyRepositoryName":"spark-policy-repository"
                },
                {
                   "App":"Hive",
                   "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES HIVE PLUGIN PRIVATE TLS CERTICATE>",
                   "PolicyRepositoryName":"hive-policy-repository"
                },
                {
                   "App":"EMRFS-S3",
                   "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES EMRFS S3 PLUGIN PRIVATE TLS CERTICATE>",
                   "PolicyRepositoryName":"emrfs-policy-repository"
                }
             ],
             "AuditConfiguration":{
                "Destinations":{
                   "AmazonCloudWatchLogs":{
                      "CloudWatchLogGroup":"arn:aws:logs:us-east-1:<AWS ACCOUNT ID>:log-group:<LOG GROUP NAME FOR AUDIT EVENTS>"
                   }
                }
             }
          }
       }
    }'
    

    Install Amazon EMR cluster with Kerberos

    Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

    Setting up your architecture with CloudFormation

    To help you get started, we added a new GitHub repo with setup instructions. The following diagram shows the logical architecture after the CloudFormation stack is fully deployed. Review the roadmap for future enhancements.

    Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

    To set up this architecture using CloudFormation, complete the following steps:

    1. Use the create-tls-certs.sh script to upload the SSL key and certifications to Secrets Manager.
    2. Set up the VPC or Active Directory server by launching the following CloudFormation template.
    3. Verify DHCP options to make sure the domain name servers for the VPC are listed in the right order (LDAP/AD server first, followed by AmazonProvidedDNS).
    4. Set up the Apache Ranger server,  Amazon Relational Database Service (Amazon RDS) instance, and EMR cluster by launching the following CloudFormation template.

    Limitations

    When using this solution, keep in mind the following limitations:

    • As of this writing, Amazon EMR 6.x isn’t supported (only Amazon EMR 5.32+ is supported)
    • Non-Kerberos clusters will not be supported.
    • Jobs must be submitted through Apache Zeppelin, Hue, Livy, and SSH.
    • Only selected applications can be installed on the Apache Ranger-enabled EMR cluster, such as Hadoop, Tez and Ganglia. For a full list, see Supported Applications. The cluster creation request is rejected if you choose applications outside this supported list.
    • As of this writing, the SparkSQL plugin doesn’t support column masking and row-level filters.
    • The SparkSQL INSERT INTO and INSERT OVERWRITE overrides aren’t supported.
    • You can’t view audits on the Apache Ranger UI as they’re sent to CloudWatch.
    • The AWS Glue Data Catalog isn’t supported as the Apache Hive Metastore.

    Available now

    Native support for Apache Ranger 2.0 with Apache Hive, Apache Spark, and Amazon S3 is available today in the following AWS Regions:

    • US East (Ohio)
    • US East (N. Virginia)
    • US West (N. California)
    • US West (Oregon)
    • Africa (Cape Town)
    • Asia Pacific (Hong Kong)
    • Asia Pacific (Mumbai)
    • Asia Pacific (Seoul)
    • Asia Pacific (Singapore)
    • Asia Pacific (Sydney)
    • Canada (Central)
    • Europe (Frankfurt)
    • Europe (Ireland)
    • Europe (London)
    • Europe (Paris)
    • Europe (Milan)
    • Europe (Stockholm)
    • South America (São Paulo)
    • Middle East (Bahrain)

    For the latest Region availability, see Amazon EMR Management Guide.

    Conclusion

    Amazon EMR 5.32 includes plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon S3, and Apache Hive. This post demonstrates how to set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. If you have any thoughts of questions, please leave them in the comments.


    About the Author

    Varun Rao Bhamidimarri is a Sr Manager, AWS Analytics Specialist Solutions Architect team. His focus is helping customers with adoption of cloud-enabled analytics solutions to meet their business requirements. Outside of work, he loves spending time with his wife and two kids, stay healthy, mediate and recently picked up garnering during the lockdown.

    Estimating scoring probabilities by preparing soccer matches data with AWS Glue DataBrew

    Post Syndicated from Arash Rowshan original https://aws.amazon.com/blogs/big-data/estimating-scoring-probabilities-by-preparing-soccer-matches-data-with-aws-glue-databrew/

    In soccer (or football outside of the US), players decide to take shots when they think they can score. But how do they make that determination vs. when to pass or dribble? In a fraction of a second, in motion, while chased from multiple directions by other professional athletes, they think about their distance from the goal, the speed they’re running, the ball movement, the number of defenders, the goalkeeper’s position, their own shot power, accuracy, angle, and more. For a moment, time stands still. A decision is made. To shoot or not to shoot.

    This post was inspired by AWS’s collaboration with Germany’s Bundesliga, which lead to Match Fact xGoals. We use representative sample datasets and walk through using AWS Glue DataBrew to prepare data prior to training a model that can predict the probability of a player scoring at any given moment in the game.

    We start with two datasets that have the information about the events in the game and the players. We describe how a problem in this domain could be framed to benefit from data preparation and how that can lead us to making predictions.

    No prior knowledge is required to enjoy this post. However, basic familiarity with data preparation and machine learning (ML) is beneficial. By the end of this post, you should have a good sense regarding what DataBrew offers as well as how you can apply the approaches offered here to other use cases.

    Sample datasets

    Let’s assume we have collected a lot of data from video records of soccer matches. We extracted the following dataset by taking a snapshot of every shot taken in every match. For each shot, we also recorded if it resulted in a goal or if it did not. The dataset is fictionalized and not based on historic soccer games.

    The dataset is fictionalized and not based on historic soccer games.

    The following table summarizes what each column contains.

    Column Name Description
    event_id Unique identifier for each record
    game_minute Minute of the game between 0 to approximately 90
    player_id Unique identifier for each player
    player_name Name of the player who took the shot
    defenders Number of defenders between the player and the opponent’s goalkeeper
    player_position [x, y] coordinate of the player taking the shot
    player_angle Angle of the player taking the shot
    player_speed Speed of the player at the moment taking the shot in km/h
    goalkeeper_position [x, y] coordinate of the opponent’s goalkeeper
    situation Open play, free kick, or penalty kick
    result Goal or no goal

    We also use the FIFA 20 complete player dataset, a dataset on soccer players across the globe that attempts to estimate their qualities quantitatively. The following screenshot shows some of the columns this dataset contains.

    The following screenshot shows some of the columns this dataset contains.

    We’re interested in the following columns:

    • age
    • height
    • weight
    • overall
    • preferred_foot
    • weak_foot
    • pace
    • shooting
    • attacking_finishing
    • skill_fk_accuracy
    • movement_acceleration
    • movement_sprint_speed
    • power_shot_power
    • mentality_penalties 

    We talk more about how each of these columns can be relevant later in this post.

    As you can imagine, these datasets aren’t quite ready to train an ML model. We need to combine, clean, and extract values in numeric forms so we can start creating our training set. Normally that can take a lot of time and is somewhat tedious work. Data scientists prefer to focus on training their model, but they have to spend most of their time wrangling data to the shape that can be used for their ML flow.

    But fear not! Enter AWS Glue DataBrew, your friendly neighborhood visual data prep tool. As businesswire puts it, “

    “…AWS Glue DataBrew offers customers over 250 pre-built transformations to automate data preparation tasks that would otherwise require days or weeks writing hand-coded transformations.”

    Let’s get to work and see how we can prepare a dataset for training an ML model using DataBrew.

    Preparing the data with DataBrew

    Let’s start with the shots dataset. I first create a project on the DataBrew console and upload this dataset.

    I first create a project on the DataBrew console and upload this dataset.

    When my session is ready, I should see all the columns and some analytics that give me useful insights about my dataset.

    When my session is ready, I should see all the columns and some analytics that give me useful insights about my dataset.

    The player_position column is recorded as [x,y] coordinates with x between 0–1000 and y between 0–500. The following image shows how a player is positioned based on this data.

    The following image shows how a player is positioned based on this data.

    In CSV, this is recorded as a string and has two values. We want to extract the values and combine them into a single value that can be represented as a number. This can help our model draw better conclusions from this feature. We can start preparing our data by completing the following steps:

    1. Using the Remove values transform, I remove the opening and closing brackets from the entries of the player_position column.

    Using the Remove values transform, I remove the opening and closing brackets from the entries of the player_position column

    1. I split this column by a comma to generate two columns that hold the x and y coordinates.

    I split this column by a comma to generate two columns that hold the x and y coordinates.

    1. We also need to convert these newly generated columns from strings to numbers.

    We also need to convert these newly generated columns from strings to numbers.

    Now we can generate a single value using the two columns that we generated. Imagine we want to break down the whole soccer field into 1×1 squares. If we think about the field similar to rows and columns, we can calculate the number for each square as such: x * 500 + y.

    The benefit of this approach is that squares with higher numbers tend to be closer to the opponent’s goal, and this single feature can help our model draw good correlations between a player’s location and event outcomes. Note that the squares in the following image aren’t drawn to scale.

    Note that the squares in the following image aren’t drawn to scale.

    We can calculate the square numbers by using the the Functions transform Multiply and then Sum.

    1. First I multiple the x coordinate by 500.

    First I multiple the x coordinate by 500.

    1. Using the ADD function, I sum this generated column with the y coordinate.

    Using the ADD function, I sum this generated column with the y coordinate.

    1. We apply the same transforms to the goal_keeper position to achieve a single number for that feature as well.

    We apply the same transforms to the goal_keeper position to achieve a single number for that feature as well.

    Next, let’s take a look at the player_angle column.

    Next, let’s take a look at the player_angle column.

    When positioned on the lower half of the field, a positive angle likely presents a better opportunity to score, and when on the top half, a negative angle faces the players more toward the goal. Another point to consider is the player’s strong or weak foot. The bottom half presents a wide angle for a left-footed player and the top half does so for a right-footed player. Angle in combination with strong foot can help our model draw good conclusions. We add the information on players’ strong or weak foot later in this post.

    We have three different situations recorded in our dataset.

    1. We apply one-hot-encoding to this column to refactor the situations as 1 (True) or 0 (False) values suitable for training our model.

    We apply one-hot-encoding to this column to refactor the situations as 1 (True) or 0 (False) values suitable for training our model.

    1. For the shots dataset, we change the results column to 0s and 1s using the custom flag transform.
    2. Because this is what we want to predict, I name the column y.

    We apply one-hot-encoding to this column to refactor the situations as 1 (True) or 0 (False) values suitable for training our model.

    We can enrich the data further by joining this dataset with the player dataset to take each player’s qualities into consideration. You may have noticed that we generated a lot of extra columns while applying the previous transforms. We can address that in one step while we do the join and clean things up a bit.

    1. When applying the join, we can connect the players dataset.

    1. I use an inner join and choose player_id as my key.
    2. I also only select the columns that I’m interested in.

    You can select more or fewer columns depending on what features you want to feed into your model. For instance, I’m not selecting the player’s nationality, but you may want your model to take that into consideration. That’s really up to you, so feel free to play around with your options.

    You can select more or fewer columns depending on what features you want to feed into your model.

    1. I deselect the extra columns from the shots dataset and only select the following from the players dataset:
      1. age
      2. overall
      3. preferred_foot
      4. weak_foot
      5. shooting
      6. attacking_finishing
      7. skill_fk_accuracy
      8. movement_acceleration
      9. movement_sprint_speed
      10. power_shot_power
      11. mentality_penalties 

    We’re almost done. We just need to apply a few transforms to the newly added player columns.

    1. I one-hot-encode preferred foot.

    I one-hot-encode preferred foot.

    We can normalize some of the columns depending on the ML model that we want to run on this dataset afterwards. I want to train a basic logistic regression model.

    1. I use min-max normalization on most columns to scale values between 0–1.

    Depending on your model, it may make more sense to center around 0, use a custom range, or apply z-score for your normalization.

    1. I also apply mean normalization to the player angle.

    I also apply mean normalization to the player angle.

    1. Now that I have the normalized columns, I can delete all their source columns.

    Now that I have the normalized columns, I can delete all their source columns.

    1. Lastly, I move the result column all the way to the end because this is my output column (what I intend to predict).

    Lastly, I move the result column all the way to the end because this is my output column (what I intend to predict).

    1. Now we have a complete recipe and it’s time to run a job to apply the steps on the full dataset and generate an output file to use to train our model.

    Now we have a complete recipe and it’s time to run a job to apply the steps on the full dataset and generate an output file to use to train our model.

    Training the model

    When the job is finished, I retrieve the output from my Amazon Simple Storage Service (Amazon S3) bucket. This rich dataset is now ready to be fed into a model. Logistic regression or Support Vector Machines (SVM) could be good candidates for our dataset. You could use Amazon SageMaker to train a model and generate a probability of scoring per event. The following screenshot shows a basic logistic regression model created using scikit-learn.

    The following screenshot shows a basic logistic regression model created using scikit-learn.

    We see an approximately 80% probability that this model correctly predicts a scoring opportunity. You may get even higher accuracy using SVM. Feel free to try those or edit one of your data preparation steps and see how it affects your model accuracy.

    Conclusion

    In this post, we started with some raw fictionalized data of soccer shots and players. We framed the problem based on our domain knowledge and the data available. We used DataBrew to rapidly and visually connect the dots and forge the original datasets into an enriched form that could be used to train an ML model.

    I encourage you to apply the same methodology to a problem domain that interests you and see how DataBrew can speed up your workflow.


    About the Author

    Arash RowshanArash Rowshan is a Software Engineer at Amazon Web Services. He’s passionate about big data and applications of ML. Most recently he was part of the team that launched AWS Glue DataBrew. Any time he’s not at his desk, he’s likely playing soccer somewhere. You can follow him on Twitter @OSO_Arash.

     

     

    Orchestrating an AWS Glue DataBrew job and Amazon Athena query with AWS Step Functions

    Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/orchestrating-an-aws-glue-databrew-job-and-amazon-athena-query-with-aws-step-functions/

    As the industry grows with more data volume, big data analytics is becoming a common requirement in data analytics and machine learning (ML) use cases. Also, as we start building complex data engineering or data analytics pipelines, we look for a simpler orchestration mechanism with graphical user interface-based ETL (extract, transform, load) tools.

    Recently, AWS announced the general availability of AWS Glue DataBrew, a new visual data preparation tool that helps you clean and normalize data without writing code. This reduces the time it takes to prepare data for analytics and ML by up to 80% compared to traditional approaches to data preparation.

    Regarding orchestration or workflow management, AWS provides AWS Step Functions, a serverless function orchestrator that makes it easy to build a workflow by integrating different AWS services like AWS Lambda, Amazon Simple Notification Service (Amazon SNS), AWS Glue, and more. With its built-in operational controls, Step Functions manages sequencing, error handling, retry logic, and states, removing a significant operational burden from your team.

    Today, we’re launching Step Functions support for DataBrew, which means you can now invoke DataBrew jobs in your Step Functions workflow to build an end-to-end ETL pipeline. Recently, Step Functions also started supporting Amazon Athena integration, which means that you can submit SQL queries to the Athena engine through a Step Functions state.

    In this post, we walk through a solution where we integrate a DataBrew job for data preparation, invoke a series of Athena queries for data refresh, and integrate Amazon QuickSight for business reporting. The whole solution is orchestrated through Step Functions and is invoked through Amazon EventBridge.

    Use case overview

    For our use case, we use two public datasets. The first dataset is a sales pipeline dataset, which contains a list of over 20,000 sales opportunity records for a fictitious business. Each record has fields that specify the following:

    • A date, potentially when an opportunity was identified
    • The salesperson’s name
    • A market segment to which the opportunity belongs
    • Forecasted monthly revenue

    The second dataset is an online marketing metrics dataset. This dataset contains records of marketing metrics, aggregated by day. The metrics describe user engagement across various channels, such as websites, mobile, and social media, plus other marketing metrics. The two datasets are unrelated, but for the purpose of this post, we assume that they’re related.

    For our use case, these sales and marketing CSV files are maintained by your organization’s Marketing team, which uploads the updated full CSV file to Amazon Simple Storage Service (Amazon S3) every month. The aggregated output data is created through a series of data preparation steps, and the business team uses the output data to create business intelligence (BI) reports.

    Architecture overview

    To automate the complete process, we use the following architecture, which integrates Step Functions for orchestration, DataBrew for data preparation, Athena for data analysis with standard SQL, and QuickSight for business reporting. In addition, we use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

    We use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

    The workflow includes the following steps:

    • Step 1 – The Marketing team uploads the full CSV file to an S3 input bucket every month.
    • Step 2 – An EventBridge rule, scheduled to run every month, triggers the Step Functions state machine.
    • Steps 3 and 4 – We receive two separate datasets (sales data and marketing data), so Step Functions triggers two parallel DataBrew jobs, which create additional year, month, and day columns from the existing date field and uses those three columns for partitioning. The jobs write the final output to our S3 output bucket.
    • Steps 5, 6, 7, 8 – After the output data is written, we can create external tables on top of it with Athena create table statements and then load partitions with MCSK REPAIR commands. After the AWS Glue Data Catalog tables are created for sales and marketing, we run an additional query through Athena, which merges these two tables by year and month to create another table with aggregated output.
    • Steps 9 and 10 – As the last step of the Step Functions workflow, we send a notification to end-users through Amazon SNS to notify them that the data refresh job completed successfully.
    • Steps 11, 12, 13 – After the aggregated table data is refreshed, business users can use QuickSight for BI reporting, which fetches data through Athena. Data analysts can also use Athena to analyze the complete refreshed dataset.

    Prerequisites

    Before beginning this tutorial, make sure you have the required permissions to create the resources required as part of the solution.

    Additionally, create the S3 input and output buckets with the required subfolders to capture the sales and marketing data, and upload the input data into their respective folders.

    Creating a DataBrew project

    To create a DataBrew project for the marketing data, complete the following steps:

    1. On the DataBrew console, choose Projects.
    2. Choose Create a project.
    3. For Project name, enter a name (for this post, marketing-data-etl).
    4. For Select a dataset, select New dataset.

    For Select a dataset, select New dataset.

    1. For Enter your source from S3, enter the S3 path of the marketing input CSV.

    For Enter your source from S3, enter the S3 path of the marketing input CSV.

    1. Under Permissions, for Role name, choose an AWS Identity and Access Management (IAM) role that allows DataBrew to read from your Amazon S3 input location.

    You can choose a role if you already created one, or create a new one. Please read here for steps to create the IAM role.

    1. After the dataset is loaded, on the Functions menu, choose Date functions.
    2. Choose YEAR.

    Choose YEAR.

    1. Apply the year function on the date column to create a new column called year.

    1. Repeat these steps to create month and day columns.

    Repeat these steps to create month and day columns.

    For our use case, we created a few new columns that we plan to use for partitioning, but you can integrate additional transformations as needed.

    1. After you have finished applying all your transformations, choose Publish on the recipe.
    2. Provide a description of the recipe version and choose Publish.

    Creating a DataBrew job

    Now that our recipe is ready, we can create a job for it, which gets invoked through our Step Functions state machine.

    1. On the DataBrew console, choose Jobs.
    2. Choose Create a job.
    3. For Job name¸ enter a name (for example, marketing-data-etl).

    Your recipe is already linked to the job.

    1. Under Job output settings¸ for File type, choose your final storage format (for this post, we choose PARQUET).
    2. For S3 location, enter your final S3 output bucket path.
    3. For Compression, choose the compression type you want to apply (for this post, we choose Snappy).
    4. Under Additional configurations, for Custom partition by column values, choose year, month, and day.
    5. For File output storage, select Replace output files for each job run.

    We choose this option because our use case is to do a full refresh.

    We choose this option because our use case is to do a full refresh.

    1. Under Permissions, for Role name¸ choose your IAM role.
    2. Choose Create job.

    We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

    We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

    1. When your marketing job is ready, repeat the same steps for your sales data, using the sales data output file location as needed.

    Creating a Step Functions state machine

    We’re now ready to create a Step Functions state machine for the complete flow.

    1. On the Step Functions console, choose Create state machine.
    2. For Define state machine¸ select Author with code snippets.
    3. For Type, choose Standard.

    For Type, choose Standard.

    In the Definition section, Step Functions provides a list of service actions that you can use to automatically generate a code snippet for your state machine’s state. The following screenshot shows that we have options for Athena and DataBrew, among others.

    1. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

    4. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

    1. For Job name, choose Select job name from a list and choose your DataBrew job.

    The JSON snippet appears in the Preview pane.

    1. Select Wait for DataBrew job runs to complete.
    2. Choose Copy to clipboard.

    Choose Copy to clipboard.

    1. Integrate the code into the final state machine JSON code:
      {
         "Comment":"Monthly Refresh of Sales Marketing Data",
         "StartAt":"Refresh Sales Marketing Data",
         "States":{
            "Refresh Sales Marketing Data":{
               "Type":"Parallel",
               "Branches":[
                  {
                     "StartAt":"Sales DataBrew ETL Job",
                     "States":{
                        "Sales DataBrew ETL Job":{
                           "Type":"Task",
                           "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                           "Parameters":{
                              "Name":"sales-data"
                           },
                           "Next":"Drop Old Sales Table"
                        },
                        "Drop Old Sales Table":{
                           "Type":"Task",
                           "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                           "Parameters":{
                              "QueryString":"DROP TABLE IF EXISTS sales_data_output",
                              "WorkGroup":"primary",
                              "ResultConfiguration":{
                                 "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                              }
                           },
                           "Next":"Create Sales Table"
                        },
                        "Create Sales Table":{
                           "Type":"Task",
                           "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                           "Parameters":{
                              "QueryString":"CREATE EXTERNAL TABLE `sales_data_output`(`date` string, `salesperson` string, `lead_name` string, `segment` string, `region` string, `target_close` string, `forecasted_monthly_revenue` int,   `opportunity_stage` string, `weighted_revenue` int, `closed_opportunity` boolean, `active_opportunity` boolean, `latest_status_entry` boolean) PARTITIONED BY (`year` string,`month` string, `day` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/sales/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                              "WorkGroup":"primary",
                              "ResultConfiguration":{
                                 "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                              }
                           },
                           "Next":"Load Sales Table Partitions"
                        },
                        "Load Sales Table Partitions":{
                           "Type":"Task",
                           "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                           "Parameters":{
                              "QueryString":"MSCK REPAIR TABLE sales_data_output",
                              "WorkGroup":"primary",
                              "ResultConfiguration":{
                                 "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                              }
                           },
                           "End":true
                        }
                     }
                  },
                  {
                     "StartAt":"Marketing DataBrew ETL Job",
                     "States":{
                        "Marketing DataBrew ETL Job":{
                           "Type":"Task",
                           "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                           "Parameters":{
                              "Name":"marketing-data-etl"
                           },
                           "Next":"Drop Old Marketing Table"
                        },
                        "Drop Old Marketing Table":{
                           "Type":"Task",
                           "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                           "Parameters":{
                              "QueryString":"DROP TABLE IF EXISTS marketing_data_output",
                              "WorkGroup":"primary",
                              "ResultConfiguration":{
                                 "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                              }
                           },
                           "Next":"Create Marketing Table"
                        },
                        "Create Marketing Table":{
                           "Type":"Task",
                           "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                           "Parameters":{
                              "QueryString":"CREATE EXTERNAL TABLE `marketing_data_output`(`date` string, `new_visitors_seo` int, `new_visitors_cpc` int, `new_visitors_social_media` int, `return_visitors` int, `twitter_mentions` int,   `twitter_follower_adds` int, `twitter_followers_cumulative` int, `mailing_list_adds_` int,   `mailing_list_cumulative` int, `website_pageviews` int, `website_visits` int, `website_unique_visits` int,   `mobile_uniques` int, `tablet_uniques` int, `desktop_uniques` int, `free_sign_up` int, `paid_conversion` int, `events` string) PARTITIONED BY (`year` string, `month` string, `day` string) ROW FORMAT SERDE   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/marketing/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                              "WorkGroup":"primary",
                              "ResultConfiguration":{
                                 "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                              }
                           },
                           "Next":"Load Marketing Table Partitions"
                        },
                        "Load Marketing Table Partitions":{
                           "Type":"Task",
                           "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                           "Parameters":{
                              "QueryString":"MSCK REPAIR TABLE marketing_data_output",
                              "WorkGroup":"primary",
                              "ResultConfiguration":{
                                 "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                              }
                           },
                           "End":true
                        }
                     }
                  }
               ],
               "Next":"Drop Old Summerized Table"
            },
            "Drop Old Summerized Table":{
               "Type":"Task",
               "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
               "Parameters":{
                  "QueryString":"DROP TABLE default.sales_marketing_revenue",
                  "WorkGroup":"primary",
                  "ResultConfiguration":{
                     "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                  }
               },
               "Next":"Create Summerized Output"
            },
            "Create Summerized Output":{
               "Type":"Task",
               "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
               "Parameters":{
                  "QueryString":"CREATE TABLE default.sales_marketing_revenue AS SELECT * FROM (SELECT sales.year, sales.month, total_paid_conversion, total_weighted_revenue FROM (SELECT year, month, sum(paid_conversion) as total_paid_conversion FROM default.marketing_data_output group by year, month) sales INNER JOIN (SELECT year, month, sum(weighted_revenue) as total_weighted_revenue FROM default.sales_data_output group by year, month) marketing on sales.year=marketing.year AND sales.month=marketing.month) ORDER BY year DESC, month DESC",
                  "WorkGroup":"primary",
                  "ResultConfiguration":{
                     "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                  }
               },
               "Next":"Notify Users"
            },
            "Notify Users":{
               "Type":"Task",
               "Resource":"arn:aws:states:::sns:publish",
               "Parameters":{
                  "Message":{
                     "Input":"Monthly sales marketing data refreshed successfully!"
                  },
                  "TopicArn":"arn:aws:sns:us-east-1:<account-id>:<sns-topic-name>"
               },
               "End":true
            }
         }
      }

    The following diagram is the visual representation of the state machine flow. With the Step Functions parallel task type, we created two parallel job runs for the sales and marketing data. When both flows are complete, they join to create an aggregated table in Athena and send an SNS notification to the end-users.

    The following diagram is the visual representation of the state machine flow.

    Creating an EventBridge scheduling rule

    Now let’s integrate EventBridge to schedule the invocation of our Step Functions state machine on the first day of every month.

    1. On the EventBridge console, under Events, choose Rules.
    2. Choose Create a rule.
    3. For Name, enter a name (for example, trigger-step-funcion-rule).
    4. Under Define pattern, select Schedule.
    5. Select Cron expression.
    6. Enter 001** to specify that the job runs on the first day of every month at midnight.

    1. In the Select targets section, for Target, choose Step Functions state machine
    2. For State machine, choose your state machine.

    For State machine, choose your state machine.

    Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.
    Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.

    When the job is complete, all the steps should be green.

    When the job is complete, all the steps should be green.

    You also receive the notification “Monthly sales marketing data refreshed successfully!”

    Running an Athena query

    Let’s validate the aggregated table output in Athena by running a simple SELECT query. The following screenshot shows the output.

    Let’s validate the aggregated table output in Athena by running a simple SELECT query.

    Creating reports in QuickSight

    Now let’s do our final step of the architecture, which is creating BI reports through QuickSight by connecting to the Athena aggregated table.

    1. On the QuickSight console, choose Athena as your data source.

    On the QuickSight console, choose Athena as your data source.

    1. Select the database and table name you have in Athena.

    Select the database and table name you have in Athena.

    Now you can create a quick report to visualize your output, as shown in the following screenshot.

    Now you can create a quick report to visualize your output, as shown in the following screenshot.

     

    If QuickSight is using SPICE storage, you need to refresh the dataset in QuickSight after you receive notification about the completion of data refresh. If the QuickSight report is running an Athena query for every request, you might see a “table not found” error when data refresh is in progress. We recommend leveraging SPICE storage to get better performance.

    Conclusion

    This post explains how to integrate a DataBrew job and Athena queries with Step Functions to implement a simple ETL pipeline that refreshes aggregated sales and marketing data for BI reporting.

    I hope this gives you a great starting point for using this solution with your datasets and applying business rules to build a complete serverless data analytics pipeline.


    About the Author

    Sakti Mishra

    Sakti Mishra is a Senior Data Lab Solution Architect at AWS, where he helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places.

    The best new features for data analysts in Amazon Redshift in 2020

    Post Syndicated from Helen Anderson original https://aws.amazon.com/blogs/big-data/the-best-new-features-for-data-analysts-in-amazon-redshift-in-2020/

    This is a guest post by Helen Anderson, data analyst and AWS Data Hero

    Every year, the Amazon Redshift team launches new and exciting features, and 2020 was no exception. New features to improve the data warehouse service and add interoperability with other AWS services were rolling out all year.

    I am part of a team that for the past 3 years has used Amazon Redshift to store source tables from systems around the organization and usage data from our software as a service (SaaS) product. Amazon Redshift is our one source of truth. We use it to prepare operational reports that support the business and for ad hoc queries when numbers are needed quickly.

    When AWS re:Invent comes around, I look forward to the new features, enhancements, and functionality that make things easier for analysts. If you haven’t tried Amazon Redshift in a while, or even if you’re a longtime user, these new capabilities are designed with analysts in mind to make it easier to analyze data at scale.

    Amazon Redshift ML

    The newly launched preview of Amazon Redshift ML lets data analysts use Amazon SageMaker over datasets in Amazon Redshift to solve business problems without the need for a data scientist to create custom models.

    As a data analyst myself, this is one of the most interesting announcements to come out in re:Invent 2020. Analysts generally use SQL to query data and present insights, but they don’t often do data science too. Now there is no need to wait for a data scientist or learn a new language to create predictive models.

    For information about what you need to get started with Amazon Redshift ML, see Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML.

    For information about what you need to get started with Amazon Redshift ML, see the Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML blog post.

    Federated queries

    As analysts, we often have to join datasets that aren’t in the same format and sometimes aren’t ready for use in the same place. By using federated queries to access data in other databases or Amazon Simple Storage Service (Amazon S3), you don’t need to wait for a data engineer or ETL process to move data around.

    re:Invent 2019 featured some interesting talks from Amazon Redshift customers who were tackling this problem. Now that federated queries over operational databases like Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL are generally available and querying Amazon RDS for MySQL and Amazon Aurora MySQL is in preview, I’m excited to hear more.

    For a step-by-step example to help you get started, see Build a Simplified ETL and Live Data Query Solution Using Redshift Federated Query.

    SUPER data type

    Another problem we face as analysts is that the data we need isn’t always in rows and columns. The new SUPER data type makes JSON data easy to use natively in Amazon Redshift with PartiQL.

    PartiQL is an extension that helps analysts get up and running quickly with structured and semistructured data so you can unnest and query using JOINs and aggregates. This is really exciting for those who deal with data coming from applications that store data in JSON or unstructured formats.

    For use cases and a quickstart, see Ingesting and querying semistructured data in Amazon Redshift (preview).

    Partner console integration

    The preview of the native console integration with partners announced at AWS re:Invent 2020 will also make data analysis quicker and easier. Although analysts might not be doing the ETL work themselves, this new release makes it easier to move data from platforms like Salesforce, Google Analytics, and Facebook Ads into Amazon Redshift.

    Matillion, Sisense, Segment, Etleap, and Fivetran are launch partners, with other partners coming soon. If you’re an Amazon Redshift partner and would like to integrate into the console, contact [email protected].

    RA3 nodes with managed storage

    Previously, when you added Amazon Redshift nodes to a cluster, both storage and compute were scaled up. This all changed with the 2019 announcement of RA3 nodes, which upgrade storage and compute independently.

    In 2020, the Amazon Redshift team introduced RA3.xlplus nodes, which offer even more compute sizing options to address a broader set of workload requirements.

    AQUA for Amazon Redshift

    As analysts, we want our queries to run quickly so we can spend more time empowering the users of our insights and less time watching data slowly return. AQUA, the Advanced Query Accelerator for Amazon Redshift tackles this problem at an infrastructure level by bringing the stored data closer to the compute power

    This hardware-accelerated cache enables Amazon Redshift to run up to 10 times faster as it scales out and processes data in parallel across many nodes. Each node accelerates compression, encryption, and data processing tasks like scans, aggregates, and filtering. Analysts should still try their best to write efficient code, but the power of AQUA will speed up the return of results considerably.

    AQUA is available on Amazon Redshift RA3 instances at no additional cost. To get started with AQUA, sign up for the preview.

    The following diagram shows Amazon Redshift architecture with an AQUA layer.

    AQUA is available on Amazon Redshift RA3 instances at no additional cost.

    Figure 1: Amazon Redshift architecture with AQUA layer

    Automated performance tuning

    For analysts who haven’t used sort and distribution keys, the learning curve can be steep. A table created with the wrong keys can mean results take much longer to return.

    Automatic table optimization tackles this problem by using machine learning to select the best keys and tune the physical design of tables. Letting Amazon Redshift determine how to improve cluster performance reduces manual effort.

    Summary

    These are just some of the Amazon Redshift announcements made in 2020 to help analysts get query results faster. Some of these features help you get access to the data you need, whether it’s in Amazon Redshift or somewhere else. Others are under-the-hood enhancements that make things run smoothly with less manual effort.

    For more information about these announcements and a complete list of new features, see What’s New in Amazon Redshift.


    About the Author

    Helen Anderson is a Data Analyst based in Wellington, New Zealand. She is well known in the data community for writing beginner-friendly blog posts, teaching, and mentoring those who are new to tech. As a woman in tech and a career switcher, Helen is particularly interested in inspiring those who are underrepresented in the industry.

    Building a real-time notification system with Amazon Kinesis Data Streams for Amazon DynamoDB and Amazon Kinesis Data Analytics for Apache Flink

    Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/building-a-real-time-notification-system-with-amazon-kinesis-data-streams-for-amazon-dynamodb-and-amazon-kinesis-data-analytics-for-apache-flink/

    Amazon DynamoDB helps you capture high-velocity data such as clickstream data to form customized user profiles and Internet of Things (IoT) data so that you can develop insights on sensor activity across various industries, including smart spaces, connected factories, smart packing, fitness monitoring, and more. It’s important to store these data points in a centralized data lake in real time, where they can be transformed, analyzed, and combined with diverse organizational datasets to derive meaningful insights and make predictions.

    A popular use case in the wind energy sector is to protect wind turbines from wind speed. As per National Wind Watch, every wind turbine has a range of wind speeds, typically 30–55 mph, in which it produces maximum capacity. When wind speed is greater than 70 mph, it’s important to start shutdown to protect the turbine from a high wind storm. Customers often store high-velocity IoT data in DynamoDB and use Amazon Kinesis streaming to extract data and store it in a centralized data lake built on Amazon Simple Storage Service (Amazon S3). To facilitate this ingestion pipeline, you can deploy AWS Lambda functions or write custom code to build a bridge between DynamoDB Streams and Kinesis streaming.

    Amazon Kinesis Data Streams for DynamoDB help you to publish item-level changes in any DynamoDB table to a Kinesis data stream of your choice. Additionally, you can take advantage of this feature for use cases that require longer data retention on the stream and fan out to multiple concurrent stream readers. You also can integrate with Amazon Kinesis Data Analytics or Amazon Kinesis Data Firehose to publish data to downstream destinations such as Amazon Elasticsearch Service, Amazon Redshift, or Amazon S3.

    In this post, you use Kinesis Data Analytics for Apache Flink (Data Analytics for Flink) and Amazon Simple Notification Service (Amazon SNS) to send a real-time notification when wind speed is greater than 60 mph so that the operator can take action to protect the turbine. You use Kinesis Data Streams for DynamoDB and take advantage of managed streaming delivery of DynamoDB data to other AWS services without having to use Lambda or write and maintain complex code. To process DynamoDB events from Kinesis, you have multiple options: Amazon Kinesis Client Library (KCL) applications, Lambda, and Data Analytics for Flink. In this post, we showcase Data Analytics for Flink, but this is just one of many available options.

    Architecture

    The following architecture diagram illustrates the wind turbine protection system.

    The following architecture diagram illustrates the wind turbine protection system.

    In this architecture, high-velocity wind speed data comes from the wind turbine and is stored in DynamoDB. To send an instant notification, you need to query the data in real time and send a notification when the wind speed is greater than the established maximum. To achieve this goal, you enable Kinesis Data Streams for DynamoDB, and then use Data Analytics for Flink to query real-time data in a 60-second tumbling window. This aggregated data is stored in another data stream, which triggers an email notification via Amazon SNS using Lambda when the wind speed is greater than 60 mph. You will build this entire data pipeline in a serverless manner.

    Deploying the wind turbine data simulator

    To replicate a real-life scenario, you need a wind turbine data simulator. We use Amazon Amplify in this post to deploy a user-friendly web application that can generate the required data and store it in DynamoDB. You must have a GitHub account which will help to fork the Amplify app code and deploy it in your AWS account automatically.

    Complete the following steps to deploy the data simulator web application:

    1. Choose the following AWS Amplify link to launch the wind turbine data simulator web app.

    1. Choose Connect to GitHub and provide credentials, if required.

    Choose Connect to GitHub and provide credentials, if required.

    1. In the Deploy App section, under Select service role, choose Create new role.
    2. Follow the instructions to create the role amplifyconsole-backend-role.
    3. When the role is created, choose it from the drop-down menu.
    4. Choose Save and deploy.

    Choose Save and deploy.

    On the next page, the dynamodb-streaming app is ready to deploy.

    1. Choose Continue.

    On the next page, the dynamodb-streaming app is ready to deploy.

    On the next page, you can see the app build and deployment progress, which might take as many as 10 minutes to complete.

    1. When the process is complete, choose the URL on the left to access the data generator user interface (UI).
    2. Make sure to save this URL because you will use it in later steps.

    Make sure to save this URL because you will use it in later steps.

    You also get an email during the build process related to your SSH key. This email indicates that the build process created an SSH key on your behalf to connect to the Amplify application with GitHub.

    1. On the sign-in page, choose Create account.

    On the sign-in page, choose Create account.

    1. Provide a user name, password, and valid email to which the app can send you a one-time passcode to access the UI.
    2. After you sign in, choose Generate data to generate wind speed data.
    3. Choose the Refresh icon to show the data in the graph.

    You can generate a variety of data by changing the range of minimum and maximum speeds and the number of values.

    You can generate a variety of data by changing the range of minimum and maximum speeds and the number of values.

    To see the data in DynamoDB, choose the DynamoDB icon, note the table name that starts with windspeed-, and navigate to the table in the DynamoDB console.

    To see the data in DynamoDB, choose the DynamoDB icon, note the table name that starts with windspeed.

    Now that the wind speed data simulator is ready, let’s deploy the rest of the data pipeline.

    Deploying the automated data pipeline by using AWS CloudFormation

    You use AWS CloudFormation templates to create all the necessary resources for the data pipeline. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time. You can view the template and code in the GitHub repository.

    1. Choose Launch with CloudFormation Console:
    2. Choose the US West (Oregon) Region (us-west-2).
    3. For pEmail, enter a valid email to which the analytics pipeline can send notifications.
    4. Choose Next.

    For pEmail, enter a valid email to which the analytics pipeline can send notifications.

    1. Acknowledge that the template may create AWS Identity and Access Management (IAM) resources.
    2. Choose Create stack.

    This CloudFormation template creates the following resources in your AWS account:

    • An IAM role to provide a trust relationship between Kinesis and DynamoDB to replicate data from DynamoDB to the data stream
    • Two data streams:
      • An input stream to replicate data from DynamoDB
      • An output stream to store aggregated data from the Data Analytics for Flink app
    • A Lambda function
    • An SNS topic to send an email notifications about high wind speeds
    1. When the stack is ready, on the Outputs tab, note the values of both data streams.

    When the stack is ready, on the Outputs tab, note the values of both data streams.

    Check your email and confirm your subscription to receive notifications. Make sure to check your junk folder if you don’t see the email in your inbox.

    Check your email and confirm your subscription to receive notifications.

    Now you can use Kinesis Data Streams for DynamoDB, which allows you to have your data in both DynamoDB and Kinesis without having to use Lambda or write custom code.

    Enabling Kinesis streaming for DynamoDB

    AWS recently launched Kinesis Data Streams for DynamoDB so that you can send data from DynamoDB to Kinesis Data. You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to enable this feature.

    To enable this feature from the console, complete the following steps:

    1. In the DynamoDB console, choose the table that you created earlier (it begins with the prefix windspeed-).
    2. On the Overview tab, choose Manage streaming to Kinesis.

    On the Overview tab, choose Manage streaming to Kinesis.

    1. Choose your input stream.

    Choose your input stream.

    1. Choose Enable.

    Choose Enable.

    1. Choose Close.

    Choose Close.

    Make sure that Stream enabled is set to Yes.

    Make sure that Stream enabled is set to Yes.

    Building the Data Analytics for Flink app for real-time data queries

    As part of the CloudFormation stack, the new Data Analytics for Flink application is deployed in the configured AWS Region. When the stack is up and running, you should be able to see the new Data Analytics for Flink application in the configured Region. Choose Run to start the app.

    Choose Run to start the app.

    When your app is running, you should see the following application graph.

    When your app is running, you should see the following application graph.

    Review the Properties section of the app, which shows you the input and output streams that the app is using.

    Review the Properties section of the app, which shows you the input and output streams that the app is using.

    Let’s learn important code snippets of the Flink Java application in next section, which explain how the Flink application reads data from a data stream, aggregates the data, and outputs it to another data stream.

    Diving Deep into Flink Java application code:

    In the following code, createSourceFromStaticConfig provides all the wind turbine speed readings from the input stream in string format, which we pass to the WindTurbineInputMap map function. This function parses the string into the Tuple3 data type (exp Tuple3<>(turbineID, speed, 1)). All Tuple3 messages are grouped by turbineID to further apply a one-minute tumbling window. The AverageReducer reduce function provides two things: the sum of all the speeds for the specific turbineId in the one-minute window, and a count of the messages for the specific turbineId in the one-minute window. The AverageMap map function takes the output of the AverageReducer reduce function and transforms it into Tuple2 (exp Tuple2<>(turbineId, averageSpeed)). Then all turbineIds are filtered with an average speed greater than 60 and map them to a JSON-formatted message, which we send to the output stream by using the createSinkFromStaticConfig sink function.

    final StreamExecutionEnvironment env =
       StreamExecutionEnvironment.getExecutionEnvironment();
    
    DataStream<String> input = createSourceFromStaticConfig(env);
    
    input.map(new WindTurbineInputMap())
       .filter(v -> v.f2 > 0)
       .keyBy(0)
          .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
       .reduce(new AverageReducer())
       .map(new AverageMap())
       .filter(v -> v.f1 > 60)
       .map(v -> "{ \"turbineID\": \"" + v.f0 + "\", \"avgSpeed\": "+ v.f1 +" }")
       .addSink(createSinkFromStaticConfig());
    
    env.execute("Wind Turbine Data Aggregator");

    The following code demonstrates how the createSourceFromStaticConfig and createSinkFromStaticConfig functions read the input and output stream names from the properties of the Data Analytics for Flink application and establish the source and sink of the streams.

    private static DataStream<String> createSourceFromStaticConfig(
       StreamExecutionEnvironment env) throws IOException {
       Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
       Properties inputProperties = new Properties();
       inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, (String) applicationProperties.get("WindTurbineEnvironment").get("region"));
       inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
    
       return env.addSource(new FlinkKinesisConsumer<>((String) applicationProperties.get("WindTurbineEnvironment").get("inputStreamName"),
          new SimpleStringSchema(), inputProperties));
    }
    
    private static FlinkKinesisProducer<String> createSinkFromStaticConfig() throws IOException {
       Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
       Properties outputProperties = new Properties();
       outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, (String) applicationProperties.get("WindTurbineEnvironment").get("region"));
    
       FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
          SimpleStringSchema(), outputProperties);
       sink.setDefaultStream((String) applicationProperties.get("WindTurbineEnvironment").get("outputStreamName"));
       sink.setDefaultPartition("0");
       return sink;
    }

    In the following code, the WindTurbineInputMap map function parses Tuple3 out of the string message. Additionally, the AverageMap map and AverageReducer reduce functions process messages to accumulate and transform data.

    public static class WindTurbineInputMap implements MapFunction<String, Tuple3<String, Integer, Integer>> {
       @Override
       public Tuple3<String, Integer, Integer> map(String value) throws Exception {
          String eventName = JsonPath.read(value, "$.eventName");
          if(eventName.equals("REMOVE")) {
             return new Tuple3<>("", 0, 0);
          }
          String turbineID = JsonPath.read(value, "$.dynamodb.NewImage.deviceID.S");
          Integer speed = Integer.parseInt(JsonPath.read(value, "$.dynamodb.NewImage.value.N"));
          return new Tuple3<>(turbineID, speed, 1);
       }
    }
    
    public static class AverageMap implements MapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> {
       @Override
       public Tuple2<String, Integer> map(Tuple3<String, Integer, Integer> value) throws Exception {
          return new Tuple2<>(value.f0, (value.f1 / value.f2));
       }
    }
    
    public static class AverageReducer implements ReduceFunction<Tuple3<String, Integer, Integer>> {
       @Override
       public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> value1, Tuple3<String, Integer, Integer> value2) {
          return new Tuple3<>(value1.f0, value1.f1 + value2.f1, value1.f2 + 1);
       }
    }

    Receiving email notifications of high wind speed

    The following screenshot shows an example of the notification email you will receive about high wind speeds.

    The following screenshot shows an example of the notification email you will receive about high wind speeds.

    To test the feature, in this section you generate high wind speed data from the simulator, which is stored in DynamoDB, and get an email notification when the average wind speed is greater than 60 mph for a one-minute period. You’ll observe wind data flowing through the data stream and Data Analytics for Flink.

    To test this feature:

    1. Generate wind speed data in the simulator and confirm that it’s stored in DynamoDB.
    2. In the Kinesis Data Streams console, choose the input data stream, kds-ddb-blog-InputKinesisStream.
    3. On the Monitoring tab of the stream, you can observe the Get records – sum (Count) metrics, which show multiple records captured by the data stream automatically.
    4. In the Kinesis Data Analytics console, choose the Data Analytics for Flink application, kds-ddb-blog-windTurbineAggregator.
    5. On the Monitoring tab, you can see the Last Checkpoint metrics, which show multiple records captured by the Data Analytics for Flink app automatically.
    6. In the Kinesis Data Streams console, choose the output stream, kds-ddb-blog-OutputKinesisStream.
    7. On the Monitoring tab, you can see the Get records – sum (Count) metrics, which show multiple records output by the app.
    8. Finally, check your email for a notification.

    If you don’t see a notification, change the data simulator value range between a minimum of 50 mph and maximum of 90 mph and wait a few minutes.

    Conclusion

    As you have learned in this post, you can build an end-to-end serverless analytics pipeline to get real-time insights from DynamoDB by using Kinesis Data Streams—all without writing any complex code. This allows your team to focus on solving business problems by getting useful insights immediately. IoT and application development have a variety of use cases for moving data quickly through an analytics pipeline, and you can make this happen by enabling Kinesis Data Streams for DynamoDB.

    If this blog post helps you or inspires you to solve a problem, we would love to hear about it! The code for this solution is available in the GitHub repository for you to use and extend. Contributions are always welcome!


    About the Authors

    Saurabh Shrivastava is a solutions architect leader and analytics/machine learning specialist working with global systems integrators. He works with AWS partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

     

     

    Sameer Goel is a solutions architect in Seattle who drives customers’ success by building prototypes on cutting-edge initiatives. Prior to joining AWS, Sameer graduated with a Master’s degree with a Data Science concentration from NEU Boston. He enjoys building and experimenting with creative projects and applications.

     

     

    Pratik Patel is a senior technical account manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions by using best practices, and proactively helps keep customers’ AWS environments operationally healthy.

    Accessing and visualizing data from multiple data sources with Amazon Athena and Amazon QuickSight

    Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/accessing-and-visualizing-data-from-multiple-data-sources-with-amazon-athena-and-amazon-quicksight/

    Amazon Athena now supports federated query, a feature that allows you to query data in sources other than Amazon Simple Storage Service (Amazon S3). You can use federated queries in Athena to query the data in place or build pipelines that extract data from multiple data sources and store them in Amazon S3. With Athena Federated Query, you can run SQL queries across data stored in relational, non-relational, object, and custom data sources. Athena queries including federated queries can be run from the Athena console, a JDBC or ODBC connection, the Athena API, the Athena CLI, the AWS SDK, or AWS Tools for Windows PowerShell.

    The goal for this post is to discuss how we can use different connectors to run federated queries with complex joins across different data sources with Athena and visualize the data with Amazon QuickSight.

    Athena Federated Query

    Athena uses data source connectors that run on AWS Lambda to run federated queries. A data source connector is a piece of code that translates between your target data source and Athena. You can think of a connector as an extension of the Athena query engine. Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB (with MongoDB compatibility), Amazon Elasticsearch Service (Amazon ES), Amazon ElastiCache for Redis, and JDBC-compliant relational data sources such as MySQL, PostgreSQL, and Amazon Redshift under the Apache 2.0 license. You can also use the Athena Query Federation SDK to write custom connectors. After you deploy data source connectors, the connector is associated with a catalog name that you can specify in your SQL queries. You can combine SQL statements from multiple catalogs and span multiple data sources with a single query.

    When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Because connectors run in Lambda, you can use them to access data from any data source in the cloud or on premises that is accessible from Lambda.

    Prerequisites

    Before creating your development environment, you must have the following prerequisites:

    Configuring your data source connectors

    After you deploy your CloudFormation stack, follow the instructions in the post Extracting and joining data from multiple data sources with Athena Federated Query to configure various Athena data source connectors for HBase on Amazon EMR, DynamoDB, ElastiCache for Redis, and Amazon Aurora MySQL.

    You can run Athena federated queries in the AmazonAthenaPreviewFunctionality workgroup created as part of the CloudFormation stack or you could run them in the primary workgroup or other workgroups as long as you’re running with Athena engine version 2. As of this writing, Athena Federated Query is generally available in the Asia Pacific (Mumbai), Asia Pacific (Tokyo), Europe (Ireland), US East (N. Virginia), US East (Ohio), US West (N. California), and US West (Oregon) Regions. If you’re running in other Regions, use the AmazonAthenaPreviewFunctionality workgroup.

    For information about changing your workgroup to Athena engine version 2, see Changing Athena Engine Versions.

    Configuring QuickSight

    The next step is to configure QuickSight to use these connectors to query data and visualize with QuickSight.

    1. On the AWS Management Console, navigate to QuickSight.
    2. If you’re not signed up for QuickSight, you’re prompted with the option to sign up. Follow the steps to sign up to use QuickSight.
    3. After you log in to QuickSight, choose Manage QuickSight under your account.

    After you log in to QuickSight, choose Manage QuickSight under your account.

    1. In the navigation pane, choose Security & permissions.
    2. Under QuickSight access to AWS services, choose Add or remove.

    Under QuickSight access to AWS services, choose Add or remove.

    A page appears for enabling QuickSight access to AWS services.

    1. Choose Athena.

    Choose Athena.

    1. In the pop-up window, choose Next.

    In the pop-up window, choose Next.

    1. On the S3 tab, select the necessary S3 buckets. For this post, I select the athena-federation-workshop-<account_id> bucket and another one that stores my Athena query results.
    2. For each bucket, also select Write permission for Athena Workgroup.

    For each bucket, also select Write permission for Athena Workgroup.

    1. On the Lambda tab, select the Lambda functions corresponding to the Athena federated connectors that Athena federated queries use. If you followed the post Extracting and joining data from multiple data sources with Athena Federated Query when configuring your Athena federated connectors, you can select dynamo, hbase, mysql, and redis.

    For information about registering a data source in Athena, see the appendix in this post.

    1. Choose Finish.

    Choose Finish.

    1. Choose Update.
    2. On the QuickSight console, choose New analysis.
    3. Choose New dataset.
    4. For Datasets, choose Athena.
    5. For Data source name, enter Athena-federation.
    6. For Athena workgroup, choose primary.
    7. Choose Create data source. 

    As stated earlier, you can use the AmazonAthenaPreviewFunctionality workgroup or another workgroup as long as you’re running Athena engine version 2 in a supported Region.

    You can use the AmazonAthenaPreviewFunctionality workgroup or another workgroup as long as you’re running Athena engine version 2 in a supported Region.

    1. For Catalog, choose the catalog that you created for your Athena federated connector.

    For information about creating and registering a data source in Athena, see the appendix in this post.

    For information about creating and registering a data source in Athena, see the appendix in this post.

    1. For this post, I choose the dynamo catalog, which does a federation to the Athena DynamoDB connector.

    For this post, I choose the dynamo catalog, which does a federation to the Athena DynamoDB connector.

    I can now see the database and tables listed in QuickSight.

    1. Choose Edit/Preview data to see the data.
    2. Choose Save & Visualize to start using this data for creating visualizations in QuickSight.

    22. Choose Save & Visualize to start using this data for creating visualizations in QuickSight.

    1. To do a join with another Athena data source, choose Add data and select the catalog and table.
    2. Choose the join link between the two datasets and choose the appropriate join configuration.
    3. Choose Apply.

    Choose Apply

    You should be able to see the joined data.

    You should be able to see the joined data.

    Running a query in QuickSight

    Now we use the custom SQL option in QuickSight to run a complex query with multiple Athena federated data sources.

    1. On the QuickSight console, choose New analysis.
    2. Choose New dataset.
    3. For Datasets, choose Athena.
    4. For Data source name, enter Athena-federation.
    5. For the workgroup, choose primary.
    6. Choose Create data source.
    7. Choose Use custom SQL.
    8. Enter the query for ProfitBySupplierNation.
    9. Choose Edit/Preview data.

    Choose Edit/Preview data.

    Under Query mode, you have the option to view your query in either SPICE or direct query. SPICE is the QuickSight Super-fast, Parallel, In-memory Calculation Engine. It’s engineered to rapidly perform advanced calculations and serve data. Using SPICE can save time and money because your analytical queries process faster, you don’t need to wait for a direct query to process, and you can reuse data stored in SPICE multiple times without incurring additional costs. You also can refresh data in SPICE on a recurring basis as needed or on demand. For more information about refresh options, see Refreshing Data.

    With direct query, QuickSight doesn’t use SPICE data and sends the query every time to Athena to get the data.

    1. Select SPICE.
    2. Choose Apply.
    3. Choose Save & visualize.

    Choose Save & visualize.

    1. On the Visualize page, under Fields list, choose nation and sum_profit.

    QuickSight automatically chooses the best visualization type based on the selected fields. You can change the visual type based on your requirement. The following screenshot shows a pie chart for Sum_profit grouped by Nation.

    The following screenshot shows a pie chart for Sum_profit grouped by Nation.

    You can add more datasets using Athena federated queries and create dashboards. The following screenshot is an example of a visual analysis over various datasets that were added as part of this post.

    The following screenshot is an example of a visual analysis over various datasets that were added as part of this post.

    When your analysis is ready, you can choose Share to create a dashboard and share it within your organization.

    Summary

    QuickSight is a powerful visualization tool, and with Athena federated queries, you can run analysis and build dashboards on various data sources like DynamoDB, HBase on Amazon EMR, and many more. You can also easily join relational, non-relational, and custom object stores in Athena queries and use them with QuickSight to create visualizations and dashboards.

    For more information about Athena Federated Query, see Using Amazon Athena Federated Query and Query any data source with Amazon Athena’s new federated query.


    Appendix

    To register a data source in Athena, complete the following steps:

    1. On the Athena console, choose Data sources.

    On the Athena console, choose Data sources.

    1. Choose Connect data source.

    Choose Connect data source.

    1. Select Query a data source.
    2. For Choose a data source, select a data source (for this post, I select Redis).
    3. Choose Next.

    Choose Next.

    1. For Lambda function, choose your function.

    For this post, I use the redis Lambda function, which I configured as part of configuring the Athena federated connector in the post Extracting and joining data from multiple data sources with Athena Federated Query.

    1. For Catalog name, enter a name (for example, redis).

    The catalog name you specify here is the one that is displayed in QuickSight when selecting Lambda functions for access.

    1. Choose Connect.

    Choose Connect.

    When the data source is registered, it’s available in the Data source drop-down list on the Athena console.

    When the data source is registered, it’s available in the Data source drop-down list on the Athena console.


    About the Author

    Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. In his free time, he likes to watch movies and spend time with his family.

     

     

     

    Multi-tenant processing pipelines with AWS DMS, AWS Step Functions, and Apache Hudi on Amazon EMR

    Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/multi-tenant-processing-pipelines-with-aws-dms-aws-step-functions-and-apache-hudi-on-amazon-emr/

    Large enterprises often provide software offerings to multiple customers by providing each customer a dedicated and isolated environment (a software offering composed of multiple single-tenant environments). Because the data is in various independent systems, large enterprises are looking for ways to simplify data processing pipelines. To address this, you can create data lakes to bring your data to a single place.

    Typically, a replication tool such as AWS Database Migration Service (AWS DMS) can replicate the data from your source systems to Amazon Simple Storage Service (Amazon S3). When the data is in Amazon S3, you process it based on your requirements. A typical requirement is to sync the data in Amazon S3 with the updates on the source systems. Although it’s easy to apply updates on a relational database management system (RDBMS) that backs an online source application, it’s tough to apply this change data capture (CDC) process on your data lakes. Apache Hudi is a good way to solve this problem. You can use Hudi on Amazon EMR to create Hudi tables (for more information, see Hudi in the Amazon EMR Release Guide).

    This post introduces a pipeline that loads data and its ongoing changes (change data capture) from multiple single-tenant tables from different databases to a single multi-tenant table in an Amazon S3-backed data lake, simplifying data processing activities by creating multi-tenant datasets.

    Architecture overview

    At a high level, this architecture consolidates multiple single-tenant environments into a single multi-tenant dataset so data processing pipelines can be centralized. For example, suppose that your software offering has two tenants, each with their dedicated and isolated environment, and you want to maintain a single multi-tenant table that includes data of both tenants. Moreover, you want any ongoing replication (CDC) in the sources for tenant 1 and tenant 2 to be synchronized (compacted or reconciled) when an insert, delete, or update occurs in the source systems of the respective tenant.

    In the past, to support record-level updates or inserts (called upserts) and deletes on an Amazon S3-backed data lake, you relied on either having an Amazon Redshift cluster or an Apache Spark job that reconciled the update, deletes, and inserts with existing historical data.

    The architecture for our solution uses Hudi to simplify incremental data processing and data pipeline development by providing record-level insert, update, upsert, and delete capabilities. For more information, see Apache Hudi on Amazon EMR.

    Moreover, the architecture for our solution uses the following AWS services:

    • AWS DMS – AWS DMS is a cloud service that makes it easy to migrate relational databases, data warehouses, NoSQL databases, and other types of data stores. For more information, see What is AWS Database Migration Service?
    • AWS Step FunctionsAWS Step Functions is a web service that enables you to coordinate the components of distributed applications and microservices using visual workflows. For more information, see What Is AWS Step Functions?
    • Amazon EMR – Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. For more information, see Overview of Amazon EMR Architecture and Overview of Amazon EMR.
    • Amazon S3 – Data is stored in Amazon S3, an object storage service with scalable performance, ease-of-use features, and native encryption and access control capabilities. For more details on Amazon S3, see Amazon S3 as the Data Lake Storage Platform.

    Architecture deep dive

    The following diagram illustrates our architecture.

    This architecture relies on AWS Database Migration Service (AWS DMS) to transfer data from specific tables into an Amazon S3 location organized by tenant-id.

    Although AWS DMS performs the migration and the ongoing replication—also known as change data capture (CDC)—it applies a data transformation that adds a custom column named tenant-id and populates it with the tenant-id value defined in the AWS DMS migration task configuration. The AWS DMS data transformations allow you to modify a schema, table, or column or, in this case, add a column with the tenant-id so data transferred to Amazon S3 is grouped by tenant-id.

    AWS DMS is also configured to add an additional column with timestamp information. For a full load, each row of this timestamp column contains a timestamp for when the data was transferred from the source to the target by AWS DMS. For ongoing replication, each row of the timestamp column contains the timestamp for the commit of that row in the source database.

    We use an AWS Step Functions workflow to move the files AWS DMS wrote to Amazon S3 into an Amazon S3 location that is organized by table name and holds all the tenant’s data. Files in this location all have the new column tenant-id, and the respective tenant-id value is configured in the AWS DMS task configuration.

    Next, the Hudi DeltaStreamer utility runs on Amazon EMR to process the multi-tenant source data and create or update the Hudi dataset on Amazon S3.

    You can pass to the Hudi DeltaStreamer utility a field in the data that has each record’s timestamp. The Hudi DeltaStreamer utility uses this to ensure records are processed in the proper chronological order. You can also provide the Hudi DeltaStreamer utility one or more SQL transforms, which the utility applies in a sequence as records are read and before the datasets are persisted on Amazon S3 as an Hudi Parquet dataset. We highlight the SQL transform later in this post.

    Depending on your downstream consumption patterns, you might require a partitioned dataset. We discuss the process to choose a partition within Hudi DeltaStreamer later in this post. 

    For this post, we use the Hudi DeltaStreamer utility instead of the Hudi DataSource due to its operational simplicity. However, you can also use Hudi DataSource with this pattern.

    When to use and not use this architecture

    This architecture is ideal for the workloads that are processed in batches and can tolerate the latency associated with the time required to capture the changes in the sources, write those changes into objects in Amazon S3, and run the Step Functions workflow that aggregates the objects per tenant and creates the multi-tenant Hudi dataset.

    This architecture uses and applies to Hudi COPY_ON_WRITE tables. This architecture is not recommended for latency-sensitive applications and does not support MERGE_ON_READ tables. For more information, see the section Analyzing the properties provided to the command to run the Hudi DeltaStreamer utility.

    This architecture pattern is also recommended for workloads that have update rates to the same record (identified by a primary key) that are separated by, at most, microseconds or microsecond precision. The reason behind this is that Hudi uses a field, usually a timestamp, to break ties between records with the same key. If the field used to break ties can capture each individual update, data stored in the Hudi dataset on Amazon S3 is exactly the same as in the source system.

    The precision of timestamp fields in your table depends on the database engine at the source. We strongly recommended that as you evaluate adopting this pattern, you also evaluate the support that AWS DMS provides to your current source engines, and understand the rate of updates in your source and the respective timestamp precision requirements that the source needs to support or currently supports. For example: AWS DMS writes any timestamp column values that are written to Amazon S3 as part of an ongoing replication with second precision if the data source is MySQL, and with microsecond precision if the data source is PostgreSQL. See the section Timestamp precision considerations for additional details.

    This architecture assumes that all tables in every source database have the same schema and that any changes to a table’s schema is performed to each data source at the same time. Moreover, this architecture pattern assumes that any schema changes are backward compatible—you only append new fields and don’t delete any existing fields. Hudi supports schema evolutions that are backward compatible.

    If you’re expecting constant schema changes to the sources, it might be beneficial to consider performing full snapshots instead of ingesting and processing the CDC. If performing full snapshots isn’t practical and you are expecting constant schema changes that are not compatible with Hudi’s schema evolution support, you can use the Hudi DataWriter API with your Spark jobs and address schema changes within the code by casting and adding columns as required to keep backward compatibility.

    See the Schema evolution section for more details on the process for schema evolution with AWS DMS and Hudi.

    Although it’s out of scope to evaluate the consumption tools available downstream to the Hudi dataset, you can consume Hudi datasets stored on Amazon S3 from Apache Hive, Spark, and Presto on Amazon EMR. Moreover, you can consume Hudi datasets stored on Amazon S3 from Amazon Redshift Spectrum and Amazon Athena.

    Solution overview

    This solution uses an AWS CloudFormation template to create the necessary resources.

    You trigger the Step Functions workflow via the AWS Management Console. The workflow uses AWS Lambda for processing tasks that are part of the logic. Moreover, the workflow submits jobs to an EMR cluster configured with Hudi.

    To perform the database migration, the CloudFormation template deploys one AWS DMS replication instance and configures two AWS DMS replications tasks, one per tenant. The AWS DMS replication tasks connect to the source data stores, read the source data, apply any transformations, and load the data into the target data store.

    You access an Amazon SageMaker notebook to generate changes (updates) to the sources. Moreover, you connect into the Amazon EMR master node via AWS Systems Manager Session Manager to run Hive or Spark queries in the Hudi dataset backed by Amazon S3. Session Manager provides secure and auditable instance management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys.

    The following diagram illustrates the solution architecture.

    The orchestration in this demo code currently supports processing at most 25 sources (tables within a database or distributed across multiple databases) per run and is not preventing concurrent runs of the same tenant-id, database-name, or table-name triplet by keeping track of the tenant-id, database-name, or table-name triplet being processed or already processed. Preventing concurrent runs avoids duplication of work. Moreover, the orchestration in this demo code doesn’t prevent the Hudi DeltaStreamer job to run with the output of both an AWS DMS full load task and an AWS DMS CDC load task. For production environments, we recommend that you keep track of the existing tenant_id in the multi-tenant Hudi dataset. This way, if an existing AWS DMS replication task is mistakenly restarted to perform a full load instead of continuing the ongoing replication, your solution can adequately prevent any downstream impact to the datasets. Moreover, we recommend that you keep track of the schema changes in the source and guarantee that the Hudi DeltaStreamer utility only processes files with the same schema.

    For details on considerations related to the Step Functions workflows, see Best Practices for Step Functions. For more information about considerations when running AWS DMS at scale, see Best practices for AWS Database Migration Service. Finally, for details on how to tune Hudi, see Performance and Tuning Guide.

    Next, we walk you through several key areas of the solution.

    Prerequisites

    Before getting started, you must create a S3 bucket, unzip and upload the blog artifacts to the S3 bucket and store the database passwords in AWS Systems Manager Parameter Store.

    Creating and storing admin passwords in AWS Systems Manager Parameter Store

    This solution uses AWS Systems Manager Parameter Store to store the passwords used in the configuration scripts. With Parameter Store, you can create secure string parameters, which are parameters that have a plaintext parameter name and an encrypted parameter value. Parameter Store uses AWS Key Management Service (AWS KMS) to encrypt and decrypt the parameter values of secure string parameters. With Parameter Store, you improve your security posture by separating your data from your code and by controlling and auditing access at granular levels. There is no charge from Parameter Store to create a secure string parameter, but charges for using AWS KMS do apply. For information, see AWS Key Management Service pricing.

    Before deploying the CloudFormation templates, run the following AWS Command Line Interface (AWS CLI) commands. These commands create Parameter Store parameters to store the passwords for the RDS master user for each tenant.

    aws ssm put-parameter --name "/HudiStack/RDS/Tenant1Settings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region
    
    aws ssm put-parameter --name "/HudiStack/RDS/Tenant2Settings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

    AWS DMS isn’t integrated with Parameter Store, so you still need to set the same password as in the CloudFormation template parameter DatabasePassword (see the following section).

    Creating an S3 bucket for the solution and uploading the solution artifacts to Amazon S3

    This solution uses Amazon S3 to store all artifacts used in the solution. Before deploying the CloudFormation templates, create an Amazon S3 bucket and download the artifacts required by the solution.

    Unzip the artifacts and upload all folders and files in the .zip file to the S3 bucket you just created.

    The following screenshot uses the root location hudistackbucket.

    Keep a record of the Amazon S3 root path because you add it as a parameter to the CloudFormation template later.

    Creating the CloudFormation stack

    To launch the entire solution, choose Launch Stack:

    The template requires the following parameters. You can accept the default values for any parameters not in the table. For the full list of parameters, see the CloudFormation template.

    • S3HudiArtifacts – The bucket name that holds the solution artifacts (Lambda function Code, Amazon EMR step artifacts, Amazon SageMaker notebook, Hudi job configuration file template). You created this bucket in the previous step. For this post, we use hudistackbucket.
    • DatabasePassword – The database password. This value needs to be the same as the one configured via Parameter Store. The CloudFormation template uses this value to configure the AWS DMS endpoints.
    • emrLogUri – The Amazon S3 location to store Amazon EMR cluster logs. For example, s3://replace-with-your-bucket-name/emrlogs/.

    Testing database connectivity

    To test connectivity, complete the following steps:

    1. On the Amazon SageMaker Console, choose Notebook instances.
    2. Locate the notebook instance you created and choose Open Jupyter.
    3. In the new window, choose Runmev5.ipynb.

    This opens the notebook for this post. We use the notebook to generate changes and updates to the databases used in the post.

    1. Run all cells of the notebook until the section Triggering the AWS DMS full load tasks for tenant 1 and tenant 2.

    Analyzing the AWS DMS configuration

    In this section, we examine the data transformation configuration and other AWS DMS configurations.

    Data transformation configuration

    To support the conversion from single-tenant to multi-tenant pipelines, the CloudFormation template applied a data transformation to the AWS DMS replication task. Specifically, the data transformation adds a new column named tenant_id to the Amazon S3 AWS DMS target. Adding the tenant_id column helps with downstream activities organize the datasets per tenant_id. For additional details on how to set up AWS DMS data transformation, see Transformation Rules and Actions. For reference, the following code is the data transformation we use for this post:

    {
        "rules": [
            {
                "rule-type": "selection",
                "rule-id": "1",
                "rule-name": "1",
                "object-locator": {
                    "schema-name": "salesdb",
                    "table-name": "sales_order_detail"
                },
                "rule-action": "include",
                "filters": []
            },
            {
                "rule-type": "transformation",
                "rule-id": "2",
                "rule-name": "2",
                "rule-action": "add-column",
                "rule-target": "column",
                "object-locator": {
                    "schema-name": "salesdb",
                    "table-name": "sales_order_detail"
                },
                "value": "tenant_id",
                "expression": "1502",
                "data-type": {
                    "type": "string",
                    "length": 50
                }
            }
        ]
    }

    Other AWS DMS configurations

    When using Amazon S3 as a target, AWS DMS accepts several configuration settings that provide control on how the files are written to Amazon S3. Specifically, for this use case, AWS DMS uses Parquet as the value for the configuration property DataFormat. For additional details on the S3 settings used by AWS DMS, see S3Settings. For reference, we use the following code:

    DataFormat=parquet;TimestampColumnName=timestamp;

    Timestamp precision considerations

    By default, AWS DMS writes timestamp columns in a Parquet format with a microsecond precision, should the source engine support that precision. If the rate of updates you’re expecting is high, it’s recommended that you use a source that has support for microsecond precision, such as PostgreSQL.

    Moreover, if the rate of updates is high, you might want to use a data source with microsecond precision and the AR_H_TIMESTAMP internal header column, which captures the timestamp of when the changes were made instead of the timestamp indicating the time of the commit. See Replicating source table headers using expressions for more details, specifically the details on the AR_H_TIMESTAMP internal header column. When you set TimestampColumnName=timestamp as we mention earlier, the new timestamp column captures the time of the commit.

    If you need to use the AR_H_TIMESTAMP internal header column with a data source that supports microsecond precision such as PostgreSQL, we recommend using the Hudi DataSource writer job instead of the Hudi DeltaStreamer utility. The reason for this is that although the AR_H_TIMESTAMP internal header column (in a source that supports microsecond precision) has microsecond precision, the actual value written by AWS DMS on Amazon S3 has a nanosecond format (microsecond precision with the nanosecond dimension set to 0). By using the Hudi  DataSource writer job, you can convert the AR_H_TIMESTAMP internal header column to a timestamp datatype in Spark with microsecond precision and use that new value as the PARTITIONPATH_FIELD_OPT_KEY. See Datasource Writer for more details.

    Triggering the AWS DMS full load tasks for Tenant 1 and Tenant 2

    In this step, we run a full load of data from both databases to Amazon S3 using AWS DMS. To accomplish this, perform the following steps:

    1. On the AWS DMS console, under Migration, choose Database migration tasks.
    2. Select the replication task for Tenant 1 (dmsreplicationtasksourcetenant1-xxxxxxxxxxxxxxx).
    3. From the Actions menu, choose Restart/Resume.
    4. Repeat these steps for the Tenant 2 replication task.

    You can monitor the progress of this task by choosing the task link.

    Triggering the Step Functions workflow

    Next, we start a Step Functions workflow that automates the end-to-end process of processing the files loaded by AWS DMS to Amazon S3 and creating a multi-tenant Amazon S3-backed table using Hudi.

    To trigger the Step Functions workflow, perform the following steps:

    1. On the Step Functions console, choose State machines.
    2. Choose the MultiTenantProcessing workflow.
    3. In the new window, choose Start execution.
    4. Edit the following JSON code and replace the values as needed. You can find the emrClusterId on the Outputs tab of the Cloudformation template.
    {
      "hudiConfig": {
        "emrClusterId": "[REPLACE]",
        "targetBasePath": "s3://hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]/transformed/multitenant/huditables/sales_order_detail_mt",
    "targetTable": "sales_order_detail_mt",
    "sourceOrderingField": "timestamp",
    "blogArtifactBucket": "[REPLACE-WITH-BUCKETNAME-WITH-BLOG-ARTIFACTS]",
    "configScriptPath": "s3://[REPLACE-WITH-BUCKETNAME-WITH-BLOG-ARTIFACTS]/emr/copy_apache_hudi_deltastreamer_command_properties.sh",
    "propertiesFilename": "dfs-source.properties"
    },
      "copyConfig":{
                "srcBucketName": "hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]",
                "srcPrefix": "raw/singletenant/",
                "destBucketName": "hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]",
                "destPrefix": "raw/multitenant/salesdb/sales_order_detail_mt/"},
          "sourceConfig":{
                "databaseName": "salesdb",
                "tableName": "sales_order_detail"},  
      "workflowConfig":{
                "ddbTableName": "WorkflowTimestampRegister",
                "ddbTimestampFieldName": "T"},
      "tenants":{
        "array": [
                  {
                "tenantId": "1502"
            },           {
                "tenantId": "1501"
            }
        ]
      }
    }    
    1. Submit the edited JSON as the input to the workflow.

    If you scroll down, you should see an ExecutionSucceeded message in the last line of the event history (see the following screenshot).

    1. On the Amazon S3 console, search for the bucket name used in this post (hudiblog-[account-id]) and then for the prefix raw/multitenant/salesdb/sales_order_detail_mt/.

    You should see two files.

    1. Navigate to the prefix transformed/multitenant/salesdb/sales_order_detail_mt/.

    You should see the Hudi table Parquet format.

    Analyzing the properties provided to the command to run the Hudi DeltaStreamer utility

    If the MultitenantProcessing workflow was successful, the files that AWS DMS loaded into Amazon S3 are now available in a multi-tenant table on Amazon S3. This table is now ready to process changes to the databases for each tenant.

    In this step, we go over the command the workflow triggers to create a table with Hudi.

    The Step Functions workflow for this post runs all the steps except the tasks in the Amazon Sagemaker notebook that you trigger. The following section is just for your reference and discussion purposes.

    On the Amazon EMR console, choose the cluster created by the CloudFormation and choose the Steps view of the cluster to obtain the full command used by the workflow.

    The command has two sets of properties: one defined directly in the command, and another defined on a configuration file, dfs-source.properties, which is updated automatically by the Step Functions workflow.

    The following are some of the properties defined directly in the command:

    • –table-type – The Hudi table type, for this use case, COPY_ON_WRITE. The reason COPY_ON_WRITE is preferred for this use case relates to the fact that the ingestion is done in batch mode, access to the changes in the data are not required in real time, and the downstream workloads are read-heavy. Moreover, with this storage type, you don’t need to handle compactions because updates create a new Parquet file with the impacted rows being updated. Given that the ingestion is done in batch mode, using the COPY_ON_WRITE table type efficiently keeps track of the latest record change, triggering a new write to update the Hudi dataset with the latest value of the record.
      • This post requires that you use the COPY_ON_WRITE table type.
      • For reference, if your requirement is to ingest write- or change-heavy workloads and make the changes available as fast as possible for downstream consumption, Hudi provides the MERGE_ON_READ table type. In this table type, data is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted as needed to create new versions of the columnar files. For more details on the two table types provided by Hudi, see Understanding Dataset Storage Types: Copy on Write vs. Merge on Read and Considerations and Limitations for Using Hudi on Amazon EMR.
    • –source-ordering-field – The field in the source dataset that the utility uses to order the records. For this use case, we configured AWS DMS to add a timestamp column to the data. The utility uses that column to order the records and break ties between records with the same key. This field needs to exist in the data source and can’t be the result of a transformation.
    • –source-class – AWS DMS writes to the Amazon S3 destination in Apache Parquet. Use apache.hudi.utilities.sources.ParquetDFSSource as the value for this property.
    • –target-base-path – The destination base path that the utility writes.
    • –target-table – The table name of the table the utility writes to.
    • –transformer-class – This property indicates the transformer classes that the utility is applied to the input records. For this use case, we use the AWSDmsTransformer plus the SqlQueryBasedTransformer. The transformers are applied in the order they are identified in this property.
    • –payload-class – Set to org.apache.hudi.payload.AWSDmsAvroPayload.
    • –props – The path of the file with additional configurations. For example, file:///home/hadoop/dfs-source.properties.
      • The file /home/hadoop/dfs-source.properties has additional configurations passed to Hudi DeltaStreamer. You can view that file by logging in to your Amazon EMR master node and running cat /home/hadoop/dfs-source.properties.

    The following code is the configuration file. By setting the properties in the file, we configure Hudi to write the dataset in a partitioned way and applying a SQL transform before persisting the dataset into the Amazon S3 location.

    ===Start of Configuration File ===
    hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
    hoodie.datasource.write.partitionpath.field=tenant_id,year,month
    hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a.line_id, a.line_number, a.order_id, a.product_id,  a.quantity, a.unit_price, a.discount, a.supply_cost, a.tax, string(year(to_date(a.order_date))) as year, string(month(to_date(a.order_date))) as month, a.Op, a.tenant_id FROM <SRC> a
    hoodie.datasource.write.recordkey.field=tenant_id,line_id
    hoodie.datasource.write.hive_style_partitioning=true
    # DFS Source
    hoodie.deltastreamer.source.dfs.root=s3://hudiblog-xxxxxxxxxxxxxx/raw/multitenant/salesdb/sales_order_detail_mt/xxxxxxxxxxxxxxxxxxxxx
    ===End of Configuration File ===

    Some configurations in this file include the following:

    • hoodie.datasource.write.operation=upsert – This property defines if the write operation is an insert, upsert, or bulkinsert. If you have a large initial import, use bulkinsert to load new data into a table, and on the next loads use upsert or insert. The default value for this property is upsert. For this post, the default is accepted because the dataset is small. When you run the solution with larger datasets, you can perform the initial import with bulkinsert and then use upsert for the next loads. For more details on the three modes, see Write Operations.
    • hoodie.datasource.write.hive_style_partitioning=true – This property generates Hive style partitioning—partitions of the form partition_key=partition_values. See the property hoodie.datasource.write.partitionpath.field for more details.
    • hoodie.datasource.write.partitionpath.field=tenant_id,year,month – This property identifies the fields that Hudi uses to extract the partition fields.
    • hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator – This property allows you to combine multiple fields and use the combination of fields as the record key and partition key.
    • hoodie.datasource.write.recordkey.field=tenant_id,line_id – This property indicates the fields in the dataset that Hudi uses to identify a record key. The source table has line_id as the primary key. Given that the Hudi dataset is multi-tenant, tenant_id is also part of the record key.
    • hoodie.deltastreamer.source.dfs.root=s3://hudiblog-your-account-id/raw/multitenant/salesdb/sales_order_detail_mt/xxxxxxxxx – This property indicates the Amazon S3 location with the source files that the Hudi DeltaStreamer utility consumes. This is the location that the MultiTenantProcessing state machine created and includes files from both tenants.
    • hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a.line_id, a.line_number, a.order_id, a.product_id, a.quantity, a.unit_price, a.discount, a.supply_cost, a.tax, year(string(a.order_date)) as year, month(string(a.order_date)) as month, a.Op, a.tenant_id FROM a – This property indicates that the Hudi DeltaStreamer applies a SQL transform before writing the records as a Hudi dataset in Amazon S3. For this use case, we create new fields from the RBDMS table’s field order_date.

    A change to the schema of the source RDBMS tables requires the respective update to the SQL transformations. As mentioned before, this use case requires that schema changes to a source schema occur in every table for every tenant.

    For additional details on Hudi, see Apply record level changes from relational databases to Amazon S3 data lake using Hudi on Amazon EMR and AWS DMS.

    Although the Hudi multi-tenant table is partitioned, you should only have one job (Hudi DeltaStreamer utility or Spark data source) writing to the Hudi dataset. If you’re expecting specific tenants to produce more changes than others, you can consider prioritizing some tenants over others or use dedicated tables for the most active tenants to avoid any impact to tenants that produce a smaller amount of changes.

    Schema evolution

    Hudi supports schema evolutions that are backward compatible—you only append new fields and don’t delete any existing fields.

    By default, Hudi handles schema changes of type by appending new fields to the end of the table.

    New fields that you add have to either be nullable or have a default value. For example, as you add a new field to the source database, the records that generate a change have a value for that field in the Hudi dataset, but older records have a null value for that same field in the dataset.

    If you require schema changes that are not supported by Hudi, you need to use either a SQL transform or the Hudi DataSource API to handle those changes before writing to the Hudi dataset. For example, if you need to delete a field from the source, you need to use a SQL transform before writing the Hudi dataset to ensure the deleted column is populated by a constant or dummy value, or use the Hudi DataSource API to do so.

    Moreover, AWS DMS with Amazon S3 targets support only the following DDL commands: Truncate Table, Drop Table, and Create Table. See Limitations to using Amazon S3 as a target for more details.

    This means that when you issue an Alter Table command, the AWS DMS replication tasks don’t capture those changes until you restart the task.

    As you implement this architecture pattern, it’s recommended that you automate the schema evolution process and apply the schema changes in a maintenance window when the source isn’t serving requests and the AWS DMS replication CDC tasks aren’t running.

    Simulating random updates to the source databases

    In this step, you perform some random updates to the data. Navigate back to the Runmev5.ipynb Jupyter notebook and run the cells under the section Simulate random updates for tenant 1 and tenant 2.

    Triggering the Step Functions workflow to process the ongoing replication

    In this step, you rerun the MultitenantProcessing workflow to process the files generated during the ongoing replication.

    1. On the Step Functions console, choose State machines.
    2. Choose the MultiTenantProcessing workflow
    3. In the new window, choose Start execution.
    4. Use the same JSON as the JSON used when you first ran the workflow.
    5. Submit the edited JSON as the input to the workflow.

    Querying the Hudi multi-tenant table with Spark

    To query your table with Spark, complete the following steps:

    1. On the Session Manager console, select the instance HudiBlog Spark EMR Cluster.
    2. Choose Start session.

    Session Manager creates a secure session to the Amazon EMR master node.

    1. Switch to the Hadoop user and then to the home directory of the Hadoop user.

    You’re now ready to run the commands described in the next sections.

    1. Run the following command in the command line:
      spark-shell  --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar 

     

    1. When the spark-shell is available, enter the following code:
      import scala.collection.JavaConversions._
      import org.apache.spark.sql.SaveMode._
      import org.apache.hudi.DataSourceReadOptions._
      
      val tableName = "sales_order_detail_mt"
      
      val basePath = "s3://hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]/transformed/multitenant/huditables/"+tableName+"/"
      
      spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*").createOrReplaceTempView(tableName)
      
      val sqlDF = spark.sql("SELECT * FROM "+tableName)
      sqlDF.show(20,false)

    Updating the source table schema

    In this section, you change the schema of the source tables.

    1. On the AWS DMS management console, stop the AWS DMS replication tasks for Tenant 1 and Tenant 2.
    2. Make sure the Amazon SageMaker notebook isn’t writing to the database.
    3. Navigate back to the Jupyter notebook and run the cells under the section Updating the source table schema.
    4. On the AWS DMS console, resume (not restart) the AWS DMS replication tasks for both tenants.
    5. Navigate back to the Jupyter notebook and run the cells under the section Simulate random updates for tenant 1 after the schema change.

    Analyzing the changes to the configuration file for the Hudi DeltaStreamer utility

    Because there was a change to the schema of the sources and a new field needs to be propagated to the Hudi dataset, the property hoodie.deltastreamer.transformer.sql is updated with the following value:

    hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a.line_id, a.line_number, a.order_id, a.product_id,  a.quantity, a.unit_price, a.discount, a.supply_cost, a.tax, string(year(to_date(a.order_date))) as year, string(month(to_date(a.order_date))) as month, a.Op, a.tenant_id, a.critical FROM <SRC> a

    The field a.critical is added to the end, after a.tenant_id.

    Triggering the Step Functions workflow to process the ongoing replication after the schema change

    In this step, you rerun the MultitenantProcessing workflow to process the files produced by AWS DMS during the ongoing replication.

    1. On the Step Functions console, choose State machines.
    2. Choose the MultiTenantProcessing
    3. In the new window, choose Start execution.
    4. Update the JSON document used when you first ran the workflow by replacing the field propertiesFilename with the following value:

        "propertiesFilename": "dfs-source-new_schema.properties"

    1. Submit the edited JSON as the input to the workflow.

    Querying the Hudi multi-tenant table after the schema change

    We can now query the multi-tenant table again.

    Using Hive

    If using Hive when the workflow completes, go back to the terminal window opened by Session Manager and run the following hive-sync command:

    /usr/lib/hudi/bin/run_sync_tool.sh  --table sales_order_detail_mt  --partitioned-by tenant_id,year,month  --database default --base-path s3://hudiblog-[REPLACE-WITH-YOUR-ACCOUNT-ID]/transformed/multitenant/huditables/sales_order_detail_mt    --jdbc-url jdbc:hive2:\/\/localhost:10000 --user hive   --partition-value-extractor org.apache.hudi.hive.MultiPartKeysValueExtractor --pass passnotenforced

    For this post, we run the hive-sync command manually. You can also add the flag --enable-hive-sync to the Hudi DeltaStreamer utility command showed in section Analyzing the properties provided to the command to run the Hudi DeltaStreamer utility.

    After the hive-sync updates the table schema, the new column is visible from Hive. Start Hive in the command line and run the following query:

    select `timestamp`, quantity, critical,  order_id, tenant_id from sales_order_detail_mt where order_id=4668 and tenant_id=1502;

    Using Spark

    If you just want to use Spark to read the Hudi datasets on Amazon S3, after the schema change, you can use the same code in Querying the Hudi multi-tenant table with Spark and add the following line to the code:

    spark.conf.set("spark.sql.parquet.mergeSchema", "true")

    Cleaning up

    To avoid incurring future charges, stop the AWS DMS replication tasks, delete the contents in the S3 bucket for the solution, and delete the CloudFormation stack.

    Conclusion

    This post explained how to use AWS DMS, Step Functions, and Hudi on Amazon EMR to convert a single-tenant pipeline to a multi-tenant pipeline.

    With this solution, your software offerings that use dedicate sources for each tenant can offload some of the common tasks across all the tenants to a pipeline backed by Hudi datasets stored on Amazon S3. Moreover, by using Hudi on Amazon EMR, you can easily apply inserts, updates, and deletes of the source databases to the datasets in Amazon S3. Moreover, you can easily support schema evolution that is backward compatible.

    Follow these steps and deploy the CloudFormation templates in your account to further explore the solution. If you have any questions or feedback, please leave a comment.

    The author would like to thank Radhika Ravirala and Neil Mukerje for the dataset and the functions on the notebook.


    About the Author

    Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

    Testing data quality at scale with PyDeequ

    Post Syndicated from Calvin Wang original https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/

    You generally write unit tests for your code, but do you also test your data? Incoming data quality can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. Examples of data quality issues include the following:

    • Missing values can lead to failures in production system that require non-null values (NullPointerException)
    • Changes in the distribution of data can lead to unexpected outputs of machine learning (ML) models
    • Aggregations of incorrect data can lead to wrong business decisions

    In this post, we introduce PyDeequ, an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark, the language of choice of many data scientists. PyDeequ democratizes and extends the power of Deequ by allowing you to use it alongside the many data science libraries that are available in that language. Furthermore, PyDeequ allows for fluid interface with Pandas DataFrames as opposed to restricting within Apache Spark DataFrames.

    Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (billions of rows) that typically live in a data lake, distributed file system, or a data warehouse. PyDeequ gives you access to this capability, but also allows you to use it from the familiar environment of your Python Jupyter notebook.

    Deequ at Amazon

    Deequ is used internally at Amazon to verify the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues don’t propagate to consumer data pipelines, reducing their blast radius.

    Deequ is also used within Amazon SageMaker Model Monitor. Now with the availability of PyDeequ, you can use it from a broader set of environments— Amazon SageMaker notebooks, AWS Glue, Amazon EMR, and more.

    Overview of PyDeequ

    Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram):

    • Metrics computation – Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon Simple Storage Service (Amazon S3) and compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
    • Constraint verification – As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
    • Constraint suggestion – You can choose to define your own custom data quality constraints or use the automated constraint suggestion methods that profile the data to infer useful constraints.
    • Python wrappers – You can call each Deequ function using Python syntax. The wrappers translate the commands to the underlying Deequ calls and return their response.

    Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram)

    Use case overview

    As a running example, we use a customer review dataset provided by Amazon on Amazon S3. We intentionally follow the example in the post Test data quality at scale with Deequ to show the similarity in functionality and implementation. We begin the way many data science projects do: with initial data exploration and assessment in a Jupyter notebook.

    If you’d like to follow along with a live Jupyter notebook, check out the notebook on our GitHub repo.

    During the data exploration phase, you want to easily answer some basic questions about the data:

    • Are the fields that are supposed to contain unique values really unique? Are there fields that are missing values?
    • How many distinct categories are there in the categorical fields?
    • Are there correlations between some key features?
    • If there are two supposedly similar datasets (such as different categories or different time periods), are they really similar?

    We also show you how to scale this approach to large-scale datasets, using the same code on an Amazon EMR cluster. This is how you’d likely do your ML training, and later as you move into a production setting.

    Starting a PySpark session in a SageMaker notebook

    To follow along with this post, open up a SageMaker notebook instance, clone the PyDeequ GitHub on the Sagemaker notebook instance, and run the test_data_quality_at_scale.ipynb notebook from the tutorials directory from the PyDeequ repository.

    Let’s install our dependencies first in a terminal window:

    $ pip install pydeequ

    Next, in a cell of our SageMaker notebook, we need to create a PySpark session:

    import sagemaker_pyspark
    import pydeequ
    
    classpath = ":".join(sagemaker_pyspark.classpath_jars())
    
    spark = (SparkSession
        .builder
        .config("spark.driver.extraClassPath", classpath)
        .config("spark.jars.packages", pydeequ.deequ_maven_coord)
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
        .getOrCreate())

    Loading data

    Load the dataset containing reviews for the category Electronics into our Jupyter notebook:

    df = spark.read.parquet("s3a://amazon-reviews-pds/parquet/product_category=Electronics/")

    After you load the DataFrame, you can run df.printSchema() to view the schema of the dataset:

    root
     |-- marketplace: string (nullable = true)
     |-- customer_id: string (nullable = true)
     |-- review_id: string (nullable = true)
     |-- product_id: string (nullable = true)
     |-- product_parent: string (nullable = true)
     |-- product_title: string (nullable = true)
     |-- star_rating: integer (nullable = true)
     |-- helpful_votes: integer (nullable = true)
     |-- total_votes: integer (nullable = true)
     |-- vine: string (nullable = true)
     |-- verified_purchase: string (nullable = true)
     |-- review_headline: string (nullable = true)
     |-- review_body: string (nullable = true)
     |-- review_date: date (nullable = true)
     |-- year: integer (nullable = true)

    Data analysis

    Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. As with Deequ, PyDeequ supports a rich set of metrics. For more information, see Test data quality at scale with Deequ or the GitHub repo. In the following example, we use the AnalysisRunner to capture the metrics you’re interested in:

    from pydeequ.analyzers import *
    
    analysisResult = AnalysisRunner(spark) \
                        .onData(df) \
                        .addAnalyzer(Size()) \
                        .addAnalyzer(Completeness("review_id")) \
                        .addAnalyzer(ApproxCountDistinct("review_id")) \
                        .addAnalyzer(Mean("star_rating")) \
                        .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
                        .addAnalyzer(Correlation("total_votes", "star_rating")) \
                        .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
                        .run()
                        
    analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
    analysisResult_df.show()

    The following table summarizes our findings.

    Name Instance Value
    ApproxCountDistinct review_id 3010972
    Completeness review_id 1
    Compliance top star_rating 0.74941
    Correlation helpful_votes,total_votes 0.99365
    Correlation total_votes,star_rating -0.03451
    Mean star_rating 4.03614
    Size * 3120938

    From this, we learn the following:

    • review_id has no missing values and approximately 3,010,972 unique values
    • 9% of reviews have a star_rating of 4 or higher
    • total_votes and star_rating are not correlated
    • helpful_votes and total_votes are strongly correlated
    • The average star_rating is 4.0
    • The dataset contains 3,120,938 reviews

    Defining and running tests for data

    After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

    For writing tests on data, we start with the VerificationSuite and add checks on attributes of the data. In this example, we test for the following properties of our data:

    • At least 3 million rows in total
    • review_id is never NULL
    • review_id is unique
    • star_rating has a minimum of 1.0 and maximum of 5.0
    • marketplace only contains US, UK, DE, JP, or FR
    • year does not contain negative values

    This is the code that reflects the previous statements. For information about all available checks, see the GitHub repo. You can run this directly in the Spark shell as previously explained:

    from pydeequ.checks import *
    from pydeequ.verification import *
    
    check = Check(spark, CheckLevel.Warning, "Amazon Electronics Review Check")
    
    checkResult = VerificationSuite(spark) \
        .onData(df) \
        .addCheck(
            check.hasSize(lambda x: x >= 3000000) \
            .hasMin("star_rating", lambda x: x == 1.0) \
            .hasMax("star_rating", lambda x: x == 5.0)  \
            .isComplete("review_id")  \
            .isUnique("review_id")  \
            .isComplete("marketplace")  \
            .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"]) \
            .isNonNegative("year")) \
        .run()
        
    checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    checkResult_df.show()

    After calling run(), PyDeequ translates your test description into Deequ, which translates it into a series of Spark jobs that are run to compute metrics on the data. Afterwards, it invokes your assertion functions (for example, lambda x: x == 1.0 for the minimum star rating check) on these metrics to see if the constraints hold on the data. The following table summarizes our findings.

    Constraint constraint_status constraint_message
    SizeConstraint(Size(None)) Success
    MinimumConstraint(Minimum(star_rating,None)) Success
    MaximumConstraint(Maximum(star_rating,None)) Success
    CompletenessConstraint(Completeness(review_id,None)) Success
    UniquenessConstraint(Uniqueness(List(review_id))) Failure Value: 0.9926566948782706 does not meet the constraint requirement!
    CompletenessConstraint(Completeness(marketplace,None)) Success
    ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None)) Success
    ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None)) Success

    Interestingly, the review_id column isn’t unique, which resulted in a failure of the check on uniqueness. We can also look at all the metrics that Deequ computed for this check by running the following:

    checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult)
    checkResult_df.show()

    The following table summarizes our findings.

    Name Instance Value
    Completeness review_id 1
    Completeness marketplace 1
    Compliance marketplace contained in US,UK,DE,JP,FR 1
    Compliance year is non-negative 1
    Maximum star_rating 5
    Minimum star_rating 1
    Size * 3120938
    Uniqueness review_id 0.99266

    Automated constraint suggestion

    If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see the GitHub repo.

    from pydeequ.suggestions import *
    
    suggestionResult = ConstraintSuggestionRunner(spark) \
                 .onData(df) \
                 .addConstraintRule(DEFAULT()) \
                 .run()
    
    # Constraint Suggestions in JSON format
    print(json.dumps(suggestionResult, indent=2))

    The result contains a list of constraints with descriptions and Python code, so that you can directly apply it in your data quality checks. Call print(json.dumps(result_json)) to inspect the suggested constraints; the following table shows a subset.

    Column Constraint Python code
    customer_id customer_id is not null .isComplete("customer_id")
    customer_id customer_id has type Integral .hasDataType("customer_id", ConstrainableDataTypes.Integral)
    customer_id customer_id has no negative values .isNonNegative("customer_id")
    helpful_votes helpful_votes is not null .isComplete("helpful_votes")
    helpful_votes helpful_votes has no negative values .isNonNegative("helpful_votes")
    marketplace marketplace has value range “US”, “UK”, “DE”, “JP”, “FR” .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"])
    product_title product_title is not null .isComplete("product_title")
    star_rating star_rating is not null .isComplete("star_rating")
    star_rating star_rating has no negative values .isNonNegative("star_rating")
    vine vine has value range “N”, “Y” .isContainedIn("vine", ["N", "Y"])

    You can explore the other tutorials in the PyDeequ GitHub repo.

    Scaling to production

    So far, we’ve shown you how to use these capabilities in the context of data exploration using a Jupyter notebook running on a SageMaker notebook instance. As your project matures, you need to use the same capabilities on larger and larger datasets, and in a production environment. With PyDeequ, it’s easy to make that transition. The following diagram illustrates deployment options for local and production purposes on AWS.

    The following diagram illustrates deployment options for local and production purposes on AWS.

    Amazon EMR and AWS Glue interface with PyDeequ through the PySpark drivers that PyDeequ utilizes as its main engine. PyDeequ can run as a PySpark application in both contexts when the Deequ JAR is added the Spark context. You can run PyDeequ’s data validation toolkit after the Spark context and drivers are configured and your data is loaded into a DataFrame. We describe the Amazon EMR configuration options and use cases in this section (configurations 2 and 3 in the diagram).

    Data exploration from a SageMaker notebook via an EMR cluster

    As shown in configuration 2 in the diagram, you can connect to an EMR cluster from a SageMaker notebook to run PyDeequ. This enables you to explore much larger volumes of data than you can using a single notebook. Your Amazon EMR cluster must be running Spark v2.4.6, available with Amazon EMR version 5.31 or higher, in order to work with PyDeequ. After you have a running cluster that has those components and a SageMaker notebook, you configure a SparkSession object using the following template to connect to your cluster. For more information about connecting a SageMaker notebook to Amazon EMR or the necessary IAM permissions, see Submitting User Applications with spark-submit.

    In the SageMaker notebook, run the following JSON in a cell before you start your SparkSession to configure your EMR cluster:

    %%configure -f
    { "conf":{
              "spark.pyspark.python": "python3",
              "spark.pyspark.virtualenv.enabled": "true",
              "spark.pyspark.virtualenv.type":"native",
              "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
              "spark.jars.packages": "com.amazon.deequ:deequ:1.0.3",
              "spark.jars.excludes": "net.sourceforge.f2j:arpack_combined_all"
             }
    }

    Start your SparkSession object in a cell after the preceding configuration by running spark. Then install PyDeequ onto your EMR cluster using the SparkContext (default named sc) with the following command:

    sc.install_pypi_package('pydeequ')

    Now you can start using PyDeequ from your notebook to run the same statements as before, but with much larger volumes of data.

    Running a transient EMR cluster

    Another way to leverage the power of an EMR cluster is to treat it as a transient cluster and run it in a headless configuration, as shown in configuration 3 in the diagram. We use spark-submit in an EMR add-step to run PyDeequ on Amazon EMR. For each of the following steps, make sure to replace the values in brackets accordingly.

    1. Create a bootstrap shell script and upload it to an S3 bucket. The following code is an example of pydeequ-emr-bootstrap.sh:
      #!/bin/bash
      
      sudo python3 -m pip install --no-deps pydeequ
      sudo python3 -m pip install pandas 

    1. Create an EMR cluster via the AWS Command Line Interface (AWS CLI):
      $ aws emr create-cluster \
      --name 'my-pydeequ-cluster' \
      --release-label emr-5.31.0 --applications Name=Spark Name=Hadoop Name=Hive Name=Livy Name=Pig Name=Hue 
      --use-default-roles \
      --instance-type m5.xlarge \
      --instance-count 2 \
      --bootstrap-actions \
          Path="s3://<S3_PATH_TO_BOOTSTRAP>/pydeequ-emr-bootstrap.sh",Name='install_pydeequ' \
      --visible-to-all-users \
      --enable-debugging \
      --ec2-attributes KeyName="<MY_SSH_KEY>",SubnetId="<MY_SUBNET>" \
      --auto-scaling-role EMR_AutoScaling_DefaultRole

    1. Create your PySpark PyDeequ run script and upload into Amazon S3. The following code is our example of pydeequ-test.py:
      import sys
      import pydeequ
      from pydeequ.checks import *
      from pydeequ.verification import *
      from pyspark.sql import SparkSession, Row
      
      if __name__ == "__main__":
      
          with SparkSession.builder.appName("pydeequ").getOrCreate() as spark:
      
              df = spark.sparkContext.parallelize([
                  Row(a="foo", b=1, c=5),
                  Row(a="bar", b=2, c=6),
                  Row(a="baz", b=3, c=None)]).toDF()
      
              check = Check(spark, CheckLevel.Error, "Integrity checks")
      
              checkResult = VerificationSuite(spark) \
                  .onData(df) \
                  .addCheck(
                      check.hasSize(lambda x: x >= 3) \
                      .hasMin("b", lambda x: x == 0) \
                      .isComplete("c")  \
                      .isUnique("a")  \
                      .isContainedIn("a", ["foo", "bar", "baz"]) \
                      .isNonNegative("b")) \
                  .run()
      
              checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
              checkResult_df.repartition(1).write.csv("s3a://<PATH_TO_OUTPUT>/pydeequ-out.csv", sep='|')

    1. When your cluster is running and in the WAITING stage, submit your Spark job to Amazon EMR via the AWS CLI:
      $ aws emr add-steps \
      --cluster-id <MY_CLUSTER_ID> \
      --steps Type=Spark,Name="pydeequ-spark-submit",Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,--packages,com.amazon.deequ:deequ:1.0.3,--exclude-packages,net.sourceforge.f2j:arpack_combined_all,s3a://pydeequ-emr/setup/pydeequ-test.py],ActionOnFailure=CANCEL_AND_WAIT

    Congratulations, you have now submitted a PyDeequ PySpark job to Amazon EMR. Give the job a few minutes to run, after which you can view your results at the S3 output path specified on the last line of pydeequ-test.py.

    Afterwards, remember to clean up your results and spin down the EMR cluster using the following command:

    $ aws emr terminate-clusters --cluster-ids <MY_CLUSTER_ID>

    Now you can use Amazon EMR to process large datasets in batch using PyDeequ to plug into your pipelines and provide scalable tests on your data.

    More examples on GitHub

    You can find examples of more advanced features on the Deequ GitHub page:

    • Deequ provides more than data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
    • Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
    • If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

    Conclusion

    This post showed you how to use PyDeequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. PyDeequ is available via pip install and on GitHub now for you to build your own data quality management pipeline.

    Learn more about the inner workings of Deequ in the VLDB 2018 paper Automating large-scale data quality verification.

    Stay tuned for another post demonstrating production workflows on AWS Glue.


    About the Authors

    Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.

     

     

    Chris Ghyzel is a Data Engineer for AWS Professional Services. Currently, he is working with customers to integrate machine learning solutions on AWS into their production pipelines.

     

     

     

    Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

    Running queries securely from the same VPC where an Amazon Redshift cluster is running

    Post Syndicated from Seetha Sarma original https://aws.amazon.com/blogs/big-data/running-queries-securely-from-the-same-vpc-where-an-amazon-redshift-cluster-is-running/

    Customers who don’t need to set up a VPN or a private connection to AWS often use public endpoints to access AWS. Although this is acceptable for testing out the services, most production workloads need a secure connection to their VPC on AWS. If you’re running your production data warehouse on Amazon Redshift, you can run your queries in Amazon Redshift query editor or use Amazon WorkSpaces from Amazon Virtual Private Cloud (Amazon VPC) to connect to Amazon Redshift securely and analyze and graph a data summary in your favorite business intelligence (BI) or data visualization desktop tool.

    With Amazon Redshift, you can query petabytes of structured and semi-structured data across your data warehouse, operational database, and your data lake using standard SQL. Amazon WorkSpaces is a managed, secure Desktop-as-a-Service (DaaS) solution deployed within an Amazon VPC. In this post, we show how you can run SQL queries on Amazon Redshift securely without VPN using Amazon Redshift query editor and Amazon WorkSpaces. First, we discuss how to run queries that return large datasets from the Amazon Redshift query editor using the UNLOAD command. Next, we discuss how to set up Amazon WorkSpaces and use it to securely run queries on Amazon Redshift. We cover the detailed steps for setting up Amazon WorkSpaces and show different scenarios to test Amazon Redshift queries with Amazon WorkSpaces.

    The following diagram illustrates these architectures.

    Using the Amazon Redshift query editor with UNLOAD command

    Amazon Redshift has a query editor on its console that is typically used to run short queries and explore the data in the Amazon Redshift database. You may have a production scenario with queries that return large result sets. For instance, you may want to unload CSV data for use by a downstream process. In this case, you can run your query in the query editor and use the UNLOAD command to send the output directly to Amazon Simple Storage Service (Amazon S3) and get notified when the data is uploaded.

    1. Create separate S3 buckets for each user. You can use the default configuration for the bucket.

    Create separate S3 buckets for each user. You can use the default configuration for the bucket.

    1. On the Amazon Redshift console, choose Editor.
    2. In the Connect to database section, enter the database connection details.
    3. Choose Connect to database.

    Choose Connect to database.

    1. Use the following format for the queries run from the query editor using the IAM role for Amazon Redshift, so the results get uploaded to Amazon S3:
      UNLOAD 
      ('select id, name
      from <your_ schema>.<your_table>)
      TO 's3://<username>/<yy-mm-dd-hh24-mi-ss>/'
      FORMAT as CSV
      iam_role 'arn:aws:iam:<myaccount>:role/MyAmazon_RedshiftUnloadRole';

    Use the following format for the queries run from the query editor.

    This query creates multiple files in the designated user’s S3 bucket under the date/time prefix. The user can preview or download the individual files on the Amazon S3 console.

    This query creates multiple files in the designated user’s S3 bucket under the date/time prefix.

    A large unload may take some time. You can configure Amazon Simple Notification Service (Amazon SNS) to send a notification when the results are uploaded to Amazon S3.

    1. On the Amazon SNS console, choose Topics.
    2. Choose Create topic.

    Choose Create topic.

    1. Create an SNS topic with a meaningful description text, like Your query results are uploaded to S3.

    In the next steps, you edit the access policy of the SNS topic to give permission for Amazon S3 to publish to it.

    1. Change the Principal from "AWS": "*" to "Service": "s3.amazonaws.com".
    2. Scroll down to “Action” and delete everything except “SNS:Publish”. Make sure to delete the extra comma.
    3. Scroll down to “Condition” and modify the text "StringEquals": { "AWS:SourceOwner": <Your account id>} to "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:<user-bucket-name>" }.

    In the next steps, you edit the access policy of the SNS topic to give permission for Amazon S3 to publish to it.

    1. In the navigation pane, choose Subscriptions.
    2. Choose Create subscription.

    Choose Create subscription.

    1. Subscribe to the SNS notification with the user’s email address as the endpoint.

    Subscribe to the SNS notification with the user’s email address as the endpoint.

    1. Make sure the user chooses confirms the subscription from their email.
    2. On the Amazon S3 console, choose the Properties tab of the user’s S3 bucket.
    3. Under Event Notifications, choose Create event notification.

    Under Event Notifications, choose Create event notification.

    1. Select All object create events.

    Select All object create events.

    1. For Destination, select SNS topic.
    2. For Specify SNS topic, select Choose from your SNS topics.
    3. For SNS topic, choose the topic you created.

    For SNS topic, choose the topic you created.

    1. Save your settings.
    2. To test the notification, on the Amazon Redshift console, open the query editor.
    3. Edit the UNLOAD query and change the S3 bucket name to the current date and time.
    4. Run the query and check if the user gets the email notification.

    Using Amazon WorkSpaces to run Amazon Redshift queries

    In this section, we cover setting up Amazon WorkSpaces, including Amazon VPC prerequisites, creating an Amazon VPC endpoint for Amazon S3, launching Amazon WorkSpaces in the same VPC where an Amazon Redshift cluster is running, setting up an Amazon WorkSpaces client, installing PSQL or a SQL client, and connecting to the client.

    When setup is complete, we show different scenarios to test with Amazon WorkSpaces, such as testing a SQL command from the Amazon WorkSpaces client, testing SCREEN program to run SQL in the background, and testing PSQL with Amazon S3 and getting a notification through Amazon SNS.

    Prerequisites

    By default, AWS Identity and Access Management (IAM) users and roles can’t perform tasks using the AWS Management Console and they don’t have permission to create or modify Amazon VPC resources. Make sure you have administrator privileges or an administrator creates IAM policies that grants sufficient permissions to edit the route table, edit the VPC security group, and enable a DNS hostname for the VPC.

    When you have the correct permissions, complete the following prerequisite steps:

    1. On the Amazon Redshift console, in config, check the cluster subnet groups to make sure the Amazon Redshift cluster is created in an Amazon VPC with at least two subnets that are in separate Availability Zones.
    2. On the Amazon VPC console, edit the route table and make sure to associate these two subnets.
    3. Make sure the Amazon VPC security group has a self-referencing inbound rule for the security group for all traffic (not all tcp). The self-referencing rule restricts the source to the same security group in the VPC, and it’s not open to all networks. Consider limiting to just the protocol and port needed for Redshift to talk to Workspaces.
    4. Edit the DNS hostname of the Amazon VPC and enable it.

    Creating an Amazon VPC endpoint for Amazon S3 for software downloads

    In this step, you create your Amazon VPC endpoint for Amazon S3. This gives you Amazon S3 access to download PSQL from the Amazon repository. Alternatively, you could set up a NAT Gateway and download PSQL or other SQL clients from the internet.

    1. On the Amazon VPC console, choose Endpoints.
    2. Choose Create endpoint.
      Choose Create endpoint.
    3. Search for Service Name: S3
    4. Select the S3 service gateway
      Select the S3 service gateway
    5. Select the Amazon VPC where the Amazon Redshift cluster is running
    6. Select the route table
      Select the route table
    7. Enter the following custom policy for the endpoint to access the Amazon Linux AMI
      {
          "Version": "2008-10-17",
          "Statement": [
              {
                  "Sid": "Amazon Linux AMI Repository Access",
                  "Effect": "Allow",
                  "Principal": "*",
                  "Action": "s3:GetObject",
                  "Resource": [
                      "arn:aws:s3:::*.amazonaws.com",
                      "arn:aws:s3:::*.amazonaws.com/*"
                  ]
              }
          ]
      }

      Select the Amazon VPC where the Amazon Redshift cluster is running

    8. Create the endpoint

    Launching Amazon WorkSpaces in the VPC where the Amazon Redshift cluster runs

    You’re now ready to launch Amazon WorkSpaces.

    1. On the Amazon WorkSpaces console, choose Launch WorkSpaces.

    On the Amazon WorkSpaces console, choose Launch WorkSpaces.

    1. For Directory types, select Simple AD.

    Directory Service Solutions helps you store information and manage access to resources. For this post, we choose Simple AD.

    Directory Service Solutions helps you store information and manage access to resources. For this post, we choose Simple AD.

    1. For Directory size, select Small.
    2. Enter your directory details.

    Enter your directory details.

    1. For VPC, choose the VPC where the Amazon Redshift cluster is running.
    2. For Subnets, choose the two subnets you created.

    For Subnets, choose the two subnets you created.

    It may take a few minutes to provision the directory. You see the status show as Active when it’s ready.

    It may take a few minutes to provision the directory. You see the status show as Active when it’s ready.

    1. When the directory is provisioned, choose the directory and subnets you created.
    2. Choose Next Step.

    Choose Next Step.

    1. Create and identify your users.

    Create and identify your users.

    1. Use the default settings for the compute bundle.
    2. For Running Mode, select AlwaysOn.

    Alternatively, select AutoStop and adjust the time in order to run long-running queries.

    Alternatively, select AutoStop and adjust the time in order to run long-running queries.

    1. Review and launch WorkSpaces.

    It may take up to 20 minutes to become available.

    Setting up the Amazon WorkSpaces client

    In this section, you configure your Amazon WorkSpaces client.

    1. Use the link from your email to download the Amazon WorkSpaces client.
    2. Register it using the registration code from the email.
    3. Login with your username in the email and your newly created password
    4. In the Amazon WorkSpaces client, open the terminal.

    In the Amazon WorkSpaces client, open the terminal.

    1. Run the following command to capture the IP address:
    hostname -I | awk '{print $2}'

    The following screenshot shows your output.

    The following screenshot shows your output.

    1. On the Amazon Redshift console, choose Clusters.
    2. Choose your cluster.
    3. Save the endpoint information to use later.
    4. Choose the Properties tab.

    Choose the Properties tab.

    1. In the Network and security section, note the VPC security group.
      In the Network and security section, note the VPC security group.
    2. On the Amazon VPC console, under Security, choose Security Groups.
    3. Select the security group that the Amazon Redshift cluster uses.

    Select the security group that the Amazon Redshift cluster uses.

    1. Add an inbound rule with the type Redshift and the source value of the IP Address you captured/32.

    Add an inbound rule with the type Redshift and the source value of the IP Address you captured/32.

    1. On the Amazon WorkSpaces client, use the Amazon Redshift hostname from the endpoint you saved earlier and verify the VPC setup with the following code:
       nslookup <Amazon Redshift hostname>

    If you see an IP address within your subnet range, the private endpoint setup for Amazon Redshift was successful.

    If you see an IP address within your subnet range, the private endpoint setup for Amazon Redshift was successful.

    Testing a SQL command from PSQL or a SQL client in the Amazon WorkSpaces client

    To test a SQL command, complete the following steps:

    1. From the terminal in the Amazon WorkSpaces client, run the following command to install PostgreSQL:
      sudo yum install postgresql-server

    Alternatively, setup a NAT Gateway and download a SQL client such as SQL Workbench on the Amazon WorkSpaces client:

    sudo wget https://www.sql-workbench.eu/Workbench-Build125-with-optional-libs.zip

    Then unzip the content of the downloaded file and save it to a directory:

    unzip Workbench-Build125-with-optional-libs.zip -d ~/Workbench
    1. Use the Amazon Redshift hostname, port, and database names of the Amazon Redshift cluster endpoint you copied earlier and try connecting to the database:
      psql -h <Amazon Redshift hostname> -p <port> -d <database> -U <username> -W

    1. Enter your password when prompted.

    Enter your password when prompted.

    1. Run a SQL command and check the results.

    Testing the SCREEN program to run SQL in the background

    You can use the SCREEN program to run the SQL command in the background and resume to see the results.

    1. From the terminal in the Amazon WorkSpaces client, install the SCREEN program:
      sudo yum install screen

    1. Run the program:
      screen

    1. Connect to PSQL:
      psql -h <Amazon Redshift hostname> -p <port> -d <database> -U <username> -W

    1. Enter the password when prompted.
    2. Run the SQL command
    3. Enter the command ctrl A D to detach from the screen.

    The SQL command is now running in the background. You can check by running ps -ef | grep psql.

    The SQL command is now running in the background. You can check by running ps -ef | grep psql.

    1. To go back to that screen, run the following command:
      screen -r

    1. To quit SCREEN, enter the following command:
      ctrl A \

    Testing PSQL with Amazon S3 and Amazon SNS

    Similar to the UNLOAD command we used from the Amazon Redshift query editor in the beginning of this post, you can run PSQL from the Amazon WorkSpaces client, send the output to an S3 bucket, and get an Amazon SNS notification for an object create event.

    1. From the terminal in the Amazon WorkSpaces client, run aws configure to set up AWS credentials with write access to the S3 bucket.
    2. Run the following command to write a single output file to Amazon S3 and send an email notification:
      psql -h <Amazon Redshift hostname> -p <port> -d <database> -U <username> -W -c 'select column1, column2 from myschema.mytable' > s3://<username>/<yy-mm-dd-hh24-mi-ss>/

    Conclusion

    This post explained how to securely query Amazon Redshift databases running in an Amazon VPC using the UNLOAD command with the Amazon Redshift query editor and using Amazon WorkSpaces running in the same VPC. Try out this solution to securely access Amazon Redshift databases without a VPN and run long-running queries. If you have any questions or comments, please share your thoughts in the comments section.


    About the Authors

    Seetha Sarma is a Senior Database Specialist Solutions Architect with Amazon Web Services. Seetha provides guidance to customers on using AWS services for distributed data processing. In her spare time she likes to go on long walks and enjoy nature.

     

     

     

    Moiz MianMoiz Mian is a Solutions Architect for AWS Strategic Accounts. He focuses on enabling customers to build innovative, scalable, and secure solutions for the cloud. In his free time, he enjoys building out Smart Home systems and driving at race tracks.

    Building a serverless data quality and analysis framework with Deequ and AWS Glue

    Post Syndicated from Vara Bonthu original https://aws.amazon.com/blogs/big-data/building-a-serverless-data-quality-and-analysis-framework-with-deequ-and-aws-glue/

    With ever-increasing amounts of data at their disposal, large organizations struggle to cope with not only the volume but also the quality of the data they manage. Indeed, alongside volume and velocity, veracity is an equally critical issue in data analysis, often seen as a precondition to analyzing data and guaranteeing its value. High-quality data is commonly described as fit for purpose and a fair representation of the real-world constructs it depicts. Ensuring data sources meet these requirements is an arduous task that is best addressed through an automated approach and adequate tooling.

    Challenges when running data quality at scale can include choosing the right data quality tools, managing the rules and constraints to apply on top of the data, and taking on the large upfront cost of setting up infrastructure in production.

    Deequ, an open-source data quality library developed internally at Amazon, addresses these requirements by defining unit tests for data that it can then scale to datasets with billions of records. It provides multiple features, like automatic constraint suggestions and verification, metrics computation, and data profiling. For more information about how Deequ is used at Amazon, see Test data quality data at scale with Deequ.

    You need to follow several steps to implement Deequ in production, including building the infrastructure, writing custom AWS Glue jobs, profiling the data, and generating rules before applying them. In this post, we introduce an open-source Data Quality and Analysis Framework (DQAF) that simplifies this process and its orchestration. Built on top of Deequ, this framework makes it easy to create the data quality jobs that you need, manage the associated constraints through a web UI, and run them on the data as you ingest it into your data lake.

    Architecture

    As illustrated in the following architecture diagram, the DQAF exclusively uses serverless AWS technology. It takes a database and tables in the AWS Glue Data Catalog as inputs to AWS Glue jobs, and outputs various data quality metrics into Amazon Simple Storage Service (Amazon S3). Additionally, it saves time by automatically generating constraints on previously unseen data. The resulting suggestions are stored in Amazon DynamoDB tables and can be reviewed and amended at any point by data owners in the AWS Amplify managed UI. Amplify makes it easy to create, configure, and implement scalable web applications on AWS. The orchestration of these operations is carried out by an AWS Step Functions workflow. The code, artifacts, and an installation guide are available in the GitHub repository.

    As illustrated in the following architecture diagram, the DQAF exclusively uses serverless AWS technology.

    In this post, we walk through a deployment of the DQAF using some sample data. We assume you have a database in the AWS Glue Data Catalog hosting one or more tables in the same Region where you deploy the framework. We use a legislators database with two tables (persons_json and organizations_json) referencing data about United States legislators. For more information about this database, see Code Example: Joining and Relationalizing Data.

    In this post, we walk through a deployment of the DQAF using some sample data.

    Deploying the solution

    Click on the button below to launch an AWS CloudFormation stack that deploys the solution in your AWS account in the last Region that was used:

    The process takes 10–15 minutes to complete. You can verify that the framework was successfully deployed by checking that the CloudFormation stacks show the status CREATE_COMPLETE.

    You can verify that the framework was successfully deployed by checking that the CloudFormation stacks show the status CREATE_COMPLETE.

    Testing the data quality and analysis framework

    The next step is to understand (profile) your test data and set up data quality constraints. Constraints can be defined as a set of rules to validate whether incoming data meets specific requirements along various dimensions (such as completeness, consistency, or contextual accuracy). Creating these rules can be a painful process if you have lots of tables with multiple columns, but DQAF makes it easy by sampling your data and suggesting the constraints automatically.

    On the Step Functions console, locate the data-quality-sm state machine, which represents the entry point to data quality in the framework. When you provide a valid input, it starts a series of AWS Glue jobs running Deequ. This step function can be called on demand, on a schedule, or based on an event. You run the state machine by entering a value in JSON format.

    You run the state machine by entering a value in JSON format.

    First pass and automatic suggestion of constraints

    After the step function is triggered, it calls the AWS Glue controller job, which is responsible for determining the data quality checks to perform. Because the submitted tables were never checked before, a first step is to generate data quality constraints on attributes of the data. In Deequ, this is done through an automatic suggestion of constraints, a process where data is profiled and a set of heuristic rules is applied to suggest constraints. It’s particularly useful when dealing with large multi-column datasets. In the framework, this operation is performed by the AWS Glue suggestions job, which logs the constraints into the DataQualitySuggestions DynamoDB table and outputs preliminary quality check results based on those suggestions into Amazon S3 in Parquet file format.

    AWS Glue suggestions job

    The Deequ suggestions job generates constraints based on three major dimensions:

    • Completeness – Measures the presence of null values, for example isComplete("gender") or isComplete("name")
    • Consistency – Consistency of data types and value ranges, for example .isUnique("id") or isContainedIn("gender", Array("female", "male"))
    • Statistics – Univariate dimensions in the data, for example .hasMax("Salary", “90000”) or .hasSize(_>=10)

    The following table lists the available constraints that can be manually added in addition to the automatically suggested ones.

    Constraints Argument Semantics
    Dimension Completeness
    isComplete column Check that there are no missing values in a column
    hasCompleteness column, udf Custom validation of missing values in a column
    Dimension Consistency
    isUnique column Check that there are no duplicates in a column
    hasUniqueness column, udf Custom validation of the unique value ratio in a column
    hasDistinctness column, udf Custom validation of the unique row ratio in a column
    isInRange column, value range Validation of the fraction of values that are in a valid range
    hasConsistentType column Validation of the largest fraction of values that have the same type
    isNonNegative column Validation whether all the values in a numeric column are non-negative
    isLessThan column pair Validation whether all the values in the first column are always less than the second column
    satisfies predicate Validation whether all the rows match predicate
    satisfiesIf predicate pair Validation whether all the rows matching first predicate also match the second predicate
    hasPredictability column, column(s), udf User-defined validation of the predictability of a column
    Statistics (can be used to verify dimension consistency)
    hasSize udf Custom validation of the number of records
    hasTypeConsistency column, udf Custom validation of the maximum fraction of the values of the same datatype
    hastCountDistinct column Custom validation of the number of distinct non null values in a column
    hasApproxCountDistinct column, udf Custom validation of the approximate number of distinct non-null values
    hasMin column, udf Custom validation of the column’s minimum value
    hasMax column, udf Custom validation of the column’s maximum value
    hasMean column, udf Custom validation of the column’s mean value
    hasStandardDeviation column, udf Custom validation of the column’s standard deviation value
    hasApproxQuantile column,quantile,udf Custom validation of a particular quantile of a column (approximate)
    hasEntropy column, udf Custom validation of the column’s entropy
    hasMutualInformation column pair,udf Custom validation of the column pair’s mutual information
    hasHistogramValues column, udf Custom validation of the column’s histogram
    hasCorrelation column pair,udf Custom validation of the column pair’s correlation

    The following screenshot shows the DynamoDB table output with suggested constraints generated by the AWS Glue job.

    The following screenshot shows the DynamoDB table output with suggested constraints generated by the AWS Glue job.

    AWS Glue data profiler job

    Deequ also supports single-column profiling of data, and its implementation scales to large datasets with billions of rows. As a result, we get a profile for each column in the data, which allows us to inspect the completeness of the column, the approximate number of distinct values, and the inferred datatype.

    The controller triggers an AWS Glue data profiler job in parallel to the suggestions job. This profiler Deequ process runs three passes over the data and avoids any shuffles in order to easily scale to large datasets. Results are stored as Parquet files in the S3 data quality bucket.

    When the controller job is complete, the second step in the data quality state machine is to crawl the Amazon S3 output data into a data_quality_db database in the AWS Glue Data Catalog, which is then immediately available to be queried in Amazon Athena. The following screenshot shows the list of tables created by this AWS Glue framework and a sample output from the data profiler results.

    The following screenshot shows the list of tables created by this AWS Glue framework and a sample output from the data profiler results.

    Reviewing and verifying data quality constraints

    As good as Deequ is at suggesting data quality rules, the data stewards should first review the constraints before applying them in production. Because it may be cumbersome to edit large tables in DynamoDB directly, we have created a web app that enables you to add or amend the constraints. The changes are updated in the relevant DynamoDB tables in the background.

    Accessing the web front end

    To access the user interface, on the AWS Amplify console, choose the deequ-constraints app. Choosing the URL (listed as https://<env>.<appsync_app_id>.amplifyapp.com) opens the data quality constraints front end. After you complete the registration process with Amazon Cognito (create an account) and sign in, you see a UI similar to the following screenshot.

    After you complete the registration process with Amazon Cognito (create an account) and sign in, you see a UI similar to the following screenshot.

    It lists data quality constraint suggestions produced by the AWS Glue job in the previous step. Data owners can add or remove and enable or disable these constraints at any point via the UI. Suggestions are not enabled by default. This makes sure all constraints are human reviewed before they are processed. Choosing the check box enables a constraint.

    Data analyzer (metric computations)

    Alongside profiling, Deequ can also generate column-level statistics called data analyzer metrics (such as completeness, maximum, and correlation). They can help uncover data quality problems, for example by highlighting the share of null values in a primary key or the correlation between two columns.

    The following table lists the metrics that you can apply to any column.

    Metric Semantics
    Dimension Completeness
    Completeness Fraction of non-missing values in a column
    Dimension Consistency
    Size Number of records
    Compliance Ratio of columns matching predicate
    Uniqueness Unique value ratio in a column
    Distinctness Unique row ratio in a column
    ValueRange Value range verification for a column
    DataType Data type inference for a column
    Predictability Predictability of values in a column
    Statistics (can be used to verify dimension consistency)
    Minimum Minimal value in a column
    Maximum Maximal value in a column
    Mean Mean value in a column
    StandardDeviation Standard deviation of the value distribution in a column
    CountDistinct Number of distinct values in a column
    ApproxCountDistinct Number of distinct values in a column
    ApproxQuantile Approximate quantile of the value in a column
    Correlation Correlation between two columns
    Entropy Entropy of the value distribution in a column
    Histogram Histogram of an optionally binned column
    MutualInformation Mutual information between two columns

    In the web UI, you can add these metrics on the Analyzers tab. In the following screenshot, we add an ApproxCountDistinct metric on an id column. Choosing Create analyzer inserts the record into the DataQualityAnalyzer table in DynamoDB and enables the constraint.

    In the following screenshot, we add an ApproxCountDistinct metric on an id column.

    AWS Glue verification job

    We’re now ready to put our rules into production and can use Athena to look at the resultsYou can start running the step function with the same JSON as input:

    {
      "glueDatabase": "legislators",
      "glueTables": "persons_json, organizations_json"
    }
    

    This time the AWS Glue verification job is triggered by the controller. This job performs two actions: it verifies the suggestion constraints and performs metric computations. You can immediately query the results in Athena under the constraints_verification_results table.

    The following screenshot shows the verification output.

    The following screenshot shows the verification output.

    The following screenshot shows the metric computation results.

    The following screenshot shows the metric computation results.

    Summary

    Dealing with large, real-world datasets requires a scalable and automated approach to data quality. Deequ is the tool of choice at Amazon when it comes to measuring the quality of large production datasets. It’s used to compute data quality metrics, suggest and verify constraints, and profile data.

    This post introduced an open-source, serverless Data Quality and Analysis Framework that aims to simplify the process of deploying Deequ in production by setting up the necessary infrastructure and making it easy to manage data quality constraints. It enables data owners to generate automated data quality suggestions on previously unseen data that can then be reviewed and amended in a UI. These constraints serve as inputs to various AWS Glue jobs in order to produce data quality results queryable via Athena. Try this framework on your data and leave suggestions on how to improve it on our open-source GitHub repo.


    About the Authors

    Vara Bonthu is a Senior BigData/DevOps Architect for ProServe working with the Global Accounts team. He is passionate about big data and Kubernetes. He helps customers all over the world design, build, and migrate end-to-end data analytics and container-based solutions. In his spare time, he develops applications to help educate his 7-year-old autistic son.

     

    Abdel Jaidi is a Data & Machine Learning Engineer for AWS Professional Services. He works with customers on Data & Analytics projects, helping them shorten their time to value and accelerate business outcomes. In his spare time, he enjoys participating in triathlons and walking dogs in parks in and around London.

    7 most common data preparation transformations in AWS Glue DataBrew

    Post Syndicated from Shilpa Mohan original https://aws.amazon.com/blogs/big-data/7-most-common-data-preparation-transformations-in-aws-glue-databrew/

    For all analytics and ML modeling use cases, data analysts and data scientists spend a bulk of their time running data preparation tasks manually to get a clean and formatted data to meet their needs. We ran a survey among data scientists and data analysts to understand the most frequently used transformations in their data preparation workflow. AWS Glue DataBrew provides more than 250 built-in transformations which will make most of these tasks 80% faster. This blog covers use case based walkthroughs of how we can achieve the top 7 among those transformations in AWS Glue DataBrew.

    This blog covers use case based walkthroughs of how we can achieve the top 7 among those transformations in AWS Glue DataBrew.

    #1 Handling/Imputing missing values

    Missing data is predominant in all datasets and can have a significant impact on the analytics or ML models using the data. Missing values in datasets can skew or bias the data and result in invalid conclusions. Handling missing values is one of the most frequently used data preparation steps.

    In DataBrew project you can get a quick view of missing values in your sample data under Data quality in the Schema view and the Column statistics.

    In DataBrew project you can get a quick view of missing values in your sample data under Data quality in the Schema view and the Column statistics.

    For any data column you can choose to either remove the missing rows or fill it with an empty string, null, last valid value, most frequent value or a custom value. For numerical data columns you can also fill missing values with numerical aggregates of values like average, mode, sum or median of values.

    Here’s an example of how we filled missing values in column “sub-product” with the custom values “None” for the Consumer complaints dataset.

    Here’s an example of how we filled missing values in column "sub-product" with the custom values "None" for the Consumer complaints dataset.

    #2 Combining datasets

    Data analysis is often performed on a single dataset, however the key information required to arrive at useful insights might be spread throughout multiple datasets. Joins are a predominantly used data preparation step to bring together information from multiple different datasets together. In many large data scenarios, the information in a single dataset can be split or partitioned into multiple files, Union is a data preparation step used in this case to consolidate all parts of the dataset together. DataBrew supports Union and Join to combine data from multiple datasets. You can union multiple files into one at the beginning of a project or as a recipe step or join a dataset based on one or more join keys.

    Multiple files as an input dataset
    You can union multiple files together as a single input for any DataBrew project. Here is an example of how we union four files for NYC Parking tickets dataset.

    You can select all files in a folder as an input for your project to union all the data in the files. DataBrew supports parameterized input path to customize the files you would like to combine.

    DataBrew supports parameterized input path to customize the files you would like to combine.

    Union as a transformation

    In a project, you can add the union as a recipe step to combine multiple files. You will need to pre-create all the required datasets in DataBrew to perform this as a recipe step. Union is available as a transformation in the project toolbar.

    Union is available as a transformation in the project toolbar.

    You can select multiple datasets with preview for the Union transform. You can then specify whether to map column by names or position in the dataset and also order the datasets to control the order of rows in the data after union.

    Detailed column mapping allows you to customize the columns mapping, resulting column name and column type.

    Detailed column mapping allows you to customize the columns mapping, resulting column name and column type.

    You can preview the resulting dataset before applying the transformation, which is then added to your recipe.

    Joining datasets

    The Join transform allows you to bring in data from a secondary dataset. The example covers joining of UN General Assembly Votes – Resolutions data with UN General Assembly Votes – States data using “resolution session” as the join key.

    You can choose from the different Join types, the visual representation helps you identify the right type of join for your scenario. You can add one or more join keys.

    The columns list allows you to search the entire list of columns from both the datasets and choose which columns you want to retain as part of the Join operation.

    The columns list allows you to search the entire list of columns from both the datasets and choose which columns you want to retain as part of the Join operation.

    You can also preview your joined table before you complete the transform.

    #3 Creating columns

    Often data available in a dataset might not be represented as the values you may need for downstream data analysis or data modeling. As part of data prep, data analyst and data scientist create these custom data columns in a format that would suit their data analysis needs.

    You can create columns with extracted values or flagged values from existing column. DataBrew also provides a collection of functions that help you create new columns. It covers math, aggregate, text, date, windows, web and other functions.

    You can create columns with extracted values or flagged values from existing column. DataBrew also provides a collection of functions that help you create new columns.

    For example in a Netflix Movies and TV Shows dataset you can create a column that tell you how many years the title has been available on Netflix but using a DATEDIFF function on the “date_added” column.

    Create a column using a function

    You can select the DATEDIFF from the date functions from the toolbar. You can calculate the number of years by doing  calculating the difference of the “date_added” value and current date. You can set the output to be calculated in years.

    You can set the output to be calculated in years.

    As all transformations, you can preview the transform before applying it.

    Creating a Flag column

    From the same Netflix Movies and TV Shows dataset, you can create a flag column from the create column option in the toolbar.

    You can flag titles that starred Kate Hudson by flagging a custom value “Kate Hudson” in the “Cast” column. You have different options for values of the flag column, in this case we will go with Yes and No.

    You have different options for values of the flag column, in this case we will go with Yes and No.

    #4 Filtering data

    You can filter values in a dataset as a transformation or as a filter the data in your grid view.

    If you select “Apply as a step”, the filter is added to your recipe as a step.

    You can “Filter values” to filter values in view of the project. All applied filters are shown in the toolbar. You can choose to apply all the filters as a step from the toolbar at any time.

    You can conditionally apply transformations on the conditionally filtered values.

    For example, in the Netflix Movies and TV Shows dataset, we can replace the word “deformed” with “physically different” but only for Kids related titles.

    For example, in the Netflix Movies and TV Shows dataset, we can replace the word “deformed” with “physically different” but only for Kids related titles.

    We can filter the “listed_on” column by “Kid’s TV” and “Children & Family Movies”. Now on the “description” column we can replace values from the clean menu on the toolbar. On the transformation panel, under “Apply transform to” you can choose to apply the transformations only to the filtered rows. This will replace the term “deformed” with “physically different” only for Kids titles in the entire dataset.

    This will replace the term “deformed” with “physically different” only for Kids titles in the entire dataset.

    #5 Aggregating data

    You can aggregate data in DataBrew by using the Group by transformation. For the example dataset of New York City Airbnb Open Data, we can create an aggregated minimum and maximum price by neighborhood.

    You can select Group By transformation from the toolbar. We first select the column to group by “Neighborhood”. We can then add two aggregated columns based on column “Price” for “min” and “max” of the values in the column.

    You can select Group By transformation from the toolbar. We first select the column to group by “Neighborhood”.

    By default the columns are added as new columns in the existing dataset. You can choose to create a table with only the specified columns above.

    #6 Handling Categorical values

    For most ML modeling algorithms with categorical values like Gender, Product category or Education level need to be converted to numerical formats. DataBrew supports Categorical mapping and One-Hot Encoding.

    Categorical or label mapping

    Ordinal categorical values are ordered or hierarchical like Education level or T-shirt sizes e.g: Large is greater than Small so small can be labeled as 1 and large as 2 in numerical format. Easiest way to handle such variables is by using Categorical mapping in DataBrew. It can be accessed from the toolbar under Mapping.

    For the example dataset of New York City Airbnb Open Data the “room_type” has values that are ordered. On the Categorical mapping form, you can choose to “Map all values” under mapping options. You can then select the checkbox “Map values to numeric values” or custom enter the values as required. Apply and you have new column with numerical values mapped to the original column.

    Apply and you have new column with numerical values mapped to the original column.

    One-Hot Encoding 

    For all non-ordinal categorical values like gender or product category, One-hot encoding is the most common way to convert them to numerical format. It can be accessed in a DataBrew project in the toolbar under Encoding.

    For the column “neighborhood_group” in the New York City Airbnb Open Data, all you have to do is open the one-hot encoding form and click apply. All the required columns with the encoded numerical values are generated instantly. This would usually take a Data Scientist hours to perform manually.

    This would usually take a Data Scientist hours to perform manually.

    #7 Handling Numerical values

    Columns with numerical values are often not streamlined enough to be processed by an ML algorithm. A dataset may have columns with numerical values that are on very different scales for e.g: capacity would have a range of values from 0 to 100 but price could have a range of 10 to 10000. For such instances the columns would need to be rescaled to a common scale like 0 to 1. Along with scaling issues, we may also have data with outliers that need to be handled, hence scaling, normalizing and standardizing are some common transformations performed on numerical values on datasets used for ML models.

    You can handle a column with numerical values like “Price” in New York City Airbnb Open Data using any of the transformations under Scale in the toolbar.

    DataBrew provides you multiple techniques to rescale you data like Min-max normalizationScaling between specified valuesMean normalization or standardization, and Z-score normalization or standardization. In this use case, as price has an outlier you can select Z-score normalization or standardization to best scale the values for your ML model.

    DataBrew provides you multiple techniques to rescale you data.

    These are some of the most frequently used Data preparation transformations demonstrated in AWS Glue DataBrew. With more than 250 built-in transformation, you can find one that meets your data preparation use case and reduce the time and effort that goes into cleaning data.

    Jump in and try out AWS Glue DataBrew today.

    Datasets used in this blog:

    Consumer complaints : https://www.consumerfinance.gov/data-research/consumer-complaints/

    NYC Parking tickets : https://www.kaggle.com/new-york-city/nyc-parking-tickets

    UN General Assembly Votes – Resolutions: Sample dataset available in AWS Glue DataBrew

    UN General Assembly Votes – States: Sample dataset available in AWS Glue DataBrew

    Netflix Movies and TV Shows : https://www.kaggle.com/shivamb/netflix-shows

    New York City Airbnb Open Data – https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data


    About the Authors

    Shilpa Mohan is a Sr. UX designer at AWS and leads the design of AWS Glue DataBrew. With over 13 years of experience across multiple enterprise domains, she is currently crafting products for Database, Analytics and AI services for AWS. Shilpa is a passionate creator, she spends her time creating anything from content, photographs to crafts

     

     

    Romi Boimer is a Sr. Software Development Engineer at AWS and a technical lead for AWS Glue DataBrew. She designs and builds solutions that enable customers to efficiently prepare and manage their data. Romi has a passion for aerial arts, in her spare time she enjoys fighting gravity and hanging from fabric.

    Scheduling SQL queries on your Amazon Redshift data warehouse

    Post Syndicated from Sain Das original https://aws.amazon.com/blogs/big-data/scheduling-sql-queries-on-your-amazon-redshift-data-warehouse/

    Amazon Redshift is the most popular cloud data warehouse today, with tens of thousands of customers collectively processing over 2 exabytes of data on Amazon Redshift daily. Amazon Redshift is fully managed, scalable, secure, and integrates seamlessly with your data lake. In this post, we discuss how to set up and use the new query scheduling feature on Amazon Redshift.

    Amazon Redshift users often need to run SQL queries or routine maintenance tasks at a regular schedule. You can now schedule statements directly from the Amazon Redshift console or by using the AWS Command Line Interface (AWS CLI) without having to use scripting and a scheduler like cron. You can schedule and run the SQL statement using Amazon EventBridge and the Amazon Redshift Data API. The results are available for 24 hours after running the SQL statement. This helps you in a variety of scenarios, such as when you need to do the following:

    • Run SQL queries during non-business hours
    • Load data using COPY statements every night
    • Unload data using UNLOAD nightly or at regular intervals throughout the day
    • Delete older data from tables as per regulatory or established data retention policies
    • Refresh materialized views manually at a regular frequency
    • Back up system tables every night

    EventBridge is a serverless event bus service that makes it easy to connect your applications with data from a variety of sources. EventBridge delivers a stream of real-time data from your own applications, software as a service (SaaS) applications, and AWS services and routes that data to targets including Amazon Redshift clusters. Amazon Redshift integrates with EventBridge to allow you to schedule your SQL statements on recurring schedules and enables you to build event-driven applications. Beside scheduling SQL, you can also invoke the Amazon Redshift Data API in response to any other EventBridge event.

    When creating a schedule using the Amazon Redshift console, you create an EventBridge rule with the specified schedule and attach a target (with the Amazon Redshift cluster information, login details, and SQL command run) to the rule. This rule then runs as per the schedule using EventBridge.

    In this post, we describe how you can schedule SQL statements on Amazon Redshift using EventBridge, and also go over the required security privileges and the steps to schedule a SQL statement using both the AWS Management Console and the AWS CLI.

    Prerequisites

    To set this up, we need to make sure that the AWS Identity and Access Management (IAM) user (which we use to create the schedule and access the schedule history), the IAM role, and the AWS secret (which stores the database user name and password) are configured correctly.

    1. Make sure that the IAM user who is going to create the schedule has the AmazonEventBridgeFullAccess IAM policy attached to it. The IAM user should also have appropriate access to Amazon Redshift as per their role. We use the placeholder {USER_NAME} to refer to this IAM user in this post.
    1. Store the database credentials to be used for running the scheduled SQL statement securely using AWS Secrets Manager. If you already have a secret created, you can use that. If not, create a new secret to store the Amazon Redshift database name and user name.
    2. Create an IAM role for the Amazon Redshift service with the “Redshift Customizable” option and attach the AmazonRedshiftDataFullAccess AWS managed policy to it.
      1. Ensure that the role has the following trust relationships added to it:
        {
              "Sid": "S1",
              "Effect": "Allow",
              "Principal": {
                "Service": "events.amazonaws.com"
              },
              "Action": "sts:AssumeRole"
            }

      2. Add the following AssumeRole permissions so the user can see the schedule history list. Replace the {ACCOUNT_ID} with your AWS account ID and {USER_NAME} with the IAM user you set up:
        {
              "Sid": "S2",
              "Effect": "Allow",
              "Principal": {
                "AWS": "arn:aws:iam::{ACCOUNT_ID}:user/{USER_NAME}"
              },
              "Action": "sts:AssumeRole"
        }

      We use the placeholder {ROLE_NAME} to refer to this role in this post.

    1. In addition to the preceding AssumeRole permissions, the IAM user has to be granted AssumeRole permissions on the IAM role you created in order to view the schedule history:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "S3",
                  "Effect": "Allow",
                  "Action": "sts:AssumeRole",
                  "Resource": "arn:aws:iam::{ACCOUNT_ID}:role/{ROLE_NAME}"
              }
          ]
      }

    Now that we have completed the security setup required to schedule SQL statements and view their history, let’s schedule a SQL statement to run at a regular frequency. We can do this using the console or the AWS CLI. We discuss both approaches in this post.

    Scheduling the SQL statement using the console

    Let’s take the example of a fairly common use case where data from a table has to be extracted daily (or at another regular frequency) into Amazon Simple Storage Service (Amazon S3) for analysis by data scientists or data analysts. You can accomplish this by scheduling an UNLOAD command to run daily to export data from the table to the data lake on Amazon S3. See the following code:

    unload ('select * from edw.trxns')
    to 's3://mybucket/'
    iam_role 'arn:aws:iam::{ACCOUNT_ID}:role/{ROLE_NAME_2}'
    PARQUET
    PARTITION BY (trxn_dt);
    ;

    {ROLE_NAME_2} in the preceding code is not the same as {ROLE_NAME}. {ROLE_NAME_2} should be an IAM role that has permissions to run the UNLOAD command successfully.

    We can schedule this UNLOAD statement to run every day at 4:00 AM UTC using the following steps:

    1. Sign in to the console. Make sure the IAM user has been granted the necessary permissions.
    2. On the Amazon Redshift console, open the query editor.
    3. Choose Schedule.

    Choose Schedule.

    1. In the Scheduler permissions section, for IAM role, choose the role you created earlier.

    If you don’t have IAM read permissions, you may not see the IAM role in the drop-down menu. In that case, you can enter the Amazon Resource Name (ARN) of the IAM role that you created.

    1. For Authentication, select AWS Secrets Manager.
    2. For Secret, choose the secret you configured earlier, which contains the database user name and password.

    You can also use Temporary credentials for authentication as explained in the AWS documentation.

    1. For Database name, enter a name.

    For Database name, enter a name.

    1. In the Query Information section, for Scheduled query name, enter a name for the query.
    2. For SQL query, enter the SQL code.

    You also have the ability to upload a SQL query from a file.

    You also have the ability to upload a SQL query from a file.

    1. In the Scheduling options section, for Schedule query by, select Run frequency.
    2. For Repeat by, choose Day.
    3. For Repeat every, enter 1.
    4. For Repeat at time (UTC), enter 04:00.

    For Repeat at time (UTC), enter 04:00.

    1. In the Monitoring section, you can choose to enable notifications via email or text using Amazon Simple Notification Service (Amazon SNS).

    If you decide to enable notifications, make sure that the Publish action has been granted to events.amazonaws.com on the SNS topic. You can do this by adding the following snippet to the access policy of the SNS topic. Replace ACCOUNT_ID and SNS_TOPIC_NAME with appropriate values. If you choose to create a new SNS topic during the schedule creation process, then the following access is granted automatically.

    {
          "Sid": "Allow_Publish_Events",
          "Effect": "Allow",
          "Principal": {
            "Service": "events.amazonaws.com"
          },
          "Action": "sns:Publish",
          "Resource": "arn:aws:sns:us-east-1:{ACCOUNT_ID}:{SNS_TOPIC_NAME}"
      }

    Scheduling the SQL statement using the AWS CLI

    You can also schedule SQL statements via the AWS CLI using EventBridge and the Amazon Redshift Data API. For more information about the Amazon Redshift Data API, see Using the Amazon Redshift Data API to interact with Amazon Redshift clusters.

    In the following example, we set up a schedule to refresh a materialized view (called mv_cust_trans_hist) on Amazon Redshift daily at 2:00 AM UTC.

    1. Create an event rule. The following command creates a rule named scheduled-refresh-mv-cust-trans-hist and schedules it to run daily at 2:00 AM UTC. The schedule expression can be provided using cron or rate expressions.
      aws events put-rule \
      --name scheduled-refresh-mv-cust-trans-hist \
      --schedule-expression "cron(0 22 * * ? *)"

    1. Create a JSON object that contains the Amazon Redshift Data API parameters:
      1. For ARN, enter the ARN of your Amazon Redshift cluster.
      2. For the RoleArn and the SecretManagerArn fields, use the ARNs for the role and secret you created earlier.
      3. Enter the database name and the SQL statement to be scheduled for the database and SQL fields.
      4. You can enter any name for the StatementName field.
      5. If you want an event to be sent after the SQL statement has been run, you can set the WithEvent parameter to true. You can then use that event to trigger other SQL statements if needed.
        {
        "Rule": "scheduled-refresh-mv-cust-trans-hist",
        "EventBusName": "default",
        "Targets": 
        [
        {
        "Id": "scheduled-refresh-mv-cust-trans-hist",
        "Arn": "arn:aws:redshift:us-east-1:{ACCOUNT_ID}:cluster:{REDSHIFT_CLUSTER_IDENTIFIER}",
        "RoleArn": "arn:aws:iam::{ACCOUNT_ID}:role/{ROLE_NAME}",
        "RedshiftDataParameters": 
        {
        "SecretManagerArn": "arn:aws:secretsmanager:us-east-1:{ACCOUNT_ID}:secret:{SECRET_NAME-xxxxxx}",
        "Database": "dev",
        "Sql": "REFRESH MATERIALIZED VIEW mv_cust_trans_hist;",
        "StatementName": "refresh-mv-cust-trans-hist",
        "WithEvent": true
        }
        }
        ]
        }

    1. Create an event target using the JSON file created in the previous step:
      aws events put-targets --cli-input-json file://data.json

    Additional commands

    We have now set up the schedule to refresh the materialized view using the AWS CLI. Let’s quickly go over a few more CLI commands that are useful while scheduling tasks:

    • To list all targets for a particular rule, use:
      aws events list-targets-by-rule --rule <rule-name> 

    • To list all rules, use:
      aws events list-rules

    • To remove a target from a specific rule, use:
      aws events remove-targets --rule <rule-name> --ids 2 

    • To delete a rule, use:
      aws events delete-rule --name <rule-name>

    • To view the schedule history for a particular scheduled SQL statement, use:
      aws redshift-data list-statements --status ALL --statement-name <statement-name>

    Retrieving the SQL status and results

    After the schedule is set up, scheduled queries might be listed in the following places:

    • On the Schedules tab of the details page of your cluster.
    • On the scheduled queries list that you can reach from the navigation pane. To see the list, in the navigation pane, choose Queries and Schedule query list.
    • On the Scheduled queries tab of the query editor (see the following screenshot).

    On the Scheduled queries tab of the query editor (see the following screenshot).

    1. Choose the schedule name to see more details about the scheduled query, including details about any previous runs of the schedule.
    2. In the Schedule history section, you can see the ID (which can be used to retrieve SQL statement results), start time, end time, status, and elapsed time for each previous run of the scheduled SQL statement.

    In the Schedule history section, you can see the ID.

    To retrieve the SQL results, you need to use the AWS CLI (or AWS SDK), but you have to assume the role you created earlier.

    1. To assume this role, run the following command on the command line using the IAM user you configured.
      aws sts assume-role --role-arn "arn:aws:iam::{Account_ID}:role/{Role_Name}" --role-session-name AWSCLI-Session

    The command returns a result set similar to the following code:

    {
        "Credentials": {
            "AccessKeyId": "XXXXXXXXXXXX",
            "SecretAccessKey": "XXXXXXXXXXXXXXXXX",
            "SessionToken": "xxxxxxxxxxxxxxxxxxxxx”
            "Expiration": "2020-10-09T17:13:23+00:00"
        },
        "AssumedRoleUser": {
            "AssumedRoleId": "XXXXXXXXXXXXXXXXX:AWSCLI-Session",
            "Arn": "arn:aws:sts::{Account_ID}:assumed-role/{Role_Name}/AWSCLI-Session"
        }
    }
    1. Create three environment variables to assume the IAM role by running the following commands. Use the access key, secret access key, and the session token from the preceding results.
      export AWS_ACCESS_KEY_ID=RoleAccessKeyID
      export AWS_SECRET_ACCESS_KEY=RoleSecretKey
      export AWS_SESSION_TOKEN=RoleSessionToken

    1. Run the following command to retrieve the results of the SQL statement. Replace the ID with the ID from the schedule history on the console.
      aws redshift-data get-statement-result --id xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx --region us-east-1 

    Conclusion

    The ability to schedule SQL statements using the Amazon Redshift Data API and EventBridge simplifies running routine tasks that previously required scripting. You can configure schedules and manage them either via the console or the AWS CLI. You can also see the previous runs of any scheduled SQL statements directly from the console and choose to be notified when it runs.


    About the Authors

    Sain Das is an Analytics Specialist Solutions Architect at AWS and helps customers build scalable cloud solutions that help turn data into actionable insights.

     

     

     

    Vijetha Parampally Vijayakumar is a full stack software development engineer with Amazon Redshift. She is specialized in building applications for Big data, Databases and Analytics.

     

     

     

    André Dias is a Systems Development Engineer working in the Amazon Redshift team. André’s passionate about learning and building new AWS Services and has worked in the Redshift Data API. He specializes in building highly available and cost-effective infrastructure using AWS.

    Dream11’s journey to building their Data Highway on AWS

    Post Syndicated from Pradip Thoke original https://aws.amazon.com/blogs/big-data/dream11s-journey-to-building-their-data-highway-on-aws/

    This is a guest post co-authored by Pradip Thoke of Dream11. In their own words, “Dream11, the flagship brand of Dream Sports, is India’s biggest fantasy sports platform, with more than 100 million users. We have infused the latest technologies of analytics, machine learning, social networks, and media technologies to enhance our users’ experience. Dream11 is the epitome of the Indian sports technology revolution.”

    Since inception, Dream11 has been a data-driven sports technology brand. 

    Since inception, Dream11 has been a data-driven sports technology brand. The systems that power Dream11, including their transactional data warehouse, run on AWS. As Dream11 hosts fantasy sports contests that are joined by millions of Indian sports fans, they have large volumes of transactional data that is organized in a well-defined Amazon Redshift data warehouse. Previously they were using 3rd party services to collect, analyze and build models over user interaction data combined with transactional data. Although this approach was convenient, it presented certain critical issues:

    • The approach wasn’t conducive to 360-degree user analytics. Dream11’s user interactions data wasn’t present on the cloud, where the rest of Dream11’s infrastructure and data were present (AWS, in this case). To get a complete picture of a user’s experience and journey, the user’s interaction data (client events) needs to be analyzed alongside their transactional data (server events). This is known as 360-degree user analytics.
    • It wasn’t possible to get accurate user journey funnel reports. Currently, there are limitations with every tool available on the market with respect to identifying and mapping a given user’s actions across multiple platforms (on the web, iOS, or Android), as well as multiple related apps. This use case is specifically important if your company is a parent to other companies.
    • The statistics on user behavior that Dream11 was getting weren’t as accurate as they wanted. Some of the popular services they were using for web & mobile analytics use the technique of sampling to be able to deal with high volumes of data. Although this is a well-regarded technique to deal with high volumes of data and provides reasonable accuracy in multiple cases, Dream11 wanted statistics to be as accurate as possible.
    • The analytics wasn’t real-time. Dream11 experiences intense use by their users just before and during the real-life sports matches, so real-time and near-real-time analytics is very critical for them. This need wasn’t sufficiently met by the plethora of services they were using.
    • Their approach was leading to high cost for custom analytics for Dream11’s user interactions data, consisting of hundreds of event types. Serverless query engines typically charge by the amount of data scanned and so it can get very expensive if events data isn’t organized properly in separate tables in a data lake to enable selective access.

    All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform.

    All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform. Therefore, they embarked on the Data Highway project on AWS.

    This project has additional advantages. It is increasingly becoming important to store and process data securely. Having everything in-house can help Dream11 with data security and data privacy objectives. The platform enables 360-degree customer analytics, which further allows Dream11 to do intelligent user segmentation in-house and share only those segments (without exposing underlying transactional or interactions data) with third-party messaging service providers. 

    Design goals

    Dream11 had the following design goals for the project:

    • The system should be easy to maintain and should be able to handle a very high volume of data, consisting of billions of events and terabytes of data daily.
    • The cost should be low and should be pay-as-you-go.
    • Dream11’s web and mobile developers regularly create new types of events to capture new types of interactions. Whenever they add new types of events, they should be immediately available in the system for analytics, and their statistics should immediately reflect in relevant dashboards and reports without any human intervention.
    • Certain types of statistics (such as concurrency) should be available in real-time and near-real time—within 5 minutes or less.
    • Dream11 should be able to use custom logic to calculate key statistics. The analytics should be accurate—no more sampling.
    • The data for various events should be neatly organized in separate tables and analytics-friendly file formats.
    • Although Dream11 will have a common data lake, they shouldn’t be constrained to use a single analytics engine for all types of analytics. Different types of analytics engines excel for different types of queries.
    • The Product Management team should have access to views they commonly use in their decision-making process, such as funnels and user flow diagrams.
    • The system should be extensible by adding lanes in the system. Lanes allow you to reuse your basic setup without mixing events data for different business units. It also potentially allows you to study user behavior across different apps.
    • The system should be able to build 360-degree user profiles
    • The system should provide alerting on important changes to key business metrics.
    • Last but not the least, the system should be secure and reliable with 6 nines of availability guarantee.

    Data Highway architecture

    In less than 3 months, Dream11’s data team built a system that met all the aforementioned goals. The following diagram shows the high-level architecture.

    The following diagram shows the high-level architecture.

    For this project, they used the following components:

    The rest of this post explains the various design choices and trade-offs made by the Dream11’s data engineers. 

    Event ingestion, segregation, and organization

    Dream11 has several hundred event types. These events have common attributes and specific attributes. The following diagram shows the logical structure of these events.

    The following diagram shows the logical structure of these events.

    When the front end receives an event, it saves fields up to common attributes into a message and posts it to Kafka_AllEvents_CommonAttributes. This Kafka topic is the source for the following systems:

    • Apache HBase on Amazon EMR – Provides real-time concurrency analytics
    • Apache Druid – Provides near real-time dimensional analytics
    • Amazon Redshift – Provides session analytics

    The front end also saves events, as they are, into Kafka_AllEvents_AllAttributes. These events are further picked by Apache Ni-Fi, which forwards them to their respective topics. Apache Ni-Fi supports data routing, transformation, and system mediation logic using powerful and scalable directed graphs. Data is transformed and published to Kafka by using a combination of RouteOnAttribute and JoltTransformJSON processors (to parse JSON). Apache Ni-Fi basically reads event names and posts to the Kafka topic with matching names. If Kafka doesn’t have a topic with that name, it creates a new topic with that name. You can configure your Kafka brokers to auto-create a topic when a message is received for a non-existent topic.

    The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

      The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

    The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

     

    The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

    Storage, cataloging, ETL, and scheduling

    In this section, we discuss how Dream11 updates their AWS Glue Data Catalog, performs extract, transform, and load (ETL) jobs with Amazon EMR Presto, and uses Apache Airflow for schedule management.

    Updating the AWS Glue Data Catalog with metadata for the target table

    The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. It provides out-of-the-box integration with Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon Redshift, Amazon Redshift Spectrum, Athena, Amazon EMR, and any application compatible with the Apache Hive metastore. You can create your table definitions one time and query across engines. For more information, see FAQ: Upgrading to the AWS Glue Data Catalog.

    Because this Data Catalog is accessible from multiple services that were going to be used for the Data Highway project, Dream11 decided to use it to register all the table definitions.

    Registering tables with AWS Glue Data Catalog is easy. You can use an AWS Glue crawler. It can infer schema from files in Amazon S3 and register a table in the Data Catalog. It works quite well, but Dream11 needed additional actions, such as automatically configuring Kafka Amazon S3 sink connectors etc. Therefore, they developed two Python based crawlers.

    The first Python based crawler runs every 2 hours and looks up Kafka topics. If it finds a new topic, it configures a Kafka Amazon S3 connector sink to dump its data to Amazon S3 every 30 minutes in JSON Gzip format. It also registers a table with Glue Data Catalog so that users can query the JSON data directly, if needed. 

    The second Python based crawler runs once a day and registers a corresponding table for each new table created that day to hold flattened data (Parquet, Snappy). It infers schemas and registers tables with the Data Catalog using its Table API. It adds customization needed by the Dream11 team to the metadata. It then creates Amazon EMR Presto ETL jobs to convert JSON, Gzip data to Parquet, Snappy, and registers them with Apache Airflow to run every 24 hours.

    ETL with Amazon EMR Presto

    Dream11 has a multi node, long-running, multi-purpose EMR cluster. They decided to run scheduled ETL jobs on it for the Data Highway project.

    ETL for an event table involves a simple SELECT FROM -> INSERT INTO command to convert JSON (Gzip) to Parquet (Snappy). Converted data takes up to 70% less space, results in 10 times improvement in Athena query performance. ETL happens once a day. Tables are partitioned by day.

    Data received on Kafka_AllEvents_CommonAttributes topic is loaded to Redshift. ETL involves SELECT FROM -> INSERT INTO to convert JSON (Gzip) to CSV, followed by Amazon Redshift COPY.

    Apache Airflow for schedule management

    Apache Airflow is an open-source tool for authoring and orchestrating big data workflows. With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that run independently. The DAG keeps track of the relationships and dependencies between tasks.

    Dream11 uses Apache Airflow to schedule Python scripts and over few hundred ETL jobs on Amazon EMR Presto to convert JSON (Gzip) data for over few hundred events to Parquet (Snappy) format, and converts JSON data containing common attributes for all events to CSV before loading to Amazon Redshift. For more information, see Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1.

    The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

    The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

    The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling.

    The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling. 

    Real-time and near-real-time analytics

    In this section, we discuss the real-time and near-real-time analytics performed on Dream11’s data.

    Concurrency analytics with Apache Druid

    Apache Druid is an OLAP-style data store. It computes facts and metrics against various dimensions while data is being loaded. This avoids the need to compute results when a query is run.

    Dream11’s web and mobile events are loaded from the Kafka_AllEvents_CommonAttributes topic to Apache Druid with the help of the Apache Druid Kafka indexing service. Dream11 has a dashboard with different granularity levels and dimensions such as app version, org, and other dimensions present in the common event attributes list.

    Finding active users with Amazon EMR HBase

    Dream11 also needs to identify individual active users at any given time or during a given window. This is required by other downstream teams such as the Data Science team and Digital User Engagement team.

    With the help of a Java consumer, they push all events from the Kafka_AllEvents_ CommonAttributes topic to HBase on an EMR cluster with just required user dimensions. They can query the data in HBase with SQL syntax supported by the Apache Phoenix interface. 

    Session analytics with Amazon Redshift

    Dream11 maintains their transactional data warehouse on Amazon Redshift multi node cluster. Amazon Redshift allows them to run complex SQL queries efficiently. Amazon Redshift would have been a natural choice for event analytics for hundreds of event types. However, in Dream11’s case, events data grows very rapidly and this would be a lot of data in Amazon Redshift. Also, this data loses its value rapidly as time passes (relatively speaking) compared with transactional data. Therefore, they decided to do only session analytics in Amazon Redshift to benefit from its complex SQL query capabilities and to do analytics for individual events with the help of Athena (which we discuss in the next section).

    Data received on Kafka_AllEvents_CommonAttributes is loaded into Amazon S3 every 30 minutes by the associated kafka connector sink. This data is in JSON format with Gzip compression. Every 24 hours, a job runs on Amazon EMR Presto that flattens this data into CSV format. The data is loaded into Amazon Redshift with the COPY command. The data gets loaded first into a staging table. Data in the staging table is aggregated to get sessions data. Amazon Redshift already has transactional data from other tables that, combined now with the session data, allows Dream11 to perform 360-degree user analytics. They can now easily segment users based on their interactions data and transactions data. They can then run campaigns for those users with the help of messaging platforms. 

    Event analytics with Athena

    Dream11 uses Athena to analyze the data in Amazon S3. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. It made perfect sense to organize data for over hundreds of event tables in Amazon S3 and analyze them with Athena on demand.

    With Athena, you’re charged based on the amount of data scanned by each query. You can get significant cost savings and performance gains by compressing, partitioning, or converting your data to a columnar format, because each of those operations reduces the amount of data that Athena needs to scan to run a query. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

    As discussed before, Dream11 has registered over hundreds of tables for events data in JSON format, and similar number of tables for events data in Parquet format with the AWS Glue Data Catalog. They observed a performance gain of 10 times on conversion of data format to Parquet, and an 80% savings in space. Data in Amazon S3 can be queried directly through the Athena UI with SQL queries. The other option they use is connecting to Athena using a JDBC driver from Looker and their custom Java UI for the Data Aware project.

    Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

      Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

     The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

     The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

     

    Conclusion

    This architecture has enabled Dream11 to achieve all the design goals they set out with. Results of analytics for real-time requirements are available under millisecond latency, and the system costs 40% less than the previous system. Analytics is performed with all the data without sampling, so results are accurate and reliable. All the data and analytics engines are within Dream11’s AWS account, improving data security and privacy.

    As of this writing, the system handles 14 TB of data per day and it has served 80 million requests per minute at peak during Dream11 IPL 2020.

    Doing all their analytics in-house on AWS has not just improved speed, accuracy, and data security, it has also enabled newer possibilities. Now Dream11 has a 360-degree view of their users. They can study their users’ progress across multiple platforms – web, Android, and IOS. This new system is enabling novel applications of machine learning, digital user engagement, and social media technologies at Dream11.


    About the Authors

    Pradip Thoke is a AVP Data Engineering at Dream11 and leads their Data Engineering team. The team involved in this implementation includes Vikas Gite, Salman Dhariwala, Naincy Suman, Lavanya Pulijala, Ruturaj Bhokre, Dhanraj Gaikwad, Vishal Verma, Hitesh Bansal, Sandesh Shingare, Renu Yadav, Yash Anand, Akshay Rochwani, Alokh P, Sunaim and Nandeesh Bijoor.

     

    Girish Patil is a Principal Architect AI, Big Data, India Scale Apps for Amazon.

    Building high-quality benchmark tests for Amazon Redshift using Apache JMeter

    Post Syndicated from Asser Moustafa original https://aws.amazon.com/blogs/big-data/building-high-quality-benchmark-tests-for-amazon-redshift-using-apache-jmeter/

    In the introductory post of this series, we discussed benchmarking benefits and best practices common across different open-source benchmarking tools. As a reminder of why benchmarking is important, Amazon Redshift allows you to scale storage and compute independently, and for you to choose an appropriately balanced compute layer, you need to profile the compute requirements of various production workloads. Existing Amazon Redshift customers also desire an approach to scale up with eyes wide open, and benchmarking different Amazon Redshift cluster configurations against various production workloads can help you appropriately accommodate workload expansion. In addition, you may also use benchmark tests to proactively monitor a production cluster’s performance in real time.

    For prospective Amazon Redshift customers, benchmarking Amazon Redshift is often one of the main components of evaluation and a key source of insight into the price-to-performance ratio of different Amazon Redshift configurations.

    Open-source tools, with their cost-efficiency and vendor neutrality, are often the preferred choice for profiling your production workloads and benchmark tests. However, best practices for using these tools are scarce, possibly resulting in flawed compute profiles, flawed benchmark results, customer frustration, or bloated timelines.

    As mentioned, this series is divided into multiple installments, with the first installment discussing general best practices for benchmarking, and the subsequent installments discussing the strengths and challenges with different open-source tools such as SQLWorkbench, psql, and Apache JMeter. In this post, we discuss benchmarking Amazon Redshift with the Apache JMeter open-source tool.

    One final point before we get started: there is a lot that could be said about benchmarking—more than can be accommodated in a single post. Analytics Specialists Solutions Architects such as myself frequently and happily engage with current and prospective customers to help you evaluate your benchmarking strategy and approach at no charge. I highly recommend you take advantage of that benefit by reaching out to your AWS account SA.

    Apache JMeter

    Apache JMeter is an open-source load testing application written in Java that you can use to load test web applications, backend server applications, databases, and more. You can run it on Windows and a number of different Linux/UNIX systems; for this post we run it in a Windows environment. To install Apache JMeter on a Windows EC2 machine, complete the following steps:

    1. Launch a Windows EC2 instance using a Windows Server AMI (such as Microsoft Windows Server 2019 Base).
    2. Connect via RDP to the Windows EC2 Instance (RDP for macOS can be downloaded from Apple’s App Store).
    3. Download and unzip the Apache JMeter .zip file from the Apache JMeter download page.
    4. Download the Redshift JDBC driver and add the driver .jar file to JMeter’s /lib When setting up the JDBC connection in the JMeter GUI, use com.amazon.redshift.jdbc.Driver as the driver class name).
    5. Download the Apache Plugins Manager .jar file to JMeter’s /lib/ext The Apache Plugins Manager enables additional crucial functionality in Apache JMeter for benchmark testing (such as Ultimate Thread Group).
    6. Increase the JVM heap size for Apache JMeter by changing the corresponding JVM parameters in the jmeter.bat file located in the Apache JMeter /bin folder. For example, see the following code:
      edit C:\Dev\apache-jmeter-5.1.1\bin\jmeter.bat rem set HEAP=-Xms1g -Xmx1g -XX:MaxMetaspaceSize=256m set HEAP=-Xms5g -Xmx5g -XX:MaxMetaspaceSize=1g

    1. Choose the jmeter.bat file (double-click) to start Apache JMeter.

    Apache JMeter supports both GUI and CLI modes, and although you may find the Apache JMeter GUI straightforward with a relatively small learning curve, it’s highly recommended that you use the Apache JMeter GUI primarily for defining benchmark tests, and perhaps running small-to-medium-sized benchmark tests. For large load tests, it’s highly recommended that you use the Apache JMeter CLI to minimize the risk of the Apache JMeter GUI exhausting its host’s compute resources, causing it to enter a non-responsive state or fail with an out-of-memory error. Using the CLI for large load tests also helps minimize any impact on the benchmark results.

    In the following example, I demonstrate creating a straightforward load test using both the Apache JMeter GUI and CLI. The load test aims to measure query throughput while simulating 50 concurrent users with the following personas:

    • 20 users submit only small queries, which are of low complexity and typically have a runtime of 0–30 seconds in the current system, such as business intelligence analyst queries
    • 20 users submit only medium queries, which are of moderate complexity and typically have a runtime of 31–300 seconds in the current system, such as data engineer queries
    • 10 users submit only large queries, which are very complex and typically have a runtime over 5 minutes in the current system, such as data scientist queries

    The load test is configured to run for 15 minutes, which is a pretty short test duration, so you can increase that setting to 30 minutes or more. We rely on JMeter’s query throughput calculation, but we can also manually compute query throughput from the runtime metadata that is gathered if we so desire.

    For this post, I skip over discussing the possible Amazon Redshift cluster tweaks that you could use to squeeze every drop of performance out of Amazon Redshift, and instead rely on the strength of its default state to be optimized to achieve excellent query throughput on diverse workloads.

    Apache JMeter has a number of building blocks, such as thread groups, that can be used to define a wide variety of benchmark tests, and each building block can have a number of community implementations (for example, Arrivals Thread Group or Ultimate Thread Group).

    The following diagram provides a basic illustration of the various Apache JMeter building blocks to be leveraged in this load test, how they interact with each other, and the typical order in which are they created; in some cases, I mention the specific implementation of the building block to be used in parenthesis (such as Ultimate Thread Group).

    The following table delves deeper into the purpose that each building block serves in our load test.

    Apache JMeter Component Purpose
    Test Plan Represents an atomic test case (simulate 50 users concurrently querying a Redshift cluster with twice the baseline node count)
    JDBC Connection Configuration Represents all the JDBC information needed to connect to the Amazon Redshift cluster (such as JDBC URL, username, and password)
    User Defined Variables A collection of key-value pairs that can be used as parameters throughout the test plan and make it easier to maintain or change the test behavior
    Listener Captures and displays or writes test output such as SQL result sets
    Thread Group A simulated group of users that perform the test function (submit a SQL query)
    JDBC Request The action to be taken by the simulated users (SQL query text)

    Apache JMeter (GUI)

    The following screenshot is the resulting load test.

    The following screenshot is the resulting load test.

    The following screenshot provides a close up of the building block tree.

    The following screenshot provides a close up of the building block tree.

    In the following sections, we examine each building block in greater detail.

    Test Plan

    The test plan serves as the parent container for our entire benchmark test, and we can change its name in the visual tree that appears in the Apache JMeter GUI by editing the Name field.

    I take advantage of the User Defined Variables section to set my own custom variables that hold values needed by all components in the test case, such as the JDBC URL, test duration, and number of users submitting small, medium, and large queries. The baseDir variable is actually a variable that is intended to be embedded in other variables, rather than directly referenced by other test components. I left all other settings at their default on this page.

    The baseDir variable is actually a variable that is intended to be embedded in other variables, rather than directly referenced by other test components.

    JDBC Connection Configuration

    We use the JDBC Connection Configuration building block to create a database connection pool that is used by the simulated users to submit queries to Amazon Redshift. The value specified in Variable Name for created pool is the identifier that is used to reference this connection pool in other JMeter building blocks. In this example, I named it RedshiftJDBCConfig.

    By setting the Max Number of Connections to 0, the connection pool can grow as large as it needs to. That may not be the desired behavior for all test scenarios, so be sure to set it as you see fit.

    In the Init SQL statements section, I provide an example of how to use SQL to disable the result set cache in Amazon Redshift for every connection created, or perform other similar initialization code.

    Towards the end, I input the database JDBC URL (which is actually a variable reference to a variable defined in the test plan), JDBC driver class name, and database username and password. I left all other fields at their default on this page.

    I left all other fields at their default on this page.

    User Defined Variables

    You can add a User Defined Variables building block in several places, and it’s best to use this capability to limit the scope of each variable. For this post, we use an instance of the User Defined Variables building block to hold the output file names of each listener in this test plan (if you look closely, you can see the values of these variables reference the baseDir variable, which was defined in our test plan). You can also notice three other instances of the User Defined Variables building block for the small, medium, and large thread groups—again so that the scope of variables is kept appropriately narrow.

    You can add a User Defined Variables building block in several places, and it’s best to use this capability to limit the scope of each variable.

    Listeners

    Listeners control where test output is written and how it’s processed. There are many different kinds of listeners that, for example, allow you to capture your test output as a tree, table, or graph. Other listeners can summarize and aggregate test metadata (such as the number of test samples submitted during the test). I choose to add several listeners in this test plan just for demonstration, but I have found the listeners Aggregate Report and View Results in Table to be most helpful to me. The following screenshot shows the View Results in Table output.

    The following screenshot shows the View Results in Table output.

    The following screenshot shows the Aggregate Report output.

    The following screenshot shows the Aggregate Report output.

    You can also save output from listeners after a test run to a different file through the JMeter menu.

    Thread group: Ultimate Thread Group

    A thread group can be thought of as a group of simulated users, which is why for this post, I create three separate thread groups: one to represent each of three previously mentioned user personas being simulated (small, medium, and large). Each thread group is named accordingly.

    We use the Thread Schedule section to control how many users should be created and at what time interval. In this test, I chose to have all 20 small users created at start time without any delays. This is achieved by a one-row entry in the Thread Schedule and setting the Start Threads Count thread group property to 20 users (or the matching variable, as we do in the following screenshot).

    We use the Thread Schedule section to control how many users should be created and at what time interval.

    Alternatively, I could stagger user creation by creating multiple rows and setting the Initial Delay sec field to control each row’s startup delay. With the row entries in the following screenshot, an additional five users are created every 5 seconds.

    With the row entries in the following screenshot, an additional five users are created every 5 seconds.

    Thread group: User Defined Variables

    An additional User Defined Variables instance is added to each of the three thread groups to hold the variables in their individual scope, or that would preferably be configurable at an individual thread group level. For this post, I make the JDBC Connection Configuration a variable so that it’s customizable for each individual thread group (JDBC_Variable_Name_In_Pool). This allows me to, for example, rapidly switch two different test clusters.

    An additional User Defined Variables instance is added to each of the three thread groups to hold the variables in their individual scope.

    JDBC Request

    The JDBC Request can be thought of as the benchmark query or SQL test query to be submitted non-stop by each simulated user in this thread group. To configure this JDBC Request, I specified the appropriate JDBC Connection Configuration and some very simple test SQL. I could have also used Apache JMeter’s ability to parameterize queries so that they vary from one iteration to another using a predetermined set of parameter values. For example, for the SQL statement select * from customer where cust_id=<some value>, Apache JMeter could be configured to set the value in the filter clause to a randomly chosen value from a pre-compiled list of filter values for each sample submission. I left all other settings at their default.

    The JDBC Request can be thought of as the benchmark query or SQL test query to be submitted non-stop by each simulated user in this thread group.

    Apache JMeter (CLI)

    The Apache JMeter GUI saves test plans in .jmx files that can be used to run the same test plan in Apache JMeter’s console mode. The following CLI command demonstrates how you can use the LoadTestExample.jmx file that was created in the previous steps using the GUI to run the same load test:

    > <jmeter_install_dir>\bin\jmeter -n -t LoadTestExample.jmx -e -l test.out

    The sample output is from a 30-second run of LoadTestExample.jmx.

    The sample output is from a 30-second run of LoadTestExample.jmx.

    After the test has completed, several output files are created, such as a JMeter application log, query output files from the listeners (if any), and test statistics from listeners (if any). For this post, the statistical metrics captured for the test run are located in a JSON file inside the report-output directory. See the following screenshot.

    For this post, the statistical metrics captured for the test run are located in a JSON file inside the report-output directory.

    The \report-output\statistics.json file captures a lot of useful metrics, such as the total samples (like SQL queries) submitted during the test duration, achieved query throughput, and number of small, medium, and large queries and their individual throughput. The following screenshot shows a sampling of the data from statistics.json.

    The following screenshot shows a sampling of the data from statistics.json.

    Conclusion

    In this series of posts, we discussed several recommended best practices for conducting high-quality benchmark tests. Some of the best practices represented core principles that span all the open-source tools discussed (such as consistency in testing methodology). In this particular post, we reviewed the strengths and appropriateness of Apache JMeter for conducting benchmark tests. I hope this series has been helpful, and strongly encourage current and prospective customers to reach out to me or other AWS colleagues if you wish to delve deeper.


    About the Author

    Asser Moustafa is an Analytics Specialist Solutions Architect at AWS based out of Dallas, Texas. He advises customers in the Americas on their Amazon Redshift and data lake architectures and migrations, starting from the POC stage to actual production deployment and maintenance

    Setting up automated data quality workflows and alerts using AWS Glue DataBrew and AWS Lambda

    Post Syndicated from Romi Boimer original https://aws.amazon.com/blogs/big-data/setting-up-automated-data-quality-workflows-and-alerts-using-aws-glue-databrew-and-aws-lambda/

    Proper data management is critical to successful, data-driven decision-making. An increasingly large number of customers are adopting data lakes to realize deeper insights from big data. As part of this, you need clean and trusted data in order to gain insights that lead to improvements in your business. As the saying goes, garbage in is garbage out—the analysis is only as good as the data that drives it.

    Organizations today have continuously incoming data that may develop slight changes in schema, quality, or profile over a period of time. To ensure data is always of high quality, we need to consistently profile new data, evaluate that it meets our business rules, alert for problems in the data, and fix any issues. In this post, we leverage AWS Glue DataBrew, a visual data preparation tool that makes it easy to profile and prepare data for analytics and machine learning (ML). We demonstrate how to use DataBrew to publish data quality statistics and build a solution around it to automate data quality alerts.

    Overview of solution

    In this post, we walk through a solution that sets up a recurring profile job to determine data quality metrics and, using your defined business rules, report on the validity of the data. The following diagram illustrates the architecture.

    We’ll walk through a solution that takes sets up a recurring Profile job to determine data quality metrics, and using your defined business rules.

    The steps in this solution are as follows:

    1. Periodically send raw data to Amazon Simple Storage Service (Amazon S3) for storage.
    2. Read the raw data in Amazon S3 and generate a scheduled DataBrew profile job to determine data quality.
    3. Write the DataBrew profile job output to Amazon S3.
    4. Trigger an Amazon EventBridge event after job completion.
    5. Invoke an AWS Lambda function based on the event, which reads the profile output from Amazon S3 and determines whether the output meets data quality business rules.
    6. Publish the results to an Amazon Simple Notification Service (Amazon SNS) topic.
    7. Subscribe email addresses to the SNS topic to inform members of your organization.

    Prerequisites

    For this walkthrough, you should have the following prerequisites:

    Deploying the solution

    For a quick start of this solution, you can deploy the provided AWS CloudFormation stack. This creates all the required resources in your account (us-east-1 Region). Follow the rest of this post for a deeper dive into the resources.

    1. Choose Launch Stack:

    1. In Parameters, for Email, enter an email address that can receive notifications.
    2. Scroll to the end of the form and select I acknowledge that AWS CloudFormation might create IAM resources.
    3. Choose Create stack.

    It takes a few minutes for the stack creation to complete; you can follow progress on the Events tab.

    1. Check your email inbox and choose Confirm subscription in the email from AWS Notifications.

    The default behavior of the deployed stack runs the profile on Sundays. You can start a one-time run from the DataBrew console to try out the end-to-end solution.

    Setting up your source data in Amazon S3

    In this post, we use an open dataset of New York City Taxi trip record data from The Registry of Open Data on AWS. This dataset represents a collection of CSV files defining trips taken by taxis and for-hire vehicles in New York City. Each record contains the pick-up and drop-off IDs and timestamps, distance, passenger count, tip amount, fair amount, and total amount. For the purpose of illustration, we use a static dataset; in a real-world use case, we would use a dataset that is refreshed at a defined interval.

    You can download the sample dataset (us-east-1 Region) and follow the instructions for this solution, or use your own data that gets dumped into your data lake on a recurring basis. We recommend creating all your resources in the same account and Region. If you use the sample dataset, choose us-east-1.

    Creating a DataBrew profile job

    To get insights into the quality of our data, we run a DataBrew profile job on a recurring basis. This profile provides us with a statistical summary of our dataset, including value distributions, sparseness, cardinality, and type determination.

    Connecting a DataBrew dataset

    To connect your dataset, complete the following steps:

    1. On the DataBrew console, in the navigation pane, choose Datasets.
    2. Choose Connect new dataset.
    3. Enter a name for the dataset.
    4. For Enter your source from S3, enter the S3 path of your data source. In our case, this is s3://nyc-tlc/misc/.
    5. Select your dataset (for this post, we choose the medallions trips dataset FOIL_medallion_trips_june17.csv).

    1. Scroll to the end of the form and choose Create dataset.

    Creating the profile job

    You’re now ready to create your profile job.

    1. In the navigation pane, choose Datasets.
    2. On the Datasets page, select the dataset that you created in the previous step. The row in the table should be highlighted.
    3. Choose Run data profile.
    4. Select Create profile job.
    5. For Job output settings, enter an S3 path as destination for the profile results. Make sure to note down the S3 bucket and key, because you use it later in this tutorial.
    6. For Permissions, choose a role that has access to your input and output S3 paths. For details on required permissions, see DataBrew permission documentation.
    7. On the Associate schedule drop-down menu, choose Create new schedule.
    8. For Schedule name, enter a name for the schedule.
    9. For Run frequency, choose a frequency based on the time and rate at which your data is refreshed.
    10. Choose Add.

    1. Choose Create and run job.

    The job run on sample data typically takes 2 minutes to complete.

    Exploring the data profile

    Now that we’ve run our profile job, we can expose insightful characteristics about our dataset. We can also review the results of the profile through the visualizations of the DataBrew console or by reading the raw JSON results in our S3 bucket.

    The profile analyzes both at a dataset level and column level granularity. Looking at our column analytics for String columns, we have the following statistics:

    • MissingCount – The number of missing values in the dataset
    • UniqueCount – The number of unique values in the dataset
    • Datatype – The data type of the column
    • CommonValues – The top 100 most common strings and their occurrences
    • Min – The length of the shortest String value
    • Max – The length of the longest String value
    • Mean – The average length of the values
    • Median – The middle value in terms of character count
    • Mode – The most common String value length
    • StandardDeviation – The standard deviation for the lengths of the String values

    For numerical columns, we have the following:

    • Min – The minimum value
    • FifthPercentile – The value that represents 5th percentile (5% of values fall below this and 95% fall above)
    • Q1 – The value that represents 25th percentile (25% of values fall below this and 75% fall above)
    • Median – The value that represents 50th percentile (50% of values fall below this and 50% fall above)
    • Q3 – The value that represents 75th percentile (75% of values fall below this and 25% fall above)
    • NinetyFifthPercentile – The value that represents 95th percentile (95% of values fall below this and 5% fall above)
    • Max – The highest value
    • Range – The difference between the highest and lowest values
    • InterquartileRange – The range between the 25th percentile and 75th percentile values
    • StandardDeviation – The standard deviation of the values (measures the variation of values)
    • Kurtosis – The kurtosis of the values (measures the heaviness of the tails in the distribution)
    • Skewness – The skewness of the values (measures symmetry in the distribution)
    • Sum – The sum of the values
    • Mean – The average of the values
    • Variance – The variance of the values (measures divergence from the mean)
    • CommonValues – A list of the most common values in the column and their occurrence count
    • MinimumValues – A list of the 5 minimum values in the list and their occurrence count
    • MaximumValues – A list of the 5 maximum values in the list and their occurrence count
    • MissingCount – The number of missing values
    • UniqueCount – The number of unique values
    • ZerosCount – The number of zeros
    • Datatype – The datatype of the column
    • Min – The minimum value
    • Max – The maximum value
    • Median – The middle value
    • Mean – The average value
    • Mode – The most common value 

    Finally, at a dataset level, we have an overview of the profile as well as cross-column analytics:

    • DatasetName – The name of the dataset the profile was run on
    • Size – The size of the data source in KB
    • Source – The source of the dataset (for example, Amazon S3)
    • Location – The location of the data source
    • CreatedBy – The ARN of the user that created the profile job
    • SampleSize – The number of rows used in the profile
    • MissingCount – The total number of missing cells
    • DuplicateRowCount – The number of duplicate rows in the dataset
    • StringColumnsCount – The number of columns that are of String type
    • NumberColumnsCount – The number of columns that are of numeric type
    • BooleanColumnsCount – The number of columns that are of Boolean type
    • MissingWarningCount – The number of warnings on columns due to missing values
    • DuplicateWarningCount – The number of warnings on columns due to duplicate values
    • JobStarted – A timestamp indicating when the job started
    • JobEnded – A timestamp indicating when the job ended
    • Correlations – The statistical relationship between columns

    By default, the DataBrew profile is run on a 20,000-row First-N sample of your dataset. If you want to increase the limit and run the profile on your entire dataset, send a request to [email protected].

    Creating an SNS topic and subscription

    Amazon SNS allows us to deliver messages regarding the quality of our data reliably and at scale. For this post, we create an SNS topic and subscription. The topic provides us with a central communication channel that we can broadcast to when the job completes, and the subscription is then used to receive the messages published to our topic. For our solution, we use an email protocol in the subscription in order to send the profile results to the stakeholders in our organization.

    Creating the SNS topic

    To create your topic, complete the following steps:

    1. On the Amazon SNS console, in the navigation pane, choose Topics.
    2. Choose Create topic.
    3. For Type, select Standard.
    4. For Name, enter a name for the topic.

    For Name, enter a name for the topic.

    1. Choose Create topic.
    2. Take note of the ARN in the topic details to use later.

    Creating the SNS subscription

    To create your subscription, complete the following steps:

    1. In the navigation pane, choose Subscriptions.
    2. Choose Create subscription.
    3. For Topic ARN, choose the topic that you created in the previous step.
    4. For Protocol, choose Email.
    5. For Endpoint, enter an email address that can receive notifications.

    For Endpoint, enter an email address that can receive notifications.

    1. Choose Create subscription.
    2. Check your email inbox and choose Confirm subscription in the email from AWS Notifications.

    Creating a Lambda function for business rule validation

    The profile has provided us with an understanding of the characteristics of our data. Now we can create business rules that ensure we’re consistently monitoring the quality our data.

    For our sample taxi dataset, we will validate the following:

    • Making sure the pu_loc_id and do_loc_id columns meet a completeness rate of 90%.
    • If more than 10% of the data in those columns is missing, we’ll notify our team that the data needs to be reviewed.

    Creating the Lambda function

    To create your function, complete the following steps:

    1. On the Lambda console, in the navigation pane, choose Functions.
    2. Choose Create function.
    3. For Function name¸ enter a name for the function.
    4. For Runtime, choose the language you want to write the function in. If you want to use the code sample provided in this tutorial, choose Python 3.8.

    For Runtime, choose the language you want to write the function in. If you want to use the code sample provided in this tutorial, choose Python 3.8.

    1. Choose Create function.

    Adding a destination to the Lambda function

    You now add a destination to your function.

    1. On the Designer page, choose Add destination.
    2. For Condition, select On success.
    3. For Destination type, choose SNS topic.
    4. For Destination, choose the SNS topic from the previous step.

    For Destination, choose the SNS topic from the previous step.

    1. Choose Save.

    Authoring the Lambda function

    For the function code, enter the following sample code or author your own function that parses the DataBrew profile job JSON and verifies it meets your organization’s business rules.

    If you use the sample code, make sure to fill in the values of the required parameters to match your configuration:

    • topicArn – The resource identifier for the SNS topic. You find this on the Amazon SNS console’s topic details page (for example, topicArn = 'arn:aws:sns:us-east-1:012345678901:databrew-profile-topic').
    • profileOutputBucket – The S3 bucket the profile job is set to output to. You can find this on the DataBrew console’s job details page (for example, profileOutputBucket = 'taxi-data').
    • profileOutputPathKey – The S3 key the profile job is set to output to. You can find this on the DataBrew console’s job details page (for example, profileOutputPathKey = profile-out/'). If you’re writing directly to an S3 bucket, keep this as an empty String (profileOutputPathKey = '').
      import json
      import boto3
      
      sns = boto3.client('sns')
      s3 = boto3.client('s3')
      s3Resource = boto3.resource('s3')
      
      # === required parameters ===
      topicArn = 'arn:aws:sns:<YOUR REGION>:<YOUR ACCOUNT ID>:<YOUR TOPIC NAME>'
      profileOutputBucket = '<YOUR S3 BUCKET NAME>'
      profileOutputPrefix = '<YOUR S3 KEY>'
      
      def verify_completeness_rule(bucket, key):
          # completeness threshold set to 10%
          threshold = 0.1
          
          # parse the DataBrew profile
          profileObject = s3.get_object(Bucket = bucket, Key = key)
          profileContent = json.loads(profileObject['Body'].read().decode('utf-8'))
          
          # verify the completeness rule is met on the pu_loc_id and do_loc_id columns
          for column in profileContent['columns']:
              if (column['name'] == 'pu_loc_id' or column['name'] == 'do_loc_id'):
                  if ((column['missingValuesCount'] / profileContent['sampleSize']) > threshold):
                      # failed the completeness check
                      return False
      
          # passed the completeness check
          return True
      
      def lambda_handler(event, context):
          jobRunState = event['detail']['state']
          jobName = event['detail']['jobName'] 
          jobRunId = event['detail']['jobRunId'] 
          profileOutputKey = ''
      
          if (jobRunState == 'SUCCEEDED'):
              profileOutputPostfix = jobRunId[3:] + '.json'
      
              bucket = s3Resource.Bucket(profileOutputBucket)
              for object in bucket.objects.filter(Prefix = profileOutputPrefix):
                  if (profileOutputPostfix in object.key):
                      profileOutputKey = object.key
              
              if (verify_completeness_rule(profileOutputBucket, profileOutputKey)):
                  message = 'Nice! Your profile job ' + jobName + ' met business rules. Head to https://console.aws.amazon.com/databrew/ to view your profile.' 
                  subject = 'Profile job ' + jobName + ' met business rules' 
              else:
                  message = 'Uh oh! Your profile job ' + jobName + ' did not meet business rules. Head to https://console.aws.amazon.com/databrew to clean your data.'
                  subject = 'Profile job ' + jobName + ' did not meet business rules'
          
          else:
              # State is FAILED, STOPPED, or TIMEOUT - intervention required
              message = 'Uh oh! Your profile job ' + jobName + ' is in state ' + jobRunState + '. Check the job details at https://console.aws.amazon.com/databrew#job-details?job=' + jobName
              subject = 'Profile job ' + jobName + ' in state ' + jobRunState
              
          response = sns.publish (
              TargetArn = topicArn,
              Message = message,
              Subject = subject
          )
      
          return {
              'statusCode': 200,
              'body': json.dumps(response)
          }

    Updating the Lambda function’s permissions

    In this final step of configuring your Lambda function, you update your function’s permissions.

    1. In the Lambda function editor, choose the Permissions tab.
    2. For Execution role, choose the role name to navigate to the AWS Identity and Access Management (IAM) console.
    3. In the Role summary, choose Add inline policy.
    4. For Service, choose S3.
    5. For Actions, under List, choose ListBucket.
    6. For Actions, under Read, choose Get Object.
    7. In the Resources section, for bucket, choose Add ARN.
    8. Enter the bucket name you used for your output data in the create profile job step.
    9. In the modal, choose Add.
    10. For object, choose Add ARN.
    11. For bucket name, enter the bucket name you used for your output data in the create profile job step and append the key (for example, taxi-data/profile-out).
    12. For object name, choose Any. This provides read access to all objects in the chosen path.
    13. In the modal, choose Add.
    14. Choose Review policy.
    15. On the Review policy page, enter a name.
    16. Choose Create policy. 

    We return to the Lambda function to add a trigger later, so keep the Lambda service page open in a tab as you continue to the next step, adding an EventBridge rule.

    Creating an EventBridge rule for job run completion

    EventBridge is a serverless event bus service that we can configure to connect applications. For this post, we configure an EventBridge rule to route DataBrew job completion events to our Lambda function. When our profile job is complete, the event triggers the function to process the results.

    Creating the EventBridge rule

    To create our rule in EventBridge, complete the following steps:

    1. On the EventBridge console, in the navigation pane, choose Rules.
    2. Choose Create rule.
    3. Enter a name and description for the rule.
    4. In the Define pattern section, select Event pattern.
    5. For Event matching pattern, select Pre-defined pattern by service.
    6. For Service provider, choose AWS.
    7. For Service name, choose AWS Glue DataBrew.
    8. For Event type, choose DataBrew Job State Change.
    9. For Target, choose Lambda function.
    10. For Function, choose the name of the Lambda function you created in the previous step.

    For Function, choose the name of the Lambda function you created in the previous step.

    1. Choose Create.

    Adding the EventBridge rule as the Lambda function trigger

    To add your rule as the function trigger, complete the following steps:

    1. Navigate back to your Lambda function configuration page from the previous step.
    2. In the Designer, choose Add trigger.
    3. For Trigger configuration, choose EventBridge (CloudWatch Events).
    4. For Rule, choose the EventBridge rule you created in the previous step.

    For Rule, choose the EventBridge rule you created in the previous step.

    1. Choose Add.

    Testing your system

    That’s it! We’ve completed all the steps required for this solution to run periodically. To give it an end-to-end test, we can run our profile job once and wait for the resulting email to get our results.

    1. On the DataBrew console, in the navigation pane, choose Jobs.
    2. On the Profile jobs tab, select the job that you created. The row in the table should be highlighted.
    3. Choose Run job.
    4. In the Run job modal, choose Run job.

    A few minutes after the job is complete, you should receive an email notifying you of the results of your business rule validation logic.

    A few minutes after the job is complete, you should receive an email notifying you of the results of your business rule validation logic.

    Cleaning up

    To avoid incurring future charges, delete the resources created during this walkthrough.

    Conclusion

    In this post, we walked through how to use DataBrew alongside Amazon S3, Lambda, EventBridge, and Amazon SNS to automatically send data quality alerts. We encourage you to extend this solution by customizing the business rule validation to meet your unique business needs.


    About the Authors

    Romi Boimer is a Sr. Software Development Engineer at AWS and a technical lead for AWS Glue DataBrew. She designs and builds solutions that enable customers to efficiently prepare and manage their data. Romi has a passion for aerial arts, in her spare time she enjoys fighting gravity and hanging from fabric.

     

     

    Shilpa Mohan is a Sr. UX designer at AWS and leads the design of AWS Glue DataBrew. With over 13 years of experience across multiple enterprise domains, she is currently crafting products for Database, Analytics and AI services for AWS. Shilpa is a passionate creator, she spends her time creating anything from content, photographs to crafts.