Tag Archives: AWS Glue

Developing, testing, and deploying custom connectors for your data stores with AWS Glue

Post Syndicated from Bo Li original https://aws.amazon.com/blogs/big-data/developing-testing-and-deploying-custom-connectors-for-your-data-stores-with-aws-glue/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue already integrates with various popular data stores such as the Amazon Redshift, RDS, MongoDB, and Amazon S3. Organizations continue to evolve and use a variety of data stores that best fit their applications and data requirements. We recently announced general availability of AWS Glue custom connectors, which makes it easy to discover and integrate with variety of additional data sources, such as SaaS applications and your custom data sources. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. We are also releasing a new framework to develop, validate, and deploy your own custom connectors (bring your own connectors / BYOC).

In this blog post, we go over three key aspects of AWS Glue custom connectors. First, we introduce the two mechanisms using which you can plug in a custom connector by either subscribing from AWS Marketplace or bring your own connector into Glue Spark jobs. Second, we describe the three interfaces based on Apache Spark DataSource, Amazon Athena Federated Query, and JDBC, which you can use to develop a custom connector with the released Glue Spark runtime.  Finally, we get deeper into the development process, and describe how Glue Spark runtime interfaces simplify data integration by offering powerful features that are built-in for Glue custom connectors. These features include job bookmarks for incremental loads of your data, at-source data filtering with SQL queries, partitioned execution for data parallelism, data type mapping, advanced Spark and built-in AWS Glue data transformations, integration with AWS Secrets Manager to securely store authentication credentials, AWS Glue Data Catalog for storing connections and table metadata. Glue custom connectors are also supported with AWS Glue Studio that enables visual authoring of your data integration jobs.

These data sources cover the following categories:

This post introduces two mechanisms to use custom connectors with AWS Glue Spark runtime and AWS Glue Studio console. First, we go over the user experience for seamless discovery and subscription to custom connectors developed by AWS Glue partners that are hosted on AWS Marketplace. Next, we go deeper into the five simple steps to develop and test your own connectors with AWS Glue Spark runtime, and deploy them into your production Apache Spark applications for ETL and analytics workloads that run on AWS Glue.

AWS Glue custom connectors: AWS Marketplace and BYOC

You can use an AWS Glue connector available on AWS Marketplace or bring your own connector (BYOC) and plug it into AWS Glue Spark runtime. This is in addition to the native connectors available with AWS Glue.

Connectors available on AWS Marketplace

As we make AWS Glue custom connectors generally available today, we have an ecosystem of Glue connectors listed on AWS Marketplace available from different AWS Glue partners, big data architects, and third-party developers. The following posts go into more detail on using some of these connectors for different use cases with AWS Glue:

BYOC connector example

Customers and application developers also need a method to develop connectors for custom data stores. The next section describes the end-to-end process to develop and test a custom connector using the AWS Glue Spark runtime library and interfaces locally.

After testing and validating, you can package and deploy the custom connector using the BYOC workflow in AWS Glue Studio. For instructions on deploying and using the Snowflake connector with AWS Glue jobs as a BYOC custom connector, see Performing data transformations using Snowflake and AWS Glue.

AWS Glue Spark runtime connector interfaces

AWS Glue Spark runtime offers three interfaces to plug in custom connectors built for existing frameworks: the Spark DataSource API, Amazon Athena Data Source API, or Java JDBC API. The following code snippets show how you can plug in these connectors into AWS Glue Spark runtime without any changes.

For connectors subscribed from AWS Marketplace, use the following code:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark|athena|jdbc", connection_options = {"dbTable":"Account","connectionName":"my-marketplace-connection"}, transformation_ctx = "DataSource0)

For custom connectors developed and deployed with AWS Glue, use the following code:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.spark|athena|jdbc", connection_options = {"dbTable":"Account","connectionName":"my-custom-connection"}, transformation_ctx = "DataSource0")

The following table summarizes the interfaces you need to implement for connectivity with AWS Glue Spark runtime using the Spark DataSource API.

InterfacesDescription
DataSourceV2The base interface for Spark DataSource v2 API.
ReadSupportA mix-in interface for DataSourceV2 for the connector to provide data reading ability and scan the data from the data source.
DataSourceReaderA data source reader that is created by ReadSupport to scan the data from this data source. It also supports reading actual schema and generating a list of InputPartition for parallel reads from Spark executors.
InputPartitionEach InputPartition is responsible for creating a data reader to read data into one RDD partition. InputPartitions are serialized and sent to executors, then the reader is created on executors to do the actual reading.
InputPartitionReaderResponsible for reading data into an RDD partition.

The following table summarizes the interfaces you need to implement for connectivity with AWS Glue Spark runtime using the Athena Data Source API.

InterfacesDescription
MetadataHandler
doGetSplitsSplits up the reads required to scan the requested batch of partitions.
doListSchemaNamesGets the list of schemas (databases) that this source contains.
doGetTableGets a definition (such as field names, types, and descriptions) of a table.
doListTablesGets the list of tables that this source contains.
getPartitionsGets the partitions that must be read from the request table.
RecordHandler
doReadRecordsReads the row data associated with the provided split.

The following diagram shows the class structure for the three interfaces and their execution on Spark drivers to read metadata and Spark executors to read data from the underlying datasource. The classes shown in pink are the ones that need to be implemented as part of the connector. Classes shown in green are already implemented as part of the Glue Spark runtime.

Steps to develop a custom connector

In the following sections, we describe how to develop, test, and validate an AWS Glue custom connector. We also show how to deploy the connectors to AWS Glue jobs using the AWS Glue Studio console.

Implementing the solution includes the following 5 high-level steps:

  1. Download and install AWS Glue Spark runtime, and review sample connectors.
  2. Develop using the required connector interface.
  3. Build, test, and validate your connector locally.
  4. Package and deploy the connector on AWS Glue.
  5. Use AWS Glue Studio to author a Spark application with the connector.

Downloading and installing AWS Glue Spark runtime and reviewing sample connectors

The first step to developing a connector is to install the Glue Spark runtime from Maven and refer to AWS Glue sample connectors on AWS Glue GitHub repository.

Developing and testing using the required connector interface

As discussed earlier, you can develop AWS Glue custom connectors with one of the following interfaces:

  • Spark DataSource
  • Athena Federated Query
  • JDBC

In this section, we walk you through each interface.

Spark DataSource interface

We use a simple example to illustrate the development of an AWS Glue custom connector with the Spark DataSource interface. You can also find intermediate and complex examples for developing connectors with more functionality for different data sources.

This solution implements a DataSourceReader that returns predefined data as InputPartitions stored in-memory with a given schema. The following interfaces need to be implemented for DataSourceReader. The DataSourceReader implementation runs on the Spark driver and plans the execution of Spark executors reading the data in InputPartitions:

class Reader implements DataSourceReader {
        public StructType readSchema() { ... }
        
        public List<InputPartition<InternalRow>> planInputPartitions() { ... }
}

The InputPartitions are read in parallel by different Spark executors using the InputPartitionReader implementation, which returns the records in Spark’s InternalRow format. The InputPartitionReader is essentially implemented to return an iterator of the records scanned from the underlying data store. Refer the following code:

class SimpleInputPartitionReader implements InputPartitionReader<InternalRow> {
    public boolean next() { ... }

    public InternalRow get() { ... }

    public void close() throws IOException { ... }
}

The second connector example shows how to use an Amazon S3 client to read the data in CSV format from an S3 bucket and path supplied as reader options. The third connector example shows how to use a JDBC driver to read data from a MySQL source. It also shows how to push down a SQL query to filter records at source and authenticate with the user name and password supplied as reader options.

You can plug the connectors based on the Spark DataSource API into AWS Glue Spark runtime as follows. You need to supply the connection_type for custom.spark and an AWS Glue catalog connection containing the reader options, such as user name and password. AWS Glue Spark runtime automatically converts the data source into a Glue DynamicFrame. The following code is an example to plug in the Elasticsearch Spark connector:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.spark", connection_options = {"path": "test", "es.nodes": "https://search-glue-etl-job-xxx.us-east-1.es.amazonaws.com","es.net.http.auth.user": "user","es.net.http.auth.pass": "pwd","es.port": "443","es.nodes.wan.only": "true" ,"connectionName":"my-custom-es-connection"}, transformation_ctx = "DataSource0")

AWS Glue Studio provides a visual ETL console that can also auto-generate the preceding code to construct a DynamicFrame for a deployed Spark connector (as described later in this post).

Athena Federated Query interface

AWS Glue Spark runtime also supports connectors developed with the Athena connector interface for federated queries. Similar to the Spark DataSource API, it requires implementation of two key handler interfaces: MetadataHandler and RecordHandler.

The MetadataHandler implementation runs on the Spark driver and contains the functions required to compute the schema, tables, and table partitions, and plan the actual scan by splitting the reads of individual partitions into different splits. See the following code:

public class MyMetadataHandler extends MetadataHandler{
       ListSchemasResponse doListSchemaNames(BlockAllocator allocator, ListSchemasRequest request) { … }

       ListTablesResponse doListTables(BlockAllocator allocator, ListTablesRequest request) { … }

       GetTableResponse doGetTable(BlockAllocator allocator, GetTableRequest request) { … }

       void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request, QueryStatusChecker queryStatusChecker) { … }

       GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest request) { … }
}

The RecordHandler implements the reader to scan the data from the underlying data store associated with the split contained in the ReadRecordsRequest structure.

AWS Glue custom connectors uses the Athena RecordHandler interface, but it do not need the BlockSpiller implementation or use AWS Lambda to read the data from the underlying data store. Instead, the implementation directly runs inline within each Spark executor to return the records as Apache Arrow column vector batches. Refer the following code:

public class MyRecordHandlerextends RecordHandler{

void readWithConstraint(ConstraintEvaluator constraints, BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker){…}
}

AWS Glue Spark runtime can convert records returned by plugging in an Athena connector to an AWS Glue DynamicFrame as follows:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.athena", connection_options = {"tableName":"table","schemaName":"schema","connectionName":"my-custom-athena-connection"}, transformation_ctx = "DataSource0")

JDBC interface

AWS Glue Spark runtime also allows you to plug in any connector compliant with the JDBC interface. It allows you to pass in any connection option available with the JDBC connector as follows:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"dbTable":"Account","connectionName":"my-custom-jdbc-connection"}, transformation_ctx = "DataSource0")

Advanced ETL and analytics with AWS Glue

AWS Glue Spark runtime also provides different features supported out-of-the-box with the custom connectors to enable advanced extract, data transformations, and load.

AWS Glue Studio for visual authoring of ETL jobs

Data type mapping

You can type cast the columns while reading them from the underlying data store itself. For example, a dataTypeMapping of {"INTEGER":"STRING"} casts all integer columns to string while parsing the records and constructing the DynamicFrame. This also helps you cast columns to types of your choice. Refer the following code:

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}", connectionName":"test-connection-snowflake-jdbc"}, transformation_ctx = "DataSource0")

Partitioning for parallel reads

AWS Glue allows you to read data in parallel from the data store by partitioning it on a column by specifying the partitionColumn, lowerBound, upperBound, and numPartitions. This allows you to use data parallelism and multiple Spark executors allocated for the Spark application. Refer the following code, which reads data from Snowflake using 4 Spark executors in parallel. Data is partitioned across executors uniformly along the id column in the range [0, 200]:

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"upperBound":"200","numPartitions":"4","partitionColumn":"id","lowerBound":"0","connectionName":"my-connection-snowflake"}, transformation_ctx = "DataSource0")

Glue Data Catalog connections

You can encapsulate all your connection properties with Glue Data Catalog connections and supply the connection name as follows. Integration with Glue Data Catalog connections allows you to use the same connection properties across multiple calls in a single Spark application or across different applications. See the following code:

DataSource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"connectionName":"my-connection-snowflake"}, transformation_ctx = "DataSource0")

Secrets Manager for credentials

The Data Catalog connection can also contain a secretId corresponding to a secret stored in AWS Secrets Manager that can be used to securely gather authentication and credentials information at runtime. For more details on using a secretId on the AWS Glue Studio console, see Adding connectors to AWS Glue Studio. secretId can also be specified within the ETL script as follows.

DataSource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"connectionName":"my-connection-snowflake", "secretId"-> "my-secret-id"}, transformation_ctx = "DataSource0")

Secret Id can be used to store credentials for different authentication mechanisms that your connector can support such as username/password, access keys, and OAuth.

SQL queries at source: Filtering with row predicates and column projections

AWS Glue Spark runtime allows you to push down SQL queries to filter data at source with row predicates and column projections. This allows you to load filtered data faster from data stores that support pushdowns. An example SQL query pushed down to a JDBC data source is SELECT id, name, department FROM department WHERE id < 200. Refer the following code:

DataSource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"query":"SELECT id, name, department FROM department WHERE id < 200","connectionName":"my-connection-snowflake "}, transformation_ctx = "DataSource0")

Job bookmarks

AWS Glue job bookmarks allows for incremental loading of data from JDBC sources. It keeps track of the last processed record from the data store and processes new data records in subsequent AWS Glue job runs. Job bookmarks use the primary key as the default column as the bookmark key if it increases or decreases sequentially. Refer the following code that uses a transformation_ctx with job bookmarks enabled for the job:

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"connectionName":"test-connection-snowflake-jdbc"}, transformation_ctx = "DataSource0")

AWS Glue transformations

AWS Glue offers more than 35 commonly used data transformation operations with DynamicFrames and Spark DataFrames. These transforms allow you to get insights from your data and prepare it for further analytics using hundreds of available Spark SQL functions. These transformations include popular functions for schema manipulation, projecting columns, and performing joins across different data sources; transforming data with map, split, and explode; flattening nested datasets with relationalize and unnest; group and aggregate records; and run arbitrary SQL on datasets.

VPC support for networking

AWS Glue jobs allow you to securely connect to your data stores within a private VPC subnet. You can also enable a NAT (Network Address Translation) gateway within a VPC to access both VPC resources and public internet.

Building, testing, and validating your connector locally

After developing the connector for your favorite data store with the interface of your choice, you can follow the instructions to build the connector using Maven by doing a maven install. This builds the connector and installs the resulting JAR into your local Maven repository. You can now include this JAR in the class path of your IDE or AWS Glue Spark runtime downloaded from Maven.

After you build and import the JAR, you can test it locally by plugging it in AWS Glue Spark runtime and writing a validation test. We provide sample validation tests in the AWS Glue’s GitHub repository. These cover several scenarios for both local testing and validation on the AWS Glue job system. The following table lists the validation tests, the functionality they test, and the associated interfaces.

Test NameDescriptionJDBCSparkAthena
DataSourceTestTests connector connectivity and reading functionality.xxx
ReadWriteTestTests reading and writing end-to-end workflow.xxx
CatalogConnectionTestTests catalog connection integration.xxx
DataSchemaTestTests data schema from reading with the connector.xxx
SecretsManagerTestTests Secrets Manager integration.xx
DataSinkTestTests connector connectivity and writing functionalityxx
ColumnPartitioningTestTests connector column partitioning functionality.x
FilterPredicateTestTests connector filter predicate functionality.x
JDBCUrlTestTests connector extra parameters for JDBC Url functionality.x
DbtableQueryTestTests connector dbTable and query option functionality.x
DataTypeMappingTestTests connector custom data type mapping functionality.x

Functionality such as AWS Glue job bookmarks that allow incremental loads can be tested on the AWS Glue job system only. We also provide a Python script to run all tests together as a suite on the AWS Glue job system.

Packaging and deploying the connector on AWS Glue

We now discuss how you can package your connector and deploy it on AWS Glue using the BYOC workflow:

  1. Package the custom connector as a JAR and upload the JAR file to an Amaon S3 bucket in your account.
  2. Follow the flow to create a custom connector referencing the JAR in Amazon S3 from AWS Glue Studio.
  3. Instantiate a connection for that connector and create an AWS Glue job using it.

For step-by-step instructions on the BYOC workflow, see Creating custom connectors.

Alternatively, we also provide the scripts and instructions for you to share the connector publicly using AWS Marketplace for a price or free. For instructions on subscribing to the connector, see Subscribing to AWS Marketplace connectors.

Using AWS Glue Studio to author a Spark application

After you create a connection for using a BYOC or AWS Marketplace – AWS Glue connector, you can follow the instructions to visually author a Spark ETL application with AWS Glue Studio. These instructions are available here for Job Authoring with custom connectors. Following are screenshots from AWS Glue Studio:

Connectors on AWS Marketplace

Connectors on AWS Marketplace

Visually author Glue jobs using connectors with AWS Glue Studio

Step 1 – Select a connector

Following are screenshots from AWS Glue Studio:

Step 2 – Visually author the job using the associated connection

Conclusion

You can use two different mechanisms to use custom connectors with AWS Glue Spark runtime and AWS Glue Studio console. In this post, we discussed the user experience for seamless discovery and subscription to custom connectors, and walked you through developing and testing your own connectors with AWS Glue Spark runtime, and deploying them into your production Apache Spark applications for ETL and analytics workloads that run on AWS Glue.

Build a custom connector yourself or try one on AWS Marketplace with AWS Glue Studio.

If you would like to partner or add a new Glue connector to AWS Marketplace, please reach out to us at [email protected]

Resources

For additional resources, see the following:


About the Authors

Bo Li is a software engineer in AWS Glue and devoted to designing and building end-to-end solutions to address customer’s data analytic and processing needs with cloud-based data-intensive technologies.

 

 

 

Yubo Xu is a Sofware Development Engineer on the AWS Glue team. His main focus is to improve the stability and efficiency of Spark runtime for AWS Glue and the easiness to connect to various data sources. Outside of work, he enjoys reading books and hiking the trails in the Bay area with his dog, Luffy, a one-year old Shiba Inu.

 

 

Xiaoxi Liu is a software engineer at AWS Glue team. Her passion is building scalable distributed systems for efficiently managing big data on cloud and her concentrations are distributed system, big data and cloud computing

 

 

Mohit Saxena is a Software Development Manager at AWS Glue. His team works on Glue’s Spark runtime to enable new customer use cases for efficiently managing data lakes on AWS and optimize Apache Spark for performance and reliability.

Performing data transformations using Snowflake and AWS Glue

Post Syndicated from Srinivas Kesanapally original https://aws.amazon.com/blogs/big-data/performing-data-transformations-using-snowflake-and-aws-glue/

In the connected world, data is getting generated from many different sources in a wide variety of data formats. Enterprises are looking for tools to ingest from these evolving data sources as well as programmatically customize the ingested data to meet their data warehousing needs. You also need solutions that help you quickly meet your business needs without provisioning any hardware or software resources, keeping costs low with the pay-as-you-use model.

AWS Glue is serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all the capabilities needed for data integration and analyzes your data in minutes instead of weeks or months.

To further support wide variety of use cases, AWS Glue has launched a new capability at AWS re:Invent 2020 to support custom third party connectors that will help users to easily orchestrate data integration workflow visually using AWS Glue Studio in minutes with just few clicks. AWS Glue Customer Connectors help users to search and select connectors from the AWS Marketplace or bring their own connectors.  Using this new feature, users can easily connect to Snowflake with few clicks using their own Snowflake connector and start orchestrating the data pipeline in minutes.

In this post, we go over how to unify your datasets in your Amazon Simple Storage Service (Amazon S3) data lake with data in Snowflake and read and transform it using AWS Glue. Though not addressed in this post, you can also read data from Amazon S3, perform transformations on it using AWS Glue, persist it into Snowflake by customizing the generated AWS Glue script.

Solution overview

The following architecture diagram shows how AWS Glue connects to Snowflake for data preparation.

The following architecture diagram shows how AWS Glue connects to Snowflake for data preparation.

You upload the Snowflake JDBC connector JAR file into your S3 bucket and define an AWS Identity and Access Management (IAM) role that has permissions to read from this bucket, write to a destination S3 bucket, and run AWS Glue jobs. Then, you define your credentials to connect to Snowflake either in AWS Secrets Manager or define it on the AWS Glue Studio console, and create a job that can load the JAR file from your S3 bucket and connect to Snowflake to get the data and save it to the defined S3 bucket location. With the same JDBC connection, you also can read data from your S3 bucket and write to Snowflake.

Creating a custom connector

To implement this solution, you first create a custom connector.

  1. On the AWS Glue Studio console, under Connectors, choose Create custom connector.

On the AWS Glue Studio console, under Connectors, choose Create custom connector.

  1. For Connector S3 URL, enter the S3 location where you uploaded the Snowflake JDBC connector JAR file.
  2. For Name, enter a name (for this post, we enter snowflake-jdbc-connector).
  3. For Connector type, choose JDBC.
  4. For Class name, enter the Snowflake JDBC driver class name, snowflake.client.jdbc.SnowflakeDriver.
  5. For JDBC URL base, enter the following URL (provide your own account): jdbc:snowflake://<snowflake account info> /?user=${Username}&password=${Password}&warehouse=${warehouse}.
  6. For URL parameter delimiter, Enter &.
  7. Choose Create connector.

8. Choose Create connector.

Creating a connection

To create a JDBC connection to Snowflake, complete the following steps:

  1. On the Connectors page, select the connector.
  2. Choose Create connection.

Choose Create connection.

  1. For Name, enter a name, such as snowflake-glue-jdbc-connection.
  2. For Description, enter a meaningful description to identify the connection.
  3. For JDBC URL format, choose default.

You have an option to enter a user name and password or use Secrets Manager to store your encrypted credentials.

  1. For this post, for Data source credentials, select Use a secret.
  2. For Secret, choose your secret.
  3. For Additional URL parameters, provide the following parameters needed to run a SQL statement in Snowflake:
    1. warehouse – Virtual Snowflake warehouse to use to run the query. Replace {warehouse} with a valid value.
    2. db – The Snowflake database name.
    3. schema – The Snowflake database schema.
  4. Verify that the JDBC URL is well formed.

Verify that the JDBC URL is well formed.

Creating a job

You’re now ready to define the job using this connection.

  1. On the Connectors page, select your connection.
  2. Choose Create job.

Choose Create job.

  1. For Name, enter a name (for this post, we enter untitled job).
  2. For Description, enter a meaningful description for the job.
  3. For IAM Role, choose the role that has access to the target S3 location where job is writing to and the source location from where it’s loading the Snowflake JDBC JAR file and also to run the AWS Glue job (use the AWS Glue service role).
  4. Use the default options for Type, Glue version, Language, Worker type, Number of workers, Number of retries, and Job timeout.
  5. For Job bookmark, choose Disable.

For Job bookmark, choose Disable.

  1. Save the job.
  2. On the Visual tab, go to the Data Source properties-connector tab to specify the table or query to read from Snowflake.
  3. Choose Save.

Choose Save.

  1. In the Visual tab, choose the + icon to create a new S3 node for the destination.
  2. On the Node properties tab, pay close attention to choose the node as Target node.

On the Node properties tab, pay close attention to choose the node as Target node.

  1. On the Data target properties tab, define the S3 bucket location to where AWS Glue is writing the results to.

On the Data target properties tab, define the S3 bucket location to where AWS Glue is writing the results to.

  1. Add an Apply Mapping transformation to map Snowflake column name to destination column

Add an Apply Mapping transformation to map Snowflake column name to destination column

  1. Save your settings.
  2. On the Script tab, look at the script generated by AWS Glue for verification.

On the Script tab, look at the script generated by AWS Glue for verification.

  1. Run the job and validate that the table data is successfully stored in the specified S3 bucket location

In the following screenshot, I upload three records from my employee table in Snowflake into my S3 bucket.

In the following screenshot, I upload three records from my employee table in Snowflake into my S3 bucket.

The following screenshot shows that my S3 bucket has the data from Snowflake.

The following screenshot shows that my S3 bucket has the data from Snowflake.

Conclusion

In this post, you went over how AWS Glue Console integration with Snowflake has simplified the process of connecting to Snowflake and apply transformations on it without writing a single line of code and you also learnt how to define Snowflake connection parameters in AWS Glue, connect to Snowflake from AWS Glue, read from Snowflake using AWS Glue and apply transformations to meet your business needs.


About the Author

Srinivas Kesanapally  is a principal partner solution architect at AWS and has over 25 years of experience in working with database and analytics products from traditional to modern database vendors and has helped many large technology companies in designing data analytics solutions as well as led engineering teams involved in modernizing data analytic platforms.

Building AWS Glue Spark ETL jobs by bringing your own JDBC drivers for Amazon RDS

Post Syndicated from Srikanth Sopirala original https://aws.amazon.com/blogs/big-data/building-aws-glue-spark-etl-jobs-by-bringing-your-own-jdbc-drivers-for-amazon-rds/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. AWS Glue has native connectors to connect to supported data sources either on AWS or elsewhere using JDBC drivers. Additionally, AWS Glue now enables you to bring your own JDBC drivers (BYOD) to your Glue Spark ETL jobs. This feature enables you to connect to data sources with custom drivers that aren’t natively supported in AWS Glue, such as MySQL 8 and Oracle 18. You can also use multiple JDBC driver versions in the same AWS Glue job, enabling you to migrate data between source and target databases with different versions. For more information, see Connection Types and Options for ETL in AWS Glue.

This post shows how to build AWS Glue ETL Spark jobs and set up connections with custom drivers with Oracle18 and MySQL8 databases.

Solution overview

We discuss three different use cases in this post, using AWS Glue, Amazon RDS for MySQL, and Amazon RDS for Oracle.

In the following architecture, we connect to Oracle 18 using an external ojdbc7.jar driver from AWS Glue ETL, extract the data, transform it, and load the transformed data to Oracle 18.

In the following architecture, we connect to Oracle 18 using an external ojdbc7.jar driver from AWS Glue ETL, extract the data, transform it, and load the transformed data to Oracle 18.

In the second scenario, we connect to MySQL 8 using an external mysql-connector-java-8.0.19.jar driver from AWS Glue ETL, extract the data, transform it, and load the transformed data to MySQL 8.

In the second scenario, we connect to MySQL 8 using an external mysql-connector-java-8.0.19.jar driver from AWS Glue ETL, extract the data, transform it, and load the transformed data to MySQL 8.

In the third scenario, we set up a connection where we connect to Oracle 18 and MySQL 8 using external drivers from AWS Glue ETL, extract the data, transform it, and load the transformed data to Oracle 18.

In the third scenario, we set up a connection where we connect to Oracle 18 and MySQL 8 using external drivers from AWS Glue ETL, extract the data, transform it, and load the transformed data to Oracle 18.

Prerequisites

Before getting started, you must complete the following prerequisites:

  1. Create an AWS Identity and Access Management (IAM) user with sufficient permissions to interact with the AWS Management Console. Your IAM permissions must also include access to create IAM roles and policies created by the AWS CloudFormation template provided in this post.
  2. Create am IAM policy for AWS Glue.
  3. Before setting up the AWS Glue job, you need to download drivers for Oracle and MySQL, which we discuss in the next section.

Downloading drivers for Oracle and MySQL

To download the required drivers for Oracle and MySQL, complete the following steps:

  1. Download the MySQL JDBC connector.
  2. Select the operating system as platform independent and download the .tar.gz or .zip file (for example, mysql-connector-java-8.0.19.tar.gz or mysql-connector-java-8.0.19.zip) and extract it.
  3. Pick MySQL connector .jar file (such as mysql-connector-java-8.0.19.jar) and upload it into your Amazon Simple Storage Service (Amazon S3) bucket.
  4. Make a note of that path because you use it later in the AWS Glue job to point to the JDBC driver.
  5. Similarly, download the Oracle JDBC connector (ojdbc7.jar).

This post is tested for mysql-connector-java-8.0.19.jar and ojdbc7.jar drivers, but based on your database types, you can download and use appropriate version of JDBC drivers supported by the database.

  1. Upload the Oracle JDBC 7 driver to (ojdbc7.jar) to your S3 bucket.
  2. Make a note of that path, because you use it in the AWS Glue job to establish the JDBC connection with the database.
  3. Make sure to upload the three scripts (OracleBYOD.py, MySQLBYOD.py, and CrossDB_BYOD.py) in an S3 bucket.
  4. Save the following code as py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    connection_oracle18_options_source_emp = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "employee",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
    
    connection_oracle18_options_source_dept = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "dept",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
        
    connection_oracle18_options_target_emp_dept = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "emp_dept",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
        
    # Read DynamicFrame from Oracle 
    df_emp = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle18_options_source_emp)
    df_emp = ApplyMapping.apply(frame = df_emp, mappings = [("employee_id", "integer", "employee_id", "integer"), ("first_name", "string", "first_name", "string"), ("last_name", "string", "last_name", "string"), ("email", "string", "email", "string"), ("phone_number", "string", "phone_number", "string"), ("hire_date", "string", "hire_date", "string"), ("job_id", "string", "job_id", "string"), ("salary", "long", "salary", "long"), ("commission_pct", "long", "commission_pct", "long"), ("manager_id", "long", "manager_id", "long"), ("department_id", "integer", "department_id", "integer")])
    df_emp = df_emp.drop_fields(['phone_number','hire_date','job_id','salary','commission_pct','manager_id'])
    df_dept = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle18_options_source_dept)
    df_dept = ApplyMapping.apply(frame = df_dept, mappings = [("department_id", "integer", "dept_id", "integer"), ("dept_name", "string", "dept_name", "string")])
    
    
    df_emp.printSchema()
    df_dept.printSchema()
    
    df_emp_dept = Join.apply(df_emp, df_dept, 'department_id', 'dept_id')
    df_emp_dept = df_emp_dept.drop_fields(['department_id','dept_id'])
    df_emp_dept = DropNullFields.apply(frame = df_emp_dept)
    
    df_emp_dept.printSchema()
    
    # Write data to Oracle 
    glueContext.write_from_options(frame_or_dfc=df_emp_dept, connection_type="oracle", connection_options=connection_oracle18_options_target_emp_dept)

  1. Save the following code as MySQLBYOD.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    connection_mysql8_options_source_emp = {
        "url": "jdbc:mysql://<MySQL RDS Endpoint>:3306/byod",
        "dbtable": "employee",
        "user": "MySQLadmin",
        "password": "MYSQLadmin123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/mysql-connector-java-8.0.19.jar",
        "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
        
    connection_mysql8_options_source_dept = {
        "url": "jdbc:mysql://<MySQL RDS Endpoint>:3306/byod",
        "dbtable": "dept",
        "user": "MySQLadmin",
        "password": "MYSQLadmin123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/mysql-connector-java-8.0.19.jar",
        "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
    
    connection_mysql8_options_target_emp_dept = {
        "url": "jdbc:mysql://<MySQL RDS Endpoint>:3306/byod",
        "dbtable": "emp_dept",
        "user": "MySQLadmin",
        "password": "MYSQLadmin123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/mysql-connector-java-8.0.19.jar",
        "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
        
    
    # Read from JDBC databases with custom driver
    df_emp = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options_source_emp)
    df_emp = ApplyMapping.apply(frame = df_emp, mappings = [("employee_id", "integer", "employee_id", "integer"), ("first_name", "string", "first_name", "string"), ("last_name", "string", "last_name", "string"), ("email", "string", "email", "string"), ("phone_number", "string", "phone_number", "string"), ("hire_date", "string", "hire_date", "string"), ("job_id", "string", "job_id", "string"), ("salary", "long", "salary", "long"), ("commission_pct", "long", "commission_pct", "long"), ("manager_id", "long", "manager_id", "long"), ("department_id", "integer", "department_id", "integer")])
    
    #print "Applied mapping to the Glue DynamicFrame"
    df_emp = df_emp.drop_fields(['phone_number','hire_date','job_id','salary','commission_pct','manager_id'])
    df_dept = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options_source_dept)
    df_dept = ApplyMapping.apply(frame = df_dept, mappings = [("department_id", "integer", "dept_id", "integer"), ("dept_name", "string", "dept_name", "string")])
    
    df_emp.printSchema()
    df_dept.printSchema()
    
    df_emp_dept = Join.apply(df_emp, df_dept, 'department_id', 'dept_id')
    df_emp_dept = df_emp_dept.drop_fields(['department_id','dept_id'])
    df_emp_dept = DropNullFields.apply(frame = df_emp_dept)
    
    df_emp_dept.printSchema()
    
    glueContext.write_from_options(frame_or_dfc=df_emp_dept, connection_type="mysql", connection_options=connection_mysql8_options_target_emp_dept)
    

  1. Save the following code as CrossDB_BYOD.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    connection_mysql8_options_source_emp = {
        "url": "jdbc:mysql://<MySQL RDS Endpoint>:3306/byod",
        "dbtable": "employee",
        "user": "MySQLadmin",
        "password": "MYSQLadmin123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/mysql-connector-java-8.0.19.jar",
        "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}
    
    connection_oracle18_options_source_dept = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "dept",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
    
    connection_oracle18_options_target_emp_dept = {
        "url": "jdbc:oracle:thin://@<Oracle RDS Endpoint>:1521:orcl",
        "dbtable": "emp_dept",
        "user": "byod",
        "password": "Awsuser123",
        "customJdbcDriverS3Path": "s3://<Bucket>/<Folder>/ojdbc7.jar",
        "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"}
    
    # Read DynamicFrame from Oracle
    df_emp = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options_source_emp)
    df_emp = ApplyMapping.apply(frame = df_emp, mappings = [("employee_id", "integer", "employee_id", "integer"), ("first_name", "string", "first_name", "string"), ("last_name", "string", "last_name", "string"), ("email", "string", "email", "string"), ("phone_number", "string", "phone_number", "string"), ("hire_date", "string", "hire_date", "string"), ("job_id", "string", "job_id", "string"), ("salary", "long", "salary", "long"), ("commission_pct", "long", "commission_pct", "long"), ("manager_id", "long", "manager_id", "long"), ("department_id", "integer", "department_id", "integer")])
    df_emp = df_emp.drop_fields(['phone_number','hire_date','job_id','salary','commission_pct','manager_id'])
    df_dept = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle18_options_source_dept)
    df_dept = ApplyMapping.apply(frame = df_dept, mappings = [("department_id", "integer", "dept_id", "integer"), ("dept_name", "string", "dept_name", "string")])
    
    
    
    df_emp.printSchema()
    df_dept.printSchema()
    
    df_emp_dept = Join.apply(df_emp, df_dept, 'department_id', 'dept_id')
    df_emp_dept = df_emp_dept.drop_fields(['department_id','dept_id'])
    df_emp_dept = DropNullFields.apply(frame = df_emp_dept)
    
    df_emp_dept.printSchema()
    
    # Write data to Oracle
    glueContext.write_from_options(frame_or_dfc=df_emp_dept, connection_type="oracle", connection_options=connection_oracle18_options_target_emp_dept)
    
    

Provisioning resources with AWS CloudFormation

The generic workflow of setting up a connection with your own custom JDBC drivers involves various steps. It’s a manual configuration that is error prone and adds overhead when repeating the steps between environments and accounts. With AWS CloudFormation, you can provision your application resources in a safe, repeatable manner, allowing you to build and rebuild your infrastructure and applications without having to perform manual actions or write custom scripts. The declarative code in the file captures the intended state of the resources to create, and allows you to automate the creation of AWS resources.

We provide this CloudFormation template for you to use. Review and customize it to suit your needs. Some of the resources deployed by this stack incur costs as long as they remain in use, like Amazon RDS for Oracle and Amazon RDS for MySQL.

This CloudFormation template creates the following resources:

  • A VPC
  • Two subnets
  • A route table
  • An internet gateway
  • A MySQL 8 database
  • An Oracle 18 database

To provision your resources, complete the following steps:

  1. Sign in to the console.
  2. Choose the us-east-1 Region in which to create the stack.
  3. Choose Next.
  4. Choose Launch Stack:

This step automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the CloudFormation template from within the console as required.

  1. For Stack name, enter a name.
  2. Change the other parameters as needed or keep the following default values:
    1. Oracle user nameoraadmin
    2. Oracle passwordoraadmin123
    3. MySQL usernameMySQLadmin
    4. MySQL passwordMYSQLadmin123

Change the other parameters as needed or keep the following default values:

  1. Choose Next.
  2. Choose Next
  3. Review the details and choose Create.

This stack creation can take up to 20 minutes.

After the stack creation is complete, go to the Outputs tab on the AWS CloudFormation console and note the following values (you use these in later steps):

  • MySQLJDBCConnectionString
  • OracleJDBCConnectionString

Configuring an AWS Glue ETL job using your own drivers

Before creating an AWS Glue ETL, run the SQL script (database_scripts.sql) on both the databases (Oracle and MySQL) to create tables and insert data. For more information about connecting to the RDS DB instance, see How can I troubleshoot connectivity to an Amazon RDS DB instance that uses a public or private subnet of a VPC?

To set up AWS Glue connections, complete the following steps:

  1. On the AWS Glue console, under Databases, choose Connections.
  2. Choose Add Connection.
  3. For Connection Name, enter a name for your connection.
  4. For Connection Type, choose JDBC.
  5. For JDBC URL, enter a URL, such as jdbc:oracle:thin://@<hostname>:1521/ORCL for Oracle or jdbc:mysql://<hostname>:3306/mysql for MySQL.
  6. Enter the user name and password for the database.
  7. Select the VPC in which you created the RDS instance (Oracle and MySQL).
  8. Choose the subnet within your VPC. Refer to the CloudFormation stack Outputs tab for the subnet name.
  9. Choose the security group of the database. Refer to the CloudFormation stack Outputs tab for security group name.
  10. Choose Next.
  11. Check the connection details and choose Finish.

Make sure to add a connection for both databases (Oracle and MySQL).

Creating endpoints and a security group

Before testing the connection, make sure you create an AWS Glue endpoint and S3 endpoint in the VPC in which databases are created. Complete the following steps for both Oracle and MySQL instances:

  1. To create your AWS Glue endpoint, on the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Names, choose AWS Glue.
  4. Choose amazonaws.<region>.glue (for example, com.amazonaws.us-west-2.glue).
  5. Choose the VPC of the RDS for Oracle or RDS for MySQL
  6. Choose the security group of the RDS instances.
  7. Choose Create endpoint.

To create your S3 endpoint, you use Amazon Virtual Private Cloud (Amazon VPC).

  1. On the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Names, choose Amazon S3.
  4. Choose amazonaws.<region>.s3 (for example, com.amazonaws.us-west-2.s3).
  5. Choose the VPC of the RDS for Oracle or RDS for MySQL
  6. Choose the route table ID.
  7. Choose Create endpoint.

The RDS for Oracle or RDS for MySQL security group must include itself as a source in its inbound rules.

  1. On the Security Groups page, choose Edit Inbound Rules.
  2. Choose Add rule.
  3. For Type, choose All Traffic Type, for example glue-byod-stack1….
  4. For Source, choose the same security group.
  5. Choose Save Rules.

If both the databases are in the same VPC and subnet, you don’t need to create a connection for MySQL and Oracle databases separately. The reason for setting an AWS Glue connection to the databases is to establish a private connection between the RDS instances in the VPC and AWS Glue via S3 endpoint, AWS Glue endpoint, and Amazon RDS security group. It’s not required to test JDBC connection because that connection is established by the AWS Glue job when you run it. If you test the connection with MySQL8, it fails because the AWS Glue connection doesn’t support the MySQL 8.0 driver at the time of writing this post, therefore you need to bring your own driver.

Setting up AWS Glue ETL jobs

You’re now ready to set up your ETL job in AWS Glue. Complete the following steps for both connections:

  1. Edit the following parameters in the scripts (OracleBYOD.py, MySQLBYOD.py, and CrossDB_BYOD.py) and upload them in Amazon S3:
    1. url
    2. user
    3. password
    4. customJdbcDriverS3Path for sources and target tables

You can find the database endpoints (url) on the CloudFormation stack Outputs tab; the other parameters are mentioned earlier in this post. If you use another driver, make sure to change customJdbcDriverClassName to the corresponding class in the driver.

Alternatively, you can pass on this as AWS Glue job parameters and retrieve the arguments that are passed using the getResolvedOptions.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add Job.
  3. For Job Name, enter a name.
  4. For IAM role, choose the IAM role you created as a prerequisite.
  5. For Type, choose Spark.
  6. For Glue Version, choose Python (latest version).
  7. For This job runs, choose An existing script that you provide.
  8. Choose the Amazon S3 path where the script (OracleBYOD.py, MySQLBYOD.py, or CrossDB_BYOD.py) is stored.
  9. Under Advanced properties, enable Job bookmark.

Job bookmarks help AWS Glue maintain state information and prevent the reprocessing of old data.

  1. Keep the remaining settings as their defaults and choose
  2. For Connections, choose the Amazon RDS for Oracle connection for OracleBYOD.py, Amazon RDS for MySQL connection for MySQLBYOD.py, or Amazon RDS for Oracle and Amazon RDS for MySQL connection for CrossDB_BYOD.py.
  3. Choose Save job and edit scripts.
  4. Choose Run Job.
  5. When the job is complete, validate the data loaded in the target table.

Cleaning up

After you finish, don’t forget to delete the CloudFormation stack, because some of the AWS resources deployed by the stack in this post incur a cost as long as you continue to use them.

You can delete the CloudFormation stack to delete all AWS resources created by the stack.

  1. On the AWS CloudFormation console, on the Stacks page, select the stack to delete. The stack must be currently running.
  2. In the stack details pane, choose Delete.
  3. Choose Delete stack when prompted.

Summary

In this post, we showed you how to build AWS Glue ETL Spark jobs and set up connections with custom drivers with Oracle18 and MySQL8 databases using AWS CloudFormation. You can use this solution to use your custom drivers for databases not supported natively by AWS Glue.

If you have any questions or suggestions, please leave a comment.


About the Authors

Srikanth Sopirala is a Sr. Analytics Specialist Solutions Architect at AWS. He is a seasoned leader with over 20 years of experience, who is passionate about helping customers build scalable data and analytics solutions to gain timely insights and make critical business decisions. In his spare time, he enjoys reading, spending time with his family and road biking.

 

 

Naresh Gautam is a Sr. Analytics Specialist Solutions Architect at AWS. His role is helping customers architect highly available, high-performance, and cost-effective data analytics solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

 

 

Building fast ETL using SingleStore and AWS Glue

Post Syndicated from Saurabh Shanbhag original https://aws.amazon.com/blogs/big-data/building-fast-etl-using-singlestore-and-aws-glue/

Disparate data systems have become a norm in many companies. The reasons for this vary: different teams in the organization select data system best suited for its primary function, the responsibility for choosing these data systems may have been decentralized across different departments, a merged company may still use separate data systems from the formerly individual companies, and many more. Data Integration combines data from these disparate data sources and helps users throughout the organization to fully leverage the inherent value in the data to gain meaningful and valuable insights. AWS Glue is a fully managed serverless data integration service that makes it easy to extract, transform, and load (ETL) from various data sources for analytics and data processing with Apache Spark ETL jobs. AWS Glue Spark runtime supports connectivity to popular data sources such as Amazon Simple Storage Service (Amazon S3), Amazon Relational Database Service (Amazon RDS), Amazon DynamoDB, Amazon Redshift, and Apache Kafka.

We recognize the existence of disparate data systems that best fit your application needs. AWS Glue custom connectors in AWS Glue Studio extends AWS Glue support for data sources beyond the native connection types. Now you can discover and subscribe to AWS Glue ETL connectors from AWS Marketplace from data sources that best fit your needs. AWS Partner SingleStore provides a relational SQL database that can handle both transactional and analytical workloads in a single system. New applications that need to combine transactional and analytical (HTAP—hybrid transaction analytical processing) requirements can take advantage of SingleStore DB. SingleStore provides a SingleStore connector for AWS Glue based on Apache Spark Datasource through AWS Marketplace. The fully managed, scale-out Apache Spark environment for ETL jobs provided by AWS Glue matches well to SingleStore’s distributed SQL design.

This post shows how you can use AWS Glue custom connector from AWS Marketplace based on Apache Spark Datasource in AWS Glue Studio to create ETL jobs in minutes using an easy-to-use graphical interface.

The following architecture diagram shows SingleStore connecting with AWS Glue for an ETL job.

The following architecture diagram shows SingleStore connecting with AWS Glue for an ETL job.

Now you can easily subscribe to the SingleStore connector on AWS Marketplace and create a connection to your SingleStore cluster. VPC networking and integration with AWS Secrets Manager for authentication credentials are supported for the connection.

Walkthrough overview

In this post, we demonstrate how to connect a SingleStore cluster in an AWS Glue ETL job as the source, transform the data, and store it back on a SingleStore cluster and in Apache Parquet format on Amazon S3. We use the TPC-H Benchmark dataset that is available as a sample dataset in SingleStore.

To successfully create the ETL job using a custom ETL connector from AWS Marketplace, you complete the following steps:

  1. Store authentication credentials in Secrets Manager.
  2. Create an AWS Identity and Access Management (IAM) role for the AWS Glue ETL job.
  3. Configure the SingleStore connector and connection.
  4. Create an ETL job using the SingleStore connection in AWS Glue Studio.

Storing authentication credentials in Secrets Manager

AWS Glue provides integration with Secrets Manager to securely store connection authentication credentials. Follow these steps to create these credentials:

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Select a secret type, select Other type of secrets.
  3. For Secret key/value, set one row for each of the following parameters:
    1. ddlEndpoint
    2. database
    3. user
    4. password
  4. Choose Next.

Choose Next.

  1. For Secret name, enter aws-glue-singlestore-connection-info.
  2. Choose Next.
  3. Keep the Disable automatic rotation check box selected.
  4. Choose Next.
  5. Choose Store.

Creating an IAM role for the AWS Glue ETL job

In this section, you create a role with an attached policy to allow read-only access to credentials that are stored in Secrets Manager for the AWS Glue ETL job.

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the following JSON snippet, providing your Region and account ID:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "secretsmanager:GetSecretValue",
                    "secretsmanager:DescribeSecret"
                ],
                "Resource": "arn:aws:secretsmanager:<REGION>:<ACCOUNT_ID>:secret:aws-glue-*"
            }
        ]
    }

  4. Choose Review Policy.
  5. Give your policy a name, for example, GlueAccessSecretValue.
  6. In the navigation pane, choose Roles.
  7. Choose Create role.
  8. For Select type of trusted entity, choose AWS service.
  9. Choose Glue.

Choose Glue.

  1. Choose Next: Permissions.
  2. Search for the AWS managed policies AWSGlueServiceRole and AmazonEC2ContainerRegistryReadOnly policy, and select them.
  3. Search for GlueAccessSecretValue policy created before, and select it.
  4. For Role name, enter a name, for example, GlueCustomETLConnectionRole.
  5. Confirm the three policies are selected.

Confirm the three policies are selected.

Configuring your SingleStore connector and connection

To connect to SingleStore, complete the following steps:

  1. On the AWS Glue console, choose AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.

Choose Go to AWS Marketplace.

  1. Subscribe to the SingleStore connector for AWS Glue from AWS Marketplace.
  2. Activate the connector from AWS Glue Studio.
  3. In the navigation pane, under Connectors, choose Create connection.
  4. For name, enter a name, such as SingleStore_connection.
  5. For AWS Secret, choose the AWS secret value aws-glue-singlestore-connection-info created before.

For AWS Secret, choose the AWS secret value aws-glue-singlestore-connection-info created before.

  1. Choose Create connection.

Creating an ETL job using the SingleStore connection in AWS Glue Studio

After you define the SingleStore connection, you can start authoring the job using this connection.

  1. On the AWS Glue Studio console, choose Connectors.
  2. Select your connector and choose Create job.

Select your connector and choose Create job.

An untitled job is created with the connection as the source node.

  1. On the Job details page, for Name, enter SingleStore_tpch_transform_job.
  2. For Description, enter Glue job to transform tpch data from SingleStore DB.
  3. For IAM Role, choose GlueCustomETLConnectionRole.
  4. Keep the other properties at their default.

Keep the other properties at their default.

  1. On the Visual page, on the Data source properties – Connector tab, expand Connection options.
  2. For Key, enter dbtable.
  3. For Value, enter lineitem.

For Value, enter lineitem.

Because AWS Glue Studio is using information stored in the connection to access the data source instead of retrieving metadata information from a Data Catalog table, you must provide the schema metadata for the data source. Use the schema editor to update the source schema. For instructions on how to use the schema editor, see Editing the schema in a custom transform node.

Use the schema editor to update the source schema.

  1. Choose the + icon.
  2. For Node type, choose DropFields.
  3. On the Transform tab, select the fields to drop.

On the Transform tab, select the fields to drop.

  1. Choose the + icon.
  2. For Node type, choose Custom Transform.
  3. On the Transform tab, add to the custom script.

For this post, we calculate two additional columns, disc_price and price. Then we use glueContext.write_dynamic_frame to write the updated data back on SingleStore using the connection SingleStore_connection we created. See the following code:

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    from pyspark.sql.functions import col
    
    df = dfc.select(list(dfc.keys())[0]).toDF()
    df1 = df.withColumn("disc_price",(col("l_extendedprice")*(1-col("l_discount"))).cast("decimal(10,2)"))
    df2 = df1.withColumn("price", (col("disc_price")*(1+col("l_tax"))).cast("decimal(10,2)"))
    dyf = DynamicFrame.fromDF(df2, glueContext, "updated_lineitem")
    
    glueContext.write_dynamic_frame.from_options(frame = dyf, 
     connection_type = "marketplace.spark", 
     connection_options = {"dbtable":"updated_lineitem","connectionName":"SingleStore_connection"})
    
    return(DynamicFrameCollection({"CustomTransform0": dyf}, glueContext))

On the Transform tab, add to the custom script.

  1. On the Output schema tab, add the additional columns price and disc_price created in the custom script.

On the Output schema tab, add the additional columns price and disc_price created in the custom scrip

  1. Keep the default for node SelectFromCollection.
  2. Choose the icon.
  3. For Node type, choose Data Target – S3.
  4. On the Data target properties – S3 tab, for Format, choose Parquet.
  5. For S3 Target Location, enter s3://aws-glue-assets-{Your Account ID as a 12-digit number}-{AWS region}/output/.

On the Data target properties – S3 tab, for Format, choose Parquet.

  1. Choose Save.
  2. Choose Run.

In the following screenshot, a new table updated_lineitem is created with the two additional columns disc_price and price.

In the following screenshot, a new table updated_lineitem is created with the two additional columns disc_price and price.

Conclusion

In this post, you learned how to subscribe to the SingleStore connector for AWS Glue from AWS Marketplace, activate the connector from AWS Glue Studio, and create an ETL job in AWS Glue Studio that uses a SingleStore connector as the source and target using custom transform. You can use AWS Glue Studio to speed up the ETL job creation process, use connectors from AWS Marketplace, or bring in your own custom connectors, and allow different personas to transform data without any previous coding experience.


About the Author

Saurabh ShanbhagSaurabh Shanbhag is a Partner Solutions Architect at AWS and has over 12 years of experience in working with data integration and analytics products. He focuses on enabling partners to build and enhance joint solutions on AWS.

Migrating data from Google BigQuery to Amazon S3 using AWS Glue custom connectors

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/migrating-data-from-google-bigquery-to-amazon-s3-using-aws-glue-custom-connectors/

In today’s connected world, it’s common to have data sitting in various data sources in a variety of formats. Even though data is a critical component of decision making, for many organizations this data is spread across multiple public clouds. Organizations are looking for tools that make it easy to ingest data from these myriad data sources and be able to customize the data ingestion to meet their needs.

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. AWS Glue provides all the capabilities needed for data integration and analysis can be done in minutes instead of weeks or months. AWS Glue custom connectors, a new capability in AWS Glue and AWS Glue Studio that makes it easy for you to transfer data from SaaS applications and custom data sources to your data lake in Amazon S3. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. You can also build custom connectors and share them across teams, and integrate open source Spark connectors and Athena federated query connectors into you data preparation workflows. AWS Glue Connector for Google BigQuery allows migrating data cross-cloud from Google BigQuery to Amazon Simple Storage Service (Amazon S3). AWS Glue Studio is a new graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. You can visually compose data transformation workflows and seamlessly run them on AWS Glue’s Apache Spark-based serverless ETL engine.

In this post, we focus on using AWS Glue Studio to query BigQuery tables and save the data into Amazon Simple Storage Service (Amazon S3) in Parquet format, and then query it using Amazon Athena. To query BigQuery tables in AWS Glue, we use the new AWS Glue Connector for Google BigQuery from AWS Marketplace.

Solution Overview:

The following architecture diagram shows how AWS Glue connects to Google BigQuery for data ingestion.

The following architecture diagram shows how AWS Glue connects to Google BigQuery for data ingestion.

Prerequisites

Before getting started, make sure you have the following:

  • An account in Google Cloud, specifically a service account that has permissions to Google BigQuery
  • An AWS Identity and Access Management (IAM) user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI)
    • The IAM user also needs permissions to create an IAM role and policies

Configuring your Google account

We create a secret in AWS Secrets Manager to store the Google service account file contents as a base64-encoded string.

  1. Download the service account credentials JSON file from Google Cloud.

For base64 encoding, you can use one of the online utilities or system commands to do that. For Linux and Mac, you can use base64 <<service_account_json_file>> to print the file contents as a base64-encoded string.

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Secret type, select Other type of secret.
  3. Enter your key as credentials and the value as the base64-encoded string.
  4. Leave the rest of the options at their default.
  5. Choose Next.

Choose Next.

  1. Give a name to the secret bigquery_credentials.
  2. Follow through the rest of the steps to store the secret.

For more information, see Tutorial: Creating and retrieving a secret.

Creating an IAM role for AWS Glue

The next step is to create an IAM role with the necessary permissions for the AWS Glue job. Attach the following AWS managed policies to the role:

Create and attach a policy to allow reading the secret from Secrets Manager and write access to the S3 bucket.

The following sample policy demonstrates the AWS Glue job as part of this post. Always make sure to scope down the policies before using in a production environment. Provide your secret ARN for the bigquery_credentials secret you created earlier and the S3 bucket for saving data from BigQuery:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GetDescribeSecret",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager::<<account_id>>:secret:<<your_secret_id>>"
        },
        {
            "Sid": "S3Policy",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:ListBucket",
                "s3:GetBucketAcl",
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::<<your_s3_bucket>>",
                "arn:aws:s3:::<<your_s3_bucket>>/*"
            ]
        }
    ]
}

Subscribing to the Glue Connector for BigQuery

To subscribe to the connector, complete the following steps:

  1. Navigate to the AWS Glue Connector for Google BigQuery on AWS Marketplace.
  2. Choose Continue to Subscribe.

Choose Continue to Subscribe.

  1. Review the terms and conditions, pricing, and other details.
  2. Choose Continue to Configuration.
  3. For Delivery Method, choose your delivery method.
  4. For Software Version, choose your software version.
  5. Choose Continue to Launch.

7. Choose Continue to Launch.

  1. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio.

Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, bigquery).

For Name, enter a name for your connection (for example, bigquery).

  1. Optionally, choose a VPC, subnet, and security group.
  2. For AWS Secret, choose bigquery_credentials.
  3. Choose Create connection.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

Creating the ETL job in AWS Glue Studio

  1. On Glue Studio, choose Jobs.
  2. For Source, choose BigQuery.
  3. For Target, choose S3.
  4. Choose Create.

Choose Create.

  1. Choose ApplyMapping and delete it.
  2. Choose BigQuery.
  3. For Connection, choose bigguery.
  4. Expand Connection options.
  5. Choose Add new option.

Choose Add new option.

  1. Add following Key/Value.
    1. Key: parentProject, Value: <<google_project_name>>
    2. Key: table, Value: bigquery-public-data.covid19_open_data.covid19_open_data

Add following Key/Value.

  1. Choose S3 bucket.
  2. Choose format and Compression Type.
  3. Specify S3 Target Location.

Specify S3 Target Location.

  1. Choose Job details.
  2. For Name, enter BigQuery_S3.
  3. For IAM Role, choose the role you created.
  4. For Type, choose Spark.
  5. For Glue version, choose Glue 2.0 – Supports Spark 2.4, Scala 2, Python3.
  6. Leave rest of the options as defaults.
  7. Choose Save.

Choose Save.

  1. To run the job, choose the Run Job button.

To run the job, choose the Run Job button.

  1. Once the job run succeeds, check the S3 bucket for data.

Once the job run succeeds, check the S3 bucket for data.

In this job, we use the connector to read data from the Big Query public dataset for COVID-19. For more information, see Apache Spark SQL connector for Google BigQuery (Beta) on GitHub.

The code reads the covid19 table in an AWS Glue dynamic DataFrame and writes the data to Amazon S3.

Querying the data

You can now use the Glue Crawlers to crawl the data in S3 bucket. It will create a table covid. You can now go to Athena and query this data. The following screenshot shows our query results.

The following screenshot shows our query results.

Pricing considerations

There might be egress charges for migrating data out of Google BigQuery into Amazon S3. Review and calculate the cost for moving data into Amazon S3.

AWS Glue 2.0 charges $0.44 per DPU-hour, billed per second, with a 1-minute minimum for Spark ETL jobs. An Apache Spark job run in AWS Glue requires a minimum of 2 DPUs. By default, AWS Glue allocates 10 DPUs to each Apache Spark job. Modify the number of workers based on your job requirements. For more information, see AWS Glue pricing.

Conclusion

In this post, we learned how to easily use AWS Glue ETL to connect to BigQuery tables and migrate the data into Amazon S3, and then query the data immediately with Athena. With AWS Glue, you can significantly reduce the cost, complexity, and time spent creating ETL jobs. AWS Glue is serverless, so there is no infrastructure to set up or manage. You pay only for the resources consumed while your jobs are running.

For more information about AWS Glue ETL jobs, see Simplify data pipelines with AWS Glue automatic code generation and workflows and Making ETL easier with AWS Glue Studio.


About the Author

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. In his free time, he likes to watch movies and spend time with his family.

Building AWS Glue Spark ETL jobs using Amazon DocumentDB (with MongoDB compatibility) and MongoDB

Post Syndicated from Naresh Gautam original https://aws.amazon.com/blogs/big-data/building-aws-glue-spark-etl-jobs-using-amazon-documentdb-with-mongodb-compatibility-and-mongodb/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. AWS Glue has native connectors to connect to supported data sources on AWS or elsewhere using JDBC drivers. Additionally, AWS Glue now supports reading and writing to Amazon DocumentDB (with MongoDB compatibility) and MongoDB collections using AWS Glue Spark ETL jobs. This feature enables you to connect and read, transform, and load (write) data from and to Amazon DocumentDB and MongoDB collections into services such as Amazon Simple Storage Service (Amazon S3) and Amazon Redshift for downstream analytics. For more information, see Connection Types and Options for ETL in AWS Glue.

This post shows how to build AWS Glue ETL Spark jobs and set up connections with Amazon DocumentDB or MongoDB to read and load data using ConnectionType. The following diagram illustrates the three components of the solution architecture:

The following diagram illustrates the three components of the solution architecture:

Prerequisites

Before getting started, you must complete the following prerequisites:

  1. Create an AWS Identity and Access Management (IAM) user with sufficient permissions to interact with the AWS Management Console. Your IAM permissions must also include access to create IAM roles and policies created by the AWS CloudFormation template provided in this post.
  2. Create an IAM policy for AWS Glue.
  3. Save the following code as DocumentDB-Glue-ETL.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    output_path = "s3://<bucket>/<folder>/" + str(time.time()) + "/"
    documentdb_uri = "mongodb://<host name>:27017"
    documentdb_write_uri = "mongodb://<host name>:27017"
    
    read_docdb_options = {
        "uri": documentdb_uri,
        "database": "test",
        "collection": "profiles",
        "username": "<username>",
        "password": "<password>",
        "ssl": "true",
        "ssl.domain_match": "false",
        "partitioner": "MongoSamplePartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id"
    }
    
    write_documentdb_options = {
        "uri": documentdb_write_uri,
        "database": "test",
        "collection": "collection1",
        "username": "<username>",
        "password": "<password>",
        "ssl": "true",
        "ssl.domain_match": "false",
        "partitioner": "MongoSamplePartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id"
    }
    
    # Get DynamicFrame from DocumentDB
    dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
                                                                   connection_options=read_docdb_options)
    
    # Write DynamicFrame to DocumentDB
    glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
                                                 connection_options=write_documentdb_options)
    
    job.commit()

  1. Save the following code as MongoDB-Glue-ETL.py in your S3 bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext, SparkConf
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import time
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    output_path = "s3://<bucket>/<folder>/" + str(time.time()) + "/"
    mongo_uri = "mongodb://<host name or IP>:27017"
    write_uri = "mongodb://<host name or IP>:27017"
    
    read_mongo_options = {
        "uri": mongo_uri,
        "database": "test",
        "collection": "profiles",
        "username": "<username>",
        "password": "<password>",
        "partitioner": "MongoSamplePartitioner",
        "partitionerOptions.partitionSizeMB": "10",
        "partitionerOptions.partitionKey": "_id"}
    
    write_mongo_options = {
        "uri": write_uri,
        "database": "test",
        "collection": "collection1",
        "username": "<username>",
        "password": "<password>"
    }
    
    
    # Get DynamicFrame from MongoDB
    dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
                                                                  connection_options=read_mongo_options)
    # Write DynamicFrame to MongoDB 
    glueContext.write_dynamic_frame.from_options(dynamic_frame, connection_type="mongodb", connection_options=write_mongo_options)
    
    job.commit()

Provisioning resources with AWS CloudFormation

For this post, we provide CloudFormation templates for you to review and customize to your needs. Some of the resources deployed by this stack incur costs as long as they remain in use, such as Amazon DocumentDB and Amazon EC2.

For instructions on launching your stacks, see Launching an Amazon DocumentDB AWS CloudFormation Stack and MongoDB on the AWS Cloud: Quick Start Reference Deployment.

The Amazon DocumentDB stack creation can take up to 15 minutes, and MongoDB stack creation can take up 60 minutes.

When stack creation is complete, go to the Outputs tab for the stack on the AWS CloudFormation console and note down the following values (you use these in later steps):

  • DocumentDB CloudFormation – ClusterEndpoint and ClusterPort
  • MongoDB CloudFormation – PrimaryReplicaNodeIp

Preparing your collection

When the CloudFormation stack is complete, use an EC2 instance to connect to your Amazon DocumentDB cluster. For instructions, see Install the mongo shell, Connect to your Amazon DocumentDB cluster, and Insert and query data.

For instructions on accessing Amazon DocumentDB from Amazon EC2 in the same VPC, see Connect Using Amazon EC2.

For more information about MongoDB, see Connect to MongoDB nodes and Testing MongoDB.

Before creating your AWS Glue ETL job, use the mongo shell to insert a few entries into a collection titled profiles. See the following code:

s0:PRIMARY> use test
s0:PRIMARY> db.profiles.insertMany([
            { "_id" : 1, "name" : "Matt", "status": "active", "level": 12, "score":202},
            { "_id" : 2, "name" : "Frank", "status": "inactive", "level": 2, "score":9},
            { "_id" : 3, "name" : "Karen", "status": "active", "level": 7, "score":87},
            { "_id" : 4, "name" : "Katie", "status": "active", "level": 3, "score":27}
            ])

You’re now ready to configure AWS Glue ETL jobs using Amazon DocumentDB and MongoDB ConnectionType.

Setting up AWS Glue connections

You set up two separate connections for Amazon DocumentDB and MongoDB when the databases are in two different VPCs (or if you deployed the databases using the provided CloudFormation template). Complete the following steps for both connections. We first walk you through the Amazon DocumentDB connection.

  1. On the AWS Glue console, under Databases, choose Connections.
  2. Choose Add connection.
  3. For Connection name, enter a name for your connection.
  4. If you have SSL enabled on your Amazon DocumentDB cluster (which is what the CloudFormation template in this post used), select Require SSL connection.
  5. For Connection Type, choose Amazon DocumentDB or MongoDB.
  6. Choose Next.

Choose Next.

  1. For Amazon DocumentDB URL, enter a URL using the output from the CloudFormation stack, such as mongodb://host:port/databasename (use the default port, 27017).
  2. For Username and Password, enter the credentials you entered as parameters when creating the CloudFormation stack.
  3. For VPC, choose the VPC in which you created databases (Amazon DocumentDB and MongoDB).
  4. For Subnet, choose the subnet within your VPC.
  5. For Security groups, select your security group.
  6. Choose Next.

Choose Next.

  1. Review the connection details and choose Finish.

Review the connection details and choose Finish.

Similarly, add the connection for MongoDB with the following changes to the steps:

  • If you used the CloudFormation template in this post, don’t select Require SSL connection for MongoDB
  • For Connection Type, choose MongoDB
  • For MongoDB URL, enter a URL using the output from the CloudFormation stack, such as mongodb://host:port/databasename (use the default port, 27017)

Creating an AWS Glue endpoint, S3 endpoint, and security group

Before testing the connections, make sure you create an AWS Glue endpoint and S3 endpoint in the VPC in which the databases are created. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:

  1. To create your AWS Glue endpoint, on the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Name, choose AWS Glue.
  4. Search for and select com.amazonaws.<region>.glue (for example, com.amazonaws.us-west-2.glue). Enter the appropriate Region where the database instance was created.
  5. For VPC, choose the VPC of the Amazon DocumentDB

For VPC, choose the VPC of the Amazon DocumentDB

  1. For Security group, select the security groups of the Amazon DocumentDB cluster.
  2. Choose Create endpoint.

Choose Create endpoint.

  1. To create your S3 endpoint, on the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. For Service Name, choose Amazon S3.
  4. Search for and select com.amazonaws.<region>.s3 (for example, com.amazonaws.us-west-2.s3). Enter the appropriate Region.
  5. For VPC, choose the VPC of the Amazon DocumentDB
  6. For Configure route tables, select the route table ID of the associated subnet of the database.

13. For Configure route tables, select the route table ID of the associated subnet of the database.

  1. Choose Create endpoint.

Choose Create endpoint.

Similarly, add an AWS Glue endpoint and S3 endpoint for MongoDB with the following changes:

  • Choose the VPC of the Amazon MongoDB instance

The Amazon security group must include itself as a source in its inbound rules. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:

  1. On the Security Groups page, choose Edit Inbound Rules.
  2. Choose Add rule.
  3. For Type, choose All traffic.
  4. For Source, choose the same security group.
  5. Choose Save rules.

Choose Save rules.

The objective of setting up a connection is to establish private connections between the Amazon DocumentDB and MongoDB instances in the VPC and AWS Glue via the S3 endpoint, AWS Glue endpoint, and security group. It’s not required to test the connection because that connection is established by the AWS Glue job when you run it. At the time of writing, testing an AWS Glue connection is not supported for Amazon DocumentDB connections.

Code for building the AWS Glue ETL job

The following sample code sets up a read connection with Amazon DocumentDB for your AWS Glue ETL job (PySpark):

read_docdb_options = {
    "uri": documentdb_uri,
    "database": "test",
    "collection": "profiles",
    "username": "<username>",
    "password": "<password>",
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

The following sample code sets up a write connection with Amazon DocumentDB for your AWS Glue ETL job (PySpark):

write_documentdb_options = {
    "uri": documentdb_write_uri,
    "database": "test",
    "collection": "collection1",
    "username": "<username>",
    "password": "<password>",
    "ssl": "true",
    "ssl.domain_match": "false",
    "partitioner": "MongoSamplePartitioner",
    "partitionerOptions.partitionSizeMB": "10",
    "partitionerOptions.partitionKey": "_id"
}

The following sample code creates an AWS Glue DynamicFrame by using the read and write connections for your AWS Glue ETL job (PySpark):

# Get DynamicFrame from DocumentDB
dynamic_frame2 = glueContext.create_dynamic_frame.from_options(connection_type="documentdb",
                                                               connection_options=read_docdb_options)

# Write DynamicFrame to DocumentDB
glueContext.write_dynamic_frame.from_options(dynamic_frame2, connection_type="documentdb",
                                             connection_options=write_documentdb_options)

Setting up AWS Glue ETL jobs

You’re now ready to set up your ETL job in AWS Glue. Complete the following steps for both Amazon DocumentDB and MongoDB instances separately:

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Job Name, enter a name.
  4. For IAM role, choose the IAM role you created as a prerequisite.
  5. For Type, choose Spark.
  6. For Glue Version, choose Python (latest version).
  7. For This job runs, choose An existing script that you provide.
  8. Choose the Amazon S3 path where the script (DocumentDB-Glue-ETL.py) is stored.
  9. Under Advanced properties, enable Job bookmark.

Job bookmarks help AWS Glue maintain state information and prevent the reprocessing of old data.

  1. Keep the remaining settings at their defaults and choose Next.
  2. For Connections, choose the Amazon DocumentDB connection you created.
  3. Choose Save job and edit scripts.
  4. Edit the following parameters:
    1. documentdb_uri or mongo_uri
    2. documentdb_write_uri or write_uri
    3. user
    4. password
    5. output_path
  5. Choose Run job.

When the job is finished, validate the data loaded in the collection.

Similarly, add the job for MongoDB with the following changes:

  • Choose the Amazon S3 path where the script (MongoDB-Glue-ETL.py) is stored
  • For Connections, choose the Amazon MongoDB connection you created
  • Change the parameters applicable to MongoDB (mongo_uri and write_uri)

Cleaning up

After you finish, don’t forget to delete the CloudFormation stack, because some of the AWS resources deployed by the stack in this post incur a cost as long as you continue to use them.

You can delete the CloudFormation stack to delete all AWS resources created by the stack.

  1. On the AWS CloudFormation console, on the Stacks page, select the stack to delete. The stack must be currently running.
  2. On the stack details page, choose Delete.
  3. Choose Delete stack when prompted.

Additionally, delete the AWS Glue endpoint, S3 endpoint, AWS Glue connections, and AWS Glue ETL jobs.

Summary

In this post, we showed you how to build AWS Glue ETL Spark jobs and set up connections using ConnectionType for Amazon DocumentDB and MongoDB databases using AWS CloudFormation. You can use this solution to read data from Amazon DocumentDB or MongoDB, and transform it and write to Amazon DocumentDB or MongoDB or other targets like Amazon S3 (using Amazon Athena to query), Amazon Redshift, Amazon DynamoDB, Amazon Elasticsearch Service (Amazon ES), and more.

If you have any questions or suggestions, please leave a comment.


About the Authors

Naresh Gautam is a Sr. Analytics Specialist Solutions Architect at AWS. His role is helping customers architect highly available, high-performance, and cost-effective data analytics solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

 

 

Srikanth Sopirala is a Sr. Analytics Specialist Solutions Architect at AWS. He is a seasoned leader with over 20 years of experience, who is passionate about helping customers build scalable data and analytics solutions to gain timely insights and make critical business decisions. In his spare time, he enjoys reading, spending time with his family and road biking.

Writing to Apache Hudi tables using AWS Glue Custom Connector

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/writing-to-apache-hudi-tables-using-aws-glue-connector/

In today’s world, most organizations have to tackle the 3 V’s of variety, volume and velocity of big data. In this blog post, we talk about dealing with the variety and volume aspects of big data. The challenge of dealing with the variety involves processing data from various SQL and NoSQL systems. This variety can include data from rdbms sources such as Amazon Aurora or NoSQL sources such as Amazon DynamoDB or 3rd party APIs.

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. In order to enable customers process data from a variety of sources, the AWS Glue team has introuduced AWS Glue Custom Connectors, a new capability in AWS Glue and AWS Glue Studio that makes it easy for you to transfer data from SaaS applications and custom data sources to your data lake in Amazon S3. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. This new feature is over and above the AWS Glue Connections feature in the AWS Glue service.

In this post, we simplify the process to create Hudi tables with AWS Glue Custom Connector. The jar wrapped by the first version of AWS Glue Custom Connector is based on Apache Hudi 0.5.3. Instructions on creating the JAR file are in the previous post of this series.

Whereas the first post focused on creating an end-to-end architecture for replicating the data in a rdbms source to Lakehouse, this post focuses on volume aspect of big data. In this post, we create a Hudi table with an initial load of over 200 million records and then update 70 million of those records. The connector not only writes the data to Amazon Simple Storage Service (Amazon S3), but also creates the tables in the AWS Glue Data Catalog. If you’re creating a partitioned Hudi table, the connector also creates the partitions in the Data Catalog. We discuss the code for creating a partitioned Hudi table in the previous post in this series.

We use the Copy On Write storage type, which gives better read performance compared to Merge On Read. For more information about Hudi storage types, see Hudi Dataset Storage Types and Storage Types & Views.

Note that this post focuses on using the AWS Glue Custom Connector to write to Apache Hudi tables. Please implement other best practices such as encryption and network security while implementing the architecture for your workloads.

Creating the Apache Hudi connection using AWS Glue Custom Connector

To create your AWS Glue job with an AWS Glue Custom Connector, complete the following steps:

  1. Go to the AWS Glue Studio Console, search for AWS Glue Connector for Apache Hudi and choose AWS Glue Connector for Apache Hudi link.
    Go to the AWS Glue Studio Console, search for AWS Glue Connector for Apache Hudi and choose AWS Glue Connector for Apache Hudi link.
  2. Choose Continue to Subscribe.
    Choose Continue to Subscribe
  3. Review the Terms and Conditions and choose the Accept Terms button to continue.Review the Terms and Conditions and choose the Accept Terms button to continue.
  4. Make sure that the subscription is complete and you see the Effective date populated next to the product and then choose Continue to Configuration button.
    Make sure that the subscription is complete and you see the Effective date populated next to the product and then choose Continue to Configuration button.
  5. As of writing this blog, 0.5.3 is the latest version of the AWS Glue Connector for Apache Hudi. Make sure that 0.5.3 (Nov 19, 2020) is selected in the Software Version dropdown and Activate in AWS Glue Studio is selected in the Delivery Method dropdown. Choose Continue to Launch button.
    5. Choose Continue to Launch button.
  6. Under Launch this software, choose Usage Instructions and then choose Activate the Glue connector for Apache Hudi in AWS Glue Studio.
    6. Activate the Glue connector for Apache Hudi in AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, hudi-connection).
  2. For Description, enter a description.
    8. For Description, enter a description.
  3. Choose Create connection and activate connector.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

Configuring resources and permissions

For this post, we provide an AWS CloudFormation template to create the following resources:

  • Two AWS Glue jobs: hudi-init-load-job and hudi-upsert-job
  • An S3 bucket to store the Python scripts for these jobs
  • An S3 bucket to store the output files of these jobs
  • An AWS Lambda function to copy the scripts from the public S3 bucket to your account
  • AWS Identity and Access Management (IAM) roles and policies with appropriate permissions

Launch the following stack, providing your connection name, created in Step 9 of the previous section, for the HudiConnectionName parameter:

Launch the following stack, providing your connection name for the HudiConnectionName parameter:

Please check I acknowledge that AWS CloudFormation might create IAM resources with custom names check box before clicking the Create Stack button.

If you have AWS Lake Formation enabled in the Region in which you’re implementing this solution, make sure that you give HudiConnectorExecuteGlueHudiJobRole Create table permission in the default database. HudiConnectorExecuteGlueHudiJobRole is created by the CloudFormation stack that you created above.

Create table permission in the default database.

HudiConnectorExecuteGlueHudiJobRole should also have Create Database permission. You can grant this permission in Database creators section under Admins and database creators tab.

You can grant this permission in Database creators section under Admins and database creators tab.

Running the load job

You’re now ready to run the first of your two jobs. 

  1. On the AWS Glue console, select the job hudi-init-load-job.
  2. On the Action menu, choose Run job.
    On the Action menu, choose Run job.

My job finished in less than 10 minutes. The job inserted over 204 million records into the Hudi table.

The job inserted over 204 million records into the Hudi table.

Although rest of the code is standard Hudi PySpark code, I want to call out the last line of the code to show how easy it is to write to Hudi tables using AWS Glue:

glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = "marketplace.spark", connection_options = combinedConf)

In the preceding code, combinedConf is a Python dictionary that includes all your Apache Hudi configurations. You can download the HudiInitLoadNYTaxiData.py script to use.

Querying the data

The ny_yellow_trip_data table is now visible in the default database, and you can query it through Athena.

If you have Lake Formation enabled in this Region, the role or user querying the table should have Select permissions on the table.

You can now run the following query:

select count(*) cnt, vendorid from default.ny_yellow_trip_data group by vendorid

The following screenshot shows our output.

The following screenshot shows our output.

If you have Lake Formation enabled in this Region, make sure that you give Drop permission to HudiConnectorExecuteLambdaFnsRole so the CloudFormation template can drop the default.ny_yellow_trip_data table when you delete the stack.

If you have Lake Formation enabled in this Region, make sure that you give Drop permission to HudiConnectorExecuteLambdaFnsRole so the CloudFormation template can drop the default.ny_yellow_trip_data table when you delete the stack.

Running the upsert job

You can now run your second job, hudi-upsert-job. This job reads the newly written data and updates the vendor IDs of all the records that have vendorid=1. The new vendor ID for these records (over 78 million) is set as 9. You can download the HudiUpsertNYTaxiData.py script to use.

This job also finished in under 10 minutes.

This job also finished in under 10 minutes.

Querying the updated data

You can now query the updated Hudi table in Athena. The following screenshot shows that the vendor ID of over 78 million records has been changed to 9.

The following screenshot shows that the vendor ID of over 78 million records has been changed to 9.

Additional considerations

The AWS Glue Connector for Apache Hudi has not been tested for AWS Glue streaming jobs. Additionally, there are some hardcoded Hudi options in the AWS Glue job scripts. These options are set for the sample table that we create for this post. Update the options based on your workload.

Conclusion

In this post, we created an Apache Hudi table with AWS Glue Custom Connector and AWS Glue 2.0 jobs. We read over 200 million records from a public S3 bucket and created an Apache Hudi table using it. We then updated over 70 million of these records. With the new AWS Glue Custom Connector feature, we can now directly write an AWS Glue DynamicFrame to an Apache Hudi table.

Note that you can also use Glue jobs to write to Apache Hudi MoR tables. Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift talks about the process in detail. While it uses jars as an external dependency, you can now use the AWS Glue Connector for Apache Hudi for the same operation. The post uses HudiJob.py to write to MoR tables and then uses HudiMoRCompactionJob.scala to compact the MoR tables. Note that HudiMoRCompactionJob.scala has also been implemented using Glue jobs and hence you can use AWS Glue for compaction job too.


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

Validate, evolve, and control schemas in Amazon MSK and Amazon Kinesis Data Streams with AWS Glue Schema Registry

Post Syndicated from Brian Likosar original https://aws.amazon.com/blogs/big-data/validate-evolve-and-control-schemas-in-amazon-msk-and-amazon-kinesis-data-streams-with-aws-glue-schema-registry/

Data streaming technologies like Apache Kafka and Amazon Kinesis Data Streams capture and distribute data generated by thousands or millions of applications, websites, or machines. These technologies serve as a highly available transport layer that decouples the data-producing applications from data processors. However, the sheer number of applications producing, processing, routing, and consuming data can make it hard to coordinate and evolve data schemas, like adding or removing a data field, without introducing data quality issues and downstream application failures. Developers often build complex tools, write custom code, or rely on documentation, change management, and Wikis to protect against schema changes. This is quite error prone because it relies too heavily on human oversight. A common solution with data streaming technologies is a schema registry that provides for validation of schema changes to allow for safe evolution as business needs adjust over time.

AWS Glue Schema Registry, a serverless feature of AWS Glue, enables you to validate and reliably evolve streaming data against Apache Avro schemas at no additional charge. Through Apache-licensed serializers and deserializers, the Glue Schema Registry integrates with Java applications developed for Apache Kafka, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Kinesis Data Streams, Apache Flink, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.

This post explains the benefits of using the Glue Schema Registry and provides examples of how to use it with both Apache Kafka and Kinesis Data Streams.

With the Glue Schema Registry, you can eliminate defensive coding and cross-team coordination, improve data quality, reduce downstream application failures, and use a registry that is integrated across multiple AWS services. Each schema can be versioned within the guardrails of a compatibility mode, providing developers the flexibility to reliably evolve schemas. Additionally, the Glue Schema Registry can serialize data into a compressed format, helping you save on data transfer and storage costs.

Although there are many ways to leverage the Glue Schema Registry (including using the API to build your own integrations), in this post, we show two use cases. The Schema Registry is a free feature that can significantly improve data quality and developer productivity. If you use Avro schemas, you should be using the Schema Registry to supplement your solutions built on Apache Kafka (including Amazon MSK) or Kinesis Data Streams. The following diagram illustrates this architecture.

AWS Glue Schema Registry features

Glue Schema Registry has the following features:

  • Schema discovery – When a producer registers a schema change, metadata can be applied as a key-value pair to provide searchable information for administrators or developers. This metadata can indicate the original source of the data (source=MSK_west), the team name to contact (owner=DataEngineering), or AWS tags (environment=Production). You could potentially encrypt a field in your data on the producing client and use metadata to specify to potential consumer clients which public key fingerprint to use for decryption.
  • Schema compatibility – The versioning of each schema is governed by a compatibility mode. If a new version of a schema is requested to be registered that breaks the specified compatibility mode, the request fails and an exception is thrown. Compatibility checks enable developers building downstream applications to have a bounded set of scenarios to build applications against, which helps to prepare for the changes without issue. Commonly used modes are FORWARD, BACKWARD, and FULL. For more information about mode definitions, see Schema Versioning and Compatibility.
  • Schema validation – Glue Schema Registry serializers work to validate that the schema used during data production is compatible. If it isn’t, the data producer receives an exception from the serializer. This ensures that potentially breaking changes are found earlier in development cycles, and can also help prevent unintentional schema changes due to human error.
  • Auto-registration of schemas – If configured to do so, the producer of data can auto-register schema changes as they flow in the data stream. This is especially useful for use cases where the source of the data is change data capture from a database.
  • IAM support – Thanks to integrated AWS Identity and Access Management (IAM) support, only authorized producers can change certain schemas. Furthermore, only those consumers authorized to read the schema can do so. Schema changes are typically performed deliberately and with care, so it’s important to use IAM to control who performs these changes. Additionally, access control to schemas is important in situations where you might have sensitive information included in the schema definition itself. In the examples that follow, IAM roles are inferred via the AWS SDK for Java, so they are inherited from the Amazon Elastic Compute Cloud (Amazon EC2) instance’s role that the application runs in. IAM roles can also be applied to any other AWS service that could contain this code, such as containers or Lambda functions.
  • Integrations and other support – The provided serializers and deserializers are currently for Java clients using Apache Avro for data serialization. The GitHub repo also contains support for Apache Kafka Streams, Apache Kafka Connect, and Apache Flink—all licensed using the Apache License 2.0. We’re already working on additional language and data serialization support, but we need your feedback on what you’d like to see next.
  • Secondary deserializer – If you have already registered schemas in another schema registry, there’s an option for specifying a secondary deserializer when performing schema lookups. This allows for migrations from other schema registries without having to start anew. If the schema ID being used isn’t known to the Glue Schema Registry, it’s looked for in the secondary deserializer.
  • Compression – Using the Avro format already reduces message size due to its compact, binary format. Using a schema registry can further reduce data payload by no longer needing to send and receive schemas with each message. Glue Schema Registry libraries also provide an option for zlib compression, which can reduce data requirements even further by compressing the payload of the message. This varies by use case, but compression can reduce the size of the message significantly.

Example schema

For this post, we use the following schema to begin each of our use cases:

{
 "namespace": "Customer.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
 {"name": "first_name", "type": "string"},
 {"name": "last_name", "type": "string"}
 ]
}

Using AWS Glue Schema Registry with Amazon MSK and Apache Kafka

You can use the following Apache Kafka producer code to produce Apache Avro formatted messages to a topic with the preceding schema:

package com.amazon.gsrkafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer;
import com.amazonaws.services.schemaregistry.serializers.avro.AWSAvroSerializer;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.File;

public class gsrkafka {
private static final Properties properties = new Properties();
private static final String topic = "test";
public static void main(final String[] args) throws IOException {
// Set the default synchronous HTTP client to UrlConnectionHttpClient
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "liko-schema-registry");
properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "customer");
properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL);
properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
Schema schema_customer = new Parser().parse(new File("Customer.avsc"));
GenericRecord customer = new GenericData.Record(schema_customer);

try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties)) {
final ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, customer);
customer.put("first_name", "Ada");
customer.put("last_name", "Lovelace");
customer.put("full_name", "Ada Lovelace");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Sue");
customer.put("last_name", "Black");
customer.put("full_name", "Sue Black");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Anita");
customer.put("last_name", "Borg");
customer.put("full_name", "Anita Borg");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Grace");
customer.put("last_name", "Hopper");
customer.put("full_name", "Grace Hopper");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Neha");
customer.put("last_name", "Narkhede");
customer.put("full_name", "Neha Narkhede");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);
producer.flush();
System.out.println("Successfully produced 5 messages to a topic called " + topic);
} catch (final InterruptedException | SerializationException e) {
e.printStackTrace();
}
}
}

Use the following Apache Kafka consumer code to look up the schema information while consuming from a topic to learn the schema details:

package com.amazon.gsrkafka;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSAvroDeserializer;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.File;


public class gsrkafka {
private static final Properties properties = new Properties();
private static final String topic = "test";
public static void main(final String[] args) throws IOException {
// Set the default synchronous HTTP client to UrlConnectionHttpClient
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "gsr-client");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "liko-schema-registry");
properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

try (final KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(properties)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
final ConsumerRecords<String, GenericRecord> records = consumer.poll(1000);
for (final ConsumerRecord<String, GenericRecord> record : records) {
final GenericRecord value = record.value();
System.out.println("Received message: value = " + value);
}
			}
} catch (final SerializationException e) {
e.printStackTrace();
}
}
}

Using AWS Glue Schema Registry with Kinesis Data Streams

You can use the following Kinesis Producer Library (KPL) code to publish messages in Apache Avro format to a Kinesis data stream with the preceding schema:

private static final String SCHEMA_DEFINITION = "{"namespace": "Customer.avro",\n"
+ " "type": "record",\n"
+ " "name": "Customer",\n"
+ " "fields": [\n"
+ " {"name": "first_name", "type": "string"},\n"
+ " {"name": "last_name", "type": "string"}\n"
+ " ]\n"
+ "}";

KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion("us-west-1")

//[Optional] configuration for Schema Registry.

GlueSchemaRegistryConfiguration schemaRegistryConfig = 
new GlueSchemaRegistryConfiguration("us-west-1");

schemaRegistryConfig.setCompression(true);

config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);

///Optional configuration ends.

final KinesisProducer producer = 
new KinesisProducer(config);

final ByteBuffer data = getDataToSend();

com.amazonaws.services.schemaregistry.common.Schema gsrSchema = 
new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");

ListenableFuture<UserRecordResult> f = producer.addUserRecord(
config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);

private static ByteBuffer getDataToSend() {
org.apache.avro.Schema avroSchema = 
new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);

GenericRecord user = new GenericData.Record(avroSchema);
user.put("name", "Emily");
user.put("favorite_number", 32);
user.put("favorite_color", "green");

ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
new GenericDatumWriter<>(avroSchema).write(user, encoder);
encoder.flush();
return ByteBuffer.wrap(outBytes.toByteArray());
}

On the consumer side, you can use the Kinesis Client Library (KCL) (v2.3 or later) to look up schema information while retrieving messages from a Kinesis data stream:

GlueSchemaRegistryConfiguration schemaRegistryConfig = 
new GlueSchemaRegistryConfiguration(this.region.toString());

 GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = 
new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);

 RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
 retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
 
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
retrievalConfig
);

 public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Processing {} record(s)", 
processRecordsInput.records().size());
processRecordsInput.records()
.forEach(
r -> 
log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}", 
r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
} catch (Throwable t) {
log.error("Caught throwable while processing records. Aborting.");
Runtime.getRuntime().halt(1);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
 }
 
 private GenericRecord recordToAvroObj(KinesisClientRecord r) {
byte[] data = new byte[r.data().remaining()];
r.data().get(data, 0, data.length);
org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
DatumReader datumReader = new GenericDatumReader<>(schema);

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
return (GenericRecord) datumReader.read(null, binaryDecoder);
 }

Example of schema evolution

As a producer, let’s say you want to add an additional field to our schema:

{
 "namespace": "Customer.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
 {"name": "first_name", "type": "string"},
 {"name": "last_name", "type": "string"},
 {"name": "full_name", "type": ["string", “null”], “default”: null}
]
}

Regardless of whether you’re following the Apache Kafka or Kinesis Data Streams example, you can use the previously provided producer code to publish new messages using this new schema version with the full_name field. This is simply a concatenation of first_name and last_name.

This schema change added an optional field (full_name), which is indicated by the type field having an option of null in addition to string with a default of null. In adding this optional field, we’ve created a schema evolution. This qualifies as a FORWARD compatible change because the producer has modified the schema and the consumer can read without updating its version of the schema. It’s a good practice to provide a default for a given field. This allows for its eventual removal if necessary. If it’s removed by the producer, the consumer uses the default that it knew for that field from before the removal.

This change is also a BACKWARD compatible change, because if the consumer changes the schema it expects to receive, it can use that default to fill in the value for the field it isn’t receiving. By being both FORWARD and BACKWARD compatible, it is therefore a FULL compatible change. The Glue Schema Registry serializers default to BACKWARD compatible, so we have to add a line declaring it as FULL.

In looking at the full option set, you may find FORWARD_ALL, BACKWARD_ALL, and FULL_ALL. These typically only come into play when you want to change data types for a field whose name you don’t change. The most common observed compatibility mode is BACKWARD, which is why it’s the default.

As a consumer application, however, you don’t want to have to recompile your application to handle the addition of a new field. If you want to reference the customer by full name, that’s your choice in your app instead of being forced to consume the new field and use it. When you consume the new messages you’ve just produced, your application doesn’t crash or have problems, because it’s still using the prior version of the schema, and that schema change is compatible with your application. To experience this in action, run the consumer code in one window and don’t interrupt it. As you run the producer application again, this time with messages following the new schema, you can still see output without issue, thanks to the Glue Schema Registry.

Conclusion

In this post, we discussed the benefits of using the Glue Schema Registry to register, validate, and evolve schemas for data streams as business needs change. We also provided examples of how to use Glue Schema Registry with Apache Kafka and Kinesis Data Streams.

For more information and to get started, see AWS Glue Schema Registry.


About the Authors

Brian Likosar is a Senior Streaming Specialist Solutions Architect at Amazon Web Services. Brian loves helping customers capture value from real-time streaming architectures, because he knows life doesn’t happen in batch. He’s a big fan of open-source collaboration, theme parks, and live music.

 

 

Larry Heathcote is a Senior Product Marketing Manager at Amazon Web Services for data streaming and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys walking his Samoyed Sasha in the mornings so she can look for squirrels to bark at.

 

 

Automating Recommendation Engine Training with Amazon Personalize and AWS Glue

Post Syndicated from Alexander Spivak original https://aws.amazon.com/blogs/architecture/automating-recommendation-engine-training-with-amazon-personalize-and-aws-glue/

Customers from startups to enterprises observe increased revenue when personalizing customer interactions. Still, many companies are not yet leveraging the power of personalization, or, are relying solely on rule-based strategies. Those strategies are effort-intensive to maintain and not effective. Common reasons for not launching machine learning (ML) based personalization projects include: the complexity of aggregating and preparing the datasets, gaps in data science expertise and the lack of trust regarding the quality of ML recommendations.

This blog post demonstrates an approach for product recommendations to mitigate those concerns using historical datasets. To get started with your personalization journey, you don’t need ML expertise or a data lake. The following serverless end-to-end architecture involves aggregating and transforming the required data, as well as automatically training an ML-based recommendation engine.

I will outline the architectural production-ready setup for personalized product recommendations based on historical datasets. This is of interest to data analysts who look for ways to bring an existing recommendation engine to production, as well as solutions architects new to ML.

Solution Overview

The two core elements to create a proof-of-concept for ML-based product recommendations are:

  1. the recommendation engine and,
  2. the data set to train the recommendation engine.

Let’s start with the recommendation engine first, and work backwards to the corresponding data needs.

Product recommendation engine

To create the product recommendation engine, we use Amazon Personalize. Amazon Personalize differentiates three types of input data:

  1. user events called interactions (user events like views, signups or likes),
  2. item metadata (description of your items: category, genre or availability), and
  3. user metadata (age, gender, or loyalty membership).

An interactions dataset is typically the minimal requirement to build a recommendation system. Providing user and item metadata datasets improves recommendation accuracy, and enables cold starts, item discovery and dynamic recommendation filtering.

Most companies already have existing historical datasets to populate all three input types. In the case of retail companies, the product order history is a good fit for interactions. In the case of the media and entertainment industry, the customer’s consumption history maps to the interaction dataset. The product and media catalogs map to the items dataset and the customer profiles to the user dataset.

Amazon Personalize: from datasets to a recommendation API

Amazon Personalize: from datasets to a recommendation API

The Amazon Personalize Deep Dive Series provides a great introduction into the service and explores the topics of training, inference and operations. There are also multiple blog posts available explaining how to create a recommendation engine with Amazon Personalize and how to select the right metadata for the engine training. Additionally, the Amazon Personalize samples repository in GitHub showcases a variety of topics: from getting started with Amazon Personalize, up to performing a POC in a Box using existing datasets, and, finally, automating the recommendation engine with MLOps. In this post, we focus on getting the data from the historical data sources into the structure required by Amazon Personalize.

Creating the dataset

While manual data exports are a quick way to get started with one-time datasets for experiments, we use AWS Glue to automate this process. The automated approach with AWS Glue speeds up the proof of concept (POC) phase and simplifies the process to production by:

  • easily reproducing dataset exports from various data sources. This are used to iterate with other feature sets for recommendation engine training.
  • adding additional data sources and using those to enrich existing datasets
  • efficiently performing transformation logic like column renaming and fuzzy matching out of the box with code generation support.

AWS Glue is a serverless data integration service that is scalable and simple to use. It provides all of the capabilities needed for data integration and supports a wide variety of data sources: Amazon S3 buckets, JDBC connectors, MongoDB databases, Kafka, and Amazon Redshift, the AWS data warehouse. You can even make use of data sources living outside of your AWS environment, e.g. on-premises data centers or other services outside of your VPC. This enables you to perform a data-driven POC even when the data is not yet in AWS.

Modern application environments usually combine multiple heterogeneous database systems, like operational relational and NoSQL databases, in addition to, the BI-powering data warehouses. With AWS Glue, we orchestrate the ETL (extract, transform, and load) jobs to fetch the relevant data from the corresponding data sources. We then bring it into a format that Amazon Personalize understands: CSV files with pre-defined column names hosted in an Amazon S3 bucket.

Each dataset consists of one or multiple CSV files, which can be uniquely identified by an Amazon S3 prefix. Additionally, each dataset must have an associated schema describing the structure of your data. Depending on the dataset type, there are required and pre-defined fields:

  • USER_ID (string) and one additional metadata field for the users dataset
  • ITEM_ID (string) and one additional metadata field for the items dataset
  • USER_ID (string), ITEM_ID (string), TIMESTAMP (long; as Epoch time) for the interactions dataset

The following graph presents a high-level architecture for a retail customer, who has a heterogeneous data store landscape.

Using AWS Glue to export datasets from heterogeneous data sources to Amazon S3

Using AWS Glue to export datasets from heterogeneous data sources to Amazon S3

To understand how AWS Glue connects to the variety of data sources and allows transforming the data into the required format, we need to drill down into the AWS Glue concepts and components.

One of the key components of AWS Glue is the AWS Glue Data Catalog: a persistent metadata store containing table definitions, connection information, as well as, the ETL job definitions.
The tables are metadata definitions representing the structure of the data in the defined data sources. They do not contain any data entries from the sources but solely the structure definition. You can create a table either manually or automatically by using AWS Glue Crawlers.

AWS Glue Crawlers scan the data in the data sources, extract the schema information from it, and store the metadata as tables in the AWS Glue Data Catalog. This is the preferred approach for defining tables. The crawlers use AWS Glue Connections to connect to the data sources. Each connection contains the properties that are required to connect to a particular data store. The connections will be also used later by the ETL jobs to fetch the data from the data sources.

AWS Glue Crawlers also help to overcome a challenge frequently appearing in microservice environments. Microservice architectures are frequently operated by fully independent and autonomous teams. This means that keeping track of changes to the data source format becomes a challenge. Based on a schedule, the crawlers can be triggered to update the metadata for the relevant data sources in the AWS Glue Data Catalog automatically. To detect cases when a schema change would break the ETL job logic, you can combine the CloudWatch Events emitted by AWS Glue on updating the Data Catalog tables with an AWS Lambda function or a notification send via the Amazon Simple Notification Service (SNS).

The AWS Glue ETL jobs use the defined connections and the table information from the Data Catalog to extract the data from the described sources, apply the user-defined transformation logic and write the results into a data sink. AWS Glue can automatically generate code for ETL jobs to help perform a variety of useful data transformation tasks. AWS Glue Studio makes the ETL development even simpler by providing an easy-to-use graphical interface that accelerates the development and allows designing jobs without writing any code. If required, the generated code can be fully customized.

AWS Glue supports Apache Spark jobs, written either in Python or in Scala, and Python Shell jobs. Apache Spark jobs are optimized to run in a highly scalable, distributed way dealing with any amount of data and are a perfect fit for data transformation jobs. The Python Shell jobs provide access to the raw Python execution environment, which is less scalable but provides a cost-optimized option for orchestrating AWS SDK calls.

The following diagram visualizes the interaction between the components described.

The basic concepts of populating your Data Catalog and processing ETL dataflow in AWS Glue

The basic concepts of populating your Data Catalog and processing ETL dataflow in AWS Glue

For each Amazon Personalize dataset type, we create a separate ETL job. Since those jobs are independent, they also can run in parallel. After all jobs have successfully finished, we can start the recommendation engine training. AWS Glue Workflows allow simplifying data pipelines by visualizing and monitoring complex ETL activities involving multiple crawlers, jobs, and triggers, as a single entity.

The following graph visualizes a typical dataset export workflow for training a recommendation engine, which consists of:

  • a workflow trigger being either manual or scheduled
  • a Python Shell job to remove the results of the previous export workflow from S3
  • a trigger firing when the removal job is finished and initiating the parallel execution of the dataset ETL jobs
  • the three Apache Spark ETL jobs, one per dataset type
  • a trigger firing when all three ETL jobs are finished and initiating the training notification job
  • a Python Shell job to initiate a new dataset import or a full training cycle in Amazon Personalize (e.g. by triggering the MLOps pipeline using the AWS SDK)

 

AWS Glue workflow for extracting the three datasets and triggering the training workflow of the recommendation engine

AWS Glue workflow for extracting the three datasets and triggering the training workflow of the recommendation engine

Combining the data export and the recommendation engine

In the previous sections, we discussed how to create an ML-based recommendation engine and how to create the datasets for the training of the engine. In this section, we combine both parts of the solution leveraging an adjusted version of the MLOps pipeline solution available on GitHub to speed up the iterations on new solution versions by avoiding manual steps. Moreover, automation means new items can be put faster into production.

The MLOps pipeline uses a JSON file hosted in an S3 bucket to describe the training parameters for Amazon Personalize. The creation of a new parameter file version triggers a new training workflow orchestrated in a serverless manner using AWS Step Functions and AWS Lambda.

To integrate the Glue data export workflow described in the previous section, we also enable the Glue workflow to trigger the training pipeline. Additionally, we manipulate the pipeline to read the parameter file as the first pipeline step. The resulting architecture enables an automated end-to-end set up from dataset export up to the recommendation engine creation.

End-to-end architecture combining the data export with AWS Glue, the MLOps training workflow and Amazon Personalize

End-to-end architecture combining the data export with AWS Glue, the MLOps training workflow, and Amazon Personalize

The architecture for the end-to-end data export and recommendation engine creation solution is completely serverless. This makes it highly scalable, reliable, easy to maintain, and cost-efficient. You pay only for what you consume. For example, in the case of the data export, you pay only for the duration of the AWS Glue crawler executions and ETL jobs. These are only need to run to iterate with a new dataset.

The solution is also flexible in terms of the connected data sources. This architecture is also recommended for use cases with a single data source. You can also start with a single data store and enrich the datasets on-demand with additional data sources in future iterations.

Testing the quality of the solution

A common approach to validate the quality of the solution is the A/B testing technique, which is widely used to measure the efficacy of generated recommendations. Based on the testing results, you can iterate on the recommendation engine by optimizing the underlying datasets and models. The high degree of automation increases the speed of iterations and the resiliency of the end-to-end process.

Conclusion

In this post, I presented a typical serverless architecture for a fully automated, end-to-end ML-based recommendation engine leveraging available historical datasets. As you begin to experiment with ML-based personalization, you will unlock value currently hidden in the data. This helps mitigate potential concerns like the lack of trust in machine learning and you can put the resulting engine into production.

Start your personalization journey today with the Amazon Personalize code samples and bring the engine to production with the architecture outlined in this blog. As a next step, you can involve recording real-time events to update the generated recommendations automatically based on the event data.

 

Building complex workflows with Amazon MWAA, AWS Step Functions, AWS Glue, and Amazon EMR

Post Syndicated from Dipankar Ghosal original https://aws.amazon.com/blogs/big-data/building-complex-workflows-with-amazon-mwaa-aws-step-functions-aws-glue-and-amazon-emr/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS and build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

You can use AWS Step Functions as a serverless function orchestrator to build scalable big data pipelines using services such as Amazon EMR to run Apache Spark and other open-source applications on AWS in a cost-effective manner, and use AWS Glue for a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs

For production pipelines, a common use case is to read data originating from a variety of sources. This data requires transformation to extract business value and generate insights before sending to downstream applications, such as machine learning algorithms, analytics dashboards, and business reports.

This post demonstrates how to use Amazon MWAA as a primary workflow management service to create and run complex workflows and extend the directed acyclic graph (DAG) to start and monitor a state machine created using Step Functions. In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Architectural overview

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. This workflow uses Amazon EMR to preprocess data and starts a Step Functions state machine. The state machine transforms data using AWS Glue.

The state machine transforms data using AWS Glue.

The workflow includes the following core components:

  1. Airflow Scheduler triggers the DAG based on a schedule or manually.
  2. DAG uses PythonOperator to create an EMR cluster and waits for the cluster creation process to complete.
  3. DAG uses a custom operator EmrSubmitAndMonitorStepOperator to submit and monitor the Amazon EMR step.
  4. DAG uses PythonOperator to stop the EMR cluster when the preprocessing tasks are complete.
  5. DAG starts a Step Functions state machine and monitors it for completion using PythonOperator.

You can build complex ETL pipelines with Step Functions separately and trigger them from an Airflow DAG.

Prerequisites

Before starting, create an Amazon MWAA environment. If this is your first time using Amazon MWAA, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs. It’s located on the environment details page on the Amazon MWAA console.

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs.

Also note the AWS Identity and Access Management (IAM) execution role. This role should be modified to allow MWAA to read and write from your S3 bucket, submit an Amazon EMR step, start a Step Functions state machine, and read from the AWS Systems Manager Parameter Store. The IAM role is available in the Permissions section of the environment details.

The IAM role is available in the Permissions section of the environment details.

The solution references Systems Manager parameters in an AWS CloudFormation template and scripts. For information on adding and removing IAM identity permissions, see Adding and removing IAM identity permissions. A sample IAM policy is also provided in the GitHub repository amazon-mwaa-complex-workflow-using-step-functions.

For this post, we use the MovieLens dataset. We concurrently convert the MovieLens CSV files to Parquet format and save them to Amazon S3 as part of preprocessing.

Setting up the state machine using Step Functions

Our solution extends the ETL pipeline to run a Step Functions state machine from the Airflow DAG. Step Functions lets you build visual workflows that enable fast translation of business requirements into technical requirements. With Step Functions, you can set up dependency management and failure handling using a JSON-based template. A workflow is a series of steps, such as tasks, choices, parallel runs, and timeouts with the output of one step acting as input into the next. For more information about other use cases, see AWS Step Functions Use Cases.

The following diagram shows the ETL process set up through a Step Functions state machine.

The following diagram shows the ETL process set up through a Step Functions state machine.

In the workflow, the Process Data step runs an AWS Glue job, and the Get Job Status step periodically checks for the job completion. The AWS Glue job reads the input datasets and creates output data for the most popular movies and top-rated movies. After the job is complete, the Run Glue Crawler step runs an AWS Glue crawler to catalog the data. The workflow also allows you to monitor and respond to failures at any stage.

Creating resources

Create your resources by following the installation instructions provided in the amazon-mwaa-complex-workflow-using-step-functions README.md.

Running the ETL workflow

To run your ETL workflow, complete the following steps:

  1. On the Amazon MWAA console, choose Open Airflow UI.
  2. Locate the mwaa_movielens_demo DAG.
  3. Turn on the DAG.

Turn on the DAG.

  1. Select the mwaa_movielens_demo DAG and choose Graph View.

This displays the overall ETL pipeline managed by Airflow.

This displays the overall ETL pipeline managed by Airflow.

  1. To view the DAG code, choose Code.

To view the DAG code, choose Code.

The code for the custom operator can be found in the amazon-mwaa-complex-workflow-using-step-functions GitHub repo. 

  1. From the Airflow UI, select the mwaa_movielens_demo DAG and choose Trigger DAG.
  2. Leave the Optional Configuration JSON box blank.

Leave the Optional Configuration JSON box blank.

When the Airflow DAG runs, the first task calls the PythonOperator to create an EMR cluster using Boto3. Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3. Boto provides object-oriented API, as well as low-level access to AWS services.

The second task waits until the EMR cluster is ready and in the Waiting state. As soon as the cluster is ready, the data load task runs, followed by the data preprocessing tasks, which are started in parallel using EmrSubmitAndMonitorStepOperator. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. You can change the concurrency of Amazon EMR to run multiple Amazon EMR steps in parallel.

When the data preprocessing tasks are complete, the EMR cluster is stopped and the DAG starts the Step Functions state machine to initiate data transformation.

The final task in the DAG monitors the completion of the Step Functions state machine.

The DAG run should complete in approximately 10 minutes.

Verifying the DAG run

While the DAG is running, you can view the task logs.

  1. From Graph View, select any task and choose View Log.

From Graph View, select any task and choose View Log.

  1. When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

  1. You can also monitor ETL process completion from the Airflow UI.

You can also monitor ETL process completion from the Airflow UI.

  1. On the Airflow UI, verify the completion from the log entries.

On the Airflow UI, verify the completion from the log entries.

Querying the data

After the successful completion of the Airflow DAG, two tables are created in the AWS Glue Data Catalog. To query the data with Amazon Athena, complete the following steps:

  1. On the Athena console, choose Databases.
  2. Select the mwaa-movielens-demo-db database.

You should see the two tables. If the tables aren’t listed, verify that the AWS Glue crawler run is complete and that the console is showing the correct Region.

  1. Run the following query:
    SELECT * FROM "mwaa-movielens-demo-db"."most_popular_movies" limit 10;

The following screenshot shows the output.

The following screenshot shows the output.

Cleaning up

To clean up the resources created as part of our CloudFormation template, delete the mwaa-demo-foundations stack. You can either use the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI).

Conclusion

In this post, we used Amazon MWAA to orchestrate an ETL pipeline on Amazon EMR and AWS Glue with Step Functions. We created an Airflow DAG to demonstrate how to run data processing jobs concurrently and extended the DAG to start a Step Functions state machine to build a complex ETL pipeline. A custom Airflow operator submitted and then monitored the Amazon EMR steps synchronously.

If you have comments or feedback, please leave them in the comments section.


About the Author

Dipankar GhosalDipankar Ghosal is a Sr Data Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

Estimating scoring probabilities by preparing soccer matches data with AWS Glue DataBrew

Post Syndicated from Arash Rowshan original https://aws.amazon.com/blogs/big-data/estimating-scoring-probabilities-by-preparing-soccer-matches-data-with-aws-glue-databrew/

In soccer (or football outside of the US), players decide to take shots when they think they can score. But how do they make that determination vs. when to pass or dribble? In a fraction of a second, in motion, while chased from multiple directions by other professional athletes, they think about their distance from the goal, the speed they’re running, the ball movement, the number of defenders, the goalkeeper’s position, their own shot power, accuracy, angle, and more. For a moment, time stands still. A decision is made. To shoot or not to shoot.

This post was inspired by AWS’s collaboration with Germany’s Bundesliga, which lead to Match Fact xGoals. We use representative sample datasets and walk through using AWS Glue DataBrew to prepare data prior to training a model that can predict the probability of a player scoring at any given moment in the game.

We start with two datasets that have the information about the events in the game and the players. We describe how a problem in this domain could be framed to benefit from data preparation and how that can lead us to making predictions.

No prior knowledge is required to enjoy this post. However, basic familiarity with data preparation and machine learning (ML) is beneficial. By the end of this post, you should have a good sense regarding what DataBrew offers as well as how you can apply the approaches offered here to other use cases.

Sample datasets

Let’s assume we have collected a lot of data from video records of soccer matches. We extracted the following dataset by taking a snapshot of every shot taken in every match. For each shot, we also recorded if it resulted in a goal or if it did not. The dataset is fictionalized and not based on historic soccer games.

The dataset is fictionalized and not based on historic soccer games.

The following table summarizes what each column contains.

Column NameDescription
event_idUnique identifier for each record
game_minuteMinute of the game between 0 to approximately 90
player_idUnique identifier for each player
player_nameName of the player who took the shot
defendersNumber of defenders between the player and the opponent’s goalkeeper
player_position[x, y] coordinate of the player taking the shot
player_angleAngle of the player taking the shot
player_speedSpeed of the player at the moment taking the shot in km/h
goalkeeper_position[x, y] coordinate of the opponent’s goalkeeper
situationOpen play, free kick, or penalty kick
resultGoal or no goal

We also use the FIFA 20 complete player dataset, a dataset on soccer players across the globe that attempts to estimate their qualities quantitatively. The following screenshot shows some of the columns this dataset contains.

The following screenshot shows some of the columns this dataset contains.

We’re interested in the following columns:

  • age
  • height
  • weight
  • overall
  • preferred_foot
  • weak_foot
  • pace
  • shooting
  • attacking_finishing
  • skill_fk_accuracy
  • movement_acceleration
  • movement_sprint_speed
  • power_shot_power
  • mentality_penalties 

We talk more about how each of these columns can be relevant later in this post.

As you can imagine, these datasets aren’t quite ready to train an ML model. We need to combine, clean, and extract values in numeric forms so we can start creating our training set. Normally that can take a lot of time and is somewhat tedious work. Data scientists prefer to focus on training their model, but they have to spend most of their time wrangling data to the shape that can be used for their ML flow.

But fear not! Enter AWS Glue DataBrew, your friendly neighborhood visual data prep tool. As businesswire puts it, “

“…AWS Glue DataBrew offers customers over 250 pre-built transformations to automate data preparation tasks that would otherwise require days or weeks writing hand-coded transformations.”

Let’s get to work and see how we can prepare a dataset for training an ML model using DataBrew.

Preparing the data with DataBrew

Let’s start with the shots dataset. I first create a project on the DataBrew console and upload this dataset.

I first create a project on the DataBrew console and upload this dataset.

When my session is ready, I should see all the columns and some analytics that give me useful insights about my dataset.

When my session is ready, I should see all the columns and some analytics that give me useful insights about my dataset.

The player_position column is recorded as [x,y] coordinates with x between 0–1000 and y between 0–500. The following image shows how a player is positioned based on this data.

The following image shows how a player is positioned based on this data.

In CSV, this is recorded as a string and has two values. We want to extract the values and combine them into a single value that can be represented as a number. This can help our model draw better conclusions from this feature. We can start preparing our data by completing the following steps:

  1. Using the Remove values transform, I remove the opening and closing brackets from the entries of the player_position column.

Using the Remove values transform, I remove the opening and closing brackets from the entries of the player_position column

  1. I split this column by a comma to generate two columns that hold the x and y coordinates.

I split this column by a comma to generate two columns that hold the x and y coordinates.

  1. We also need to convert these newly generated columns from strings to numbers.

We also need to convert these newly generated columns from strings to numbers.

Now we can generate a single value using the two columns that we generated. Imagine we want to break down the whole soccer field into 1×1 squares. If we think about the field similar to rows and columns, we can calculate the number for each square as such: x * 500 + y.

The benefit of this approach is that squares with higher numbers tend to be closer to the opponent’s goal, and this single feature can help our model draw good correlations between a player’s location and event outcomes. Note that the squares in the following image aren’t drawn to scale.

Note that the squares in the following image aren’t drawn to scale.

We can calculate the square numbers by using the the Functions transform Multiply and then Sum.

  1. First I multiple the x coordinate by 500.

First I multiple the x coordinate by 500.

  1. Using the ADD function, I sum this generated column with the y coordinate.

Using the ADD function, I sum this generated column with the y coordinate.

  1. We apply the same transforms to the goal_keeper position to achieve a single number for that feature as well.

We apply the same transforms to the goal_keeper position to achieve a single number for that feature as well.

Next, let’s take a look at the player_angle column.

Next, let’s take a look at the player_angle column.

When positioned on the lower half of the field, a positive angle likely presents a better opportunity to score, and when on the top half, a negative angle faces the players more toward the goal. Another point to consider is the player’s strong or weak foot. The bottom half presents a wide angle for a left-footed player and the top half does so for a right-footed player. Angle in combination with strong foot can help our model draw good conclusions. We add the information on players’ strong or weak foot later in this post.

We have three different situations recorded in our dataset.

  1. We apply one-hot-encoding to this column to refactor the situations as 1 (True) or 0 (False) values suitable for training our model.

We apply one-hot-encoding to this column to refactor the situations as 1 (True) or 0 (False) values suitable for training our model.

  1. For the shots dataset, we change the results column to 0s and 1s using the custom flag transform.
  2. Because this is what we want to predict, I name the column y.

We apply one-hot-encoding to this column to refactor the situations as 1 (True) or 0 (False) values suitable for training our model.

We can enrich the data further by joining this dataset with the player dataset to take each player’s qualities into consideration. You may have noticed that we generated a lot of extra columns while applying the previous transforms. We can address that in one step while we do the join and clean things up a bit.

  1. When applying the join, we can connect the players dataset.

  1. I use an inner join and choose player_id as my key.
  2. I also only select the columns that I’m interested in.

You can select more or fewer columns depending on what features you want to feed into your model. For instance, I’m not selecting the player’s nationality, but you may want your model to take that into consideration. That’s really up to you, so feel free to play around with your options.

You can select more or fewer columns depending on what features you want to feed into your model.

  1. I deselect the extra columns from the shots dataset and only select the following from the players dataset:
    1. age
    2. overall
    3. preferred_foot
    4. weak_foot
    5. shooting
    6. attacking_finishing
    7. skill_fk_accuracy
    8. movement_acceleration
    9. movement_sprint_speed
    10. power_shot_power
    11. mentality_penalties 

We’re almost done. We just need to apply a few transforms to the newly added player columns.

  1. I one-hot-encode preferred foot.

I one-hot-encode preferred foot.

We can normalize some of the columns depending on the ML model that we want to run on this dataset afterwards. I want to train a basic logistic regression model.

  1. I use min-max normalization on most columns to scale values between 0–1.

Depending on your model, it may make more sense to center around 0, use a custom range, or apply z-score for your normalization.

  1. I also apply mean normalization to the player angle.

I also apply mean normalization to the player angle.

  1. Now that I have the normalized columns, I can delete all their source columns.

Now that I have the normalized columns, I can delete all their source columns.

  1. Lastly, I move the result column all the way to the end because this is my output column (what I intend to predict).

Lastly, I move the result column all the way to the end because this is my output column (what I intend to predict).

  1. Now we have a complete recipe and it’s time to run a job to apply the steps on the full dataset and generate an output file to use to train our model.

Now we have a complete recipe and it’s time to run a job to apply the steps on the full dataset and generate an output file to use to train our model.

Training the model

When the job is finished, I retrieve the output from my Amazon Simple Storage Service (Amazon S3) bucket. This rich dataset is now ready to be fed into a model. Logistic regression or Support Vector Machines (SVM) could be good candidates for our dataset. You could use Amazon SageMaker to train a model and generate a probability of scoring per event. The following screenshot shows a basic logistic regression model created using scikit-learn.

The following screenshot shows a basic logistic regression model created using scikit-learn.

We see an approximately 80% probability that this model correctly predicts a scoring opportunity. You may get even higher accuracy using SVM. Feel free to try those or edit one of your data preparation steps and see how it affects your model accuracy.

Conclusion

In this post, we started with some raw fictionalized data of soccer shots and players. We framed the problem based on our domain knowledge and the data available. We used DataBrew to rapidly and visually connect the dots and forge the original datasets into an enriched form that could be used to train an ML model.

I encourage you to apply the same methodology to a problem domain that interests you and see how DataBrew can speed up your workflow.


About the Author

Arash RowshanArash Rowshan is a Software Engineer at Amazon Web Services. He’s passionate about big data and applications of ML. Most recently he was part of the team that launched AWS Glue DataBrew. Any time he’s not at his desk, he’s likely playing soccer somewhere. You can follow him on Twitter @OSO_Arash.

 

 

Orchestrating an AWS Glue DataBrew job and Amazon Athena query with AWS Step Functions

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/orchestrating-an-aws-glue-databrew-job-and-amazon-athena-query-with-aws-step-functions/

As the industry grows with more data volume, big data analytics is becoming a common requirement in data analytics and machine learning (ML) use cases. Also, as we start building complex data engineering or data analytics pipelines, we look for a simpler orchestration mechanism with graphical user interface-based ETL (extract, transform, load) tools.

Recently, AWS announced the general availability of AWS Glue DataBrew, a new visual data preparation tool that helps you clean and normalize data without writing code. This reduces the time it takes to prepare data for analytics and ML by up to 80% compared to traditional approaches to data preparation.

Regarding orchestration or workflow management, AWS provides AWS Step Functions, a serverless function orchestrator that makes it easy to build a workflow by integrating different AWS services like AWS Lambda, Amazon Simple Notification Service (Amazon SNS), AWS Glue, and more. With its built-in operational controls, Step Functions manages sequencing, error handling, retry logic, and states, removing a significant operational burden from your team.

Today, we’re launching Step Functions support for DataBrew, which means you can now invoke DataBrew jobs in your Step Functions workflow to build an end-to-end ETL pipeline. Recently, Step Functions also started supporting Amazon Athena integration, which means that you can submit SQL queries to the Athena engine through a Step Functions state.

In this post, we walk through a solution where we integrate a DataBrew job for data preparation, invoke a series of Athena queries for data refresh, and integrate Amazon QuickSight for business reporting. The whole solution is orchestrated through Step Functions and is invoked through Amazon EventBridge.

Use case overview

For our use case, we use two public datasets. The first dataset is a sales pipeline dataset, which contains a list of over 20,000 sales opportunity records for a fictitious business. Each record has fields that specify the following:

  • A date, potentially when an opportunity was identified
  • The salesperson’s name
  • A market segment to which the opportunity belongs
  • Forecasted monthly revenue

The second dataset is an online marketing metrics dataset. This dataset contains records of marketing metrics, aggregated by day. The metrics describe user engagement across various channels, such as websites, mobile, and social media, plus other marketing metrics. The two datasets are unrelated, but for the purpose of this post, we assume that they’re related.

For our use case, these sales and marketing CSV files are maintained by your organization’s Marketing team, which uploads the updated full CSV file to Amazon Simple Storage Service (Amazon S3) every month. The aggregated output data is created through a series of data preparation steps, and the business team uses the output data to create business intelligence (BI) reports.

Architecture overview

To automate the complete process, we use the following architecture, which integrates Step Functions for orchestration, DataBrew for data preparation, Athena for data analysis with standard SQL, and QuickSight for business reporting. In addition, we use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

We use Amazon SNS for sending notifications to users, and EventBridge is integrated to schedule running the Step Functions workflow.

The workflow includes the following steps:

  • Step 1 – The Marketing team uploads the full CSV file to an S3 input bucket every month.
  • Step 2 – An EventBridge rule, scheduled to run every month, triggers the Step Functions state machine.
  • Steps 3 and 4 – We receive two separate datasets (sales data and marketing data), so Step Functions triggers two parallel DataBrew jobs, which create additional year, month, and day columns from the existing date field and uses those three columns for partitioning. The jobs write the final output to our S3 output bucket.
  • Steps 5, 6, 7, 8 – After the output data is written, we can create external tables on top of it with Athena create table statements and then load partitions with MCSK REPAIR commands. After the AWS Glue Data Catalog tables are created for sales and marketing, we run an additional query through Athena, which merges these two tables by year and month to create another table with aggregated output.
  • Steps 9 and 10 – As the last step of the Step Functions workflow, we send a notification to end-users through Amazon SNS to notify them that the data refresh job completed successfully.
  • Steps 11, 12, 13 – After the aggregated table data is refreshed, business users can use QuickSight for BI reporting, which fetches data through Athena. Data analysts can also use Athena to analyze the complete refreshed dataset.

Prerequisites

Before beginning this tutorial, make sure you have the required permissions to create the resources required as part of the solution.

Additionally, create the S3 input and output buckets with the required subfolders to capture the sales and marketing data, and upload the input data into their respective folders.

Creating a DataBrew project

To create a DataBrew project for the marketing data, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create a project.
  3. For Project name, enter a name (for this post, marketing-data-etl).
  4. For Select a dataset, select New dataset.

For Select a dataset, select New dataset.

  1. For Enter your source from S3, enter the S3 path of the marketing input CSV.

For Enter your source from S3, enter the S3 path of the marketing input CSV.

  1. Under Permissions, for Role name, choose an AWS Identity and Access Management (IAM) role that allows DataBrew to read from your Amazon S3 input location.

You can choose a role if you already created one, or create a new one. Please read here for steps to create the IAM role.

  1. After the dataset is loaded, on the Functions menu, choose Date functions.
  2. Choose YEAR.

Choose YEAR.

  1. Apply the year function on the date column to create a new column called year.

  1. Repeat these steps to create month and day columns.

Repeat these steps to create month and day columns.

For our use case, we created a few new columns that we plan to use for partitioning, but you can integrate additional transformations as needed.

  1. After you have finished applying all your transformations, choose Publish on the recipe.
  2. Provide a description of the recipe version and choose Publish.

Creating a DataBrew job

Now that our recipe is ready, we can create a job for it, which gets invoked through our Step Functions state machine.

  1. On the DataBrew console, choose Jobs.
  2. Choose Create a job.
  3. For Job name¸ enter a name (for example, marketing-data-etl).

Your recipe is already linked to the job.

  1. Under Job output settings¸ for File type, choose your final storage format (for this post, we choose PARQUET).
  2. For S3 location, enter your final S3 output bucket path.
  3. For Compression, choose the compression type you want to apply (for this post, we choose Snappy).
  4. Under Additional configurations, for Custom partition by column values, choose year, month, and day.
  5. For File output storage, select Replace output files for each job run.

We choose this option because our use case is to do a full refresh.

We choose this option because our use case is to do a full refresh.

  1. Under Permissions, for Role name¸ choose your IAM role.
  2. Choose Create job.

We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

We choose this because we don’t want to run it now; we plan to invoke it through Step Functions.

  1. When your marketing job is ready, repeat the same steps for your sales data, using the sales data output file location as needed.

Creating a Step Functions state machine

We’re now ready to create a Step Functions state machine for the complete flow.

  1. On the Step Functions console, choose Create state machine.
  2. For Define state machine¸ select Author with code snippets.
  3. For Type, choose Standard.

For Type, choose Standard.

In the Definition section, Step Functions provides a list of service actions that you can use to automatically generate a code snippet for your state machine’s state. The following screenshot shows that we have options for Athena and DataBrew, among others.

  1. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

4. For Generate code snippet, choose AWS Glue DataBrew: Start a job run.

  1. For Job name, choose Select job name from a list and choose your DataBrew job.

The JSON snippet appears in the Preview pane.

  1. Select Wait for DataBrew job runs to complete.
  2. Choose Copy to clipboard.

Choose Copy to clipboard.

  1. Integrate the code into the final state machine JSON code:
    {
       "Comment":"Monthly Refresh of Sales Marketing Data",
       "StartAt":"Refresh Sales Marketing Data",
       "States":{
          "Refresh Sales Marketing Data":{
             "Type":"Parallel",
             "Branches":[
                {
                   "StartAt":"Sales DataBrew ETL Job",
                   "States":{
                      "Sales DataBrew ETL Job":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                         "Parameters":{
                            "Name":"sales-data"
                         },
                         "Next":"Drop Old Sales Table"
                      },
                      "Drop Old Sales Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"DROP TABLE IF EXISTS sales_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Create Sales Table"
                      },
                      "Create Sales Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"CREATE EXTERNAL TABLE `sales_data_output`(`date` string, `salesperson` string, `lead_name` string, `segment` string, `region` string, `target_close` string, `forecasted_monthly_revenue` int,   `opportunity_stage` string, `weighted_revenue` int, `closed_opportunity` boolean, `active_opportunity` boolean, `latest_status_entry` boolean) PARTITIONED BY (`year` string,`month` string, `day` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/sales/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Load Sales Table Partitions"
                      },
                      "Load Sales Table Partitions":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"MSCK REPAIR TABLE sales_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "End":true
                      }
                   }
                },
                {
                   "StartAt":"Marketing DataBrew ETL Job",
                   "States":{
                      "Marketing DataBrew ETL Job":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::databrew:startJobRun.sync",
                         "Parameters":{
                            "Name":"marketing-data-etl"
                         },
                         "Next":"Drop Old Marketing Table"
                      },
                      "Drop Old Marketing Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"DROP TABLE IF EXISTS marketing_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Create Marketing Table"
                      },
                      "Create Marketing Table":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"CREATE EXTERNAL TABLE `marketing_data_output`(`date` string, `new_visitors_seo` int, `new_visitors_cpc` int, `new_visitors_social_media` int, `return_visitors` int, `twitter_mentions` int,   `twitter_follower_adds` int, `twitter_followers_cumulative` int, `mailing_list_adds_` int,   `mailing_list_cumulative` int, `website_pageviews` int, `website_visits` int, `website_unique_visits` int,   `mobile_uniques` int, `tablet_uniques` int, `desktop_uniques` int, `free_sign_up` int, `paid_conversion` int, `events` string) PARTITIONED BY (`year` string, `month` string, `day` string) ROW FORMAT SERDE   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION  's3://<your-bucket-name>/sales-pipeline/transformed/marketing/' TBLPROPERTIES ('classification'='parquet', 'compressionType'='none', 'typeOfData'='file')",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "Next":"Load Marketing Table Partitions"
                      },
                      "Load Marketing Table Partitions":{
                         "Type":"Task",
                         "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
                         "Parameters":{
                            "QueryString":"MSCK REPAIR TABLE marketing_data_output",
                            "WorkGroup":"primary",
                            "ResultConfiguration":{
                               "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                            }
                         },
                         "End":true
                      }
                   }
                }
             ],
             "Next":"Drop Old Summerized Table"
          },
          "Drop Old Summerized Table":{
             "Type":"Task",
             "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
             "Parameters":{
                "QueryString":"DROP TABLE default.sales_marketing_revenue",
                "WorkGroup":"primary",
                "ResultConfiguration":{
                   "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                }
             },
             "Next":"Create Summerized Output"
          },
          "Create Summerized Output":{
             "Type":"Task",
             "Resource":"arn:aws:states:::athena:startQueryExecution.sync",
             "Parameters":{
                "QueryString":"CREATE TABLE default.sales_marketing_revenue AS SELECT * FROM (SELECT sales.year, sales.month, total_paid_conversion, total_weighted_revenue FROM (SELECT year, month, sum(paid_conversion) as total_paid_conversion FROM default.marketing_data_output group by year, month) sales INNER JOIN (SELECT year, month, sum(weighted_revenue) as total_weighted_revenue FROM default.sales_data_output group by year, month) marketing on sales.year=marketing.year AND sales.month=marketing.month) ORDER BY year DESC, month DESC",
                "WorkGroup":"primary",
                "ResultConfiguration":{
                   "OutputLocation":"s3://<your-aws-athena-query-results-bucket-path>/"
                }
             },
             "Next":"Notify Users"
          },
          "Notify Users":{
             "Type":"Task",
             "Resource":"arn:aws:states:::sns:publish",
             "Parameters":{
                "Message":{
                   "Input":"Monthly sales marketing data refreshed successfully!"
                },
                "TopicArn":"arn:aws:sns:us-east-1:<account-id>:<sns-topic-name>"
             },
             "End":true
          }
       }
    }

The following diagram is the visual representation of the state machine flow. With the Step Functions parallel task type, we created two parallel job runs for the sales and marketing data. When both flows are complete, they join to create an aggregated table in Athena and send an SNS notification to the end-users.

The following diagram is the visual representation of the state machine flow.

Creating an EventBridge scheduling rule

Now let’s integrate EventBridge to schedule the invocation of our Step Functions state machine on the first day of every month.

  1. On the EventBridge console, under Events, choose Rules.
  2. Choose Create a rule.
  3. For Name, enter a name (for example, trigger-step-funcion-rule).
  4. Under Define pattern, select Schedule.
  5. Select Cron expression.
  6. Enter 001** to specify that the job runs on the first day of every month at midnight.

  1. In the Select targets section, for Target, choose Step Functions state machine
  2. For State machine, choose your state machine.

For State machine, choose your state machine.

Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.
Now when the step function is being invoked, its run flow looks like the following screenshot, where blue represents the DataBrew jobs currently running.

When the job is complete, all the steps should be green.

When the job is complete, all the steps should be green.

You also receive the notification “Monthly sales marketing data refreshed successfully!”

Running an Athena query

Let’s validate the aggregated table output in Athena by running a simple SELECT query. The following screenshot shows the output.

Let’s validate the aggregated table output in Athena by running a simple SELECT query.

Creating reports in QuickSight

Now let’s do our final step of the architecture, which is creating BI reports through QuickSight by connecting to the Athena aggregated table.

  1. On the QuickSight console, choose Athena as your data source.

On the QuickSight console, choose Athena as your data source.

  1. Select the database and table name you have in Athena.

Select the database and table name you have in Athena.

Now you can create a quick report to visualize your output, as shown in the following screenshot.

Now you can create a quick report to visualize your output, as shown in the following screenshot.

 

If QuickSight is using SPICE storage, you need to refresh the dataset in QuickSight after you receive notification about the completion of data refresh. If the QuickSight report is running an Athena query for every request, you might see a “table not found” error when data refresh is in progress. We recommend leveraging SPICE storage to get better performance.

Conclusion

This post explains how to integrate a DataBrew job and Athena queries with Step Functions to implement a simple ETL pipeline that refreshes aggregated sales and marketing data for BI reporting.

I hope this gives you a great starting point for using this solution with your datasets and applying business rules to build a complete serverless data analytics pipeline.


About the Author

Sakti Mishra

Sakti Mishra is a Senior Data Lab Solution Architect at AWS, where he helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places.

Testing data quality at scale with PyDeequ

Post Syndicated from Calvin Wang original https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/

You generally write unit tests for your code, but do you also test your data? Incoming data quality can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. Examples of data quality issues include the following:

  • Missing values can lead to failures in production system that require non-null values (NullPointerException)
  • Changes in the distribution of data can lead to unexpected outputs of machine learning (ML) models
  • Aggregations of incorrect data can lead to wrong business decisions

In this post, we introduce PyDeequ, an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark, the language of choice of many data scientists. PyDeequ democratizes and extends the power of Deequ by allowing you to use it alongside the many data science libraries that are available in that language. Furthermore, PyDeequ allows for fluid interface with Pandas DataFrames as opposed to restricting within Apache Spark DataFrames.

Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (billions of rows) that typically live in a data lake, distributed file system, or a data warehouse. PyDeequ gives you access to this capability, but also allows you to use it from the familiar environment of your Python Jupyter notebook.

Deequ at Amazon

Deequ is used internally at Amazon to verify the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues don’t propagate to consumer data pipelines, reducing their blast radius.

Deequ is also used within Amazon SageMaker Model Monitor. Now with the availability of PyDeequ, you can use it from a broader set of environments— Amazon SageMaker notebooks, AWS Glue, Amazon EMR, and more.

Overview of PyDeequ

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram):

  • Metrics computation – Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon Simple Storage Service (Amazon S3) and compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
  • Constraint verification – As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
  • Constraint suggestion – You can choose to define your own custom data quality constraints or use the automated constraint suggestion methods that profile the data to infer useful constraints.
  • Python wrappers – You can call each Deequ function using Python syntax. The wrappers translate the commands to the underlying Deequ calls and return their response.

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram)

Use case overview

As a running example, we use a customer review dataset provided by Amazon on Amazon S3. We intentionally follow the example in the post Test data quality at scale with Deequ to show the similarity in functionality and implementation. We begin the way many data science projects do: with initial data exploration and assessment in a Jupyter notebook.

If you’d like to follow along with a live Jupyter notebook, check out the notebook on our GitHub repo.

During the data exploration phase, you want to easily answer some basic questions about the data:

  • Are the fields that are supposed to contain unique values really unique? Are there fields that are missing values?
  • How many distinct categories are there in the categorical fields?
  • Are there correlations between some key features?
  • If there are two supposedly similar datasets (such as different categories or different time periods), are they really similar?

We also show you how to scale this approach to large-scale datasets, using the same code on an Amazon EMR cluster. This is how you’d likely do your ML training, and later as you move into a production setting.

Starting a PySpark session in a SageMaker notebook

To follow along with this post, open up a SageMaker notebook instance, clone the PyDeequ GitHub on the Sagemaker notebook instance, and run the test_data_quality_at_scale.ipynb notebook from the tutorials directory from the PyDeequ repository.

Let’s install our dependencies first in a terminal window:

$ pip install pydeequ

Next, in a cell of our SageMaker notebook, we need to create a PySpark session:

import sagemaker_pyspark
import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

Loading data

Load the dataset containing reviews for the category Electronics into our Jupyter notebook:

df = spark.read.parquet("s3a://amazon-reviews-pds/parquet/product_category=Electronics/")

After you load the DataFrame, you can run df.printSchema() to view the schema of the dataset:

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)

Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. As with Deequ, PyDeequ supports a rich set of metrics. For more information, see Test data quality at scale with Deequ or the GitHub repo. In the following example, we use the AnalysisRunner to capture the metrics you’re interested in:

from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("review_id")) \
                    .addAnalyzer(ApproxCountDistinct("review_id")) \
                    .addAnalyzer(Mean("star_rating")) \
                    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
                    .addAnalyzer(Correlation("total_votes", "star_rating")) \
                    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

The following table summarizes our findings.

NameInstanceValue
ApproxCountDistinctreview_id3010972
Completenessreview_id1
Compliancetop star_rating0.74941
Correlationhelpful_votes,total_votes0.99365
Correlationtotal_votes,star_rating-0.03451
Meanstar_rating4.03614
Size*3120938

From this, we learn the following:

  • review_id has no missing values and approximately 3,010,972 unique values
  • 9% of reviews have a star_rating of 4 or higher
  • total_votes and star_rating are not correlated
  • helpful_votes and total_votes are strongly correlated
  • The average star_rating is 4.0
  • The dataset contains 3,120,938 reviews

Defining and running tests for data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the VerificationSuite and add checks on attributes of the data. In this example, we test for the following properties of our data:

  • At least 3 million rows in total
  • review_id is never NULL
  • review_id is unique
  • star_rating has a minimum of 1.0 and maximum of 5.0
  • marketplace only contains US, UK, DE, JP, or FR
  • year does not contain negative values

This is the code that reflects the previous statements. For information about all available checks, see the GitHub repo. You can run this directly in the Spark shell as previously explained:

from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Amazon Electronics Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3000000) \
        .hasMin("star_rating", lambda x: x == 1.0) \
        .hasMax("star_rating", lambda x: x == 5.0)  \
        .isComplete("review_id")  \
        .isUnique("review_id")  \
        .isComplete("marketplace")  \
        .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"]) \
        .isNonNegative("year")) \
    .run()
    
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

After calling run(), PyDeequ translates your test description into Deequ, which translates it into a series of Spark jobs that are run to compute metrics on the data. Afterwards, it invokes your assertion functions (for example, lambda x: x == 1.0 for the minimum star rating check) on these metrics to see if the constraints hold on the data. The following table summarizes our findings.

Constraintconstraint_statusconstraint_message
SizeConstraint(Size(None))Success
MinimumConstraint(Minimum(star_rating,None))Success
MaximumConstraint(Maximum(star_rating,None))Success
CompletenessConstraint(Completeness(review_id,None))Success
UniquenessConstraint(Uniqueness(List(review_id)))FailureValue: 0.9926566948782706 does not meet the constraint requirement!
CompletenessConstraint(Completeness(marketplace,None))Success
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None))Success
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None))Success

Interestingly, the review_id column isn’t unique, which resulted in a failure of the check on uniqueness. We can also look at all the metrics that Deequ computed for this check by running the following:

checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult)
checkResult_df.show()

The following table summarizes our findings.

NameInstanceValue
Completenessreview_id1
Completenessmarketplace1
Compliancemarketplace contained in US,UK,DE,JP,FR1
Complianceyear is non-negative1
Maximumstar_rating5
Minimumstar_rating1
Size*3120938
Uniquenessreview_id0.99266

Automated constraint suggestion

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see the GitHub repo.

from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

The result contains a list of constraints with descriptions and Python code, so that you can directly apply it in your data quality checks. Call print(json.dumps(result_json)) to inspect the suggested constraints; the following table shows a subset.

ColumnConstraintPython code
customer_idcustomer_id is not null.isComplete("customer_id")
customer_idcustomer_id has type Integral.hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_idcustomer_id has no negative values.isNonNegative("customer_id")
helpful_voteshelpful_votes is not null.isComplete("helpful_votes")
helpful_voteshelpful_votes has no negative values.isNonNegative("helpful_votes")
marketplacemarketplace has value range “US”, “UK”, “DE”, “JP”, “FR”.isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"])
product_titleproduct_title is not null.isComplete("product_title")
star_ratingstar_rating is not null.isComplete("star_rating")
star_ratingstar_rating has no negative values.isNonNegative("star_rating")
vinevine has value range “N”, “Y”.isContainedIn("vine", ["N", "Y"])

You can explore the other tutorials in the PyDeequ GitHub repo.

Scaling to production

So far, we’ve shown you how to use these capabilities in the context of data exploration using a Jupyter notebook running on a SageMaker notebook instance. As your project matures, you need to use the same capabilities on larger and larger datasets, and in a production environment. With PyDeequ, it’s easy to make that transition. The following diagram illustrates deployment options for local and production purposes on AWS.

The following diagram illustrates deployment options for local and production purposes on AWS.

Amazon EMR and AWS Glue interface with PyDeequ through the PySpark drivers that PyDeequ utilizes as its main engine. PyDeequ can run as a PySpark application in both contexts when the Deequ JAR is added the Spark context. You can run PyDeequ’s data validation toolkit after the Spark context and drivers are configured and your data is loaded into a DataFrame. We describe the Amazon EMR configuration options and use cases in this section (configurations 2 and 3 in the diagram).

Data exploration from a SageMaker notebook via an EMR cluster

As shown in configuration 2 in the diagram, you can connect to an EMR cluster from a SageMaker notebook to run PyDeequ. This enables you to explore much larger volumes of data than you can using a single notebook. Your Amazon EMR cluster must be running Spark v2.4.6, available with Amazon EMR version 5.31 or higher, in order to work with PyDeequ. After you have a running cluster that has those components and a SageMaker notebook, you configure a SparkSession object using the following template to connect to your cluster. For more information about connecting a SageMaker notebook to Amazon EMR or the necessary IAM permissions, see Submitting User Applications with spark-submit.

In the SageMaker notebook, run the following JSON in a cell before you start your SparkSession to configure your EMR cluster:

%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
          "spark.jars.packages": "com.amazon.deequ:deequ:1.0.3",
          "spark.jars.excludes": "net.sourceforge.f2j:arpack_combined_all"
         }
}

Start your SparkSession object in a cell after the preceding configuration by running spark. Then install PyDeequ onto your EMR cluster using the SparkContext (default named sc) with the following command:

sc.install_pypi_package('pydeequ')

Now you can start using PyDeequ from your notebook to run the same statements as before, but with much larger volumes of data.

Running a transient EMR cluster

Another way to leverage the power of an EMR cluster is to treat it as a transient cluster and run it in a headless configuration, as shown in configuration 3 in the diagram. We use spark-submit in an EMR add-step to run PyDeequ on Amazon EMR. For each of the following steps, make sure to replace the values in brackets accordingly.

  1. Create a bootstrap shell script and upload it to an S3 bucket. The following code is an example of pydeequ-emr-bootstrap.sh:
    #!/bin/bash
    
    sudo python3 -m pip install --no-deps pydeequ
    sudo python3 -m pip install pandas 

  1. Create an EMR cluster via the AWS Command Line Interface (AWS CLI):
    $ aws emr create-cluster \
    --name 'my-pydeequ-cluster' \
    --release-label emr-5.31.0 --applications Name=Spark Name=Hadoop Name=Hive Name=Livy Name=Pig Name=Hue 
    --use-default-roles \
    --instance-type m5.xlarge \
    --instance-count 2 \
    --bootstrap-actions \
        Path="s3://<S3_PATH_TO_BOOTSTRAP>/pydeequ-emr-bootstrap.sh",Name='install_pydeequ' \
    --visible-to-all-users \
    --enable-debugging \
    --ec2-attributes KeyName="<MY_SSH_KEY>",SubnetId="<MY_SUBNET>" \
    --auto-scaling-role EMR_AutoScaling_DefaultRole

  1. Create your PySpark PyDeequ run script and upload into Amazon S3. The following code is our example of pydeequ-test.py:
    import sys
    import pydeequ
    from pydeequ.checks import *
    from pydeequ.verification import *
    from pyspark.sql import SparkSession, Row
    
    if __name__ == "__main__":
    
        with SparkSession.builder.appName("pydeequ").getOrCreate() as spark:
    
            df = spark.sparkContext.parallelize([
                Row(a="foo", b=1, c=5),
                Row(a="bar", b=2, c=6),
                Row(a="baz", b=3, c=None)]).toDF()
    
            check = Check(spark, CheckLevel.Error, "Integrity checks")
    
            checkResult = VerificationSuite(spark) \
                .onData(df) \
                .addCheck(
                    check.hasSize(lambda x: x >= 3) \
                    .hasMin("b", lambda x: x == 0) \
                    .isComplete("c")  \
                    .isUnique("a")  \
                    .isContainedIn("a", ["foo", "bar", "baz"]) \
                    .isNonNegative("b")) \
                .run()
    
            checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
            checkResult_df.repartition(1).write.csv("s3a://<PATH_TO_OUTPUT>/pydeequ-out.csv", sep='|')

  1. When your cluster is running and in the WAITING stage, submit your Spark job to Amazon EMR via the AWS CLI:
    $ aws emr add-steps \
    --cluster-id <MY_CLUSTER_ID> \
    --steps Type=Spark,Name="pydeequ-spark-submit",Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,--packages,com.amazon.deequ:deequ:1.0.3,--exclude-packages,net.sourceforge.f2j:arpack_combined_all,s3a://pydeequ-emr/setup/pydeequ-test.py],ActionOnFailure=CANCEL_AND_WAIT

Congratulations, you have now submitted a PyDeequ PySpark job to Amazon EMR. Give the job a few minutes to run, after which you can view your results at the S3 output path specified on the last line of pydeequ-test.py.

Afterwards, remember to clean up your results and spin down the EMR cluster using the following command:

$ aws emr terminate-clusters --cluster-ids <MY_CLUSTER_ID>

Now you can use Amazon EMR to process large datasets in batch using PyDeequ to plug into your pipelines and provide scalable tests on your data.

More examples on GitHub

You can find examples of more advanced features on the Deequ GitHub page:

  • Deequ provides more than data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
  • Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
  • If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

Conclusion

This post showed you how to use PyDeequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. PyDeequ is available via pip install and on GitHub now for you to build your own data quality management pipeline.

Learn more about the inner workings of Deequ in the VLDB 2018 paper Automating large-scale data quality verification.

Stay tuned for another post demonstrating production workflows on AWS Glue.


About the Authors

Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.

 

 

Chris Ghyzel is a Data Engineer for AWS Professional Services. Currently, he is working with customers to integrate machine learning solutions on AWS into their production pipelines.

 

 

 

Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

Building a serverless data quality and analysis framework with Deequ and AWS Glue

Post Syndicated from Vara Bonthu original https://aws.amazon.com/blogs/big-data/building-a-serverless-data-quality-and-analysis-framework-with-deequ-and-aws-glue/

With ever-increasing amounts of data at their disposal, large organizations struggle to cope with not only the volume but also the quality of the data they manage. Indeed, alongside volume and velocity, veracity is an equally critical issue in data analysis, often seen as a precondition to analyzing data and guaranteeing its value. High-quality data is commonly described as fit for purpose and a fair representation of the real-world constructs it depicts. Ensuring data sources meet these requirements is an arduous task that is best addressed through an automated approach and adequate tooling.

Challenges when running data quality at scale can include choosing the right data quality tools, managing the rules and constraints to apply on top of the data, and taking on the large upfront cost of setting up infrastructure in production.

Deequ, an open-source data quality library developed internally at Amazon, addresses these requirements by defining unit tests for data that it can then scale to datasets with billions of records. It provides multiple features, like automatic constraint suggestions and verification, metrics computation, and data profiling. For more information about how Deequ is used at Amazon, see Test data quality data at scale with Deequ.

You need to follow several steps to implement Deequ in production, including building the infrastructure, writing custom AWS Glue jobs, profiling the data, and generating rules before applying them. In this post, we introduce an open-source Data Quality and Analysis Framework (DQAF) that simplifies this process and its orchestration. Built on top of Deequ, this framework makes it easy to create the data quality jobs that you need, manage the associated constraints through a web UI, and run them on the data as you ingest it into your data lake.

Architecture

As illustrated in the following architecture diagram, the DQAF exclusively uses serverless AWS technology. It takes a database and tables in the AWS Glue Data Catalog as inputs to AWS Glue jobs, and outputs various data quality metrics into Amazon Simple Storage Service (Amazon S3). Additionally, it saves time by automatically generating constraints on previously unseen data. The resulting suggestions are stored in Amazon DynamoDB tables and can be reviewed and amended at any point by data owners in the AWS Amplify managed UI. Amplify makes it easy to create, configure, and implement scalable web applications on AWS. The orchestration of these operations is carried out by an AWS Step Functions workflow. The code, artifacts, and an installation guide are available in the GitHub repository.

As illustrated in the following architecture diagram, the DQAF exclusively uses serverless AWS technology.

In this post, we walk through a deployment of the DQAF using some sample data. We assume you have a database in the AWS Glue Data Catalog hosting one or more tables in the same Region where you deploy the framework. We use a legislators database with two tables (persons_json and organizations_json) referencing data about United States legislators. For more information about this database, see Code Example: Joining and Relationalizing Data.

In this post, we walk through a deployment of the DQAF using some sample data.

Deploying the solution

Click on the button below to launch an AWS CloudFormation stack that deploys the solution in your AWS account in the last Region that was used:

The process takes 10–15 minutes to complete. You can verify that the framework was successfully deployed by checking that the CloudFormation stacks show the status CREATE_COMPLETE.

You can verify that the framework was successfully deployed by checking that the CloudFormation stacks show the status CREATE_COMPLETE.

Testing the data quality and analysis framework

The next step is to understand (profile) your test data and set up data quality constraints. Constraints can be defined as a set of rules to validate whether incoming data meets specific requirements along various dimensions (such as completeness, consistency, or contextual accuracy). Creating these rules can be a painful process if you have lots of tables with multiple columns, but DQAF makes it easy by sampling your data and suggesting the constraints automatically.

On the Step Functions console, locate the data-quality-sm state machine, which represents the entry point to data quality in the framework. When you provide a valid input, it starts a series of AWS Glue jobs running Deequ. This step function can be called on demand, on a schedule, or based on an event. You run the state machine by entering a value in JSON format.

You run the state machine by entering a value in JSON format.

First pass and automatic suggestion of constraints

After the step function is triggered, it calls the AWS Glue controller job, which is responsible for determining the data quality checks to perform. Because the submitted tables were never checked before, a first step is to generate data quality constraints on attributes of the data. In Deequ, this is done through an automatic suggestion of constraints, a process where data is profiled and a set of heuristic rules is applied to suggest constraints. It’s particularly useful when dealing with large multi-column datasets. In the framework, this operation is performed by the AWS Glue suggestions job, which logs the constraints into the DataQualitySuggestions DynamoDB table and outputs preliminary quality check results based on those suggestions into Amazon S3 in Parquet file format.

AWS Glue suggestions job

The Deequ suggestions job generates constraints based on three major dimensions:

  • Completeness – Measures the presence of null values, for example isComplete("gender") or isComplete("name")
  • Consistency – Consistency of data types and value ranges, for example .isUnique("id") or isContainedIn("gender", Array("female", "male"))
  • Statistics – Univariate dimensions in the data, for example .hasMax("Salary", “90000”) or .hasSize(_>=10)

The following table lists the available constraints that can be manually added in addition to the automatically suggested ones.

ConstraintsArgumentSemantics
Dimension Completeness
isCompletecolumnCheck that there are no missing values in a column
hasCompletenesscolumn, udfCustom validation of missing values in a column
Dimension Consistency
isUniquecolumnCheck that there are no duplicates in a column
hasUniquenesscolumn, udfCustom validation of the unique value ratio in a column
hasDistinctnesscolumn, udfCustom validation of the unique row ratio in a column
isInRangecolumn, value rangeValidation of the fraction of values that are in a valid range
hasConsistentTypecolumnValidation of the largest fraction of values that have the same type
isNonNegativecolumnValidation whether all the values in a numeric column are non-negative
isLessThancolumn pairValidation whether all the values in the first column are always less than the second column
satisfiespredicateValidation whether all the rows match predicate
satisfiesIfpredicate pairValidation whether all the rows matching first predicate also match the second predicate
hasPredictabilitycolumn, column(s), udfUser-defined validation of the predictability of a column
Statistics (can be used to verify dimension consistency)
hasSizeudfCustom validation of the number of records
hasTypeConsistencycolumn, udfCustom validation of the maximum fraction of the values of the same datatype
hastCountDistinctcolumnCustom validation of the number of distinct non null values in a column
hasApproxCountDistinctcolumn, udfCustom validation of the approximate number of distinct non-null values
hasMincolumn, udfCustom validation of the column’s minimum value
hasMaxcolumn, udfCustom validation of the column’s maximum value
hasMeancolumn, udfCustom validation of the column’s mean value
hasStandardDeviationcolumn, udfCustom validation of the column’s standard deviation value
hasApproxQuantilecolumn,quantile,udfCustom validation of a particular quantile of a column (approximate)
hasEntropycolumn, udfCustom validation of the column’s entropy
hasMutualInformationcolumn pair,udfCustom validation of the column pair’s mutual information
hasHistogramValuescolumn, udfCustom validation of the column’s histogram
hasCorrelationcolumn pair,udfCustom validation of the column pair’s correlation

The following screenshot shows the DynamoDB table output with suggested constraints generated by the AWS Glue job.

The following screenshot shows the DynamoDB table output with suggested constraints generated by the AWS Glue job.

AWS Glue data profiler job

Deequ also supports single-column profiling of data, and its implementation scales to large datasets with billions of rows. As a result, we get a profile for each column in the data, which allows us to inspect the completeness of the column, the approximate number of distinct values, and the inferred datatype.

The controller triggers an AWS Glue data profiler job in parallel to the suggestions job. This profiler Deequ process runs three passes over the data and avoids any shuffles in order to easily scale to large datasets. Results are stored as Parquet files in the S3 data quality bucket.

When the controller job is complete, the second step in the data quality state machine is to crawl the Amazon S3 output data into a data_quality_db database in the AWS Glue Data Catalog, which is then immediately available to be queried in Amazon Athena. The following screenshot shows the list of tables created by this AWS Glue framework and a sample output from the data profiler results.

The following screenshot shows the list of tables created by this AWS Glue framework and a sample output from the data profiler results.

Reviewing and verifying data quality constraints

As good as Deequ is at suggesting data quality rules, the data stewards should first review the constraints before applying them in production. Because it may be cumbersome to edit large tables in DynamoDB directly, we have created a web app that enables you to add or amend the constraints. The changes are updated in the relevant DynamoDB tables in the background.

Accessing the web front end

To access the user interface, on the AWS Amplify console, choose the deequ-constraints app. Choosing the URL (listed as https://<env>.<appsync_app_id>.amplifyapp.com) opens the data quality constraints front end. After you complete the registration process with Amazon Cognito (create an account) and sign in, you see a UI similar to the following screenshot.

After you complete the registration process with Amazon Cognito (create an account) and sign in, you see a UI similar to the following screenshot.

It lists data quality constraint suggestions produced by the AWS Glue job in the previous step. Data owners can add or remove and enable or disable these constraints at any point via the UI. Suggestions are not enabled by default. This makes sure all constraints are human reviewed before they are processed. Choosing the check box enables a constraint.

Data analyzer (metric computations)

Alongside profiling, Deequ can also generate column-level statistics called data analyzer metrics (such as completeness, maximum, and correlation). They can help uncover data quality problems, for example by highlighting the share of null values in a primary key or the correlation between two columns.

The following table lists the metrics that you can apply to any column.

MetricSemantics
Dimension Completeness
CompletenessFraction of non-missing values in a column
Dimension Consistency
SizeNumber of records
ComplianceRatio of columns matching predicate
UniquenessUnique value ratio in a column
DistinctnessUnique row ratio in a column
ValueRangeValue range verification for a column
DataTypeData type inference for a column
PredictabilityPredictability of values in a column
Statistics (can be used to verify dimension consistency)
MinimumMinimal value in a column
MaximumMaximal value in a column
MeanMean value in a column
StandardDeviationStandard deviation of the value distribution in a column
CountDistinctNumber of distinct values in a column
ApproxCountDistinctNumber of distinct values in a column
ApproxQuantileApproximate quantile of the value in a column
CorrelationCorrelation between two columns
EntropyEntropy of the value distribution in a column
HistogramHistogram of an optionally binned column
MutualInformationMutual information between two columns

In the web UI, you can add these metrics on the Analyzers tab. In the following screenshot, we add an ApproxCountDistinct metric on an id column. Choosing Create analyzer inserts the record into the DataQualityAnalyzer table in DynamoDB and enables the constraint.

In the following screenshot, we add an ApproxCountDistinct metric on an id column.

AWS Glue verification job

We’re now ready to put our rules into production and can use Athena to look at the resultsYou can start running the step function with the same JSON as input:

{
  "glueDatabase": "legislators",
  "glueTables": "persons_json, organizations_json"
}

This time the AWS Glue verification job is triggered by the controller. This job performs two actions: it verifies the suggestion constraints and performs metric computations. You can immediately query the results in Athena under the constraints_verification_results table.

The following screenshot shows the verification output.

The following screenshot shows the verification output.

The following screenshot shows the metric computation results.

The following screenshot shows the metric computation results.

Summary

Dealing with large, real-world datasets requires a scalable and automated approach to data quality. Deequ is the tool of choice at Amazon when it comes to measuring the quality of large production datasets. It’s used to compute data quality metrics, suggest and verify constraints, and profile data.

This post introduced an open-source, serverless Data Quality and Analysis Framework that aims to simplify the process of deploying Deequ in production by setting up the necessary infrastructure and making it easy to manage data quality constraints. It enables data owners to generate automated data quality suggestions on previously unseen data that can then be reviewed and amended in a UI. These constraints serve as inputs to various AWS Glue jobs in order to produce data quality results queryable via Athena. Try this framework on your data and leave suggestions on how to improve it on our open-source GitHub repo.


About the Authors

Vara Bonthu is a Senior BigData/DevOps Architect for ProServe working with the Global Accounts team. He is passionate about big data and Kubernetes. He helps customers all over the world design, build, and migrate end-to-end data analytics and container-based solutions. In his spare time, he develops applications to help educate his 7-year-old autistic son.

 

Abdel Jaidi is a Data & Machine Learning Engineer for AWS Professional Services. He works with customers on Data & Analytics projects, helping them shorten their time to value and accelerate business outcomes. In his spare time, he enjoys participating in triathlons and walking dogs in parks in and around London.

7 most common data preparation transformations in AWS Glue DataBrew

Post Syndicated from Shilpa Mohan original https://aws.amazon.com/blogs/big-data/7-most-common-data-preparation-transformations-in-aws-glue-databrew/

For all analytics and ML modeling use cases, data analysts and data scientists spend a bulk of their time running data preparation tasks manually to get a clean and formatted data to meet their needs. We ran a survey among data scientists and data analysts to understand the most frequently used transformations in their data preparation workflow. AWS Glue DataBrew provides more than 250 built-in transformations which will make most of these tasks 80% faster. This blog covers use case based walkthroughs of how we can achieve the top 7 among those transformations in AWS Glue DataBrew.

This blog covers use case based walkthroughs of how we can achieve the top 7 among those transformations in AWS Glue DataBrew.

#1 Handling/Imputing missing values

Missing data is predominant in all datasets and can have a significant impact on the analytics or ML models using the data. Missing values in datasets can skew or bias the data and result in invalid conclusions. Handling missing values is one of the most frequently used data preparation steps.

In DataBrew project you can get a quick view of missing values in your sample data under Data quality in the Schema view and the Column statistics.

In DataBrew project you can get a quick view of missing values in your sample data under Data quality in the Schema view and the Column statistics.

For any data column you can choose to either remove the missing rows or fill it with an empty string, null, last valid value, most frequent value or a custom value. For numerical data columns you can also fill missing values with numerical aggregates of values like average, mode, sum or median of values.

Here’s an example of how we filled missing values in column “sub-product” with the custom values “None” for the Consumer complaints dataset.

Here’s an example of how we filled missing values in column "sub-product" with the custom values "None" for the Consumer complaints dataset.

#2 Combining datasets

Data analysis is often performed on a single dataset, however the key information required to arrive at useful insights might be spread throughout multiple datasets. Joins are a predominantly used data preparation step to bring together information from multiple different datasets together. In many large data scenarios, the information in a single dataset can be split or partitioned into multiple files, Union is a data preparation step used in this case to consolidate all parts of the dataset together. DataBrew supports Union and Join to combine data from multiple datasets. You can union multiple files into one at the beginning of a project or as a recipe step or join a dataset based on one or more join keys.

Multiple files as an input dataset
You can union multiple files together as a single input for any DataBrew project. Here is an example of how we union four files for NYC Parking tickets dataset.

You can select all files in a folder as an input for your project to union all the data in the files. DataBrew supports parameterized input path to customize the files you would like to combine.

DataBrew supports parameterized input path to customize the files you would like to combine.

Union as a transformation

In a project, you can add the union as a recipe step to combine multiple files. You will need to pre-create all the required datasets in DataBrew to perform this as a recipe step. Union is available as a transformation in the project toolbar.

Union is available as a transformation in the project toolbar.

You can select multiple datasets with preview for the Union transform. You can then specify whether to map column by names or position in the dataset and also order the datasets to control the order of rows in the data after union.

Detailed column mapping allows you to customize the columns mapping, resulting column name and column type.

Detailed column mapping allows you to customize the columns mapping, resulting column name and column type.

You can preview the resulting dataset before applying the transformation, which is then added to your recipe.

Joining datasets

The Join transform allows you to bring in data from a secondary dataset. The example covers joining of UN General Assembly Votes – Resolutions data with UN General Assembly Votes – States data using “resolution session” as the join key.

You can choose from the different Join types, the visual representation helps you identify the right type of join for your scenario. You can add one or more join keys.

The columns list allows you to search the entire list of columns from both the datasets and choose which columns you want to retain as part of the Join operation.

The columns list allows you to search the entire list of columns from both the datasets and choose which columns you want to retain as part of the Join operation.

You can also preview your joined table before you complete the transform.

#3 Creating columns

Often data available in a dataset might not be represented as the values you may need for downstream data analysis or data modeling. As part of data prep, data analyst and data scientist create these custom data columns in a format that would suit their data analysis needs.

You can create columns with extracted values or flagged values from existing column. DataBrew also provides a collection of functions that help you create new columns. It covers math, aggregate, text, date, windows, web and other functions.

You can create columns with extracted values or flagged values from existing column. DataBrew also provides a collection of functions that help you create new columns.

For example in a Netflix Movies and TV Shows dataset you can create a column that tell you how many years the title has been available on Netflix but using a DATEDIFF function on the “date_added” column.

Create a column using a function

You can select the DATEDIFF from the date functions from the toolbar. You can calculate the number of years by doing  calculating the difference of the “date_added” value and current date. You can set the output to be calculated in years.

You can set the output to be calculated in years.

As all transformations, you can preview the transform before applying it.

Creating a Flag column

From the same Netflix Movies and TV Shows dataset, you can create a flag column from the create column option in the toolbar.

You can flag titles that starred Kate Hudson by flagging a custom value “Kate Hudson” in the “Cast” column. You have different options for values of the flag column, in this case we will go with Yes and No.

You have different options for values of the flag column, in this case we will go with Yes and No.

#4 Filtering data

You can filter values in a dataset as a transformation or as a filter the data in your grid view.

If you select “Apply as a step”, the filter is added to your recipe as a step.

You can “Filter values” to filter values in view of the project. All applied filters are shown in the toolbar. You can choose to apply all the filters as a step from the toolbar at any time.

You can conditionally apply transformations on the conditionally filtered values.

For example, in the Netflix Movies and TV Shows dataset, we can replace the word “deformed” with “physically different” but only for Kids related titles.

For example, in the Netflix Movies and TV Shows dataset, we can replace the word “deformed” with “physically different” but only for Kids related titles.

We can filter the “listed_on” column by “Kid’s TV” and “Children & Family Movies”. Now on the “description” column we can replace values from the clean menu on the toolbar. On the transformation panel, under “Apply transform to” you can choose to apply the transformations only to the filtered rows. This will replace the term “deformed” with “physically different” only for Kids titles in the entire dataset.

This will replace the term “deformed” with “physically different” only for Kids titles in the entire dataset.

#5 Aggregating data

You can aggregate data in DataBrew by using the Group by transformation. For the example dataset of New York City Airbnb Open Data, we can create an aggregated minimum and maximum price by neighborhood.

You can select Group By transformation from the toolbar. We first select the column to group by “Neighborhood”. We can then add two aggregated columns based on column “Price” for “min” and “max” of the values in the column.

You can select Group By transformation from the toolbar. We first select the column to group by “Neighborhood”.

By default the columns are added as new columns in the existing dataset. You can choose to create a table with only the specified columns above.

#6 Handling Categorical values

For most ML modeling algorithms with categorical values like Gender, Product category or Education level need to be converted to numerical formats. DataBrew supports Categorical mapping and One-Hot Encoding.

Categorical or label mapping

Ordinal categorical values are ordered or hierarchical like Education level or T-shirt sizes e.g: Large is greater than Small so small can be labeled as 1 and large as 2 in numerical format. Easiest way to handle such variables is by using Categorical mapping in DataBrew. It can be accessed from the toolbar under Mapping.

For the example dataset of New York City Airbnb Open Data the “room_type” has values that are ordered. On the Categorical mapping form, you can choose to “Map all values” under mapping options. You can then select the checkbox “Map values to numeric values” or custom enter the values as required. Apply and you have new column with numerical values mapped to the original column.

Apply and you have new column with numerical values mapped to the original column.

One-Hot Encoding 

For all non-ordinal categorical values like gender or product category, One-hot encoding is the most common way to convert them to numerical format. It can be accessed in a DataBrew project in the toolbar under Encoding.

For the column “neighborhood_group” in the New York City Airbnb Open Data, all you have to do is open the one-hot encoding form and click apply. All the required columns with the encoded numerical values are generated instantly. This would usually take a Data Scientist hours to perform manually.

This would usually take a Data Scientist hours to perform manually.

#7 Handling Numerical values

Columns with numerical values are often not streamlined enough to be processed by an ML algorithm. A dataset may have columns with numerical values that are on very different scales for e.g: capacity would have a range of values from 0 to 100 but price could have a range of 10 to 10000. For such instances the columns would need to be rescaled to a common scale like 0 to 1. Along with scaling issues, we may also have data with outliers that need to be handled, hence scaling, normalizing and standardizing are some common transformations performed on numerical values on datasets used for ML models.

You can handle a column with numerical values like “Price” in New York City Airbnb Open Data using any of the transformations under Scale in the toolbar.

DataBrew provides you multiple techniques to rescale you data like Min-max normalizationScaling between specified valuesMean normalization or standardization, and Z-score normalization or standardization. In this use case, as price has an outlier you can select Z-score normalization or standardization to best scale the values for your ML model.

DataBrew provides you multiple techniques to rescale you data.

These are some of the most frequently used Data preparation transformations demonstrated in AWS Glue DataBrew. With more than 250 built-in transformation, you can find one that meets your data preparation use case and reduce the time and effort that goes into cleaning data.

Jump in and try out AWS Glue DataBrew today.

Datasets used in this blog:

Consumer complaints : https://www.consumerfinance.gov/data-research/consumer-complaints/

NYC Parking tickets : https://www.kaggle.com/new-york-city/nyc-parking-tickets

UN General Assembly Votes – Resolutions: Sample dataset available in AWS Glue DataBrew

UN General Assembly Votes – States: Sample dataset available in AWS Glue DataBrew

Netflix Movies and TV Shows : https://www.kaggle.com/shivamb/netflix-shows

New York City Airbnb Open Data – https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data


About the Authors

Shilpa Mohan is a Sr. UX designer at AWS and leads the design of AWS Glue DataBrew. With over 13 years of experience across multiple enterprise domains, she is currently crafting products for Database, Analytics and AI services for AWS. Shilpa is a passionate creator, she spends her time creating anything from content, photographs to crafts

 

 

Romi Boimer is a Sr. Software Development Engineer at AWS and a technical lead for AWS Glue DataBrew. She designs and builds solutions that enable customers to efficiently prepare and manage their data. Romi has a passion for aerial arts, in her spare time she enjoys fighting gravity and hanging from fabric.

Dream11’s journey to building their Data Highway on AWS

Post Syndicated from Pradip Thoke original https://aws.amazon.com/blogs/big-data/dream11s-journey-to-building-their-data-highway-on-aws/

This is a guest post co-authored by Pradip Thoke of Dream11. In their own words, “Dream11, the flagship brand of Dream Sports, is India’s biggest fantasy sports platform, with more than 100 million users. We have infused the latest technologies of analytics, machine learning, social networks, and media technologies to enhance our users’ experience. Dream11 is the epitome of the Indian sports technology revolution.”

Since inception, Dream11 has been a data-driven sports technology brand. 

Since inception, Dream11 has been a data-driven sports technology brand. The systems that power Dream11, including their transactional data warehouse, run on AWS. As Dream11 hosts fantasy sports contests that are joined by millions of Indian sports fans, they have large volumes of transactional data that is organized in a well-defined Amazon Redshift data warehouse. Previously they were using 3rd party services to collect, analyze and build models over user interaction data combined with transactional data. Although this approach was convenient, it presented certain critical issues:

  • The approach wasn’t conducive to 360-degree user analytics. Dream11’s user interactions data wasn’t present on the cloud, where the rest of Dream11’s infrastructure and data were present (AWS, in this case). To get a complete picture of a user’s experience and journey, the user’s interaction data (client events) needs to be analyzed alongside their transactional data (server events). This is known as 360-degree user analytics.
  • It wasn’t possible to get accurate user journey funnel reports. Currently, there are limitations with every tool available on the market with respect to identifying and mapping a given user’s actions across multiple platforms (on the web, iOS, or Android), as well as multiple related apps. This use case is specifically important if your company is a parent to other companies.
  • The statistics on user behavior that Dream11 was getting weren’t as accurate as they wanted. Some of the popular services they were using for web & mobile analytics use the technique of sampling to be able to deal with high volumes of data. Although this is a well-regarded technique to deal with high volumes of data and provides reasonable accuracy in multiple cases, Dream11 wanted statistics to be as accurate as possible.
  • The analytics wasn’t real-time. Dream11 experiences intense use by their users just before and during the real-life sports matches, so real-time and near-real-time analytics is very critical for them. This need wasn’t sufficiently met by the plethora of services they were using.
  • Their approach was leading to high cost for custom analytics for Dream11’s user interactions data, consisting of hundreds of event types. Serverless query engines typically charge by the amount of data scanned and so it can get very expensive if events data isn’t organized properly in separate tables in a data lake to enable selective access.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform. Therefore, they embarked on the Data Highway project on AWS.

This project has additional advantages. It is increasingly becoming important to store and process data securely. Having everything in-house can help Dream11 with data security and data privacy objectives. The platform enables 360-degree customer analytics, which further allows Dream11 to do intelligent user segmentation in-house and share only those segments (without exposing underlying transactional or interactions data) with third-party messaging service providers. 

Design goals

Dream11 had the following design goals for the project:

  • The system should be easy to maintain and should be able to handle a very high volume of data, consisting of billions of events and terabytes of data daily.
  • The cost should be low and should be pay-as-you-go.
  • Dream11’s web and mobile developers regularly create new types of events to capture new types of interactions. Whenever they add new types of events, they should be immediately available in the system for analytics, and their statistics should immediately reflect in relevant dashboards and reports without any human intervention.
  • Certain types of statistics (such as concurrency) should be available in real-time and near-real time—within 5 minutes or less.
  • Dream11 should be able to use custom logic to calculate key statistics. The analytics should be accurate—no more sampling.
  • The data for various events should be neatly organized in separate tables and analytics-friendly file formats.
  • Although Dream11 will have a common data lake, they shouldn’t be constrained to use a single analytics engine for all types of analytics. Different types of analytics engines excel for different types of queries.
  • The Product Management team should have access to views they commonly use in their decision-making process, such as funnels and user flow diagrams.
  • The system should be extensible by adding lanes in the system. Lanes allow you to reuse your basic setup without mixing events data for different business units. It also potentially allows you to study user behavior across different apps.
  • The system should be able to build 360-degree user profiles
  • The system should provide alerting on important changes to key business metrics.
  • Last but not the least, the system should be secure and reliable with 6 nines of availability guarantee.

Data Highway architecture

In less than 3 months, Dream11’s data team built a system that met all the aforementioned goals. The following diagram shows the high-level architecture.

The following diagram shows the high-level architecture.

For this project, they used the following components:

The rest of this post explains the various design choices and trade-offs made by the Dream11’s data engineers. 

Event ingestion, segregation, and organization

Dream11 has several hundred event types. These events have common attributes and specific attributes. The following diagram shows the logical structure of these events.

The following diagram shows the logical structure of these events.

When the front end receives an event, it saves fields up to common attributes into a message and posts it to Kafka_AllEvents_CommonAttributes. This Kafka topic is the source for the following systems:

  • Apache HBase on Amazon EMR – Provides real-time concurrency analytics
  • Apache Druid – Provides near real-time dimensional analytics
  • Amazon Redshift – Provides session analytics

The front end also saves events, as they are, into Kafka_AllEvents_AllAttributes. These events are further picked by Apache Ni-Fi, which forwards them to their respective topics. Apache Ni-Fi supports data routing, transformation, and system mediation logic using powerful and scalable directed graphs. Data is transformed and published to Kafka by using a combination of RouteOnAttribute and JoltTransformJSON processors (to parse JSON). Apache Ni-Fi basically reads event names and posts to the Kafka topic with matching names. If Kafka doesn’t have a topic with that name, it creates a new topic with that name. You can configure your Kafka brokers to auto-create a topic when a message is received for a non-existent topic.

The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

  The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

 

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

Storage, cataloging, ETL, and scheduling

In this section, we discuss how Dream11 updates their AWS Glue Data Catalog, performs extract, transform, and load (ETL) jobs with Amazon EMR Presto, and uses Apache Airflow for schedule management.

Updating the AWS Glue Data Catalog with metadata for the target table

The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. It provides out-of-the-box integration with Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon Redshift, Amazon Redshift Spectrum, Athena, Amazon EMR, and any application compatible with the Apache Hive metastore. You can create your table definitions one time and query across engines. For more information, see FAQ: Upgrading to the AWS Glue Data Catalog.

Because this Data Catalog is accessible from multiple services that were going to be used for the Data Highway project, Dream11 decided to use it to register all the table definitions.

Registering tables with AWS Glue Data Catalog is easy. You can use an AWS Glue crawler. It can infer schema from files in Amazon S3 and register a table in the Data Catalog. It works quite well, but Dream11 needed additional actions, such as automatically configuring Kafka Amazon S3 sink connectors etc. Therefore, they developed two Python based crawlers.

The first Python based crawler runs every 2 hours and looks up Kafka topics. If it finds a new topic, it configures a Kafka Amazon S3 connector sink to dump its data to Amazon S3 every 30 minutes in JSON Gzip format. It also registers a table with Glue Data Catalog so that users can query the JSON data directly, if needed. 

The second Python based crawler runs once a day and registers a corresponding table for each new table created that day to hold flattened data (Parquet, Snappy). It infers schemas and registers tables with the Data Catalog using its Table API. It adds customization needed by the Dream11 team to the metadata. It then creates Amazon EMR Presto ETL jobs to convert JSON, Gzip data to Parquet, Snappy, and registers them with Apache Airflow to run every 24 hours.

ETL with Amazon EMR Presto

Dream11 has a multi node, long-running, multi-purpose EMR cluster. They decided to run scheduled ETL jobs on it for the Data Highway project.

ETL for an event table involves a simple SELECT FROM -> INSERT INTO command to convert JSON (Gzip) to Parquet (Snappy). Converted data takes up to 70% less space, results in 10 times improvement in Athena query performance. ETL happens once a day. Tables are partitioned by day.

Data received on Kafka_AllEvents_CommonAttributes topic is loaded to Redshift. ETL involves SELECT FROM -> INSERT INTO to convert JSON (Gzip) to CSV, followed by Amazon Redshift COPY.

Apache Airflow for schedule management

Apache Airflow is an open-source tool for authoring and orchestrating big data workflows. With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that run independently. The DAG keeps track of the relationships and dependencies between tasks.

Dream11 uses Apache Airflow to schedule Python scripts and over few hundred ETL jobs on Amazon EMR Presto to convert JSON (Gzip) data for over few hundred events to Parquet (Snappy) format, and converts JSON data containing common attributes for all events to CSV before loading to Amazon Redshift. For more information, see Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling. 

Real-time and near-real-time analytics

In this section, we discuss the real-time and near-real-time analytics performed on Dream11’s data.

Concurrency analytics with Apache Druid

Apache Druid is an OLAP-style data store. It computes facts and metrics against various dimensions while data is being loaded. This avoids the need to compute results when a query is run.

Dream11’s web and mobile events are loaded from the Kafka_AllEvents_CommonAttributes topic to Apache Druid with the help of the Apache Druid Kafka indexing service. Dream11 has a dashboard with different granularity levels and dimensions such as app version, org, and other dimensions present in the common event attributes list.

Finding active users with Amazon EMR HBase

Dream11 also needs to identify individual active users at any given time or during a given window. This is required by other downstream teams such as the Data Science team and Digital User Engagement team.

With the help of a Java consumer, they push all events from the Kafka_AllEvents_ CommonAttributes topic to HBase on an EMR cluster with just required user dimensions. They can query the data in HBase with SQL syntax supported by the Apache Phoenix interface. 

Session analytics with Amazon Redshift

Dream11 maintains their transactional data warehouse on Amazon Redshift multi node cluster. Amazon Redshift allows them to run complex SQL queries efficiently. Amazon Redshift would have been a natural choice for event analytics for hundreds of event types. However, in Dream11’s case, events data grows very rapidly and this would be a lot of data in Amazon Redshift. Also, this data loses its value rapidly as time passes (relatively speaking) compared with transactional data. Therefore, they decided to do only session analytics in Amazon Redshift to benefit from its complex SQL query capabilities and to do analytics for individual events with the help of Athena (which we discuss in the next section).

Data received on Kafka_AllEvents_CommonAttributes is loaded into Amazon S3 every 30 minutes by the associated kafka connector sink. This data is in JSON format with Gzip compression. Every 24 hours, a job runs on Amazon EMR Presto that flattens this data into CSV format. The data is loaded into Amazon Redshift with the COPY command. The data gets loaded first into a staging table. Data in the staging table is aggregated to get sessions data. Amazon Redshift already has transactional data from other tables that, combined now with the session data, allows Dream11 to perform 360-degree user analytics. They can now easily segment users based on their interactions data and transactions data. They can then run campaigns for those users with the help of messaging platforms. 

Event analytics with Athena

Dream11 uses Athena to analyze the data in Amazon S3. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. It made perfect sense to organize data for over hundreds of event tables in Amazon S3 and analyze them with Athena on demand.

With Athena, you’re charged based on the amount of data scanned by each query. You can get significant cost savings and performance gains by compressing, partitioning, or converting your data to a columnar format, because each of those operations reduces the amount of data that Athena needs to scan to run a query. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

As discussed before, Dream11 has registered over hundreds of tables for events data in JSON format, and similar number of tables for events data in Parquet format with the AWS Glue Data Catalog. They observed a performance gain of 10 times on conversion of data format to Parquet, and an 80% savings in space. Data in Amazon S3 can be queried directly through the Athena UI with SQL queries. The other option they use is connecting to Athena using a JDBC driver from Looker and their custom Java UI for the Data Aware project.

Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

  Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 

Conclusion

This architecture has enabled Dream11 to achieve all the design goals they set out with. Results of analytics for real-time requirements are available under millisecond latency, and the system costs 40% less than the previous system. Analytics is performed with all the data without sampling, so results are accurate and reliable. All the data and analytics engines are within Dream11’s AWS account, improving data security and privacy.

As of this writing, the system handles 14 TB of data per day and it has served 80 million requests per minute at peak during Dream11 IPL 2020.

Doing all their analytics in-house on AWS has not just improved speed, accuracy, and data security, it has also enabled newer possibilities. Now Dream11 has a 360-degree view of their users. They can study their users’ progress across multiple platforms – web, Android, and IOS. This new system is enabling novel applications of machine learning, digital user engagement, and social media technologies at Dream11.


About the Authors

Pradip Thoke is a AVP Data Engineering at Dream11 and leads their Data Engineering team. The team involved in this implementation includes Vikas Gite, Salman Dhariwala, Naincy Suman, Lavanya Pulijala, Ruturaj Bhokre, Dhanraj Gaikwad, Vishal Verma, Hitesh Bansal, Sandesh Shingare, Renu Yadav, Yash Anand, Akshay Rochwani, Alokh P, Sunaim and Nandeesh Bijoor.

 

Girish Patil is a Principal Architect AI, Big Data, India Scale Apps for Amazon.

Setting up automated data quality workflows and alerts using AWS Glue DataBrew and AWS Lambda

Post Syndicated from Romi Boimer original https://aws.amazon.com/blogs/big-data/setting-up-automated-data-quality-workflows-and-alerts-using-aws-glue-databrew-and-aws-lambda/

Proper data management is critical to successful, data-driven decision-making. An increasingly large number of customers are adopting data lakes to realize deeper insights from big data. As part of this, you need clean and trusted data in order to gain insights that lead to improvements in your business. As the saying goes, garbage in is garbage out—the analysis is only as good as the data that drives it.

Organizations today have continuously incoming data that may develop slight changes in schema, quality, or profile over a period of time. To ensure data is always of high quality, we need to consistently profile new data, evaluate that it meets our business rules, alert for problems in the data, and fix any issues. In this post, we leverage AWS Glue DataBrew, a visual data preparation tool that makes it easy to profile and prepare data for analytics and machine learning (ML). We demonstrate how to use DataBrew to publish data quality statistics and build a solution around it to automate data quality alerts.

Overview of solution

In this post, we walk through a solution that sets up a recurring profile job to determine data quality metrics and, using your defined business rules, report on the validity of the data. The following diagram illustrates the architecture.

We’ll walk through a solution that takes sets up a recurring Profile job to determine data quality metrics, and using your defined business rules.

The steps in this solution are as follows:

  1. Periodically send raw data to Amazon Simple Storage Service (Amazon S3) for storage.
  2. Read the raw data in Amazon S3 and generate a scheduled DataBrew profile job to determine data quality.
  3. Write the DataBrew profile job output to Amazon S3.
  4. Trigger an Amazon EventBridge event after job completion.
  5. Invoke an AWS Lambda function based on the event, which reads the profile output from Amazon S3 and determines whether the output meets data quality business rules.
  6. Publish the results to an Amazon Simple Notification Service (Amazon SNS) topic.
  7. Subscribe email addresses to the SNS topic to inform members of your organization.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Deploying the solution

For a quick start of this solution, you can deploy the provided AWS CloudFormation stack. This creates all the required resources in your account (us-east-1 Region). Follow the rest of this post for a deeper dive into the resources.

  1. Choose Launch Stack:

  1. In Parameters, for Email, enter an email address that can receive notifications.
  2. Scroll to the end of the form and select I acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Create stack.

It takes a few minutes for the stack creation to complete; you can follow progress on the Events tab.

  1. Check your email inbox and choose Confirm subscription in the email from AWS Notifications.

The default behavior of the deployed stack runs the profile on Sundays. You can start a one-time run from the DataBrew console to try out the end-to-end solution.

Setting up your source data in Amazon S3

In this post, we use an open dataset of New York City Taxi trip record data from The Registry of Open Data on AWS. This dataset represents a collection of CSV files defining trips taken by taxis and for-hire vehicles in New York City. Each record contains the pick-up and drop-off IDs and timestamps, distance, passenger count, tip amount, fair amount, and total amount. For the purpose of illustration, we use a static dataset; in a real-world use case, we would use a dataset that is refreshed at a defined interval.

You can download the sample dataset (us-east-1 Region) and follow the instructions for this solution, or use your own data that gets dumped into your data lake on a recurring basis. We recommend creating all your resources in the same account and Region. If you use the sample dataset, choose us-east-1.

Creating a DataBrew profile job

To get insights into the quality of our data, we run a DataBrew profile job on a recurring basis. This profile provides us with a statistical summary of our dataset, including value distributions, sparseness, cardinality, and type determination.

Connecting a DataBrew dataset

To connect your dataset, complete the following steps:

  1. On the DataBrew console, in the navigation pane, choose Datasets.
  2. Choose Connect new dataset.
  3. Enter a name for the dataset.
  4. For Enter your source from S3, enter the S3 path of your data source. In our case, this is s3://nyc-tlc/misc/.
  5. Select your dataset (for this post, we choose the medallions trips dataset FOIL_medallion_trips_june17.csv).

  1. Scroll to the end of the form and choose Create dataset.

Creating the profile job

You’re now ready to create your profile job.

  1. In the navigation pane, choose Datasets.
  2. On the Datasets page, select the dataset that you created in the previous step. The row in the table should be highlighted.
  3. Choose Run data profile.
  4. Select Create profile job.
  5. For Job output settings, enter an S3 path as destination for the profile results. Make sure to note down the S3 bucket and key, because you use it later in this tutorial.
  6. For Permissions, choose a role that has access to your input and output S3 paths. For details on required permissions, see DataBrew permission documentation.
  7. On the Associate schedule drop-down menu, choose Create new schedule.
  8. For Schedule name, enter a name for the schedule.
  9. For Run frequency, choose a frequency based on the time and rate at which your data is refreshed.
  10. Choose Add.

  1. Choose Create and run job.

The job run on sample data typically takes 2 minutes to complete.

Exploring the data profile

Now that we’ve run our profile job, we can expose insightful characteristics about our dataset. We can also review the results of the profile through the visualizations of the DataBrew console or by reading the raw JSON results in our S3 bucket.

The profile analyzes both at a dataset level and column level granularity. Looking at our column analytics for String columns, we have the following statistics:

  • MissingCount – The number of missing values in the dataset
  • UniqueCount – The number of unique values in the dataset
  • Datatype – The data type of the column
  • CommonValues – The top 100 most common strings and their occurrences
  • Min – The length of the shortest String value
  • Max – The length of the longest String value
  • Mean – The average length of the values
  • Median – The middle value in terms of character count
  • Mode – The most common String value length
  • StandardDeviation – The standard deviation for the lengths of the String values

For numerical columns, we have the following:

  • Min – The minimum value
  • FifthPercentile – The value that represents 5th percentile (5% of values fall below this and 95% fall above)
  • Q1 – The value that represents 25th percentile (25% of values fall below this and 75% fall above)
  • Median – The value that represents 50th percentile (50% of values fall below this and 50% fall above)
  • Q3 – The value that represents 75th percentile (75% of values fall below this and 25% fall above)
  • NinetyFifthPercentile – The value that represents 95th percentile (95% of values fall below this and 5% fall above)
  • Max – The highest value
  • Range – The difference between the highest and lowest values
  • InterquartileRange – The range between the 25th percentile and 75th percentile values
  • StandardDeviation – The standard deviation of the values (measures the variation of values)
  • Kurtosis – The kurtosis of the values (measures the heaviness of the tails in the distribution)
  • Skewness – The skewness of the values (measures symmetry in the distribution)
  • Sum – The sum of the values
  • Mean – The average of the values
  • Variance – The variance of the values (measures divergence from the mean)
  • CommonValues – A list of the most common values in the column and their occurrence count
  • MinimumValues – A list of the 5 minimum values in the list and their occurrence count
  • MaximumValues – A list of the 5 maximum values in the list and their occurrence count
  • MissingCount – The number of missing values
  • UniqueCount – The number of unique values
  • ZerosCount – The number of zeros
  • Datatype – The datatype of the column
  • Min – The minimum value
  • Max – The maximum value
  • Median – The middle value
  • Mean – The average value
  • Mode – The most common value 

Finally, at a dataset level, we have an overview of the profile as well as cross-column analytics:

  • DatasetName – The name of the dataset the profile was run on
  • Size – The size of the data source in KB
  • Source – The source of the dataset (for example, Amazon S3)
  • Location – The location of the data source
  • CreatedBy – The ARN of the user that created the profile job
  • SampleSize – The number of rows used in the profile
  • MissingCount – The total number of missing cells
  • DuplicateRowCount – The number of duplicate rows in the dataset
  • StringColumnsCount – The number of columns that are of String type
  • NumberColumnsCount – The number of columns that are of numeric type
  • BooleanColumnsCount – The number of columns that are of Boolean type
  • MissingWarningCount – The number of warnings on columns due to missing values
  • DuplicateWarningCount – The number of warnings on columns due to duplicate values
  • JobStarted – A timestamp indicating when the job started
  • JobEnded – A timestamp indicating when the job ended
  • Correlations – The statistical relationship between columns

By default, the DataBrew profile is run on a 20,000-row First-N sample of your dataset. If you want to increase the limit and run the profile on your entire dataset, send a request to [email protected].

Creating an SNS topic and subscription

Amazon SNS allows us to deliver messages regarding the quality of our data reliably and at scale. For this post, we create an SNS topic and subscription. The topic provides us with a central communication channel that we can broadcast to when the job completes, and the subscription is then used to receive the messages published to our topic. For our solution, we use an email protocol in the subscription in order to send the profile results to the stakeholders in our organization.

Creating the SNS topic

To create your topic, complete the following steps:

  1. On the Amazon SNS console, in the navigation pane, choose Topics.
  2. Choose Create topic.
  3. For Type, select Standard.
  4. For Name, enter a name for the topic.

For Name, enter a name for the topic.

  1. Choose Create topic.
  2. Take note of the ARN in the topic details to use later.

Creating the SNS subscription

To create your subscription, complete the following steps:

  1. In the navigation pane, choose Subscriptions.
  2. Choose Create subscription.
  3. For Topic ARN, choose the topic that you created in the previous step.
  4. For Protocol, choose Email.
  5. For Endpoint, enter an email address that can receive notifications.

For Endpoint, enter an email address that can receive notifications.

  1. Choose Create subscription.
  2. Check your email inbox and choose Confirm subscription in the email from AWS Notifications.

Creating a Lambda function for business rule validation

The profile has provided us with an understanding of the characteristics of our data. Now we can create business rules that ensure we’re consistently monitoring the quality our data.

For our sample taxi dataset, we will validate the following:

  • Making sure the pu_loc_id and do_loc_id columns meet a completeness rate of 90%.
  • If more than 10% of the data in those columns is missing, we’ll notify our team that the data needs to be reviewed.

Creating the Lambda function

To create your function, complete the following steps:

  1. On the Lambda console, in the navigation pane, choose Functions.
  2. Choose Create function.
  3. For Function name¸ enter a name for the function.
  4. For Runtime, choose the language you want to write the function in. If you want to use the code sample provided in this tutorial, choose Python 3.8.

For Runtime, choose the language you want to write the function in. If you want to use the code sample provided in this tutorial, choose Python 3.8.

  1. Choose Create function.

Adding a destination to the Lambda function

You now add a destination to your function.

  1. On the Designer page, choose Add destination.
  2. For Condition, select On success.
  3. For Destination type, choose SNS topic.
  4. For Destination, choose the SNS topic from the previous step.

For Destination, choose the SNS topic from the previous step.

  1. Choose Save.

Authoring the Lambda function

For the function code, enter the following sample code or author your own function that parses the DataBrew profile job JSON and verifies it meets your organization’s business rules.

If you use the sample code, make sure to fill in the values of the required parameters to match your configuration:

  • topicArn – The resource identifier for the SNS topic. You find this on the Amazon SNS console’s topic details page (for example, topicArn = 'arn:aws:sns:us-east-1:012345678901:databrew-profile-topic').
  • profileOutputBucket – The S3 bucket the profile job is set to output to. You can find this on the DataBrew console’s job details page (for example, profileOutputBucket = 'taxi-data').
  • profileOutputPathKey – The S3 key the profile job is set to output to. You can find this on the DataBrew console’s job details page (for example, profileOutputPathKey = profile-out/'). If you’re writing directly to an S3 bucket, keep this as an empty String (profileOutputPathKey = '').
    import json
    import boto3
    
    sns = boto3.client('sns')
    s3 = boto3.client('s3')
    s3Resource = boto3.resource('s3')
    
    # === required parameters ===
    topicArn = 'arn:aws:sns:<YOUR REGION>:<YOUR ACCOUNT ID>:<YOUR TOPIC NAME>'
    profileOutputBucket = '<YOUR S3 BUCKET NAME>'
    profileOutputPrefix = '<YOUR S3 KEY>'
    
    def verify_completeness_rule(bucket, key):
        # completeness threshold set to 10%
        threshold = 0.1
        
        # parse the DataBrew profile
        profileObject = s3.get_object(Bucket = bucket, Key = key)
        profileContent = json.loads(profileObject['Body'].read().decode('utf-8'))
        
        # verify the completeness rule is met on the pu_loc_id and do_loc_id columns
        for column in profileContent['columns']:
            if (column['name'] == 'pu_loc_id' or column['name'] == 'do_loc_id'):
                if ((column['missingValuesCount'] / profileContent['sampleSize']) > threshold):
                    # failed the completeness check
                    return False
    
        # passed the completeness check
        return True
    
    def lambda_handler(event, context):
        jobRunState = event['detail']['state']
        jobName = event['detail']['jobName'] 
        jobRunId = event['detail']['jobRunId'] 
        profileOutputKey = ''
    
        if (jobRunState == 'SUCCEEDED'):
            profileOutputPostfix = jobRunId[3:] + '.json'
    
            bucket = s3Resource.Bucket(profileOutputBucket)
            for object in bucket.objects.filter(Prefix = profileOutputPrefix):
                if (profileOutputPostfix in object.key):
                    profileOutputKey = object.key
            
            if (verify_completeness_rule(profileOutputBucket, profileOutputKey)):
                message = 'Nice! Your profile job ' + jobName + ' met business rules. Head to https://console.aws.amazon.com/databrew/ to view your profile.' 
                subject = 'Profile job ' + jobName + ' met business rules' 
            else:
                message = 'Uh oh! Your profile job ' + jobName + ' did not meet business rules. Head to https://console.aws.amazon.com/databrew to clean your data.'
                subject = 'Profile job ' + jobName + ' did not meet business rules'
        
        else:
            # State is FAILED, STOPPED, or TIMEOUT - intervention required
            message = 'Uh oh! Your profile job ' + jobName + ' is in state ' + jobRunState + '. Check the job details at https://console.aws.amazon.com/databrew#job-details?job=' + jobName
            subject = 'Profile job ' + jobName + ' in state ' + jobRunState
            
        response = sns.publish (
            TargetArn = topicArn,
            Message = message,
            Subject = subject
        )
    
        return {
            'statusCode': 200,
            'body': json.dumps(response)
        }

Updating the Lambda function’s permissions

In this final step of configuring your Lambda function, you update your function’s permissions.

  1. In the Lambda function editor, choose the Permissions tab.
  2. For Execution role, choose the role name to navigate to the AWS Identity and Access Management (IAM) console.
  3. In the Role summary, choose Add inline policy.
  4. For Service, choose S3.
  5. For Actions, under List, choose ListBucket.
  6. For Actions, under Read, choose Get Object.
  7. In the Resources section, for bucket, choose Add ARN.
  8. Enter the bucket name you used for your output data in the create profile job step.
  9. In the modal, choose Add.
  10. For object, choose Add ARN.
  11. For bucket name, enter the bucket name you used for your output data in the create profile job step and append the key (for example, taxi-data/profile-out).
  12. For object name, choose Any. This provides read access to all objects in the chosen path.
  13. In the modal, choose Add.
  14. Choose Review policy.
  15. On the Review policy page, enter a name.
  16. Choose Create policy. 

We return to the Lambda function to add a trigger later, so keep the Lambda service page open in a tab as you continue to the next step, adding an EventBridge rule.

Creating an EventBridge rule for job run completion

EventBridge is a serverless event bus service that we can configure to connect applications. For this post, we configure an EventBridge rule to route DataBrew job completion events to our Lambda function. When our profile job is complete, the event triggers the function to process the results.

Creating the EventBridge rule

To create our rule in EventBridge, complete the following steps:

  1. On the EventBridge console, in the navigation pane, choose Rules.
  2. Choose Create rule.
  3. Enter a name and description for the rule.
  4. In the Define pattern section, select Event pattern.
  5. For Event matching pattern, select Pre-defined pattern by service.
  6. For Service provider, choose AWS.
  7. For Service name, choose AWS Glue DataBrew.
  8. For Event type, choose DataBrew Job State Change.
  9. For Target, choose Lambda function.
  10. For Function, choose the name of the Lambda function you created in the previous step.

For Function, choose the name of the Lambda function you created in the previous step.

  1. Choose Create.

Adding the EventBridge rule as the Lambda function trigger

To add your rule as the function trigger, complete the following steps:

  1. Navigate back to your Lambda function configuration page from the previous step.
  2. In the Designer, choose Add trigger.
  3. For Trigger configuration, choose EventBridge (CloudWatch Events).
  4. For Rule, choose the EventBridge rule you created in the previous step.

For Rule, choose the EventBridge rule you created in the previous step.

  1. Choose Add.

Testing your system

That’s it! We’ve completed all the steps required for this solution to run periodically. To give it an end-to-end test, we can run our profile job once and wait for the resulting email to get our results.

  1. On the DataBrew console, in the navigation pane, choose Jobs.
  2. On the Profile jobs tab, select the job that you created. The row in the table should be highlighted.
  3. Choose Run job.
  4. In the Run job modal, choose Run job.

A few minutes after the job is complete, you should receive an email notifying you of the results of your business rule validation logic.

A few minutes after the job is complete, you should receive an email notifying you of the results of your business rule validation logic.

Cleaning up

To avoid incurring future charges, delete the resources created during this walkthrough.

Conclusion

In this post, we walked through how to use DataBrew alongside Amazon S3, Lambda, EventBridge, and Amazon SNS to automatically send data quality alerts. We encourage you to extend this solution by customizing the business rule validation to meet your unique business needs.


About the Authors

Romi Boimer is a Sr. Software Development Engineer at AWS and a technical lead for AWS Glue DataBrew. She designs and builds solutions that enable customers to efficiently prepare and manage their data. Romi has a passion for aerial arts, in her spare time she enjoys fighting gravity and hanging from fabric.

 

 

Shilpa Mohan is a Sr. UX designer at AWS and leads the design of AWS Glue DataBrew. With over 13 years of experience across multiple enterprise domains, she is currently crafting products for Database, Analytics and AI services for AWS. Shilpa is a passionate creator, she spends her time creating anything from content, photographs to crafts.

Ingesting Jira data into Amazon S3

Post Syndicated from Vishwa Gupta original https://aws.amazon.com/blogs/big-data/ingesting-jira-data-into-amazon-s3/

Consolidating data from a work management tool like Jira and integrating this data with other data sources like ServiceNow, GitHub, Jenkins, and Time Entry Systems enables end-to-end visibility of different aspects of the software development lifecycle and helps keep your projects on schedule and within budget.

Amazon Simple Storage Service (Amazon S3) is an object storage service that offers industry-leading scalability, performance, security, and data availability. Many of our customers choose to build their data lakes on Amazon S3. They find the flexible, pay-as-you-go, cloud model ideal when dealing with vast amounts of heterogeneous data.

This post discusses some of the use cases for ingesting Jira data into an Amazon S3 data lake, the ingestion data flow, and a conceptional approach to ingesting data. We also provide the relevant Python code.

Use cases

Business use cases for Jira data ingestion range from proactive project monitoring, detection and resolution of project effort and cost variances, and non-compliance of SDLC processes. In this section, we provide an inclusive but not exhaustive list of use cases and their benefits.

Cognitive project monitoring

Cognitive project monitoring use cases include the following:

  • Automated analytics – You can prevent and reduce project schedule and budget variances by proactively monitoring metrics by combining data from Jira, GitHub, Jenkins, and Time Entry Systems.
  • Automated status reporting – You can use Amazon SageMaker machine learning (ML) models to derive prescriptive metrics by looking at data across various sources. This could reduce a project manager’s time spent stitching data and generating reports, and provide a holistic view of project-tracking metrics.

Automated project compliance and governance

You can analyze user behavior to detect potentially suspicious patterns by building a baseline of user activity. You create this based on primary data from HR (such as role, location, and work hours) and IT infrastructure (such as an assigned asset’s IP address).

Possible business outcomes include the following:

  • Proactively identify user IDs and passwords being shared with other users
  • Detect insider threats, such as abnormal login times, unauthorized access to Jira, and incorrect access permissions in Jira, GitHub, Jenkins, or Time Entry Systems
  • Identify compromised accounts based on frequent logins from unassigned assets or unusual successive authentications
  • Identify theft of corporate IPs based on unusual printing volume, printing project-related documents, and emailing organization-related documents and code to external accounts

Accelerated migration from Jira to another project management product

You can also use AWS Glue and its Data Catalog metadata to map between two products. This could increase your data migration.

Overview of solution

One of the most common approaches to ingest data from Jira into AWS is to create a Python module, which is used in AWS Glue or AWS Lambda. The following diagram shows the high-level approach for an end-to-end solution. In this solution, Glue Development Endpoint and respective SageMaker Jupyter Notebook instance are used to create the Jira Python module to facilitate Jupyter notebook experience, interactive testing and debugging capability. The scope of this post is limited to the following steps:

  • Setting up access for Jira
  • Using the Python model with AWS Lambda or AWS Glue
  • Incrementally pulling changed data from Jira with JQL (Jira Query Language)
  • Ingesting data to the AWS serverless data lake

Ingesting data from Jira into Amazon S3

The Jira server exposes data using REST APIs and open authorization (OAuth) authentication methods. It uses a three-legged OAuth approach (also called the OAuth dance) to acquire access to the resources served by the APIs. For more information about the following steps, see OAuth for REST APIs.

Generating an RSA public/private key pair

Consumer key and consumer secret details are required for interacting with API endpoints. You store the details inside encrypted SSM parameters.

To use macOS or Linux, run the following OpenSSL commands in the terminal (anywhere in the file system):

openssl genrsa -out jira_privatekey.pem 1024
openssl req -newkey rsa:1024 -x509 -key jira_privatekey.pem -out jira_publickey.cer -days 365
openssl pkcs8 -topk8 -nocrypt -in jira_privatekey.pem -out jira_privatekey.pcks8
openssl x509 -pubkey -noout -in jira_publickey.cer > jira_publickey.pem

To use Windows, download OpenSSL and run it using the path to the bin folder. Create a new environment variable named OPENSSL_CONF and the value "path_to"\openssl.cnf. Run the command as admin:

"path_to_openssl"\bin\openssl genrsa -out jira_privatekey.pem 1024
"path_to_openssl"\bin\openssl req -newkey rsa:1024 -x509 -key jira_privatekey.pem -out jira_publickey.cer -days 365
"path_to_openssl"\bin\openssl pkcs8 -topk8 -nocrypt -in jira_privatekey.pem -out jira_privatekey.pcks8
"path_to_openssl"\bin\openssl x509 -pubkey -noout -in jira_publickey.cer > jira_publickey.pem

Configuring a REST API-based consumer in Jira

For full instructions on configuring your REST API-based consumer, see Step 2: Configure your client application as an OAuth consumer in OAuth for REST APIs. Be sure to complete the following steps:

  1. In the Link applications section, select Create incoming link.
  2. For Public key, enter the public key you created earlier.

Performing the OAuth dance

In this step, you go through the process of getting the access token from the resource so the consumer can access the resource.

  1. Create the following parameters in the AWS Systems Manager Parameter Store:
    1. jira_access_private_key – Stores the private key in AWS Systems Manager as a parameter.
    2. jira_access_urls – Stores URLs to access Jira. These URLs are constructed based on display URLs defined in JIRA by adding the additional tags:
      • request_token_urlhttps://jiratoawss3.atlassian.net/plugins/servlet/oauth/request-token
      • access_token_urlhttps://jiratoawss3.atlassian.net/plugins/servlet/oauth/access-token
      • authorize_urlhttps://jiratoawss3.atlassian.net/plugins/servlet/oauth/authorize
      • data_urlhttps://jiratoawss3.atlassian.net/rest/api/2/search
    3. jira_access_secrets – Stores secrets to access Jira. Initially, only two values are present in this SSM parameter; it’s updated later with access_token. You need the following two parameters to start:
      1. consumer_key
      2. consumer_secret
  2. Download the notebook file and upload it to SageMaker notebook instance of AWS Glue Development Endpoint.:
    1. Below are the steps to set-up AWS Glue Development Endpoint
      1. In the AWS Glue console, choose Dev endpoints. Choose Add endpoint.
      2. Specify an endpoint name, such as demo-endpoint.
      3. Choose an IAM role with permissions similar to the IAM role that you use to run AWS Glue ETL jobs. For more information, see Create an IAM Role for AWS Glue. Choose Next.
      4. In Networking, leave Skip networking information selected, and choose Next.
      5. In SSH Public Key, enter a public key generated by an SSH key generator program, such as ssh-keygen (do not use an Amazon EC2 key pair). The generated public key will be imported into your development endpoint. Save the corresponding private key to later connect to the development endpoint using SSH. Choose Next. For more information, see ssh-keygen in Wikipedia.b. Once status of AWS Glue Development Endpoint is ready follow steps to set-up SageMaker notebooks with in your development endpoint.
    2. Once status of AWS Glue Development Endpoint is ready follow
    3. Once status shows Ready, open the notebook and upload the downloaded notebook file.

You now run the following cells.

  1. Install and import dependent modules with the following code:
    !pip install tlslite
    !pip install oauth2
    
    import urllib
    import oauth2 as oauth
    from tlslite.utils import keyfactory
    import json
    import sys
    import os
    import base64
    import boto3
    from boto3.dynamodb.conditions import Key, Attr
    import datetime
    import logging
    import pprint
    import time
    from pytz import timezone
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    

  2. Create an SSM client to connect parameters defined in Systems Manager (update the Region if it’s different than us-east-1):
    ssm = boto3.client("ssm", region_name='us-east-1')

  3. Define the signature class to sign the Jira REST API requests:
    class SignatureMethod_RSA_SHA1(oauth.SignatureMethod):
        name = 'RSA-SHA1'
    
        def signing_base(self, request, consumer, token):
            if not hasattr(request, 'normalized_url') or request.normalized_url is None:
                raise ValueError("Base URL for request is not set.")
    
            sig = (
                oauth.escape(request.method),
                oauth.escape(request.normalized_url),
                oauth.escape(request.get_normalized_parameters()),
            )
    
            key = '%s&' % oauth.escape(consumer.secret)
            if token:
                key += oauth.escape(token.secret)
            raw = '&'.join(sig)
            return key, raw
    
        def sign(self, request, consumer, token):
    
            key, raw = self.signing_base(request, consumer, token)
    
            # SSM support to fetch private key
            ssm_param = ssm.get_parameter(Name='jira_access_private_key', WithDecryption=True)
            jira_private_key_str = ssm_param['Parameter']['Value']
    
            privateKeyString = jira_private_key_str.strip()
    
            privatekey = keyfactory.parsePrivateKey(privateKeyString)
    
            # Used encode() to convert to bytes
            signature = privatekey.hashAndSign(raw.encode())
            return base64.b64encode(signature) 
    

  4. Get the consumer_key and consumer_secret from the SSM parameter that you defined earlier:
    jira_secrets = json.loads(ssm.get_parameter(Name='jira_access_secrets_1', WithDecryption=True)['Parameter']['Value'])
    jira_secrets
    
    consumer_key = jira_secrets["consumer_key"]
    consumer_key
    
    consumer_secret = jira_secrets["consumer_secret"]
    consumer_secret

  5. Define the URLs for request_token_url, access_token_url, and authorize_url:
    request_token_url = 'input_here'
    access_token_url = 'input_here'
    authorize_url = 'input_here'
    

    These URLs are defined while setting up the Rest API endpoint in Jira. It includes the following components:

    • request_token_url – https://jiratoawss3.atlassian.net/plugins/servlet/oauth/request-token
    • access_token_url – https://jiratoawss3.atlassian.net/plugins/servlet/oauth/access-token
    • authorize_url – https://jiratoawss3.atlassian.net/plugins/servlet/oauth/authorize
  6. Generate your request token:
    # Create Consumer using consumer_key and consumer_secret
    consumer = oauth.Consumer(consumer_key, consumer_secret)
    
    # Use Consumer to create oauth client
    client = oauth.Client(consumer)
    
    # Add Signature Method to the client
    client.set_signature_method(SignatureMethod_RSA_SHA1())
    
    # Get response from request token URL using the client
    resp, content = client.request(request_token_url, "POST")
    
    # Convert the content received from previous step into a Dictionary
    request_token = dict(urllib.parse.parse_qsl(content))
    
    # request token has two components oauth_token and oauth_token_secret
    request_token
    

You only need to do this one time every five years (the default setting in Jira).

The following is an example request token value:

b'oauth_token=oFUFV5cqOuoWycnaCXYrkcioHuRw2TbV&oauth_token_secret=CzhMoEsozCV3xFZ179YQoLzRu4DYQHlR'

The following are example values after the URL parse and converted to dict:

  • {b'oauth_token': b'oFUFV5cqOuoWycnaCXYrkcioHuRw2TbV ',
  • b'oauth_token_secret': b'CzhMoEsozCV3xFZ179YQoLzRu4DYQHlR'}
  1. Manually approve the request token by opening the following user in a browser:
    authorize_url + '?oauth_token=' + request_token[b'oauth_token'].decode()

An example value of the final authorized user is:

  • https://jiratoawss3.atlassian.net/plugins/servlet/oauth/authorize?oauth_token=wYLlIxmcsnZTHgTy2ZpUmBakqzmqSbww.

When you go to the URL in your output, you see the following screenshot.

  1. Use an approved request token to generate an access token:
    # Create an oauth token using components of request token
    token = oauth.Token(request_token[b'oauth_token'], request_token[b'oauth_token_secret'])
    
    # Use Consumer and token to create oauth client
    client = oauth.Client(consumer, token)
    
    # Add Signature Method to the client
    client.set_signature_method(SignatureMethod_RSA_SHA1())
    
    # Get response from access token URL using the client
    access_token_resp, access_token_content = client.request(access_token_url, "POST")
    
    access_token_content

An example access token is:

b'oauth_token=Ym3UDrs1iYnLUZ1t0TkT1PinfJNN3RLj&oauth_token_secret=FYQfGjLLhbCJg3DXZFaKsE6wsURVfebN&oauth_expires_in=157680000&oauth_session_handle=BulouCOypjssDS3GzeY7Ldi30h0ERWDo&oauth_authorization_expires_in=160272000'

The following are example access token values after URL parse and converted to dict:

  • {b'oauth_token': b'Ym3UDrs1iYnLUZ1t0TkT1PinfJNN3RLj',
  • b'oauth_token_secret': b'FYQfGjLLhbCJg3DXZFaKsE6wsURVfebN',
  • b'oauth_expires_in': b'157680000',
  • b'oauth_session_handle': b'BulouCOypjssDS3GzeY7Ldi30h0ERWDo',
  • b'oauth_authorization_expires_in': b'160272000'}

oauth_authorization_expires_in states when the token expires (in seconds), which is generally 5 years.

  1. Update the access_token key in the SSM parameter jira_access_secrets with the value for access_token_content.

This access token is valid for 5 years (the expires_in key of access_token_content states when the token expires in seconds). Rotating the access key depends on your organization’s security policy and is out of scope for this post.

Using the access token, querying data from Jira, and storing data in Amazon S3

The following are the important points in this step:

  • The Jira REST API returns 50 records at a time, but gives a total record count, which is used to paginate through the result set.
  • The Jira REST API endpoint needs to be updated with JQL filters. JQL allows you to only pick changed records.
  • Data returned from the REST API endpoint is serialized to Amazon S3. The Python code batches the records from Jira pages and commits after every four pages (which is configurable) have been fetched from Jira.
  • JQL is appended to the data_url (defined earlier). When pulling data from Jira, it’s good practice to do a one-time bulk load and get incremental loads by maintaining the last data pull date in an Amazon DynamoDB The following screenshot shows an example of tracking dates in DynamoDB by projects. Because it’s an hourly batch, last_ingest_date is rounded up to the hour.
  • The key attributes of JQL used for constructing JQL and pulling data from Jira are:
    • project – Loop through the projects from DynamoDB and pull data for one project at a time.
    • updated – Last update date for Jira story or task. Data pull from Jira is based on this date.
      • For bulk loads, updated is less than or equal to the batch run date, rounded up to the hour.
      • For incremental loads, updated is greater than or equal to last_ingest_date from DynamoDB, and less than or equal to the batch run date, rounded up to the hour.
      • To use date in JQL, we need to parse the date before we use it.
    • startAt – Jira generally paginates the results every 50 records. This attribute is used to loop through the complete data. For example, if a project has 500 records and page size is 50 records, this attribute is incremented by page size in every iteration, and it takes 10 iterations to get complete data.
    • maxResults – This is the page size set up in Jira (maximum number of records Jira returns in every API call).
  • Use the provided notebook to perform the OAuth dance. The sample code pulls data from Jira based on the approach you specified earlier. The purpose of this code is to accelerate implementing data ingestion from Jira.

Cleaning up

To avoid incurring future charges, delete the resources set up as part of this post:

  • AWS Glue Development Endpoint
  • DynamoDB table
  • Systems Manager parameters
  • S3 bucket

Next steps

To extend the usability scope of Jira data in S3 buckets, you can crawl the location to create AWS Glue Data Catalog database tables. Registering the locations with AWS Lake Formation helps simplify permission management and allows you to implement fine-grained access control. You can also use Amazon Athena, Amazon Redshift, Amazon SageMaker, and Amazon QuickSight for data analysis, ML, and reporting services.

Conclusion

This post aims to simplify and accelerate the steps to ingest Jira data into Amazon S3. The solution includes Jira configuration, performing the three-legged OAuth dance, JQL-based attributes for data selection, and Python-based data extraction into Amazon S3.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS Glue forum.


About the Authors

Vishwa Gupta is a Data and ML Engineer with AWS Professional Services Intelligence Practice. He helps customers implement big data and analytics platform and solutions. Outside of work, he enjoys spending time with family, traveling, and playing badminton.

 

 

 

Sreeram Thoom is a Data Architect at Amazon Web Services.

 

 

 

 

 

Transform data and create dashboards simply using AWS Glue DataBrew and Amazon QuickSight

Post Syndicated from Prithiviraj Jothikumar original https://aws.amazon.com/blogs/big-data/transform-data-and-create-dashboards-simply-using-aws-glue-databrew-and-amazon-quicksight/

Before you can create visuals and dashboards that convey useful information, you need to transform and prepare the underlying data. The range and complexity of data transformation steps required depends on the visuals you would like in your dashboard. Often, the data transformation process is time-consuming and highly iterative, especially when you are working with large datasets.

In this post, we show you how to apply data transformations on COVID-related tweets (with sentiment scores) using AWS Glue DataBrew. AWS Glue DataBrew’s visual and interactive interface allows us to apply data transformations without any coding. Some examples of transformations we apply are: changing date formats, transformation of text strings and performing table pivots. We then use Amazon QuickSight to visualize the transformed data in a dashboard.

See the AWS architecture diagram below for an overview of the complete and serverless data pipeline.


These are seven main steps in the data pipeline:

  1. Upload CSV file with tweets to S3
  2. Trigger AWS Lambda to partition the dataset in case the number of rows exceeds a threshold, and store the output in S3. A threshold is placed to prevent a memory exception failure to compute VADER sentiment score.
  3. Use Amazon SQS and AWS Lambda to compute VADER sentiment scores for tweets and store the VADER output as parquet files in S3
  4. Crawl the VADER output and produce a Data Catalog using AWS Glue, so that the data can be queried easily using Amazon Athena
  5. Use the Data Catalog connector in AWS Glue DataBrew to create a recipe and perform transformations
  6. Setup a Recipe job to perform the transformation and send the output to an S3 location; repeat step 5 to produce a data catalog of the transformed data
  7. Query and submit the data to Amazon QuickSight to create visuals for the dashboard

Each section below corresponds to one (or more) of the seven steps described above.

Steps 1 to 3: Setting and Deploying AWS Lambda pipeline

To test the solution we can use the AWS CloudFormation template found here. The AWS CloudFormation template automatically creates the following for you: the S3 Bucket to store the CSV and parquet files, the Lambda function to partition the files, the SQS queue and the VADER Lambda function.

Deploying the CloudFormation Template

  1. Create an S3 bucket with the name of your choice and note the bucket name. Please refer to this guide for detailed steps on how to create an S3 bucket.
  2. Upload all four individual zip files to the S3 bucket created in step 1. To do so, download the zip file from here, make sure to unzip the downloaded file and place each of the four individual zip files into the S3 bucket created in step 1.
  3. Open the AWS Management Console in the AWS Region you want to deploy the solution to, and on the Services menu, choose CloudFormation.
  4. Choose Create Stack, choose Upload a template to Amazon S3, and then choose the file databrew-cloudformation.yaml included in the solution that you downloaded earlier and if not, click on the link provided on this step to download.
  5. Set Stack name to databrew-stack. Specify the Amazon S3 bucket that contains the compressed version of AWS Lambda function code and layers uploaded in step 2.
  6. For Options, you can specify tags for your stack and an optional IAM role to be used by AWS CloudFormation to create resources. If the role isn’t specified, a new role is created. You can also perform additional configuration for rollback settings and notification options.
  7. The review section shows a recap of the information. Be sure to select the three AWS CloudFormation acknowledgements to allow AWS CloudFormation to create resources with custom names on your behalf. Also, create a change set, because the AWS CloudFormation template includes the AWS::Serverless-2016-10-31

  8. Click Execute.
  9. The Outputs for the stack lists all the resources created.

Downloading the dataset

  1. Download the dataset from here.

Testing the solution

  1. Navigate to the S3 console.
  2. Click on the bucket name (created by CloudFormation template and listed in the Outputs section of the stack).
  3. Create folder named input in the S3 bucket.
  4. Upload the downloaded dataset to the folder created in step 3.
  5. The above will trigger a lambda function to chunk the files into smaller files and proceed to another lambda function triggered by an SQS event to perform the following preprocessing steps: removal of https links, VADER sentiment scores of the 4 categories (compound, positive, negative, and neutral), and saving the file as a parquet file in S3 the path called processedv4.

Step 4: Setup AWS Glue Data Catalog

An AWS Glue Data Catalog will allows us to easily import data into AWS Glue DataBrew. Follow these steps to create a Glue crawler that crawls the the raw data with VADER output in partitioned parquet files in S3 and determines the schema:

  1. Choose a crawler name.
  2. Use the default options for Crawler source type.
  3. Provide the S3 location of the parquet files.
  4. Choose/Create an IAM role that has read/write permissions to S3 and the AWSGlueServiceRole policy attached.
  5. Set the frequency as Run on demand.
  6. Choose a database name for the crawler’s output.
  7. Leave all remaining options as default.

Once the crawler has been created, select and run the new crawler. Upon completion, the table schema is generated and visible within Tables under the Databases section in the AWS Glue console.

Step 5: Creating a DataBrew project

To get started with AWS Glue DataBrew, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create project.
  3. For Project Name, enter covid19-tweets.
  4. For Attached recipe, choose Create new recipe.
  5. For Recipe name, enter covid19-tweets-recipe.
  6. For Select a dataset, select New dataset.
  7. For Dataset name, enter covid19-tweets-data.
  8. Import the AWS Glue table <YOUR-GLUE-TABLE-NAME> from the AWS Glue database <YOUR-GLUE-DATABASE-NAME>.
  9. Create a new AWS Identity and Access Management (IAM) policy and IAM role by following the steps on the AWS Glue DataBrew console, which provides DataBrew the necessary permissions to access Amazon S3, Amazon Athena and AWS Glue.
  10. Select Create Project.

Step 6: Data transformations, creating a AWS Glue DataBrew recipe and recipe job

Exploratory data analysis (EDA) is an essential component of the data transformation workflow. EDA allows us to gain an intuitive understanding of the dataset by summarizing its main characteristics such as the distribution of data across columns, the corresponding data types and summary statistics. For more information on EDA using AWS Glue DataBrew, see the Exploratory Data Analysis section of the post Data preprocessing for machine learning on Amazon EMR made easy with AWS Glue DataBrew.

The set of data transformation steps listed below are based on the EDA and the attributes we would like to visualize in the Amazon QuickSight dashboard.

  1. Delete the following columns user_descriptionuser_createduser_verifiedis_retweet, user_followers, user_friends, user_favourites and use_name, because the aforementioned columns are not dependent for Amazon QuickSight dashboard.

Based on the schema view of the dataset, we can observe that the date column is represented as a string. AWS Glue DataBrew provides various transformations date-time formats to convert date types.

  1. Select from the dropdown select Date functions, then DATEFORMAT.
    1. Create a new column based on the values in the source column.
    2. For source column, select date.
    3. For date format, select mm/dd/yy*HH:MM.
  2. Delete the date column and rename the date_DATEFORMAT column to date.


A visual representation of the positive, negative, neutral, and compound scores generated by the VADER AWS Lambda function is helpful to understand the sentiments of the tweets over time. Individual scores could have large variation between rows leading to large fluctuations on the line graph. We can use the rolling window transformation over observations in a column, to create a smoother, less fluctuating line graph. To apply the rolling window function, create a less granular version of the date column that is truncated of the hours and minute.

  1. Select From the dropdown select Date functions, then DATEFORMAT.
    1. Create a new column based on the values in the source column
    2. For source column, select date
    3. For date format, select mm/dd/yy to transform the observations at a date-level
    4. For destination column, enter date_level.
  2. Select From the dropdown select Window functions, then ROLLING AVERAGE.
    1. For source column, select compound
    2. Select 3 rows before and after. This is the size of the rolling window that slides over the column and impacts the smoothness of the average score from one row to another; larger window sizes produce more smoother rolling average scores and vice-versa.
    3. Order the observations by the date column because it’s granularity is at the hour- and minute-level
    4. Group the observations by the date_level column, which is only represented in date without the time
    5. For the destination column name, enter rolling_compound.

  3. Repeat step 5 above for each of the source columns pos, neu and neg to create rolling_pos, rolling_neu and rolling_neg.

To support downstream visuals in Amazon QuickSight, apply the unpivot transformation to convert the selected columns into row values.

  1. Select Pivot transformation
    1. For pivot type, select Unpivot: Columns to rows
    2. In the unpivot columns dropdown, select the columns rolling_compound, rolling_pos, rolling_neu and rolling_neg
    3. For column name, enter Rolling_category and for column value enter VADER_score

The hashtags column contains an array of hashtags. To visualize the hashtags as a word cloud in Amazon QuickSight, create a new row for each hashtag from the array by applying the unnest transformation.

  1. Select unnest
    1. For source column, select hashtags
    2. For the destination column name, enter unnest_hashtags

Normalize the data in the unnest_hashtags column

  1. Format the data in unnest_hashtags column to lowercase
  2. Remove all special characters by selecting the clean transformation

The data preparation phase is now complete, and the set of 20 transformations that consist of date formatting, rolling window functions and normalized columns are combined into a recipe.

The transformations above were applied on a sample of the first 500 rows of the dataset. AWS Glue DataBrew recipe job provides the ability to scale the set of transformation steps from a sample of data to the entire dataset.

To create a recipe job, complete the following steps:

  1. On the AWS Glue DataBrew console, choose Jobs.
  2. Choose Create recipe job.
  3. For Job name, enter a name.
  4. Create a new folder in Amazon S3 for the recipe job output, select the file type as GLUEPARQUET and compression as Snappy.

If your dataset is updated on a regular basis, AWS Glue DataBrew provides an option to schedule jobs.

To query the newly transformed data from S3 into Amazon QuickSight, create another new crawler/table in AWS Glue similar to steps provided earlier (refer to the following section: Step 4: Setup an AWS Glue Data Catalog).

Use AWS QuickSight to visualize transformed data

Before proceeding, make sure your Amazon QuickSight account has IAM permissions to access Amazon Athena and S3. Add a new dataset by clicking on Datasets from the left panel and click on New dataset. Select Athena from the list of data sources and provide a name for the Data source name and on the next section. When prompted, select the database and table that contains the table post AWS Glue DataBrew transformation and click Use custom SQL followed by changing the New custom SQL name to TableOrderedData and paste the following SQL query before selecting Edit/Preview data:

SELECT * FROM "<db>"."<table_name>" t ORDER BY t.date;

Replace <db> with your database and <table_name> with your table name. Leave the quotes in place.

Click Apply to make some further modifications.

  1. For the date column, change the data type from Stringto Date and provide the format the date as it is presented in the column (i.e. MM/dd/yy HH:mm). Similarly, change date_level column into Date
  2. Click Save and visualize to approach the next step, which is to analyze the data. In the Analyses section, select Horizontal bar chart under Visual types followed by clicking vader_score and rolling_category. Under Field wells, change vader_score (Sum) to vader_score (Average) as the Aggregate.
  3. For this visual, select Actions and select Filter same-sheet visuals under Quick create.
  4. Create another panel by selecting Add to create a line graph that allows rolling score. Select Line chart under Visual types and select the following in the presented order: vader_score, date, and rolling_category. Under Field wells, change vader_score (Sum) to vader_score (Average) as the Aggregate. Likewise, change date (Day) to date (Hour) as the Aggregate. Similar to the previous panel, create a quick action item.
  5. Further panels can also be created, such as word cloud of hashtags or a pivot table consisting of the different VADER categories with score and date. The following image is based on selecting the rolling_pos bar on the first quadrant, which filters and cascades to the rest of the panels to that of the rolling_pos filter.
  6. First quadrant depicts the overall average of each category of the entirety of the dataset. The second quadrant depicts the rolling average that is aggregated by the hour. The third quadrant is table representative that is depicted as the rolling average that is aggregated by the day. The word cloud panel is from the hashtags column.

Summary

In this post, we showed you how to interactively analyze and visualize a dataset using AWS Lambda, AWS Glue Databrew and Amazon QuickSight. We began with a COVID19 tweets dataset and computed their sentiment scores using the VADER algorithm. We then cleaned and prepared the tweets and their sentiment scores in AWS Glue DataBrew, and finally visualized key characteristics of the dataset in Amazon QuickSight.

We encourage you to extend this pipeline to your data analytics + visualization use case – there are many more pre-built transformations in AWS Glue DataBrew and pre-built visuals in Amazon QuickSight to explore.

Happy Building!


About the Authors

Prithiviraj Jothikumar, PhD, is a Data Scientist with AWS Professional Services, where he helps customers build solutions using machine learning. He enjoys watching movies and sports and spending time to meditate.

 

 

 

Kartik Kannapur is a Data Scientist with AWS Professional Services. He holds a Master’s degree in Applied Mathematics and Statistics from Stony Brook University and focuses on using machine learning to solve customer business problems.

 

 

 

Bala Krishnamoorthy is a Data Scientist with AWS Professional Services, where he helps customers solve problems and run machine learning workloads on AWS. He has worked with customers across diverse industries, including software, finance, and healthcare. In his free time, he enjoys spending time outdoors, running with his dog, beating his family and friends at board games and keeping up with the stock market.

 

Paritosh Walvekar is a Cloud Application Architect with AWS Professional Services, where he helps customer build cloud native applications. He has a Master’s degree in Computer Science from University at Buffalo. In his free time, he enjoys watching movies and is learning to play the piano.

 

Building an ad-to-order conversion engine with Amazon Kinesis, AWS Glue, and Amazon QuickSight

Post Syndicated from Gandhi Raketla original https://aws.amazon.com/blogs/big-data/building-an-ad-to-order-conversion-engine-with-aws-glue-amazon-kinesis-data-streams-and-amazon-quicksight/

Businesses in ecommerce have the challenge of measuring their ad-to-order conversion ratio for ads or promotional campaigns displayed on a webpage. Tracking the number of users that clicked on a particular promotional ad and the number of users who actually added items to their cart or placed an order helps measure the ad’s effectiveness. Utilizing promotional ads that have higher conversion rates enables you to effectively utilize limited space on your ecommerce websites and applications.

This post demonstrates how to sessionize and aggregate clickstream and order data, compute the conversion ratio in real time, and generate data visualizations. We use Amazon Kinesis Data Streams to ingest and send data to Amazon Simple Storage Service (Amazon S3), and AWS Glue, Amazon Athena, and Amazon QuickSight to catalog, analyze, and visualize the data, respectively.

Solution overview

To measure ad-to-order conversion, you need two important pieces of data: user clicks and orders. Clickstream data is captured as users navigate through the site, each time users click on the webpage, and the metadata associated with those clicks. Depending on the user base and number of active users at any moment, clickstream data can be a large amount of data generated per second. Typically, every ecommerce system has a centralized order management system that captures orders created from different channels like a web portal or mobile app. To compute an ad-to-order conversion rate, you join clickstream data and order data over time: (total number of orders/total number of clicks) *100.

The following diagram illustrates the architecture of our solution.

The solution has six main categories.

  • Data generators – Clickstream and order data is generated with the help of an AWS Lambda function. The function is triggered by a scheduled Amazon CloudWatch Events event every minute and generates random clicks for ingestion into a Kinesis data stream. Similarly, another function triggered by a CloudWatch event generates random orders for ingestion into a second data stream. In a production environment, this data comes from clickstream generators and a centralized order management system.
  • Data ingestion – Kinesis data streams ingest clickstream and order data as they are generated.
  • Data sessionization – Data sessionization helps group related data. For clickstream data, we can group clicks on an ad by different users or time periods. For order data, we can group orders by different ads. We use Amazon Kinesis Data Analytics for SQL to analyze streaming data in real time with standard SQL. Sessionized clickstream and order data is ingested into another in-application stream.
  • Data processing and storage – The sessionization stream from Kinesis Data Analytics for SQL is ingested into an Amazon Kinesis Data Firehose delivery stream, which delivers the data to a pre-configured S3 bucket.
  • Data Catalog – You use AWS Glue to crawl the clickstream and orders data in their respective S3 buckets, as well as build metadata definitions and tables in Athena. AWS Glue crawlers run every hour to update table definitions, and Athena views are built to compute the ad-to-order conversion.
  • Data visualization – You use QuickSight to generate visualizations.

Prerequisites

Before getting started, you must provision your resources with AWS CloudFormation. 

  1. Choose Launch Stack.
  1. Choose Next.
  2. For Stack name, enter a name for the stack.
  3. For Bucket Name for Clicks, enter the name of the S3 bucket that holds clickstream data (for this post, click-stream).
  4. For Bucket Name for Orders, enter the name of the S3 bucket that holds order data (order-stream).
  5. Enter any tags you wish to assign to the stack.
  6. Choose Next.
  7. Verify that the stack has been created successfully.

If you have never used QuickSight in this account before, sign up for QuickSight before moving on to the next step. Keep in mind that admin access to the Enterprise Edition QuickSight instance is needed to complete setup. 

Generating and ingesting clickstream data

On the Lambda console, view your function ingest-clickstream for ingesting clickstream data. The clickstream data attributes include UserId, Device, Event, EventType, and Timestamp. The event contains promotional ad information on the webpage clicked by the user. This function generates random clickstreams and ingests it into the data stream ClickStream. The following screenshot shows your function details on the console.

A CloudWatch Events rule invokes this function every minute. The following screenshot shows sample data that was ingested into the data stream. The Event column represents the portion of the webpage the user clicked; every click on the webpage has a unique ID and type assigned (for example, P601 has the event type Promotion, C301 has the event type Checkout).

Generating and ingesting order data

On the AWS Lambda console, view your function ingest-order for ingesting order data. This function ingests random orders.

Each order has order lines, which contain the attributes ItemId, Promotion, UnitPrice, and Quantity (see the following screenshot). The promotion attribute indicates the ad the user clicked before adding the item to their shopping cart. This function generates random orders and ingests it into OrderStream. The Promotion attribute joins clickstream data and order data.

Sessionizing the data

To sessionize the data, complete the following steps:

  1. On the Kinesis Data Analytics console, select <Stack Name>-ClickStreamApplication.
  2. Choose Run.
  3. Repeat the same step for <Stack Name>-OrderAnalysisApp.
  4. When the status changes to Running, choose the application name.
  5. Under Real time analytics, choose Go to SQL results.
  6. Choose the Real-time analytics

The application groups clicks in 1-minute intervals. Let’s take the ad P701 as an example. If this ad is clicked by multiple users, this SQL function adds all the clicks by different users in the last minute. If five users clicked on P701 in the last minute, the function outputs a ClickCount of 5. A stagger window is used because it’s well-suited for analyzing groups of data that arrive at inconsistent times.

  1. On the Kinesis Data Analytics console, choose OrderAnalysisApp.
  2. Choose Go to SQL results.
    This application groups orders by Promotion, as shown in the following screenshot.

Processing and storing the data

In the data processing and storage stage, aggregated clickstream and order data is delivered to a Kinesis Data Firehose delivery stream. Kinesis Data Firehose delivers clickstream aggregated records and orders to the click-stream and order-stream buckets, respectively. The data is partitioned by year, month, and day. The following screenshot shows the delivery streams on the console.

Analyzing the data

To analyze your data, complete the following steps:

  1. Verify that the S3 bucket was created for clickstream and orders.

The data in the bucket is partitioned by year, month, date, and hour.

  1. On the AWS Glue console, view the clickstream and orders crawlers.

These two crawlers crawl the click-stream and order-stream buckets every 15 minutes and create tables.

  1. To run the crawlers on demand, choose Run crawler.

When the crawler is finished, the Tables added column displays 1.

  1. In the navigation pane, choose Tables.
  2. Verify that the crawlers created the tables.
  3. On the Athena console, choose Saved queries.

You can see three queries have been created.

  1. Select view_clicks_aggregate to load it in the query editor.
  2. Select ad_to_order_conversion and choose Run Query.

If the Amazon S3 bucket name has -, the crawler replaces - with _ while creating the table.

  1. Replace - with _ in the table name when creating the view.
  2. Repeat the same process for view_orders_aggregate and view_conversion_ratio.

Make sure you run view_clicks_aggregate and view_orders_aggregate before running view_conversion_ratio.

  1. Choose view_conversion_ratio and choose Preview.

Orders and clicks for each promotion and the corresponding conversion ratio are displayed.

Visualizing the data

To visualize your data, you first load it into QuickSight. You can then create visualizations. In this section, we also configure a scheduled data refresh.

Loading the data

To visualize your data, you must first load your data into QuickSight.

  1. On the QuickSight console, from the Admin drop-down menu, choose Manage QuickSight.
  2. In the navigation pane, choose Security & Permissions.
  3. Choose Add or remove.
  4. Select Amazon Athena.
  5. Select Amazon S3 to edit QuickSight access to your S3 buckets.
  6. Choose the Details link next to Amazon S3.
  7. Choose Select S3 buckets.
  8. Select the bucket names you provided for clicks and orders.
  9. Choose Finish.
  10. Choose Update.
  11. Choose the QuickSight icon on the top left of the admin panel to proceed back to the home screen.
  12. In the navigation pane, choose Datasets.
  13. Choose New dataset.
  14. Choose Athena.
  15. For Data source name, enter Ad-To-Order-Conversion.
  16. Choose Validate Connection.
  17. After your connection is validated, choose Create data source.
  18. For Database, choose ad-to-order-conversion.
  19. For Tables, select view_conversion_ratio.
  20. Choose Select.
  21. Choose Visualize.

Creating visualizations

In this section, we create two visualizations of our data. We first make a horizontal bar chart.

  1. From the Add menu, choose Add Calculated Field.
  2. Enter Clicks_to_Orders.
  3. Enter the formula sum(orders)/sum(clicks).
  4. Choose Save.
  5. Choose next to Click to orders.
  6. For Show as, choose Percent.
  7. For Visual type, choose Horizontal bar chart.
  8. Drag promotion to Y-axis.
  9. Drag clicks_to_orders to Value.
  10.  Drag date to Group/Color.

The following screenshot shows our visualization.

We now make our second visualization, a vertical bar chart.

  1. Choose the + icon next to Sheet1.
  2. For Visual types, choose Vertical bar chart.
  3. Drag promotions to Y-axis.
  4. Drag clicks and orders to Value.

This graph displays clicks and orders for each promotion.

  1. Choose Insights on the left panel to see a summary of your insights.

Refreshing the data

We can also set up a scheduled refresh for our data.

  1. Choose Manage Data.
  2. Choose view_conversion_ratio.
  3. Choose Schedule refresh.
  4. Choose Create.
  5. For Repeats, choose Hourly.
  6. Choose Create.

You see a confirmation message that you configured a refresh one time per hour.

Conclusion

In this post, we showed you how to use AWS analytics and storage services to address business challenges that require handling large volumes of data. Kinesis Data Streams and Kinesis Data Analytics let you ingest large volumes of data and sessionize the data. We also showed you how to analyze and visualize the clickstream and order data using AWS Glue, Athena, and QuickSight.


About the Authors

Gandhi Raketla is a Senior Solutions Architect for AWS. He works with AWS customers and partners on cloud adoption, architecting solutions that help customers foster agility and innovation.

 

 

 

Nick Sack is a DevOps Consultant for AWS Professional Services. He is passionate about working with customers and building automated solutions to help customers on their cloud journeys. When not working, Nick enjoys hiking, playing soccer, reading, and learning about technology.