Tag Archives: Amazon EMR

Run Apache Spark 3.0 workloads 1.7 times faster with Amazon EMR runtime for Apache Spark

Post Syndicated from Al MS original https://aws.amazon.com/blogs/big-data/run-apache-spark-3-0-workloads-1-7-times-faster-with-amazon-emr-runtime-for-apache-spark/

With Amazon EMR release 6.1.0, Amazon EMR runtime for Apache Spark is now available for Spark 3.0.0. EMR runtime for Apache Spark is a performance-optimized runtime for Apache Spark that is 100% API compatible with open-source Apache Spark.

In our benchmark performance tests using TPC-DS benchmark queries at 3 TB scale, we found EMR runtime for Apache Spark 3.0 provides a 1.7 times performance improvement on average, and up to 8 times improved performance for individual queries over open-source Apache Spark 3.0.0. With Amazon EMR 6.1.0, you can now run your Apache Spark 3.0 applications faster and cheaper without requiring any changes to your applications.

Results observed using TPC-DS benchmarks

To evaluate the performance improvements, we used TPC-DS benchmark queries with 3 TB scale and ran them on a 6-node c4.8xlarge EMR cluster with data in Amazon Simple Storage Service (Amazon S3). We ran the tests with and without the EMR runtime for Apache Spark. The following two graphs compare the total aggregate runtime and geometric mean for all queries in the TPC-DS 3 TB query dataset between the Amazon EMR releases.

The following table shows the total runtime in seconds.

The following table shows the total runtime in seconds.

The following table shows the geometric mean of the runtime in seconds.

The following table shows the geometric mean of the runtime in seconds.

In our tests, all queries ran successfully on EMR clusters that used the EMR runtime for Apache Spark. However, when using Spark 3.0 without the EMR runtime, 34 out of the 104 benchmark queries failed due to SPARK-32663. To work around these issues, we disabled spark.shuffle.readHostLocalDisk configuration. However, even after this change, queries 14a and 14b continued to fail. Therefore, we chose to exclude these queries from our benchmark comparison.

The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is illustrated in the following chart. The horizontal axis shows each query in the TPC-DS 3 TB benchmark. The vertical axis shows the speedup of each query due to the EMR runtime. We found a 1.7 times performance improvement as measured by the geometric mean of the per-query speedups, with all queries showing a performance improvement with the EMR Runtime.

The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is also illustrated in the following chart.

Conclusion

You can run your Apache Spark 3.0 workloads faster and cheaper without making any changes to your applications by using Amazon EMR 6.1. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more great Apache Spark optimizations, configuration best practices, and tuning advice.


About the Authors

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

 

 

 

 

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

Building complex workflows with Amazon MWAA, AWS Step Functions, AWS Glue, and Amazon EMR

Post Syndicated from Dipankar Ghosal original https://aws.amazon.com/blogs/big-data/building-complex-workflows-with-amazon-mwaa-aws-step-functions-aws-glue-and-amazon-emr/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS and build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

You can use AWS Step Functions as a serverless function orchestrator to build scalable big data pipelines using services such as Amazon EMR to run Apache Spark and other open-source applications on AWS in a cost-effective manner, and use AWS Glue for a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs

For production pipelines, a common use case is to read data originating from a variety of sources. This data requires transformation to extract business value and generate insights before sending to downstream applications, such as machine learning algorithms, analytics dashboards, and business reports.

This post demonstrates how to use Amazon MWAA as a primary workflow management service to create and run complex workflows and extend the directed acyclic graph (DAG) to start and monitor a state machine created using Step Functions. In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Architectural overview

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. This workflow uses Amazon EMR to preprocess data and starts a Step Functions state machine. The state machine transforms data using AWS Glue.

The state machine transforms data using AWS Glue.

The workflow includes the following core components:

  1. Airflow Scheduler triggers the DAG based on a schedule or manually.
  2. DAG uses PythonOperator to create an EMR cluster and waits for the cluster creation process to complete.
  3. DAG uses a custom operator EmrSubmitAndMonitorStepOperator to submit and monitor the Amazon EMR step.
  4. DAG uses PythonOperator to stop the EMR cluster when the preprocessing tasks are complete.
  5. DAG starts a Step Functions state machine and monitors it for completion using PythonOperator.

You can build complex ETL pipelines with Step Functions separately and trigger them from an Airflow DAG.

Prerequisites

Before starting, create an Amazon MWAA environment. If this is your first time using Amazon MWAA, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs. It’s located on the environment details page on the Amazon MWAA console.

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs.

Also note the AWS Identity and Access Management (IAM) execution role. This role should be modified to allow MWAA to read and write from your S3 bucket, submit an Amazon EMR step, start a Step Functions state machine, and read from the AWS Systems Manager Parameter Store. The IAM role is available in the Permissions section of the environment details.

The IAM role is available in the Permissions section of the environment details.

The solution references Systems Manager parameters in an AWS CloudFormation template and scripts. For information on adding and removing IAM identity permissions, see Adding and removing IAM identity permissions. A sample IAM policy is also provided in the GitHub repository amazon-mwaa-complex-workflow-using-step-functions.

For this post, we use the MovieLens dataset. We concurrently convert the MovieLens CSV files to Parquet format and save them to Amazon S3 as part of preprocessing.

Setting up the state machine using Step Functions

Our solution extends the ETL pipeline to run a Step Functions state machine from the Airflow DAG. Step Functions lets you build visual workflows that enable fast translation of business requirements into technical requirements. With Step Functions, you can set up dependency management and failure handling using a JSON-based template. A workflow is a series of steps, such as tasks, choices, parallel runs, and timeouts with the output of one step acting as input into the next. For more information about other use cases, see AWS Step Functions Use Cases.

The following diagram shows the ETL process set up through a Step Functions state machine.

The following diagram shows the ETL process set up through a Step Functions state machine.

In the workflow, the Process Data step runs an AWS Glue job, and the Get Job Status step periodically checks for the job completion. The AWS Glue job reads the input datasets and creates output data for the most popular movies and top-rated movies. After the job is complete, the Run Glue Crawler step runs an AWS Glue crawler to catalog the data. The workflow also allows you to monitor and respond to failures at any stage.

Creating resources

Create your resources by following the installation instructions provided in the amazon-mwaa-complex-workflow-using-step-functions README.md.

Running the ETL workflow

To run your ETL workflow, complete the following steps:

  1. On the Amazon MWAA console, choose Open Airflow UI.
  2. Locate the mwaa_movielens_demo DAG.
  3. Turn on the DAG.

Turn on the DAG.

  1. Select the mwaa_movielens_demo DAG and choose Graph View.

This displays the overall ETL pipeline managed by Airflow.

This displays the overall ETL pipeline managed by Airflow.

  1. To view the DAG code, choose Code.

To view the DAG code, choose Code.

The code for the custom operator can be found in the amazon-mwaa-complex-workflow-using-step-functions GitHub repo. 

  1. From the Airflow UI, select the mwaa_movielens_demo DAG and choose Trigger DAG.
  2. Leave the Optional Configuration JSON box blank.

Leave the Optional Configuration JSON box blank.

When the Airflow DAG runs, the first task calls the PythonOperator to create an EMR cluster using Boto3. Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3. Boto provides object-oriented API, as well as low-level access to AWS services.

The second task waits until the EMR cluster is ready and in the Waiting state. As soon as the cluster is ready, the data load task runs, followed by the data preprocessing tasks, which are started in parallel using EmrSubmitAndMonitorStepOperator. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. You can change the concurrency of Amazon EMR to run multiple Amazon EMR steps in parallel.

When the data preprocessing tasks are complete, the EMR cluster is stopped and the DAG starts the Step Functions state machine to initiate data transformation.

The final task in the DAG monitors the completion of the Step Functions state machine.

The DAG run should complete in approximately 10 minutes.

Verifying the DAG run

While the DAG is running, you can view the task logs.

  1. From Graph View, select any task and choose View Log.

From Graph View, select any task and choose View Log.

  1. When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

  1. You can also monitor ETL process completion from the Airflow UI.

You can also monitor ETL process completion from the Airflow UI.

  1. On the Airflow UI, verify the completion from the log entries.

On the Airflow UI, verify the completion from the log entries.

Querying the data

After the successful completion of the Airflow DAG, two tables are created in the AWS Glue Data Catalog. To query the data with Amazon Athena, complete the following steps:

  1. On the Athena console, choose Databases.
  2. Select the mwaa-movielens-demo-db database.

You should see the two tables. If the tables aren’t listed, verify that the AWS Glue crawler run is complete and that the console is showing the correct Region.

  1. Run the following query:
    SELECT * FROM "mwaa-movielens-demo-db"."most_popular_movies" limit 10;

The following screenshot shows the output.

The following screenshot shows the output.

Cleaning up

To clean up the resources created as part of our CloudFormation template, delete the mwaa-demo-foundations stack. You can either use the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI).

Conclusion

In this post, we used Amazon MWAA to orchestrate an ETL pipeline on Amazon EMR and AWS Glue with Step Functions. We created an Airflow DAG to demonstrate how to run data processing jobs concurrently and extended the DAG to start a Step Functions state machine to build a complex ETL pipeline. A custom Airflow operator submitted and then monitored the Amazon EMR steps synchronously.

If you have comments or feedback, please leave them in the comments section.


About the Author

Dipankar GhosalDipankar Ghosal is a Sr Data Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

Introducing Amazon EMR integration with Apache Ranger

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-integration-with-apache-ranger/

Data security is an important pillar in data governance. It includes authentication, authorization , encryption and audit.

Amazon EMR enables you to set up and run clusters of Amazon Elastic Compute Cloud (Amazon EC2) instances with open-source big data applications like Apache Spark, Apache Hive, Apache Flink, and Presto. You may also want to set up multi-tenant EMR clusters where different users (or teams) can use a shared EMR cluster to run big data analytics workloads. In a multi-tenant cluster, it becomes important to set up mechanisms for authentication (determine who is invoking the application and authenticate the user), authorization (set up who has access to what data), and audit (maintain a log of who accessed what data).

Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka.

We’re happy to share that starting with Amazon EMR 5.32, we’re including plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon Simple Storage Service (Amazon S3), and Apache Hive.

You can set up a multi-tenant EMR cluster, use Kerberos for user authentication, use Apache Ranger 2.0 (managed separately outside the EMR cluster) for authorization, and configure fine-grained data access policies for databases, tables, columns, and S3 objects. In this post, we explain how you can set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. We show how you can set up multiple short-running and long-running EMR clusters with a single, centralized Apache Ranger server that maintains data access control policies.

Managed Apache Ranger plugins for PrestoSQL and PrestoDB will soon follow.

You should consider this solution if one or all of these apply:

  • Have experience setting up and managing Apache Ranger admin server (needs to be self-managed)
  • Want to port existing Apache Ranger Hive policies over to Amazon EMR
  • Need to use the database-backed Hive Metastore and can’t use the AWS Glue Data Catalogdue to limitations
  • Require authorization support for Apache Spark (SQL and storage and file access) and Amazon S3
  • Store Apache Ranger authorization audits in Amazon Cloudwatch, avoiding the need to maintain an Apache Solr infrastructure

With this native integration, you use the Amazon EMR security configuration to specify Apache Ranger details, without the need for custom bootstrap scripts. You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

The following image shows table and column-level access set up for Apache SparkSQL.

Additionally, SSH users are blocked from getting AWS Identity and Access Management (IAM) permissions tied with the Amazon EMR instance profiles. This disables  access to Amazon S3 using tools like the AWS Command Line Interface(AWS CLI).

The following screenshot that shows access to Amazon S3 blocked when using AWS CLI.

The following screenshot that shows access to Amazon S3 blocked when using AWS CLI. 

The following screenshots shows how access to the same Amazon S3 location is set up and used through EMRFS (default EMR file system implementation for reading and writing files from Amazon S3).

Prerequisites

Before getting started, you must have the following prerequisites:

  • Self-managed Apache Ranger server (2.x only) outside of an EMR cluster
  • TLS mutual authentication enabled between Apache Ranger server and Apache Ranger plugins running on the EMR cluster
  • Additional IAM roles:
    • IAM role for Apache Ranger– Defines privileges that trusted processes have when submitting Spark and Hive jobs
    • IAM role for other AWS services– Defines privileges that end-users have when accessing services that aren’t protected by Apache Ranger plugins.
  • Updates to the Amazon EC2 EMR role:
  • New Apache Ranger service definitions installed for Apache Spark and Amazon S3
  • Apache Ranger server certificate and private key for plugins uploaded into Secrets Manager
  • A CloudWatch log group for Apache Ranger audits

Architecture overview

The following diagram illustrates the architecture for this solution.

The following diagram illustrates the architecture for this solution.

In the architecture, the Amazon EMR secret agent intercepts user requests and vends credentials based on user and resources. The Amazon EMR record server receives requests to access data from Spark, reads data from Amazon S3, and returns filtered data based on Apache Ranger policies.

See Amazon EMR Components to learn more about Amazon EMR Secret Agent and Record Server.

Setting up your resources

In this section, we walk you through setting up your resources manually.

If you want to use CloudFormation scripts to automate the setup, see the section Setting up your architecture with CloudFormation later in this post.

Uploading SSL private keys and certificates to Secrets Manager

Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see this script.

Uploading SSL private keys and certificates to Secrets Manager

Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see the script create-tls-certs.sh.

Setting up an Apache Ranger server

You need to set up a two-way SSL-enabled Apache Ranger server. To set up the server manually, refer to the script install-ranger-admin-server.sh.

Installing Apache Ranger service definitions

In this section, we review installing the Apache Ranger service definitions for Apache Spark and Amazon S3.

Apache Spark

To add a new Apache Ranger service definition, see the following script:

mkdir /tmp/emr-spark-plugin/
cd /tmp/emr-spark-plugin/

# Download the Service definition
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-spark.json

# Download Service implementation jar/class
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-spark-plugin-2.x.jar

# Copy Service implementation jar to Ranger server
export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark
mv ranger-spark-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark

# Add the service definition using the Ranger REST API
curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-spark.json \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

This script is included in the Apache Ranger server setup script, if you’re deploying resources with the CloudFormation template.

The policy definition is similar to Apache Hive, except that the actions are limited to select only. The following screenshot shows the definition settings.

The following screenshot shows the definition settings.

To change permissions, for the user, choose select.

To change permissions, for the user, choose select.

Amazon S3 (via Amazon EMR File System)

Similar to Apache Spark, we have a new Apache Ranger service definition for Amazon S3. See the following script:

mkdir /tmp/emr-emrfs-s3-plugin/
cd /tmp/emr-emrfs-s3-plugin/

# Download the Service definition
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-emrfs.json

# Download Service implementation jar/class
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-emr-emrfs-plugin-2.x.jar

# Copy Service implementation jar to Ranger server
export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs
mv ranger-emrfs-s3-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs 

# Add the service definition using the Ranger REST API
curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-emrfs.json \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

If you’re using the CloudFormation template, this script is included in the Apache Ranger server setup script.

The following screenshot shows the policy details.

The following screenshot shows the policy details.

You can enable standard Amazon S3 access permissions in this policy.

You can enable standard Amazon S3 access permissions in this policy. 

Importing your existing Apache Hive policies

You can import your existing Apache Hive policies into the Apache Ranger server tied to the EMR cluster. For more information, see User Guide for Import-Export.

The following image shows how to use Apache Ranger’s export and import option.

 

CloudWatch for Apache Ranger audits

Apache Ranger audits are sent to CloudWatch. You should create a new Cloudwatch log group and specify that in the security configuration. See the following code:

aws logs create-log-group --log-group-name /aws/emr/rangeraudit/

You can search audit information using CloudWatch Insights. The following screenshot shows a query.

The following screenshot shows a query.
The following screenshot shows a query.

New Amazon EMR security configuration

The new Amazon EMR security configuration requires the following inputs:

  • IP address of the Apache Ranger server
  • IAM role for the Apache Ranger service (see the GitHub repo) running on the EMR cluster and accessing other AWS services (see the GitHub repo)
  • Secrets Manager name with the Apache Ranger admin server certificate
  • Secrets Manager name with the private key used by the plugins
  • CloudWatch log group name

The following code is an example of using the AWS CLI to create this security configuration:

aws emr create-security-configuration --name MyEMRRangerSecurityConfig --security-configuration
'{
   "EncryptionConfiguration":{
      "EnableInTransitEncryption":false,
      "EnableAtRestEncryption":false
   },
   "AuthenticationConfiguration":{
      "KerberosConfiguration":{
         "Provider":"ClusterDedicatedKdc",
         "ClusterDedicatedKdcConfiguration":{
            "TicketLifetimeInHours":24
         }
      }
   },
   "AuthorizationConfiguration":{
      "RangerConfiguration":{
         "AdminServerURL":"https://<RANGER ADMIN SERVER IP>:8080",
         "RoleForRangerPluginsARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<RANGER PLUGIN DATA ACCESS ROLE NAME>",
         "RoleForOtherAWSServicesARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<USER ACCESS ROLE NAME>",
         "AdminServerSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES ADMIN SERVERS PUBLIC TLS CERTICATE>",
         "RangerPluginConfigurations":[
            {
               "App":"Spark",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES SPARK PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"spark-policy-repository"
            },
            {
               "App":"Hive",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES HIVE PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"hive-policy-repository"
            },
            {
               "App":"EMRFS-S3",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES EMRFS S3 PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"emrfs-policy-repository"
            }
         ],
         "AuditConfiguration":{
            "Destinations":{
               "AmazonCloudWatchLogs":{
                  "CloudWatchLogGroup":"arn:aws:logs:us-east-1:<AWS ACCOUNT ID>:log-group:<LOG GROUP NAME FOR AUDIT EVENTS>"
               }
            }
         }
      }
   }
}'

Install Amazon EMR cluster with Kerberos

Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

Setting up your architecture with CloudFormation

To help you get started, we added a new GitHub repo with setup instructions. The following diagram shows the logical architecture after the CloudFormation stack is fully deployed. Review the roadmap for future enhancements.

Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

To set up this architecture using CloudFormation, complete the following steps:

  1. Use the create-tls-certs.sh script to upload the SSL key and certifications to Secrets Manager.
  2. Set up the VPC or Active Directory server by launching the following CloudFormation template.
  3. Verify DHCP options to make sure the domain name servers for the VPC are listed in the right order (LDAP/AD server first, followed by AmazonProvidedDNS).
  4. Set up the Apache Ranger server,  Amazon Relational Database Service (Amazon RDS) instance, and EMR cluster by launching the following CloudFormation template.

Limitations

When using this solution, keep in mind the following limitations:

  • As of this writing, Amazon EMR 6.x isn’t supported (only Amazon EMR 5.32+ is supported)
  • Non-Kerberos clusters will not be supported.
  • Jobs must be submitted through Apache Zeppelin, Hue, Livy, and SSH.
  • Only selected applications can be installed on the Apache Ranger-enabled EMR cluster, such as Hadoop, Tez and Ganglia. For a full list, see Supported Applications. The cluster creation request is rejected if you choose applications outside this supported list.
  • As of this writing, the SparkSQL plugin doesn’t support column masking and row-level filters.
  • The SparkSQL INSERT INTO and INSERT OVERWRITE overrides aren’t supported.
  • You can’t view audits on the Apache Ranger UI as they’re sent to CloudWatch.
  • The AWS Glue Data Catalog isn’t supported as the Apache Hive Metastore.

Available now

Native support for Apache Ranger 2.0 with Apache Hive, Apache Spark, and Amazon S3 is available today in the following AWS Regions:

  • US East (Ohio)
  • US East (N. Virginia)
  • US West (N. California)
  • US West (Oregon)
  • Africa (Cape Town)
  • Asia Pacific (Hong Kong)
  • Asia Pacific (Mumbai)
  • Asia Pacific (Seoul)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Canada (Central)
  • Europe (Frankfurt)
  • Europe (Ireland)
  • Europe (London)
  • Europe (Paris)
  • Europe (Milan)
  • Europe (Stockholm)
  • South America (São Paulo)
  • Middle East (Bahrain)

For the latest Region availability, see Amazon EMR Management Guide.

Conclusion

Amazon EMR 5.32 includes plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon S3, and Apache Hive. This post demonstrates how to set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. If you have any thoughts of questions, please leave them in the comments.


About the Author

Varun Rao Bhamidimarri is a Sr Manager, AWS Analytics Specialist Solutions Architect team. His focus is helping customers with adoption of cloud-enabled analytics solutions to meet their business requirements. Outside of work, he loves spending time with his wife and two kids, stay healthy, mediate and recently picked up garnering during the lockdown.

Testing data quality at scale with PyDeequ

Post Syndicated from Calvin Wang original https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/

You generally write unit tests for your code, but do you also test your data? Incoming data quality can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. Examples of data quality issues include the following:

  • Missing values can lead to failures in production system that require non-null values (NullPointerException)
  • Changes in the distribution of data can lead to unexpected outputs of machine learning (ML) models
  • Aggregations of incorrect data can lead to wrong business decisions

In this post, we introduce PyDeequ, an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark, the language of choice of many data scientists. PyDeequ democratizes and extends the power of Deequ by allowing you to use it alongside the many data science libraries that are available in that language. Furthermore, PyDeequ allows for fluid interface with Pandas DataFrames as opposed to restricting within Apache Spark DataFrames.

Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (billions of rows) that typically live in a data lake, distributed file system, or a data warehouse. PyDeequ gives you access to this capability, but also allows you to use it from the familiar environment of your Python Jupyter notebook.

Deequ at Amazon

Deequ is used internally at Amazon to verify the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues don’t propagate to consumer data pipelines, reducing their blast radius.

Deequ is also used within Amazon SageMaker Model Monitor. Now with the availability of PyDeequ, you can use it from a broader set of environments— Amazon SageMaker notebooks, AWS Glue, Amazon EMR, and more.

Overview of PyDeequ

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram):

  • Metrics computation – Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon Simple Storage Service (Amazon S3) and compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
  • Constraint verification – As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
  • Constraint suggestion – You can choose to define your own custom data quality constraints or use the automated constraint suggestion methods that profile the data to infer useful constraints.
  • Python wrappers – You can call each Deequ function using Python syntax. The wrappers translate the commands to the underlying Deequ calls and return their response.

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram)

Use case overview

As a running example, we use a customer review dataset provided by Amazon on Amazon S3. We intentionally follow the example in the post Test data quality at scale with Deequ to show the similarity in functionality and implementation. We begin the way many data science projects do: with initial data exploration and assessment in a Jupyter notebook.

If you’d like to follow along with a live Jupyter notebook, check out the notebook on our GitHub repo.

During the data exploration phase, you want to easily answer some basic questions about the data:

  • Are the fields that are supposed to contain unique values really unique? Are there fields that are missing values?
  • How many distinct categories are there in the categorical fields?
  • Are there correlations between some key features?
  • If there are two supposedly similar datasets (such as different categories or different time periods), are they really similar?

We also show you how to scale this approach to large-scale datasets, using the same code on an Amazon EMR cluster. This is how you’d likely do your ML training, and later as you move into a production setting.

Starting a PySpark session in a SageMaker notebook

To follow along with this post, open up a SageMaker notebook instance, clone the PyDeequ GitHub on the Sagemaker notebook instance, and run the test_data_quality_at_scale.ipynb notebook from the tutorials directory from the PyDeequ repository.

Let’s install our dependencies first in a terminal window:

$ pip install pydeequ

Next, in a cell of our SageMaker notebook, we need to create a PySpark session:

import sagemaker_pyspark
import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

Loading data

Load the dataset containing reviews for the category Electronics into our Jupyter notebook:

df = spark.read.parquet("s3a://amazon-reviews-pds/parquet/product_category=Electronics/")

After you load the DataFrame, you can run df.printSchema() to view the schema of the dataset:

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)

Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. As with Deequ, PyDeequ supports a rich set of metrics. For more information, see Test data quality at scale with Deequ or the GitHub repo. In the following example, we use the AnalysisRunner to capture the metrics you’re interested in:

from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("review_id")) \
                    .addAnalyzer(ApproxCountDistinct("review_id")) \
                    .addAnalyzer(Mean("star_rating")) \
                    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
                    .addAnalyzer(Correlation("total_votes", "star_rating")) \
                    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

The following table summarizes our findings.

NameInstanceValue
ApproxCountDistinctreview_id3010972
Completenessreview_id1
Compliancetop star_rating0.74941
Correlationhelpful_votes,total_votes0.99365
Correlationtotal_votes,star_rating-0.03451
Meanstar_rating4.03614
Size*3120938

From this, we learn the following:

  • review_id has no missing values and approximately 3,010,972 unique values
  • 9% of reviews have a star_rating of 4 or higher
  • total_votes and star_rating are not correlated
  • helpful_votes and total_votes are strongly correlated
  • The average star_rating is 4.0
  • The dataset contains 3,120,938 reviews

Defining and running tests for data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the VerificationSuite and add checks on attributes of the data. In this example, we test for the following properties of our data:

  • At least 3 million rows in total
  • review_id is never NULL
  • review_id is unique
  • star_rating has a minimum of 1.0 and maximum of 5.0
  • marketplace only contains US, UK, DE, JP, or FR
  • year does not contain negative values

This is the code that reflects the previous statements. For information about all available checks, see the GitHub repo. You can run this directly in the Spark shell as previously explained:

from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Amazon Electronics Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3000000) \
        .hasMin("star_rating", lambda x: x == 1.0) \
        .hasMax("star_rating", lambda x: x == 5.0)  \
        .isComplete("review_id")  \
        .isUnique("review_id")  \
        .isComplete("marketplace")  \
        .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"]) \
        .isNonNegative("year")) \
    .run()
    
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

After calling run(), PyDeequ translates your test description into Deequ, which translates it into a series of Spark jobs that are run to compute metrics on the data. Afterwards, it invokes your assertion functions (for example, lambda x: x == 1.0 for the minimum star rating check) on these metrics to see if the constraints hold on the data. The following table summarizes our findings.

Constraintconstraint_statusconstraint_message
SizeConstraint(Size(None))Success
MinimumConstraint(Minimum(star_rating,None))Success
MaximumConstraint(Maximum(star_rating,None))Success
CompletenessConstraint(Completeness(review_id,None))Success
UniquenessConstraint(Uniqueness(List(review_id)))FailureValue: 0.9926566948782706 does not meet the constraint requirement!
CompletenessConstraint(Completeness(marketplace,None))Success
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None))Success
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None))Success

Interestingly, the review_id column isn’t unique, which resulted in a failure of the check on uniqueness. We can also look at all the metrics that Deequ computed for this check by running the following:

checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult)
checkResult_df.show()

The following table summarizes our findings.

NameInstanceValue
Completenessreview_id1
Completenessmarketplace1
Compliancemarketplace contained in US,UK,DE,JP,FR1
Complianceyear is non-negative1
Maximumstar_rating5
Minimumstar_rating1
Size*3120938
Uniquenessreview_id0.99266

Automated constraint suggestion

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see the GitHub repo.

from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

The result contains a list of constraints with descriptions and Python code, so that you can directly apply it in your data quality checks. Call print(json.dumps(result_json)) to inspect the suggested constraints; the following table shows a subset.

ColumnConstraintPython code
customer_idcustomer_id is not null.isComplete("customer_id")
customer_idcustomer_id has type Integral.hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_idcustomer_id has no negative values.isNonNegative("customer_id")
helpful_voteshelpful_votes is not null.isComplete("helpful_votes")
helpful_voteshelpful_votes has no negative values.isNonNegative("helpful_votes")
marketplacemarketplace has value range “US”, “UK”, “DE”, “JP”, “FR”.isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"])
product_titleproduct_title is not null.isComplete("product_title")
star_ratingstar_rating is not null.isComplete("star_rating")
star_ratingstar_rating has no negative values.isNonNegative("star_rating")
vinevine has value range “N”, “Y”.isContainedIn("vine", ["N", "Y"])

You can explore the other tutorials in the PyDeequ GitHub repo.

Scaling to production

So far, we’ve shown you how to use these capabilities in the context of data exploration using a Jupyter notebook running on a SageMaker notebook instance. As your project matures, you need to use the same capabilities on larger and larger datasets, and in a production environment. With PyDeequ, it’s easy to make that transition. The following diagram illustrates deployment options for local and production purposes on AWS.

The following diagram illustrates deployment options for local and production purposes on AWS.

Amazon EMR and AWS Glue interface with PyDeequ through the PySpark drivers that PyDeequ utilizes as its main engine. PyDeequ can run as a PySpark application in both contexts when the Deequ JAR is added the Spark context. You can run PyDeequ’s data validation toolkit after the Spark context and drivers are configured and your data is loaded into a DataFrame. We describe the Amazon EMR configuration options and use cases in this section (configurations 2 and 3 in the diagram).

Data exploration from a SageMaker notebook via an EMR cluster

As shown in configuration 2 in the diagram, you can connect to an EMR cluster from a SageMaker notebook to run PyDeequ. This enables you to explore much larger volumes of data than you can using a single notebook. Your Amazon EMR cluster must be running Spark v2.4.6, available with Amazon EMR version 5.31 or higher, in order to work with PyDeequ. After you have a running cluster that has those components and a SageMaker notebook, you configure a SparkSession object using the following template to connect to your cluster. For more information about connecting a SageMaker notebook to Amazon EMR or the necessary IAM permissions, see Submitting User Applications with spark-submit.

In the SageMaker notebook, run the following JSON in a cell before you start your SparkSession to configure your EMR cluster:

%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
          "spark.jars.packages": "com.amazon.deequ:deequ:1.0.3",
          "spark.jars.excludes": "net.sourceforge.f2j:arpack_combined_all"
         }
}

Start your SparkSession object in a cell after the preceding configuration by running spark. Then install PyDeequ onto your EMR cluster using the SparkContext (default named sc) with the following command:

sc.install_pypi_package('pydeequ')

Now you can start using PyDeequ from your notebook to run the same statements as before, but with much larger volumes of data.

Running a transient EMR cluster

Another way to leverage the power of an EMR cluster is to treat it as a transient cluster and run it in a headless configuration, as shown in configuration 3 in the diagram. We use spark-submit in an EMR add-step to run PyDeequ on Amazon EMR. For each of the following steps, make sure to replace the values in brackets accordingly.

  1. Create a bootstrap shell script and upload it to an S3 bucket. The following code is an example of pydeequ-emr-bootstrap.sh:
    #!/bin/bash
    
    sudo python3 -m pip install --no-deps pydeequ
    sudo python3 -m pip install pandas 

  1. Create an EMR cluster via the AWS Command Line Interface (AWS CLI):
    $ aws emr create-cluster \
    --name 'my-pydeequ-cluster' \
    --release-label emr-5.31.0 --applications Name=Spark Name=Hadoop Name=Hive Name=Livy Name=Pig Name=Hue 
    --use-default-roles \
    --instance-type m5.xlarge \
    --instance-count 2 \
    --bootstrap-actions \
        Path="s3://<S3_PATH_TO_BOOTSTRAP>/pydeequ-emr-bootstrap.sh",Name='install_pydeequ' \
    --visible-to-all-users \
    --enable-debugging \
    --ec2-attributes KeyName="<MY_SSH_KEY>",SubnetId="<MY_SUBNET>" \
    --auto-scaling-role EMR_AutoScaling_DefaultRole

  1. Create your PySpark PyDeequ run script and upload into Amazon S3. The following code is our example of pydeequ-test.py:
    import sys
    import pydeequ
    from pydeequ.checks import *
    from pydeequ.verification import *
    from pyspark.sql import SparkSession, Row
    
    if __name__ == "__main__":
    
        with SparkSession.builder.appName("pydeequ").getOrCreate() as spark:
    
            df = spark.sparkContext.parallelize([
                Row(a="foo", b=1, c=5),
                Row(a="bar", b=2, c=6),
                Row(a="baz", b=3, c=None)]).toDF()
    
            check = Check(spark, CheckLevel.Error, "Integrity checks")
    
            checkResult = VerificationSuite(spark) \
                .onData(df) \
                .addCheck(
                    check.hasSize(lambda x: x >= 3) \
                    .hasMin("b", lambda x: x == 0) \
                    .isComplete("c")  \
                    .isUnique("a")  \
                    .isContainedIn("a", ["foo", "bar", "baz"]) \
                    .isNonNegative("b")) \
                .run()
    
            checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
            checkResult_df.repartition(1).write.csv("s3a://<PATH_TO_OUTPUT>/pydeequ-out.csv", sep='|')

  1. When your cluster is running and in the WAITING stage, submit your Spark job to Amazon EMR via the AWS CLI:
    $ aws emr add-steps \
    --cluster-id <MY_CLUSTER_ID> \
    --steps Type=Spark,Name="pydeequ-spark-submit",Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,--packages,com.amazon.deequ:deequ:1.0.3,--exclude-packages,net.sourceforge.f2j:arpack_combined_all,s3a://pydeequ-emr/setup/pydeequ-test.py],ActionOnFailure=CANCEL_AND_WAIT

Congratulations, you have now submitted a PyDeequ PySpark job to Amazon EMR. Give the job a few minutes to run, after which you can view your results at the S3 output path specified on the last line of pydeequ-test.py.

Afterwards, remember to clean up your results and spin down the EMR cluster using the following command:

$ aws emr terminate-clusters --cluster-ids <MY_CLUSTER_ID>

Now you can use Amazon EMR to process large datasets in batch using PyDeequ to plug into your pipelines and provide scalable tests on your data.

More examples on GitHub

You can find examples of more advanced features on the Deequ GitHub page:

  • Deequ provides more than data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
  • Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
  • If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

Conclusion

This post showed you how to use PyDeequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. PyDeequ is available via pip install and on GitHub now for you to build your own data quality management pipeline.

Learn more about the inner workings of Deequ in the VLDB 2018 paper Automating large-scale data quality verification.

Stay tuned for another post demonstrating production workflows on AWS Glue.


About the Authors

Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.

 

 

Chris Ghyzel is a Data Engineer for AWS Professional Services. Currently, he is working with customers to integrate machine learning solutions on AWS into their production pipelines.

 

 

 

Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

Dream11’s journey to building their Data Highway on AWS

Post Syndicated from Pradip Thoke original https://aws.amazon.com/blogs/big-data/dream11s-journey-to-building-their-data-highway-on-aws/

This is a guest post co-authored by Pradip Thoke of Dream11. In their own words, “Dream11, the flagship brand of Dream Sports, is India’s biggest fantasy sports platform, with more than 100 million users. We have infused the latest technologies of analytics, machine learning, social networks, and media technologies to enhance our users’ experience. Dream11 is the epitome of the Indian sports technology revolution.”

Since inception, Dream11 has been a data-driven sports technology brand. 

Since inception, Dream11 has been a data-driven sports technology brand. The systems that power Dream11, including their transactional data warehouse, run on AWS. As Dream11 hosts fantasy sports contests that are joined by millions of Indian sports fans, they have large volumes of transactional data that is organized in a well-defined Amazon Redshift data warehouse. Previously they were using 3rd party services to collect, analyze and build models over user interaction data combined with transactional data. Although this approach was convenient, it presented certain critical issues:

  • The approach wasn’t conducive to 360-degree user analytics. Dream11’s user interactions data wasn’t present on the cloud, where the rest of Dream11’s infrastructure and data were present (AWS, in this case). To get a complete picture of a user’s experience and journey, the user’s interaction data (client events) needs to be analyzed alongside their transactional data (server events). This is known as 360-degree user analytics.
  • It wasn’t possible to get accurate user journey funnel reports. Currently, there are limitations with every tool available on the market with respect to identifying and mapping a given user’s actions across multiple platforms (on the web, iOS, or Android), as well as multiple related apps. This use case is specifically important if your company is a parent to other companies.
  • The statistics on user behavior that Dream11 was getting weren’t as accurate as they wanted. Some of the popular services they were using for web & mobile analytics use the technique of sampling to be able to deal with high volumes of data. Although this is a well-regarded technique to deal with high volumes of data and provides reasonable accuracy in multiple cases, Dream11 wanted statistics to be as accurate as possible.
  • The analytics wasn’t real-time. Dream11 experiences intense use by their users just before and during the real-life sports matches, so real-time and near-real-time analytics is very critical for them. This need wasn’t sufficiently met by the plethora of services they were using.
  • Their approach was leading to high cost for custom analytics for Dream11’s user interactions data, consisting of hundreds of event types. Serverless query engines typically charge by the amount of data scanned and so it can get very expensive if events data isn’t organized properly in separate tables in a data lake to enable selective access.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform. Therefore, they embarked on the Data Highway project on AWS.

This project has additional advantages. It is increasingly becoming important to store and process data securely. Having everything in-house can help Dream11 with data security and data privacy objectives. The platform enables 360-degree customer analytics, which further allows Dream11 to do intelligent user segmentation in-house and share only those segments (without exposing underlying transactional or interactions data) with third-party messaging service providers. 

Design goals

Dream11 had the following design goals for the project:

  • The system should be easy to maintain and should be able to handle a very high volume of data, consisting of billions of events and terabytes of data daily.
  • The cost should be low and should be pay-as-you-go.
  • Dream11’s web and mobile developers regularly create new types of events to capture new types of interactions. Whenever they add new types of events, they should be immediately available in the system for analytics, and their statistics should immediately reflect in relevant dashboards and reports without any human intervention.
  • Certain types of statistics (such as concurrency) should be available in real-time and near-real time—within 5 minutes or less.
  • Dream11 should be able to use custom logic to calculate key statistics. The analytics should be accurate—no more sampling.
  • The data for various events should be neatly organized in separate tables and analytics-friendly file formats.
  • Although Dream11 will have a common data lake, they shouldn’t be constrained to use a single analytics engine for all types of analytics. Different types of analytics engines excel for different types of queries.
  • The Product Management team should have access to views they commonly use in their decision-making process, such as funnels and user flow diagrams.
  • The system should be extensible by adding lanes in the system. Lanes allow you to reuse your basic setup without mixing events data for different business units. It also potentially allows you to study user behavior across different apps.
  • The system should be able to build 360-degree user profiles
  • The system should provide alerting on important changes to key business metrics.
  • Last but not the least, the system should be secure and reliable with 6 nines of availability guarantee.

Data Highway architecture

In less than 3 months, Dream11’s data team built a system that met all the aforementioned goals. The following diagram shows the high-level architecture.

The following diagram shows the high-level architecture.

For this project, they used the following components:

The rest of this post explains the various design choices and trade-offs made by the Dream11’s data engineers. 

Event ingestion, segregation, and organization

Dream11 has several hundred event types. These events have common attributes and specific attributes. The following diagram shows the logical structure of these events.

The following diagram shows the logical structure of these events.

When the front end receives an event, it saves fields up to common attributes into a message and posts it to Kafka_AllEvents_CommonAttributes. This Kafka topic is the source for the following systems:

  • Apache HBase on Amazon EMR – Provides real-time concurrency analytics
  • Apache Druid – Provides near real-time dimensional analytics
  • Amazon Redshift – Provides session analytics

The front end also saves events, as they are, into Kafka_AllEvents_AllAttributes. These events are further picked by Apache Ni-Fi, which forwards them to their respective topics. Apache Ni-Fi supports data routing, transformation, and system mediation logic using powerful and scalable directed graphs. Data is transformed and published to Kafka by using a combination of RouteOnAttribute and JoltTransformJSON processors (to parse JSON). Apache Ni-Fi basically reads event names and posts to the Kafka topic with matching names. If Kafka doesn’t have a topic with that name, it creates a new topic with that name. You can configure your Kafka brokers to auto-create a topic when a message is received for a non-existent topic.

The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

  The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

 

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

Storage, cataloging, ETL, and scheduling

In this section, we discuss how Dream11 updates their AWS Glue Data Catalog, performs extract, transform, and load (ETL) jobs with Amazon EMR Presto, and uses Apache Airflow for schedule management.

Updating the AWS Glue Data Catalog with metadata for the target table

The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. It provides out-of-the-box integration with Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon Redshift, Amazon Redshift Spectrum, Athena, Amazon EMR, and any application compatible with the Apache Hive metastore. You can create your table definitions one time and query across engines. For more information, see FAQ: Upgrading to the AWS Glue Data Catalog.

Because this Data Catalog is accessible from multiple services that were going to be used for the Data Highway project, Dream11 decided to use it to register all the table definitions.

Registering tables with AWS Glue Data Catalog is easy. You can use an AWS Glue crawler. It can infer schema from files in Amazon S3 and register a table in the Data Catalog. It works quite well, but Dream11 needed additional actions, such as automatically configuring Kafka Amazon S3 sink connectors etc. Therefore, they developed two Python based crawlers.

The first Python based crawler runs every 2 hours and looks up Kafka topics. If it finds a new topic, it configures a Kafka Amazon S3 connector sink to dump its data to Amazon S3 every 30 minutes in JSON Gzip format. It also registers a table with Glue Data Catalog so that users can query the JSON data directly, if needed. 

The second Python based crawler runs once a day and registers a corresponding table for each new table created that day to hold flattened data (Parquet, Snappy). It infers schemas and registers tables with the Data Catalog using its Table API. It adds customization needed by the Dream11 team to the metadata. It then creates Amazon EMR Presto ETL jobs to convert JSON, Gzip data to Parquet, Snappy, and registers them with Apache Airflow to run every 24 hours.

ETL with Amazon EMR Presto

Dream11 has a multi node, long-running, multi-purpose EMR cluster. They decided to run scheduled ETL jobs on it for the Data Highway project.

ETL for an event table involves a simple SELECT FROM -> INSERT INTO command to convert JSON (Gzip) to Parquet (Snappy). Converted data takes up to 70% less space, results in 10 times improvement in Athena query performance. ETL happens once a day. Tables are partitioned by day.

Data received on Kafka_AllEvents_CommonAttributes topic is loaded to Redshift. ETL involves SELECT FROM -> INSERT INTO to convert JSON (Gzip) to CSV, followed by Amazon Redshift COPY.

Apache Airflow for schedule management

Apache Airflow is an open-source tool for authoring and orchestrating big data workflows. With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that run independently. The DAG keeps track of the relationships and dependencies between tasks.

Dream11 uses Apache Airflow to schedule Python scripts and over few hundred ETL jobs on Amazon EMR Presto to convert JSON (Gzip) data for over few hundred events to Parquet (Snappy) format, and converts JSON data containing common attributes for all events to CSV before loading to Amazon Redshift. For more information, see Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling. 

Real-time and near-real-time analytics

In this section, we discuss the real-time and near-real-time analytics performed on Dream11’s data.

Concurrency analytics with Apache Druid

Apache Druid is an OLAP-style data store. It computes facts and metrics against various dimensions while data is being loaded. This avoids the need to compute results when a query is run.

Dream11’s web and mobile events are loaded from the Kafka_AllEvents_CommonAttributes topic to Apache Druid with the help of the Apache Druid Kafka indexing service. Dream11 has a dashboard with different granularity levels and dimensions such as app version, org, and other dimensions present in the common event attributes list.

Finding active users with Amazon EMR HBase

Dream11 also needs to identify individual active users at any given time or during a given window. This is required by other downstream teams such as the Data Science team and Digital User Engagement team.

With the help of a Java consumer, they push all events from the Kafka_AllEvents_ CommonAttributes topic to HBase on an EMR cluster with just required user dimensions. They can query the data in HBase with SQL syntax supported by the Apache Phoenix interface. 

Session analytics with Amazon Redshift

Dream11 maintains their transactional data warehouse on Amazon Redshift multi node cluster. Amazon Redshift allows them to run complex SQL queries efficiently. Amazon Redshift would have been a natural choice for event analytics for hundreds of event types. However, in Dream11’s case, events data grows very rapidly and this would be a lot of data in Amazon Redshift. Also, this data loses its value rapidly as time passes (relatively speaking) compared with transactional data. Therefore, they decided to do only session analytics in Amazon Redshift to benefit from its complex SQL query capabilities and to do analytics for individual events with the help of Athena (which we discuss in the next section).

Data received on Kafka_AllEvents_CommonAttributes is loaded into Amazon S3 every 30 minutes by the associated kafka connector sink. This data is in JSON format with Gzip compression. Every 24 hours, a job runs on Amazon EMR Presto that flattens this data into CSV format. The data is loaded into Amazon Redshift with the COPY command. The data gets loaded first into a staging table. Data in the staging table is aggregated to get sessions data. Amazon Redshift already has transactional data from other tables that, combined now with the session data, allows Dream11 to perform 360-degree user analytics. They can now easily segment users based on their interactions data and transactions data. They can then run campaigns for those users with the help of messaging platforms. 

Event analytics with Athena

Dream11 uses Athena to analyze the data in Amazon S3. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. It made perfect sense to organize data for over hundreds of event tables in Amazon S3 and analyze them with Athena on demand.

With Athena, you’re charged based on the amount of data scanned by each query. You can get significant cost savings and performance gains by compressing, partitioning, or converting your data to a columnar format, because each of those operations reduces the amount of data that Athena needs to scan to run a query. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

As discussed before, Dream11 has registered over hundreds of tables for events data in JSON format, and similar number of tables for events data in Parquet format with the AWS Glue Data Catalog. They observed a performance gain of 10 times on conversion of data format to Parquet, and an 80% savings in space. Data in Amazon S3 can be queried directly through the Athena UI with SQL queries. The other option they use is connecting to Athena using a JDBC driver from Looker and their custom Java UI for the Data Aware project.

Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

  Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 

Conclusion

This architecture has enabled Dream11 to achieve all the design goals they set out with. Results of analytics for real-time requirements are available under millisecond latency, and the system costs 40% less than the previous system. Analytics is performed with all the data without sampling, so results are accurate and reliable. All the data and analytics engines are within Dream11’s AWS account, improving data security and privacy.

As of this writing, the system handles 14 TB of data per day and it has served 80 million requests per minute at peak during Dream11 IPL 2020.

Doing all their analytics in-house on AWS has not just improved speed, accuracy, and data security, it has also enabled newer possibilities. Now Dream11 has a 360-degree view of their users. They can study their users’ progress across multiple platforms – web, Android, and IOS. This new system is enabling novel applications of machine learning, digital user engagement, and social media technologies at Dream11.


About the Authors

Pradip Thoke is a AVP Data Engineering at Dream11 and leads their Data Engineering team. The team involved in this implementation includes Vikas Gite, Salman Dhariwala, Naincy Suman, Lavanya Pulijala, Ruturaj Bhokre, Dhanraj Gaikwad, Vishal Verma, Hitesh Bansal, Sandesh Shingare, Renu Yadav, Yash Anand, Akshay Rochwani, Alokh P, Sunaim and Nandeesh Bijoor.

 

Girish Patil is a Principal Architect AI, Big Data, India Scale Apps for Amazon.

Amazon EMR Studio (Preview): A new notebook-first IDE experience with Amazon EMR

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/amazon-emr-studio-preview-a-new-notebook-first-ide-experience-with-amazon-emr/

We’re happy to announce Amazon EMR Studio (Preview), an integrated development environment (IDE) that makes it easy for data scientists and data engineers to develop, visualize, and debug applications written in R, Python, Scala, and PySpark. EMR Studio provides fully managed Jupyter notebooks and tools like Spark UI and YARN Timeline Service to simplify debugging. EMR Studio uses AWS Single Sign-On (AWS SSO), and allows you to log in directly with your corporate credentials without signing in to the AWS Management Console.

With EMR Studio, you can run notebook code on Amazon EMR running on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS), and debug your applications. For more information about Amazon EMR on Amazon EKS, see What is Amazon EMR on EKS.

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing with the performance-optimized Apache Spark runtime that Amazon EMR provides. You can also install custom kernels and libraries, collaborate with peers using code repositories such as GitHub and Bitbucket, or run parameterized notebooks as part of scheduled workflows using orchestration services like Apache Airflow or Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Administrators can set up EMR clusters that can be used by EMR Studio users, or create predefined AWS CloudFormation templates for Amazon EMR and allow you to simply choose a template for creating your own cluster.

In this post, we discuss the benefits that EMR Studio offers and we introduce to you some of its capabilities. To learn more about creating and using EMR Studios, see Use Amazon EMR Studio.

Benefits of using EMR Studio

EMR Studio offers the following benefits:

  • Set up a unified experience to develop and diagnose EMR Spark applications – Administrators can set up EMR Studio to allow you to log in using your corporate credentials without having to sign in to the AWS console. You get a single unified environment to interactively explore, process, and visualize data using notebooks, build and schedule pipelines, and debug applications without having to log in to EMR clusters.
  • Use fully managed Jupyter notebooks – With EMR Studio, you can develop analytics and data science applications in R, Python, Scala, and PySpark with fully managed Jupyter notebooks. You can take advantage of distributed processing using the performance-optimized Amazon EMR runtime for Apache Spark with Jupyter kernels and applications running on EMR clusters. you can attach notebooks to an existing cluster that uses Amazon EC2 instances, or to an EMR on EKS virtual cluster. You can also start your own clusters using templates pre-configured by administrators.
  • Collaborate with others using code repositories – From the EMR Studio notebooks environment, you can connect to code repositories such as AWS CodeCommit, GitHub, and Bitbucket to collaborate with peers.
  • Run custom Python libraries and kernels – From EMR Studio, you can install custom Python libraries or Jupyter kernels required for your applications directly to the EMR clusters.
  • Automate workflows using pipelines – EMR Studio makes it easy to move from prototyping to production. You can create EMR Studio notebooks that can be programmatically invoked with parameters, and use APIs to run the parameterized notebooks. You can also use orchestration tools such as Apache Airflow or Amazon MWAA to run notebooks in automated workflows.
  • Simplified debugging – With EMR Studio, you can debug jobs and access logs without logging in to the cluster. EMR Studio provides native application interfaces such as Spark UI and YARN Timeline. When a notebook is run in EMR Studio, the application logs are uploaded to Amazon Simple Storage Service (Amazon S3). As a result, you can access logs and diagnose applications even after your EMR cluster is terminated. You can quickly locate the job to debug by filtering based on the cluster or time when the application was run.

In the following section, we demonstrate some of the capabilities of Amazon EMR Studio using a sample notebook. For our sample notebook, we use the open-source, real-time COVID-19 US daily case reports provided by Johns Hopkins University CSSE from the following GitHub repo.

Notebook-first IDE experience with AWS SSO integration

EMR Studio makes it simple to interact with applications on an EMR cluster. After an administrator sets up EMR Studio and provides the access URL (which looks like https://es-*************************.emrstudio.us-east-1.amazonaws.com), you can log in to EMR Studio with your corporate credentials.

After you log in to EMR Studio, you get started by creating a Workspace. A Workspace is a collection of one or more notebooks for a project. The Workspaces and the notebooks that you create in EMR Studio are automatically saved in an Amazon S3 location.

Now, we create a Workspace by completing the following steps:

  1. On the EMR Studio Dashboard page, choose Create Workspace.
  2. On the Create a Workspace page, enter a Workspace name and a Description.

Naming the Workspace helps identify your project. Your workspace is automatically saved, and you can find it later on the Workspaces page. For this post, we name our Workspace EMR-Studio-WS-Demo1.

  1. On the Subnet drop-down menu, choose a subnet for your Workspace.

Each subnet belongs to the same Amazon Virtual Private Cloud (Amazon VPC) as your EMR Studio. Your administrator may have set up one or more subnets to use for your EMR clusters. You should choose a subnet that matches the subnet where you use EMR clusters. If you’re not sure about which subnet to use, contact your administrator.

  1. For S3 location, choose the Amazon S3 location where EMR Studio backs up all notebook files in the Workspace.

This location is where your Workspace and all the notebooks in the Workspace are automatically saved.

  1. In the Advanced configuration section, you can attach an EMR cluster to your Workspace.

For this post, we skip this step. EMR Studio allows you to create Workspaces and notebooks without attaching to an EMR cluster. You can attach an EMR cluster later when you’re ready to run your notebooks.

  1. Choose Create Workspace.

Fully managed environment for managing and running Jupyter-based notebooks

EMR Studio provides a fully managed environment to help organize and manage Workspaces. Workspaces are the primary building blocks of EMR Studio, and they preserve the state of your notebooks. You can create different Workspaces for each project. From within a Workspace, you can create notebooks, link your Workspace to a code repository, and attach your Workspace to an EMR cluster to run notebooks. Your Workspaces and the notebooks and settings it contains are automatically saved in the Amazon S3 location that you specify.

If you created the workspace EMR-Studio-WS-Demo1 by following the preceding steps, it appears on the Workspaces page with the name EMR-Studio-WS-Demo1 along with status Ready, creation time, and last modified timestamp.

The following table describes each possible Workspace status.

StatusMeaning
StartingThe Workspace is being prepared, but is not yet ready to use.
ReadyYou can open the Workspace to use the notebook editor. When a Workspace has a Ready status, you can open or delete it.
AttachingThe Workspace is being attached to a cluster.
AttachedThe Workspace is attached to an EMR cluster. If a Workspace is not attached to an EMR cluster, you need to attach it to an EMR cluster before you can run any notebook code in the Workspace.
Idle

The Workspace is stopped and currently idle. When you launch an idle Workspace, the Workspace status changes from Idle to Starting to Ready.

 

StoppingThe Workspace is being stopped.
DeletingWhen you delete a Workspace, it’s marked for deletion. EMR Studio automatically deletes Workspaces marked for deletion. After a Workspace is deleted, it no longer shows in the list of Workspaces.

You can choose the Workspace that you created (EMR-Studio-WS-Demo1) to open it. This opens a new web browser tab with the JupyterLab interface. The icon-denoted tabs on the left sidebar allow you to access tool panels such as the file browser or JupyterLab command palette. To learn more about the EMR Studio Workspace interface, see Understand the Workspace User Interface.

EMR Studio automatically creates an empty notebook with the same name as the Workspace. For this post, we the Workspace that we created, it automatically creates EMR-Studio-WS-Demo1.ipynb. In the following screenshot, no cluster or kernel is specified in the top right corner, because we didn’t choose to attach any cluster while creating the Workspace. You can write code in your new notebook, but before you run your code, you need to attach it to an EMR cluster and specify a kernel. To attach your workspace to a cluster, choose the EMR clusters icon on the left panel.

Linking Git-based code repositories with your Workspace

You can collaborate with your peers by sharing notebooks as code via code repositories. EMR Studio supports the following Git-based services:

This capability provides the following benefits:

  • Version control – Record code changes in a version control system so you can review the history of your changes and selectively revert them.
  • Collaboration – Share code with team members working in different Workspaces through remote Git-based repositories. Workspaces can clone or merge code from remote repositories and push changes back to those repositories.
  • Code reuse – Many Jupyter notebooks that demonstrate data analysis or machine learning techniques are available in publicly hosted repositories, such as GitHub. You can associate your Workspace with a GitHub repository to reuse the Jupyter notebooks contained in a repository.

To link Git repositories to your Workspace, you can link an existing repository or create a new one. When you link an existing repository, you choose from a list of Git repositories associated with the AWS account in which your EMR Studio was created.

We add a new repository by completing the following steps:

  1. Choose the Git icon.
  2. For Repository name¸ enter a name (for example, emr-notebook).
  3. For Git repository URL, enter the URL for the Git repo (for this post, we use the sample notebook at https://github.com/emrnotebooks/notebook_execution).
  4. For Git credentials, select your credentials. Because we’re using a public repo, we select Use a public repository without credentials.
  5. Choose Add repository.

After it’s added, we can see the repo on the Git repositories drop-down menu.

  1. Choose the repo to link to the Workspace.

You can link up to three Git repositories with an EMR Studio Workspace. For more information, see Link Git-Based Repositories to an EMR Studio Workspace.

  1. Choose the File browser icon to locate the Git repo we just linked.

Attaching and detaching Workspaces to and from EMR clusters

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing using the performance-optimized EMR runtime for Apache Spark. You can attach your Workspace to an EMR cluster and get distributed data processing using Spark or custom kernels. You can use primary node capacity to run non-distributed applications.

In addition to using Amazon EMR clusters running on Amazon EC2, you can attach a Workspace to an Amazon EMR on EKS virtual cluster to run notebook code. For more information about how to use an Amazon EMR on EKS cluster in EMR Studio, see Use an Amazon EMR on EKS Cluster to Run Notebook Code.

Before you can run your notebooks, you must attach your Workspace to an EMR cluster. For more information about clusters, see Create and Use Clusters with EMR Studio.

To run the Git repo notebooks that we linked in the previous step, complete the following steps:

  1. Choose the EMR cluster
  2. Attach the Workspace to an existing EMR cluster running on Amazon EC2 instances.
  3. Open the notebook demo_pyspark.ipynb from the Git repo emr-notebook that we linked to the Workspace.

In the upper right corner of the Workspace UI, we can see the ID of the EMR cluster being attached to our Workspace, as well as the kernel selected to run the notebook.

  1. Record the value of the cluster ID (for example, <j-*************>).

We use this value later to locate the EMR cluster for application debugging purposes.

You can also detach the Workspace from the cluster in the Workspace UI and re-attach it to another cluster. For more information, see Detach a Cluster from Your Workspace.

Being able to easily attach and detach to and from any EMR cluster allows you to move any workload from prototyping into production. For example, you can start your prototype development by attaching your workspace to a development EMR cluster and working with test datasets. When you’re ready to run your notebook with larger production datasets, you can detach your workspace from the development EMR cluster and attach it to a larger production EMR cluster.

Installing and loading custom libraries and kernels

You can install notebook-scoped libraries with a PySpark kernel in EMR Studio. The libraries installed are isolated to your notebook session and don’t interfere with libraries installed via EMR bootstrap actions, or libraries installed by other EMR Studio notebook sessions that may be running on the same EMR cluster. After you install libraries for your Workspace, they’re available for other notebooks in the Workspace in the same session.

Our sample notebook demo_pyspark.ipynb is a Python script. It uses real-time COVID-19 US daily case reports as input data. The following parameters are defined in the first cell:

  • DATE – The given date used when the notebook job is started.
  • TOP_K – The top k US states with confirmed COVID-19 cases. We use this to plot Graph a.
  • US_STATES – The names of the specific US states being checked for the fatality rates of COVID-19 patients. We use this plot Graph b.

The parameters can be any of the Python data types.

Running this notebook plots two graphs:

  • Graph a – Visualizes the top k US states with most the COVID-19 cases on a given date
  • Graph b – Visualizes the fatality rates among specific US states on a given date

In our notebook, we install notebook-scoped libraries by running the following code from within a notebook cell:

sc.install_pypi_package("pandas==0.25.1")
sc.install_pypi_package("requests==2.24.0")
sc.install_pypi_package("numpy==1.19.1")
sc.install_pypi_package("kiwisolver==1.2.0")
sc.install_pypi_package("matplotlib==3.3.0")

We use these libraries in the subsequent cells for the further data analysis and visualization steps in the notebook.

The following set of parameters is used to run the notebook:

{"DATE": "10-15-2020",
 "TOP_K": 6,
"US_STATES": ["Wisconsin", "Texas", "Nevada"]}

Running all the notebook cells generates two graphs. Graph a shows the top six US states with confirmed COVID-19 cases on October 15, 2020.

Graph b shows the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada on October 15, 2020.

EMR Studio also allows you to install Jupyter notebook kernels and Python libraries on a cluster primary node, which makes your custom environment available to any EMR Studio Workspace attached the cluster. To install the sas_kernel kernel on a cluster primary node, run the following code within a notebook cell:

!/emr/notebook-env/bin/pip install sas_kernel

The following screenshot shows your output.

For more information about how to install kernels and use libraries, see Installing and Using Kernels and Libraries.

Diagnosing applications and jobs with EMR Studio

In EMR Studio, you can quickly debug jobs and access logs without logging in to the cluster, such as setting up a web proxy through an SSH connection, for both active and stopped clusters. You can use native application interfaces such as Spark UI and YARN Timeline Service directly from EMR Studio. EMR Studio also allows you to quickly locate the cluster or job to debug by using filters such as cluster state, creation time, and cluster ID. For more information, see Diagnose Applications and Jobs with EMR Studio.

Now, we show you how to open a native application interface to debug the notebook job that already finished.

  1. On the EMR Studio page, choose Clusters.

A list appears with all the EMR clusters launched under the same AWS account. You can filter the list by cluster state, cluster ID, or creation time range by entering values in the provided fields.

  1. Choose the cluster ID of the EMR cluster that we attached to the Workspace EMR-Studio-WS-Demo1 for running notebook demo_pyspark.ipynb.
  2. For Spark job debugging, on the Launch application UIs menu, choose Spark History Server.

The following screenshot shows you the Spark job debugging UI.

We can traverse the details for our notebook application by checking actual logs from the Spark History Server, as in the following screenshot.

  1. For Yarn application debugging, on the Launch application UIs menu, choose Yarn Timeline Server.

The following screenshot shows the Yarn debugging UI.

Orchestrating analytics notebook jobs to build ETL production pipelines

EMR Studio makes it easy for you to move any analytics workload from prototyping to production. With EMR Studio, you can run parameterized notebooks as part of scheduled workflows using orchestration services like AWS Step Functions and Apache Airflow or Amazon MWAA.

In this section, we show a simple example of how to orchestrate running notebook workflows using Apache Airflow.

We have a fully tested notebook under an EMR Studio Workspace, and want to schedule a workflow that runs the notebook on an on-demand EMR cluster every 10 minutes.

Record the value of the Workspace ID (for example, e-*****************************) and the notebook file path relative to the home directory within the Workspace (for example, demo.ipynb or my_folder/demo.ipynb)

The workflow that we create takes care of the following tasks:

  1. Create an EMR cluster.
  2. Wait until the cluster is ready.
  3. Start running a notebook defined by the Workspace ID, notebook file path, and the cluster created.
  4. Wait until the notebook is complete.

The following screenshot is the tree view of this example DAG. The DAG definition is available on the GitHub repo. Make sure you replace any placeholder values with the actual ones before using.

When you open the Gantt chart of one of the successful notebooks, we can see the timeline of our workflow. The time spent creating the cluster and creating a notebook execution is negligible compared to the time spent waiting for the cluster to be ready and waiting for the notebook to finish, which meets the expectation of our SLA.

This example is a just starting point. Try it out and extend it with more sophisticated workflows that suit your needs.

Summary

In this post, we highlighted some of the capabilities of EMR Studio, such as the ability to log in via AWS SSO, access fully managed Jupyter notebooks, link Git-based code repositories, change clusters, load custom Python libraries and kernels, diagnose clusters and jobs using native application UIs, and orchestrate notebook jobs using Apache Airflow or Amazon MWAA.

There is no additional charge for using EMR Studio in public preview, and you only pay for the use of the EMR cluster or other AWS services such as AWS Service Catalog. For more information, see the EMR Studio FAQs.

EMR Studio is available on Amazon EMR release version 6.2 and later, in the US East (N. Virginia), US West (Oregon), and EU (Ireland) Regions for public preview. For the latest Region availability for the public preview, see Considerations.

If you have questions or suggestions, feel free to leave a comment.


About the  Authors

Fei Lang is a Senior Big Data Architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

 

Shuang Li is a Senior Product Manager for Amazon EMR at AWS. She holds a doctoral degree in Computer Science and Engineering from Ohio State University.

 

 

Ray Liu is a Software Development Engineer at AWS. Besides work, he enjoys traveling and spending time with family.

 

 

 

Kendra Ellis is a Programmer Writer at AWS.

 

 

 

 

New – Amazon EMR on Amazon Elastic Kubernetes Service (EKS)

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/new-amazon-emr-on-amazon-elastic-kubernetes-service-eks/

Tens of thousands of customers use Amazon EMR to run big data analytics applications on frameworks such as Apache Spark, Hive, HBase, Flink, Hudi, and Presto at scale. EMR automates the provisioning and scaling of these frameworks and optimizes performance with a wide range of EC2 instance types to meet price and performance requirements. Customer are now consolidating compute pools across organizations using Kubernetes. Some customers who manage Apache Spark on Amazon Elastic Kubernetes Service (EKS) themselves want to use EMR to eliminate the heavy lifting of installing and managing their frameworks and integrations with AWS services. In addition, they want to take advantage of the faster runtimes and development and debugging tools that EMR provides.

Today, we are announcing the general availability of Amazon EMR on Amazon EKS, a new deployment option in EMR that allows customers to automate the provisioning and management of open-source big data frameworks on EKS. With EMR on EKS, customers can now run Spark applications alongside other types of applications on the same EKS cluster to improve resource utilization and simplify infrastructure management.

Customers can deploy EMR applications on the same EKS cluster as other types of applications, which allows them to share resources and standardize on a single solution for operating and managing all their applications. Customers get all the same EMR capabilities on EKS that they use on EC2 today, such as access to the latest frameworks, performance optimized runtimes, EMR Notebooks for application development, and Spark user interface for debugging.

Amazon EMR automatically packages the application into a container with the big data framework and provides pre-built connectors for integrating with other AWS services. EMR then deploys the application on the EKS cluster and manages logging and monitoring. With EMR on EKS, you can get 3x faster performance using the performance-optimized Spark runtime included with EMR compared to standard Apache Spark on EKS.

Amazon EMR on EKS – Getting Started
If you already have a EKS cluster where you run Spark jobs, you simply register your existing EKS cluster with EMR using the AWS Management Console, AWS Command Line Interface (CLI) or APIs to deploy your Spark appication.

For exampe, here is a simple CLI command to register your EKS cluster.

$ aws emr create-virtual-cluster \
          --name <virtual_cluster_name> \
          --container-provider '{
             "id": "<eks_cluster_name>",
             "type": "EKS",
             "info": {
                 "eksInfo": {
                     "namespace": "<namespace_name>"
                 }
             } 
         }'

In the EMR Management console, you can see it in the list of virtual clusters.

When Amazon EKS clusters are registered, EMR workloads are deployed to Kubernates nodes and pods to manage application execution and auto-scaling, and sets up managed endpoints so that you can connect notebooks and SQL clients. EMR builds and deploys a performance-optimized runtime for the open source frameworks used in analytics applications.

You can simply start your Spark jobs.

$ aws emr start-job-run \
          --name <job_name> \
          --virtual-cluster-id <cluster_id> \
          --execution-role-arn <IAM_role_arn> \
          --virtual-cluster-id <cluster_id> \
          --release-label <<emr_release_label> \
          --job-driver '{
            "sparkSubmitJobDriver": {
              "entryPoint": <entry_point_location>,
              "entryPointArguments": ["<arguments_list>"],
              "sparkSubmitParameters": <spark_parameters>
            }
       }'

To monitor and debug jobs, you can use inspect logs uploaded to your Amazon CloudWatch and Amazon Simple Storage Service (S3) location configured as part of monitoringConfiguration. You can also use the one-click experience from the console to launch the Spark History Server.

Integration with Amazon EMR Studio

Now you can submit analytics applications using AWS SDKs and AWS CLI, Amazon EMR Studio notebooks, and workflow orchestration services like Apache Airflow. We have developed a new Airflow Operator for Amazon EMR on EKS. You can use this connector with self-managed Airflow or by adding it to the Plugin Location with Amazon Managed Workflows for Apache Airflow.

You can also use newly previewed Amazon EMR Studio to perform data analysis and data engineering tasks in a web-based integrated development environment (IDE). Amazon EMR Studio lets you submit notebook code to EMR clusters deployed on EKS using the Studio interface. After seting up one or more managed endpoints to which Studio users can attach a Workspace, EMR Studio can communicate with your virtual cluster.

For EMR Studio preview, there is no additional cost when you create managed endpoints for virtual clusters. To learn more, visit the guide document.

Now Available
Amazon EMR on Amazon EKS is available in US East (N. Virginia), US West (Oregon), and Europe (Ireland) Regions. You can run EMR workloads in AWS Fargate for EKS removing the need to provision and manage infrastructure for pods as a serverless option.

To learn more, visit the documentation. Please send feedback to the AWS forum for Amazon EMR or through your usual AWS support contacts.

Learn all the details about Amazon EMR on Amazon EKS and get started today.

Channy;

How the Allen Institute uses Amazon EMR and AWS Step Functions to process extremely wide transcriptomic datasets

Post Syndicated from Gautham Acharya original https://aws.amazon.com/blogs/big-data/how-the-allen-institute-uses-amazon-emr-and-aws-step-functions-to-process-extremely-wide-transcriptomic-datasets/

This is a guest post by Gautham Acharya, Software Engineer III at the Allen Institute for Brain Science, in partnership with AWS Data Lab Solutions Architect Ranjit Rajan, and AWS Sr. Enterprise Account Executive Arif Khan.

The human brain is one of the most complex structures in the universe. Billions of neurons and trillions of connections come together to form a labyrinthine network of activity. Understanding the mechanisms that guide our minds is one of the most challenging problems in modern scientific research.

The Allen Institute for Brain Science is dedicated to solving large-scale, fundamental problems in neuroscience. Our mission is to accelerate the rate at which the world understands the inner workings of the human brain and to uncover the essence of what makes us human.

Processing extremely wide datasets

As a part of “big science,” one of our core principles, we seek to tackle scientific challenges at scales no one else has attempted before. One of these challenges is processing large-scale transcriptomic datasets. Transcriptomics is the study of RNA. In particular, we’re interested in the genes that are expressed in individual neurons. The human brain contains almost 100 billion neurons—how do they differ from each other, and what genes do they express? After a series of complex analysis using cutting-edge techniques such as Smart-Seq and 10x Genomics Chromium Sequencing, we produce extremely large matrices of numeric values.

Such matrices are called feature matrices. Each column represents the feature of a cell, which in this case are genes. A genome is over 50,000 genes, so a single matrix can have over 50,000 columns! We expect the number of rows in our matrices to increase over time, reaching tens of millions, if not more. These matrices can reach 500 GB or more in size. Over the next few years, we want to be able to ingest tens or hundreds of such matrices.

Our goal is to provide low-latency visualizations on such matrices, allowing researchers to aggregate, slice, and dissect our data in real time. To do this, we run a series of precomputations that store expensive calculations in a database for future retrieval.

We wanted to create a flexible, scalable pipeline to run computations on these matrices and store the results for visualizations.

The pipeline

We wanted to build a pipeline that takes these large matrices as inputs, runs various Spark jobs, and stores the outputs in an Apache HBase cluster. We wanted to create something flexible so that we could easily add additional Spark transformations.

We decided on AWS Step Functions as our workflow-orchestration tool of choice. Step Functions allows us to create a state machine that orchestrates the dataflow from payload submission to database loading.

After close collaboration with the engineers at the AWS Data Lab, we came up with the following pipeline architecture.

At a high level, our pipeline has the following workflow:

  1. Trigger a state machine from an upload event to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Copy and unzip the input ZIP file containing a feature matrix into an Amazon S3 working directory.
  3. Run Spark jobs on Amazon EMR to transform input feature matrices into various pre-computed datasets. Store all intermittent results in a working directory on Amazon S3 and output the results of the Spark Jobs as HFiles.
  4. Bulk load the results of our Spark jobs into Apache HBase.

The preceding architecture diagram is deceptively simple. We found a number of challenges during our initial implementation, which we discuss in the following sections.

Lack of transaction support and rollbacks across tables in Apache HBase

The results of our Spark jobs are a number of precomputed views of our original input dataset. Each view is stored as a separate table in Apache HBase. A major drawback of Apache HBase is the lack of a native transactional system. HBase only provides row-level atomicity. Our worst-case scenario is writing partial data—cases where some views are updated, but not others, showing different results for different visualizations and resulting in scientifically incorrect data!

We worked around this by rolling our own blue/green system on top of Apache HBase. We suffix each set of tables related to a dataset with a universally unique identifier (UUID). We use Amazon DynamoDB to track the UUID associated with each individual dataset. When an update to a dataset is being written, the UUID is not switched in DynamoDB until we verify that all the new tables have been successfully written to Apache HBase. We have an API on top of HBase to facilitate reads. This API checks DynamoDB for the dataset UUID before querying HBase, so user traffic is never redirected toward a new view until we confirm a successful write. Our API involves an AWS Lambda function using HappyBase to connect to our HBase cluster, wrapped in an Amazon API Gateway layer to provide a REST interface. The following diagram illustrates this architecture.

The read path has the following steps:

  • R1 – API Gateway invokes a Lambda function to fetch data from a dataset
  • R2 – The Lambda function requests and receives the dataset UUID from DynamoDB
  • R3 – Lambda queries the Apache HBase cluster with the UUID

The write path has the following steps:

  • W1 – The state machine bulk loads new dataset tables to the Apache HBase cluster suffixed with the new UUID
  • W2 – After validation, the state machine updates DynamoDB so user traffic is directed towards those changes

Stalled Spark jobs on extremely wide datasets

Although Apache Spark is a fantastic engine for running distributed compute operations, it doesn’t do too well when scaling to extremely wide datasets. We routinely operate on data that surpasses 50,000 columns, which often causes issues such as a stalled JavaToPython step in our PySpark job. Although we have more investigating to do to figure out why our Spark jobs hang on these wide datasets, we found a simple workaround in the short term—batching!

A number of our jobs involve computing simple columnar aggregations on our data. This means that each calculation on a column is completely independent of all the other columns. This lends itself quite well to batching our compute. We can break our input columns into chunks and run our compute on each chunk.

The following code chunks Apache Spark aggregation functions into groups of columns:

def get_aggregation_for_matrix_and_metadata(matrix, metadata, group_by_arg, agg_func, cols_per_write):
   '''
   Performs an aggregation on the joined matrix, aggregating the desired column by the given function.
   agg_func must be a valid Pandas UDF function. Runs in batches so we don't overload the Task Scheduler with 50,000
   columns at once.
   '''
   # Chunk the data
   for col_group in pyspark_utilities.chunks(matrix.columns, cols_per_write):

       # Add the row key to the column group
       col_group.append(matrix.columns[0])

       selected_matrix = matrix.select(pyspark_utilities.escape_column_list(col_group))

       # create argument list for group by and then process
       cast_as_udf = pyspark_functions.pandas_udf(
                       agg_func,
                       pyspark_datatype.FloatType(),
                       pyspark_functions.PandasUDFType.GROUPED_AGG)

       udf_input = [cast_as_udf(selected_matrix [column_name]).alias(column_name)
                    for column_name in selected_matrix .columns
                    if column_name != group_by_arg]

       yield joined.groupby(group_by_arg).agg(*udf_input)

We then write the results of each batch to an HFile, which is then later bulk loaded into HBase.

Because the post-aggregation DataFrame was very small, we found a significant performance increase in coalescing the DataFrame post-aggregation and checkpointing the results before writing the HFiles. This forces Spark to compute the aggregation before writing the HFiles. HFiles need to be sorted by row key, so it’s easier to pass a smaller DataFrame to our HFile converter.

Using Apache Spark to write DataFrames as HFiles

Apache Spark supports writing DataFrames in multiple formats, including as HFiles. However, the documentation for doing so leaves a lot to be desired. To write out our Spark DataFrames as HFiles, we had to take the following steps:

  1. Convert a DataFrame into a HFile-compatible format, assuming that the first column is the HBase rowkey—(row_key, column_family, col, value).
  2. Create a JAR file containing a converter to convert input Python Objects into Java key-value byte classes. This step took a lot of trial and error—we couldn’t find clear documentation on how the Python object was serialized and passed into the Java function.
  3. Call the saveAsNewAPIHadoopFile function, passing in the relevant information: the ZooKeeper Quorum IP, port, and cluster DNS of our Apache HBase on the Amazon EMR cluster; the HBase table name; the class name of our Java converter function; and more.

The following code writes HFiles:

import src.spark_transforms.pyspark_jobs.pyspark_utilities as pyspark_utilities
import src.spark_transforms.pyspark_jobs.output_handler.emr_constants as constants


def csv_to_key_value(row, sorted_cols, column_family):
   '''
   This method is an RDD mapping function that will map each
   row in an RDD to an hfile-formatted tuple for hfile creation
   (rowkey, (rowkey, columnFamily, columnQualifier, value))
   '''
   result = []
   for index, col in enumerate(sorted_cols[constants.ROW_KEY_INDEX + 1:], 1):
       row_key = str(row[constants.ROW_KEY_INDEX])
       value = row[index]

       if value is None:
           raise ValueError(f'Null value found at {row_key}, {col}')

       # We store sparse representations, dropping all zeroes.
       if value != 0:
           result.append((row_key, (row_key, column_family, col, value)))

   return tuple(result)


def get_sorted_df_by_cols(df):
   '''
   Sorts the matrix by column. Retains the row key as the initial column.
   '''
   cols = [df.columns[0]] + sorted(df.columns[1:])
   escaped_cols = pyspark_utilities.escape_column_list(cols)
   return df.select(escaped_cols)


def flat_map_to_hfile_format(df, column_family):
   '''
   Flat maps the matrix DataFrame into an RDD formatted for conversion into HFiles.
   '''
   sorted_df = get_sorted_df_by_cols(df)
   columns = sorted_df.columns
   return sorted_df.rdd.flatMap(lambda row: csv_to_key_value(row, columns, column_family)).sortByKey(True)


def write_hfiles(df, output_path, zookeeper_quorum_ip, table_name, column_family):
   '''
   This method will sort and map the medians psyspark dataFrame and
   then write to hfiles in the output directory using the supplied
   hbase configuration.
   '''
   # sort columns other than the row key (first column)

   rdd = flat_map_to_hfile_format(df, column_family)

   conf = {
           constants.HBASE_ZOOKEEPER_QUORUM: zookeeper_quorum_ip,
           constants.HBASE_ZOOKEEPER_CLIENTPORT: constants.ZOOKEEPER_CLIENTPORT,
           constants.ZOOKEEPER_ZNODE_PARENT: constants.ZOOKEEPER_PARENT,
           constants.HBASE_TABLE_NAME: table_name
           }

   rdd.saveAsNewAPIHadoopFile(output_path,
                              constants.OUTPUT_FORMAT_CLASS,
                              keyClass=constants.KEY_CLASS,
                              valueClass=constants.VALUE_CLASS,
                              keyConverter=constants.KEY_CONVERTER,
                              valueConverter=constants.VALUE_CONVERTER,
                              conf=conf)

The following code describes all the constants configuration:

HBASE_ZOOKEEPER_QUORUM="hbase.zookeeper.quorum"
HBASE_ZOOKEEPER_CLIENTPORT="hbase.zookeeper.property.clientPort"
ZOOKEEPER_ZNODE_PARENT="zookeeper.znode.parent"
HBASE_TABLE_NAME="hbase.mapreduce.hfileoutputformat.table.name"

OUTPUT_FORMAT_CLASS='org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2'
KEY_CLASS='org.apache.hadoop.hbase.io.ImmutableBytesWritable'
VALUE_CLASS='org.apache.hadoop.hbase.KeyValue'
KEY_CONVERTER="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
VALUE_CONVERTER="KeyValueConverter"

ZOOKEEPER_CLIENTPORT='2181'
ZOOKEEPER_PARENT='/hbase'

ROW_KEY_INDEX = 0

The following code is a Java class to serialize input PySpark RDDs:

import org.apache.spark.api.python.Converter;
import org.apache.hadoop.hbase.KeyValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

/**
* This class is used to convert a tuple
* supplied by a spark job created in Python
* to the corresponding hbase keyValue type
* which is needed for hfile creation.
*
*/
@SuppressWarnings("rawtypes")
public class KeyValueConverter implements Converter {

  private static final long serialVersionUID = 1L;

  /**
   * this method will take a tuple object supplied
   * by Python spark job and convert and
   * return the corresponding hbase KeyValue object.
   */
  public Object convert(Object obj) {
     KeyValue cell;
     List<?> list = new ArrayList<>();
     if (obj.getClass().isArray()) {
          list = Arrays.asList((Object[])obj);
      } else if (obj instanceof Collection) {
          list = new ArrayList<>((Collection<?>)obj);
      }

     cell = new KeyValue(
           list.get(0).toString().getBytes(),
           list.get(1).toString().getBytes(),
           list.get(2).toString().getBytes(),
           list.get(3).toString().getBytes());

     return cell;
  }
}

Looking ahead

Our computation pipeline was a success, and you can see the resulting visualizations on https://transcriptomics.brain-map.org/.

We’ve been thrilled with AWS’s reliable and feature-rich ecosystem. We used Amazon EMR, Step Functions, and Amazon S3 to build a robust, large-scale data processing pipeline.

Since writing this post, we’ve done much more, including a cross-database transaction system, wide-matrix transposes in Spark, and more. Big Data problems in neuroscience never end, and we’re excited to share more with you in the future!


About the Authors

Gautham Acharya is a Software Engineer at the Allen Institute for Brain Science. He works on the backend data platform team responsible for integrating multimodal neuroscience data into a single cohesive system.

 

 

Ranjit Rajan is a Data Lab Solutions Architect with AWS. Ranjit works with AWS customers to help them design and build data and analytics applications in the cloud.

 

 

Arif Khan is a Senior Account Executive with Amazon Web Services. He works with nonprofit research customers to help shape and deliver on a strategy that focuses on customer success, building mind share and driving broad use of Amazon’s utility computing services to support their mission.

Amazon EMR now provides up to 30% lower cost and up to 15% improved performance for Spark workloads on Graviton2-based instances

Post Syndicated from Peter Gvozdjak original https://aws.amazon.com/blogs/big-data/amazon-emr-now-provides-up-to-30-lower-cost-and-up-to-15-improved-performance-for-spark-workloads-on-graviton2-based-instances/

Amazon EMR now supports M6g, C6g and R6g instances with Amazon EMR versions 6.1.0, 5.31.0 and later. These instances are powered by AWS Graviton2 processors that are custom designed by AWS using 64-bit Arm Neoverse cores to deliver the best price performance for cloud workloads running in Amazon Elastic Compute Cloud (Amazon EC2). On Graviton2 instances, Amazon EMR runtime for Apache Spark provides up to 15% improved performance at up to 30% lower costs relative to equivalent previous generation instances. In our TPC-DS 3 TB benchmark tests, we found that queries run up to 32 times faster using Amazon EMR runtime for Apache Spark. For more information, see Amazon EMR introduces EMR runtime for Apache Spark.

You can use Apache Spark for a wide array of analytics use cases ranging from large-scale transformations to streaming, data science, and machine learning. Amazon EMR provides the latest, stable, open-source innovations, performant storage with Amazon S3, and the unique cost savings capabilities of Spot Instances and Managed Scaling.

Amazon EMR runtime for Apache Spark is a performance-optimized runtime environment for Apache Spark, available and turned on by default on Amazon EMR release 5.28.0 and later. Amazon EMR runtime for Apache Spark provides 100% API compatibility with open-source Apache Spark. This means that your Apache Spark workloads run faster and incur lower compute costs when run on Amazon EMR without requiring any changes to your application.

In this post, we discuss the results that we observed by running Spark workloads on Graviton2 instances.

AWS Graviton2 and Amazon EMR runtime performance improvements

To measure improvements, we ran TPC-DS 3 TB benchmark queries on Amazon EMR 5.30.1 using Amazon EMR runtime for Apache Spark (compatible with Apache Spark version 2.4) with 5-10 node clusters of M6g instances with data in Amazon Simple Storage Service (Amazon S3) and compared it to equivalent configuration using M5 instances. We measured performance improvements using total query execution time and geometric mean of query execution time across the 104 TPC-DS 3 TB benchmark queries.

The results showed improved performance on M6g instance EMR clusters compared to equivalent M5 instance EMR clusters of between 11.61–15.61% improvement in total query runtime for different sizes within the instance family, and between 10.52–12.91% improvement in geometric mean. To measure cost improvement, we added up the Amazon EMR and Amazon EC2 cost per instance per hour and multiplied it by the total query runtime. We observed between 21.58–30.58% reduced instance hour cost on M6g instance EMR clusters compared equivalent M5 instance EMR clusters to execute the 104 TPC-DS benchmark queries.

The following table shows results from running TPC-DS 3 TB benchmark queries using Amazon EMR 5.30.1 over equivalent M5 and M6g instance EMR clusters.

Instance Size16 XL12 XL8 XL4 XL2 XL
Number of core instances in EMR cluster555510
Total query runtime on M5 (seconds)6157616768571059310676
Total query runtime on M6g (seconds)51965389606193139240
Total query execution time improvement with M6g15.61%12.63%11.61%12.08%13.45%
Geometric mean query execution time on M5 (sec)3334354747
Geometric mean query execution time on M6g (sec)2930324142
Geometric mean query execution time improvement with M6g12.73%10.79%10.52%12.91%11.24%
EC2 M5 instance price ($ per hour)$3.072$2.304$1.5360.7680.384
EMR M5 instance price ($ per hour)$0.27$0.27$0.270.1920.096
(EC2 + EMR) M5 instance price ($ per hour)$3.342$2.574$1.806$0.960$0.480
Cost of running on M5 ($ per instance)$5.72$4.41$3.44$2.82$1.42
EC2 M6g instance price ($ per hour)$2.464$1.848$1.232$0.616$0.308
EMR M6g price ($ per hour per instance)$0.616$0.462$0.308$0.15$0.08
(EC2 + EMR) M6g instance price ($ per hour)$3.080$2.310$1.540$0.770$0.385
Cost of running on M6g ($ per instance)$4.45$3.46$2.59$1.99$0.99
Total cost reduction with M6g including performance improvement-22.22%-21.58%-24.63%-29.48%-30.58%

The following graph shows per query improvements observed on M6g 2XL instances with EMR Runtime for Spark on Amazon EMR version 5.30.1 compared to equivalent M5 2XL instances for the 104 queries in the TPC-DS 3 TB benchmark. Performance of 100 of 104 TPC-DS queries improved with M6g 2XL, and performance for 4 queries regressed (q41, q20, q42, and q52 – with the maximum regression of -20.99%). If you are evaluating migrating from M5 instances to M6g instances for EMR Spark workloads, we recommend that you test your workloads to check if any of your queries encounter slower performance.

 

R6g instances showed a similar performance improvement while running Apache Spark workloads compared to equivalent R5 instances. Our test results showed between 14.27-21.50% improvement in total query runtime for 5 different instance sizes within the instance family, and between 12.48-18.95% improvement in geometric mean. On cost comparison, we observed 23.26%-31.66% reduced instance hour cost on R6g instance EMR clusters compared to R5 EMR instance clusters to execute the 104 TPC-DS benchmark queries. We observed 4 benchmark queries (q6, q21, q41 and q26 – with a maximum regression of -18.28%) taking longer to execute on R6g instance clusters compared to R5 instance clusters.

With C6g instances, we observed improved performance compared to C5 instances for Spark workloads on 2XL, 4XL, 12XL and 16XL instance sizes. Query execution performance regressed on 8XL instances by -0.38%. We observed between 16.84-24.15% lower instance hour costs on C6g instance EMR clusters compared equivalent C5 EMR instance clusters to execute the 104 TPC-DS benchmark queries. Performance of 73 of 104 TPC-DS queries improved with C6g 4XL, and performance for 31 queries regressed (with a maximum regression of -31.38% for q78).

Summary

By using Amazon EMR with M6g, C6g and R6g instances powered by Graviton2 processors, we observed improved performance and reduced cost of running 104 TPC-DS benchmark queries. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more Apache Spark optimizations, configuration best practices, and tuning advice.


About the Authors

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

 

 

 

Al MS is a product manager for Amazon EMR at Amazon Web Services.

Data preprocessing for machine learning on Amazon EMR made easy with AWS Glue DataBrew

Post Syndicated from Kartik Kannapur original https://aws.amazon.com/blogs/big-data/data-preprocessing-for-machine-learning-on-amazon-emr-made-easy-with-aws-glue-databrew/

The machine learning (ML) lifecycle consists of several key phases: data collection, data preparation, feature engineering, model training, model evaluation, and model deployment. The data preparation and feature engineering phases ensure an ML model is given high-quality data that is relevant to the model’s purpose. Because most raw datasets require multiple cleaning steps (such as addressing missing values and imbalanced data) and numerous data transformations to produce useful features ready for model training, these phases are often considered the most time-consuming in the ML lifecycle. Additionally, producing well-prepared training datasets has typically required extensive knowledge of multiple data analysis libraries and frameworks. This has presented a barrier to entry for new ML practitioners and reduced iteration speed for more experienced practitioners.

In this post, we show you how to address this challenge with the newly released AWS Glue DataBrew. DataBrew is a visual data preparation service, with over 250 pre-built transformations to automate data preparation tasks, without the need to write any code. We show you how to use DataBrew to analyze, prepare, and extract features from a dataset for ML, and subsequently train an ML model using PySpark on Amazon EMR. Amazon EMR is a managed cluster platform that provides the ability to process and analyze large amounts of data using frameworks such as Apache Spark and Apache Hadoop.

For more details about DataBrew, Amazon EMR, and each phase of the ML lifecycle, see the following:

Solution overview

The following diagram illustrates the architecture of our solution.

Loading the dataset to Amazon S3

We use the Census Income dataset from the UCI Machine Learning Repository to train an ML model that predicts whether a person’s income is above $50,000 a year. This multivariate dataset contains 48,842 observations and 14 attributes, such as age, nature of employment, educational background, and marital status.

For this post, we download the Adult dataset. The data folder contains five files, of which adult.data and adult.test are the train and test datasets, and adult.names contains the column names and description. Because the raw dataset doesn’t contain the column names, we add them to the first row of the train and test datasets and save the files with the extension .csv:

Column Names
age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race, sex,capital-gain,capital-loss,hours-per-week,native-country,target

Create a new bucket in Amazon Simple Storage Service (Amazon S3) and upload the train and test data files under a new folder titled raw-data.

Data preparation and feature engineering using DataBrew

In this section, we use DataBrew to explore a sample of the dataset uploaded to Amazon S3 and prepare the dataset to train an ML model.

Creating a DataBrew project

To get started with DataBrew, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create a project.
  3. For Name, enter census-income.
  4. For Attached recipe, choose Create a new recipe.
  5. For Recipe name, enter census-income-recipe.
  6. For Select a dataset, select New dataset.
  7. For Dataset name¸ enter adult.data.

  1. Import the train dataset adult.data.csv from Amazon S3.
  2. Create a new AWS Identity and Access Management (IAM) policy and IAM role by following the steps on the DataBrew console, which provides DataBrew the necessary permissions to access the source data in Amazon S3.
  3. In the Sampling section, for Type, choose Random rows.
  4. Select 1,000.

Exploratory data analysis

The first step in the data preparation phase is to perform exploratory data analysis (EDA). EDA allows us to gain an intuitive understanding of the dataset by summarizing its main characteristics. Example outputs from EDA include identifying data types across columns, plotting the distribution of data points, and creating visuals that describe the relationship between columns. This process informs the data transformations and feature engineering steps that you need to apply prior to building an ML model.

After you create the project, DataBrew provides three different views of the dataset:

  • Grid view – Presents the 15 columns and first 1,000 rows sampled from the dataset and the distribution of data across each column
  • Schema view – In addition to information in the grid view, presents information about the data types (such as double, integer, or string) and the data quality that indicates the presence of missing or invalid values
  • Data profile view – Supported by a data profile job, generates summary statistics such as quartiles, standard deviation, variance, most frequently occurring values, and the correlation between columns

The following screenshot shows our view of the dataset.

Each view presents a unique piece of information that helps us gain a better understanding of the dataset. For instance, in the grid view, we can observe the distribution of data across the 15 columns and spot erroneous data points, such as those with ? in the workclass, occupation, or native-country columns.

In the schema view, we can observe six columns with continuous data, nine columns with categorical or binary data, and no missing or invalid observations in the sample of our dataset. The columns with continuous data also contain the corresponding minimum, maximum, mean, median, and mode values represented as a box plot.

In the data profile view, after running a data profile job, we can observe the summary statistics from the first 20,000 rows, such as the five-number summary, measures of central tendency, variance, skewness, kurtosis, correlations, and the most frequently occurring values in each column. For instance, we can combine the information from the grid view and the data profile view to replace erroneous data points such as ? by the most frequently occurring value in that column as a form of data cleaning. To run a data profile job on more than 20,000 rows, request for a limit increase at [email protected]

As part of the EDA phase, we can look at the distribution of data in the target column, which represents whether a person’s income is above $50,000 per year. The ratio of people whose income is greater than $50,000 per year to those whose income is less than or equal to $50,000 per year is 1:3, indicating that the distribution of the target classes is not imbalanced.

Building a set of data transformation steps and publishing a recipe

Now that we have an intuitive understanding of the dataset, let’s build out the data transformation steps. Based on our EDA, we replace the ? observation with the most frequently occurring value in each column.

  1. Choose the Replace value or pattern transformation.
  2. Replace ? with Private in the workclass column.
  3. Replace ? with United-States in the native-country column.

The occupation column also contains observations with ?, but the data points are spread across categories without a clear frequently occurring category. Therefore, we can categorically encode the observations in the occupation column, including those with ? observation, thereby treating ? as a separate category. The occupation column in the adult.data training dataset contains 15 categories, of which Protective-serv, Priv-house-serv, and Armed-Forces occur infrequently. To avoid excessive granularity in ML modeling, we can group these three categories into a single category named Other.

During ML model evaluation and prediction, we can also map categories that the model hasn’t encountered during model training to the Other category.

With that as the background, let’s apply the categorical mapping transformation to only the top 12 distinct values.

  1. Select Map top 12 values.
  2. Select Map values to numeric values.

This selects the top 12 categories and combines the other categories into a single category named Other. We now have a new column named occupation_mapped.

  1. Delete the occupation column to avoid redundancy.
  2. Similarly, apply the categorical mapping transformation to the top five values in the workclass column and the top one value in the native-country Remember to select Map values to numeric values.

This groups the remaining categories into a single category named Other.

  1. Delete the columns workclass and native-country.

The other four columns with categorical data—marital-status, relationship, race, and sex—have few categories with most of them occurring frequently. Let’s apply the categorical mapping transformation to these columns as well.

  1. Apply categorical mapping, with the following differences:
    1. Select Map all values.
    2. Select Map values to numeric values.
  2. Delete the original columns to avoid redundancy.
  3. Delete the fnlwgt column, because it represents the sampling weight and isn’t related to the target
  4. Delete the education column, because it has already been categorically mapped to education-num.
  5. Map the target column to numeric values, where income less than or equal to $50,000 per year is mapped to class 0 and income greater than $50,000 per year is mapped to class 1.
  6. Rename the destination column to label in order to align with our downstream PySpark model training code.

  1. Delete the original target column.

The data preparation phase is now complete, and the set of 20 transformations that consist of data cleaning and categorical mapping is combined into a recipe.

Because the data preparation and ML model training phases are highly iterative, we can save the set of data transformation steps applied by publishing the recipe. This provides version control, and allows us to maintain the data transformation steps and experiment with multiple versions of the recipe in order to determine the version with the best ML model performance. For more information about DataBrew recipes, see Creating and using AWS Glue DataBrew recipes.

Creating and running a DataBrew recipe job

The exploratory data analysis phase helped us gain an intuitive understanding of the dataset, from which we built a recipe to prepare and transform our data for ML modeling. We have been working with a random sample of 1,000 rows from the adult.data training dataset, and we need to apply the same set of data transformation steps to the over 32,000 rows in the adult.data dataset. A DataBrew recipe job provides the ability to scale the transformation steps from a sample of data to the entire dataset. To create our recipe job, complete the following steps:

  1. On the DataBrew console, choose Jobs.
  2. Choose Create recipe job.
  3. For Job name, enter a name.
  4. Create a new folder in Amazon S3 (s3://<YOUR-S3-BUCKET-NAME>/transformed-data/) for the recipe job to save the transformed dataset.

The recipe job should take under 2 minutes to complete.

Training an ML model on the transformed dataset using PySpark

With the data transformation job complete, we can use the transformed dataset to train a binary classification model to predict whether a person’s income is above $50,000 per year.

  1. Create an Amazon EMR notebook.
  2. When the notebook’s status is Ready, open the notebook in a JupyterLab or Jupyter Notebook environment.
  3. Choose the PySpark kernel.

For this post, we use Spark version 2.4.6.

  1. Load the transformed dataset into a PySpark DataFrame within the notebook:
    train_dataset = spark.read.csv(path='s3://<YOUR-S3-BUCKET-NAME>/transformed-data/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>_part00000.csv', header=True, inferSchema=True)
    print('The transformed train dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=train_dataset.count(), n_cols=len(train_dataset.columns)))
    The transformed train dataset has 32561 rows and 13 columns

  2. Inspect the schema of the transformed dataset:
    train_dataset.printSchema()
    root 
    |-- age: integer (nullable = true) 
    |-- workclass_mapped: double (nullable = true) 
    |-- education-num: double (nullable = true) 
    |-- marital_status_mapped: double (nullable = true) 
    |-- occupation_mapped: double (nullable = true) 
    |-- relationship_mapped: double (nullable = true) 
    |-- race_mapped: double (nullable = true) 
    |-- sex_mapped: double (nullable = true) 
    |-- capital-gain: double (nullable = true) 
    |-- capital-loss: double (nullable = true) 
    |-- hours-per-week: double (nullable = true) 
    |-- native_country_mapped: double (nullable = true) 
    |-- label: double (nullable = true)

Of the 13 columns in the dataset, we use the first 12 columns as features for the model and the label column as the final target value for prediction.

  1. Use the VectorAssembler method within PySpark to combine the 12 columns into a single feature vector column, which makes it convenient to train the ML model:
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import VectorAssembler
    stages = []
    arr_features = train_dataset.columns[:-1]
    # Transform input features into a vector using VectorAssembler
    features_vector_assembler = VectorAssembler(inputCols=arr_features, outputCol='features')
    stages.append(features_vector_assembler)
    # Run the train dataset through the pipeline
    pipeline = Pipeline(stages=stages)
    train_dataset_pipeline = pipeline.fit(train_dataset).transform(train_dataset)
    # Select the feature vector and label column
    train_dataset_pipeline = train_dataset_pipeline.select('features', 'label')

  2. To estimate the model performance on the unseen test dataset (test) split the transformed train dataset (train_dataset_pipline) into 70% for model training and 30% for model validation:
    df_train, df_val = train_dataset_pipeline.randomSplit([0.7, 0.3], seed=42)
    print('The train dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=df_train.count(), n_cols=len(df_train.columns)))
    print('The validation dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=df_val.count(), n_cols=len(df_val.columns)))
    The train dataset has 22841 rows and 2 columns
    The validation dataset has 9720 rows and 2 columns

  3. Train a Random Forest classifier on the training dataset df_train and evaluate its performance on the validation dataset df_val using the area under the ROC curve (AUC), which is a measure of model performance for binary classifiers at different classification thresholds:
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    # Train a Random Forest classifier
    rf_classifier = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
    model = rf_classifier.fit(df_train)
    # Model predictions on the validation dataset
    preds = model.transform(df_val)
    # Evaluate model performance
    evaluator = BinaryClassificationEvaluator()
    auc = evaluator.evaluate(preds, {evaluator.metricName: "areaUnderROC"})
    print('Validation AUC: {}'.format(auc))
    Validation AUC: 0.8909629419656796

A validation AUC of 0.89 indicates strong model performance for the classifier. Because the data transformation and model training phases are highly iterative in nature, in order to improve the model performance, we can experiment with different data transformation steps, additional features, and other classification models. After we achieve a satisfactory model performance, we can evaluate the model predictions on the unseen test dataset, adult.test.

Evaluating the ML model on the test dataset

In the data transformation and ML model training sections, we have developed a reusable pipeline that we can use to evaluate the model predictions on the unseen test dataset.

  1. Create a new DataBrew project and load the raw test dataset (adult.test.csv) from Amazon S3, as we did in the data preparation section.
  2. Import the recipe we created earlier with the 20 data transformation steps to apply them on the adult.test dataset.



We can observe that all the columns have been transformed successfully, apart from the label column, which contains null values. This is because the adult.test dataset contains messy data in the target column, namely an extra punctuation mark at the end of the classes <=50k and >50k. To correct this, we can remove the last step of the recipe.

  1. Delete the column target.
  2. Edit the prior step in creating a categorical map to account for the extra punctuation mark.
  3. Delete the original target column to avoid redundancy.
  4. Create and run the recipe job to transform and store the over 16,000 rows in the adult.test dataset under s3://<YOUR-S3-BUCKET-NAME>/transformed-data/.

This job should take approximately 1 minute to complete.

When the train and test datasets don’t have any variation in the types of categories, we can create and run a recipe job directly from the DataBrew console, without having to create a separate project.

  1. When the data transformation job on the adult.test dataset is complete, load the transformed dataset into a PySpark dataframe to evaluate the performance of the binary classification model:
    # Load the transformed test dataset
    test_dataset = spark.read.csv(path='s3://<YOUR-S3-BUCKET-NAME>/transformed-data/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>_part00000.csv', header=True, inferSchema=True)
    
    print('The transformed test dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=test_dataset.count(), n_cols=len(test_dataset.columns)))

The transformed test dataset has 16281 rows and 13 columns

# Run the test dataset through the same feature vector pipeline
test_dataset_pipeline = pipeline.fit(test_dataset).transform(test_dataset)

# Select the feature vector and label column
test_dataset_pipeline = test_dataset_pipeline.select('features', 'label')

# Model predictions on the test dataset
preds_test = model.transform(test_dataset_pipeline)

# Evaluate model performance
evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(preds_test, {evaluator.metricName: "areaUnderROC"})
print('Test AUC: {}'.format(auc))
Test AUC: 0.8947235975486465

The model performance with an AUC of 0.89 on the unseen test dataset is about the same as the model performance on the validation set, which demonstrates strong model performance on the unseen test dataset as well.

Summary

In this post, we showed you how to use DataBrew and Amazon EMR to streamline and speed up the data preparation and feature engineering stages of the ML lifecycle. We explored a binary classification problem, but the wide selection of DataBrew pre-built transformations and PySpark ML libraries make this approach extendable to numerous ML use cases.

Get started today! Explore your use case with the services mentioned in this post and many others on the AWS Management Console


About the Authors

Kartik Kannapur is a Data Scientist with AWS Professional Services. He holds a Master’s degree in Applied Mathematics and Statistics from Stony Brook University and focuses on using machine learning to solve customer business problems.

 

 

 

Prithiviraj Jothikumar, PhD, is a Data Scientist with AWS Professional Services, where he helps customers build solutions using machine learning. He enjoys watching movies and sports and spending time to meditate.

 

 

 

Bala Krishnamoorthy is a Data Scientist with AWS Professional Services, where he helps customers solve problems and run machine learning workloads on AWS. He has worked with customers across diverse industries, including software, finance, and healthcare. In his free time, he enjoys spending time outdoors, running with his dog, beating his family and friends at board games and keeping up with the stock market.

Accessing and visualizing external tables in an Apache Hive metastore with Amazon Athena and Amazon QuickSight

Post Syndicated from James Sun original https://aws.amazon.com/blogs/big-data/accessing-and-visualizing-external-tables-in-an-apache-hive-metastore-with-amazon-athena-and-amazon-quicksight/

Many organizations have an Apache Hive metastore that stores the schemas for their data lake. You can use Amazon Athena due to its serverless nature; Athena makes it easy for anyone with SQL skills to quickly analyze large-scale datasets. You may also want to reliably query the rich datasets in the lake, with their schemas hosted in an external Hive metastore. In response to customers’ requests, AWS announced Athena’s support for Hive metastore in early 2020. This extends the ability in Athena to query external data stores, with a combined benefit of better performance and lower cost.

In this post, we provide an AWS CloudFormation template that configures a remote Hive metastore based on Amazon Relational Database Service (Amazon RDS) and MySQL with Amazon EMR located in a private subnet to perform ETL tasks. We then demonstrate how you can use a Spark step to pull COVID-19 datasets from a public repository and transform the data into a performant Parquet columnar storage format. We also walk through the steps to query the data with Athena and visualize it with Amazon QuickSight. QuickSight is a fully managed data visualization service; it lets the you easily create and publish interactive dashboards by analyzing data from various data sources, including Athena.

Solution walkthrough

The following diagram shows the architecture for our solution.


As shown in the preceding architecture, we have an Availability Zone within a VPC in an AWS Region. The Availability Zone hosts subnets that are either public or private. A multi-master EMR cluster that has Hive software components installed is launched in a private subnet with egress internet traffic through a NAT gateway to download data from public sites for analysis. The multi-master feature also ensures that the primary nodes are highly available. The Hive metastore is backed by a remote RDS for MySQL instance located in the same private subnet.

We also have an Amazon Simple Storage Service (Amazon S3)-based data lake. A Spark step in Amazon EMR retrieves the data in CSV format, saves it in the provisioned S3 bucket, and transforms the data into Parquet format. The Spark step creates an external Hive table referencing the Parquet data and is ready for Athena to query.

With Athena, you can create a data source connector based on an AWS Lambda function to access the Hive metastore hosted on the EMR cluster by using the Apache Thrift interface.

The connector is called a catalog, which when invoked in a SQL statement with Athena, invokes the Lambda function. The function exits if the connector is not active for 15 minutes. For queries that run longer than 15 minutes, it’s recommended to let the query complete before retrieving the query results in an Amazon S3 location you can specify.

In Athena, you can compose a SQL statement against the Hive tables with predicates to further limit the size of the query result for faster visualization by QuickSight.

Deploying the resources with AWS CloudFormation

To demonstrate our solution, we provide a CloudFormation template that you can download to easily deploy the necessary AWS resources. The template creates the following resources to simulate the environment:

  • VPC and subnets – A VPC with one public and one private subnets. A NAT gateway is also created to allow outbound internet access from the EMR cluster to download public COVID-19 datasets from the Johns Hopkins GitHub repo.
  • EMR cluster – A multi-master EMR cluster with Hive, running on three primary nodes (m5.xlarge) and two core nodes (m5.xlarge), is launched to support the thrift connection required by the Athena Lambda connectors.
  • Amazon RDS for MySQL database – An RDS for MySQL primary instance is launched in the same subnet as the EMR cluster. The RDS instance serves as the Hive metastore backend data store.
  • S3 bucket – An S3 bucket stores files in Parquet format by Amazon EMR and is accessed later by Athena.
  • AWS IAM users – Two AWS Identity and Access Management (IAM) users belonging to different user groups. The first user, the data engineer, has permissions to access the Lambda-based Athena data source connector. The other user, the salesperson, does not have permissions to access the connector.

To get started, you need to have an AWS account. If you don’t have one, go to aws.amazon.com to sign up for one. Then complete the following steps:

  1. Sign in to the AWS Management Console as an IAM power user, preferably an admin user.
  2. Choose Launch Stack to launch the CloudFormation template:

This template has been tested in the US East (N. Virginia) Region.

  1. Choose Next.

You’re prompted to enter a few launch parameters.

  1. For Stack name, enter a name for the stack (for example, athena-hms).
  2. For Hive Metastore Host Number, choose the Amazon EMR primary node to connect to (or use the default value).
  3. Continue to choose Next and leave other parameters at their default.
  4. On the review page, select the three check boxes to confirm that AWS CloudFormation might create resources.
  5. Choose Create stack.

The stack takes 15–20 minutes to complete.

  1. On the Outputs tab of the stack details, save the key-value pairs to use later.

When the EMR cluster is provisioned, it uses a bootstrap action to install the necessary Python libraries. It runs an Amazon EMR Spark step (a PySpark script) that downloads three COVID-19 datasets for confirmed, recovered, and death cases from the John Hopkins GitHub repo in CSV format and stores them in the csv subfolder of the S3 bucket created by the CloudFormation stack. Lastly, the final transformed data is converted to Parquet format and external Hive tables are created referencing the Parquet data located in the parquet subfolder of the S3 bucket.

The following are the source codes for the bootstrap and Spark step actions for your reference:

To validate the data, on the Amazon S3 console, choose the bucket name from the CloudFormation template outputs. You should see a covid_data folder in the bucket. The folder contains the two subfolders, csv and parquet, which store the raw CSV and transformed Parquet data, respectively.

Querying the data in Athena

The CloudFormation template creates two users belonging to two different AWS IAM groups. The de_user_us-east-1_athena-hms user is a data engineer with permissions to access the Lambda function to communicate with the Hive metastore using the Athena data source connector. They belong to the group athena-hms-DataEngineerGroup-xxxxxxx. The sales_user_us-east-1_athena-hms user is a salesperson with no access to the connector. They belong to the group athena-hms-SalesGroup-xxxxx. 

To query the data, first retrieve your secret values in AWS Secrets Manager:

  1. On the Secrets Manager console, choose Secrets.
  2. Choose DataEngineerUserCreds.

  1. Choose Retrieve secret value.

 

  1. Save the username and password.

 

  1. Repeat these steps for SalesUserCreds. 

With the data in place, we can now use Athena to query it.

Accessing data as the data engineer

To query the data as the de_user_us-east-1_athena-hms user, complete the following steps:

  1. Sign in to the Athena console as the de_user_us-east-1_athena-hms user with the credentials retrieved from Secrets Manager.

After logging in, we need to create a data source for Hive metastore.

  1. On the navigation bar, choose Data sources.
  2. Choose Connect data source.

  1. For Choose where you data is located, select Query data in Amazon S3.
  2. For Choose a metadata catalog, select Apache Hive metastore.
  3. Chose Next.

  1. For Lambda function, choose the function the CloudFormation template created with the key HiveMetastoreFunctionName.
  2. For Catalog name, enter a name for the Athena catalog (for example, demo_hive_metastore).
  3. Choose Connect.

You should now have a catalog named demo_hive_metastore with the catalog type Hive metastore.

  1. On the navigation bar, choose Query editor.
  2. Enter the following SQL statement:
    SELECT *
    FROM demo_hive_metastore.default.covid_confirmed_cases
    WHERE country = 'US'
            OR country = 'Italy'
            OR country = 'India'
            OR country = 'Brazil'
            OR country = 'Spain'
            OR country = 'United Kingdom'

This SQL statement selects all the columns in the covid_confirmed_cases table with predicates to only include a few countries of interest. We use a table name with the pattern <catalog name>.<hive database name>.<hive table name in the database>, which for this post translates to demo_hive_metastore.default.covid_confirmed_cases.

  1. Choose Run query.

The following screenshot shows your query results.

Make sure you completely sign out of the console before moving on to the next steps.

Accessing data as the salesperson

Sign in to the console as sales_user_us-east-1_athena-hms user. Because the salesperson user doesn’t have the appropriate IAM policies to access the Hive metastore connection, you can’t see the tables.

 

Permissions policies

The data engineer has additional policies attached to their IAM group in addition to the managed AmazonAthenaFullAccess policy: the <stack-name>-DataBucketReadAccessPolicy-xxxxx and <stack-name>-HiveMetastoreDataSourceAccessPolicy-xxxxx policies created by the CloudFormation template. Therefore, the data engineer can view the tables, but the salesperson can’t.

These policies are available on the IAM console, on the Permissions tab for the group <stack-name>-DataEngineerGroup-xxxxx.

The following sample JSON code is the <stack-name>-DataBucketReadWriteAccessPolicy-xxxxx policy to allow access to the provisioned S3 bucket:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:Get*",
                "s3:List*",
                "s3:Put*"
            ],
            "Resource": [
                "arn:aws:s3:::<provisioned bucket name>",
                "arn:aws:s3:::<provisioned bucket name>/",
                "arn:aws:s3:::<provisioned bucket name>/*"
            ],
            "Effect": "Allow"
        }
    ]
}

The following sample JSON code is the <stack-name>-HiveMetastoreDataSourceAccessPolicy-xxxxx policy to allow access to the Lambda Hive metastore function:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "lambda:GetFunction",
                "lambda:GetLayerVersion",
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:us-east-1:<account id>:function:athena-hms-HiveMetastoreFunction"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:PutObject",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload"
            ],
            "Resource": [
               "arn:aws:s3:::<provisioned bucket name>/hms_spill"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "lambda:ListFunctions"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

Next, we walk through using QuickSight to visualize the results. Make sure you completely sign out of the console as the salesperson user before proceeding to the next steps. 

Signing up for QuickSight

You can skip this section if you have already signed up for QuickSight previously.

  1. Sign in to the console as the IAM power user who deployed the CloudFormation template or any user with enough IAM privileges to set up QuickSight.
  2. On the QuickSight console, choose Sign up for QuickSight.
  3. Select either the Standard or Enterprise edition for QuickSight.
  4. Choose Continue.

  1. For QuickSight account name, enter your AWS account ID.
  2. For Notification email address, enter your email.
  3. Select Amazon S3.

  1. Select the provisioned S3 bucket to grant QuickSight permission to access.
  2. Choose Finish.

Your QuickSight account should now be set up.

Attaching the Hive metastore access policy

Before you can use QuickSight, you have to attach the Hive metastore access policy to the QuickSight service role.

  1. On the IAM console, search for the service role aws-quicksight-service-role-v0.
  2. Choose the role.

  1. Search for the <stack-name>-HiveMetastoreDataSourceAccessPolicy-xxxxx policy.
  2. Select the policy and attach it to the QuickSight service role.

Creating your data source and performing data conversions

Before we can create visualizations, we need to set up our data source.

  1. Download the SQL script covid-19.sql.
  2. On the QuickSight console, choose Datasets in the navigation pane.

  1. Choose New dataset.

  1. Choose Athena.

  1. In the pop-up window, for Data source name, enter demo_hive_metastore.
  2. Choose Validate.
  3. When the connection is validated, choose Create data source.

  1. In the next window, choose Use custom SQL.

  1. Enter the content of the covid-19.sql script in the query window.
  2. Choose Confirm query.

  1. Leave Import to SPICE for quicker analytics
  2. Choose Visualize.

Now we perform a few data type conversions before visualizing the data.

  1. Choose the Edit icon next to Data set on the menu bar.

  1. Choose the … icon.
  2. Choose Edit.

  1. Expand date and choose Change data type.
  2. Choose Date.

  1. Enter yyyy-MM-dd to convert the date format.
  2. Choose Update.

Now we create a coordinate using the latitude and longitude values.r

  1. Expand lat and choose Ad to coordinates.

  1. Leave Create new geospatial coordinates
  2. Chose Add.

  1. In the pop-up window, for Name your coordinates, enter coordinate.
  2. For Field to use for longitude, choose lon.
  3. Choose Create coordinates.

  1. Choose Save and visualize on the menu bar.

Creating visualizations in QuickSight

Now we can visualize our data. For this post, we create a map visualization.

  1. For Visual types, choose the map

  1. For Geospatial, drag coordinates.
  2. For Size, drag confirmed.
  3. For Color, drag country.

This world map shows the accumulated confirmed cases for selected countries over time; you need to use a filter to look at confirmed cases on a specific date.

  1. In the navigation pane, choose Filter.
  2. Choose the + icon.
  3. For Filter type, choose Time range.
  4. Choose start and end dates, such as 2020-09-10 00:00 and 2020-09-11 00:00, respectively.
  5. Choose Apply.

This plots the confirmed cases on September 10, 2020, for these countries.

Similarly, you can choose other visual types, such as a line chart, and generate the mortality rate for selected countries over time.

Using highly available primary nodes of the Amazon EMR cluster

The EMR cluster has a multi-master configuration with three primary nodes running to meet high availability requirements. At any time, the Lambda function communicates with one of these three EMR primary nodes. In the rare event that this node goes down, you can quickly re-establish the Athena data source connector to the external Hive metastore by failing over to another active primary node.

To perform this failover, complete the following steps:

  1. On the AWS CloudFormation console, choose Stacks.
  2. Choose athena-hms.
  3. Choose update.
  4. Choose Use current update.
  5. Choose Next.
  6. For Hive Metastore Host Number, choose a host other than the current one you’re using.

  1. Choose Next.
  2. Acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Update stack.

In less than a minute, you should be able to access the Hive metastore and continue to query on the Athena console.

Cleaning up

You may want to clean up the demo environment when you’re done. To do so, on the AWS CloudFormation console, select the template and choose Delete.

This action also deletes the S3 bucket and any data in it. If you want to retain the data for future use, you should make a copy of the bucket before you delete it.

Summary

In this post, we walked through a solution using Athena to query external Hive tables with public COVID-19 datasets hosted in an S3 bucket and visualizing the data with QuickSight. We provided a CloudFormation template to automate the deployment of necessary AWS services for the demo. We encourage you to use these managed and scalable services for your specific use cases in production.


About the Authors

James Sun is a Senior Solutions Architect with Amazon Web Services. James has over 15 years of experience in information technology. Prior to AWS, he held several senior technical positions at MapR, HP, NetApp, Yahoo, and EMC. He holds a PhD from Stanford University.

 

 

Chinmayi Narasimhadevara is a Solutions Architect with Amazon Web Services. Chinmayi has over 15 years of experience in information technology and has worked with organizations ranging from large enterprises to mid-sized startups. She helps AWS customers leverage the correct mix of AWS services to achieve success for their business goals.

 

 

Gagan Brahmi is a Solutions Architect focused on Big Data & Analytics at Amazon Web Services. Gagan has over 15 years of experience in information technology. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Orchestrating analytics jobs by running Amazon EMR Notebooks programmatically

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-by-running-amazon-emr-notebooks-programmatically/

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS in a cost-effective manner. Amazon EMR Notebooks is a managed environment based on Jupyter Notebook that allows data scientists, analysts, and developers to prepare and visualize data, collaborate with peers, build applications, and perform interactive analysis using EMR clusters.

EMR notebook APIs are available on Amazon EMR release version 5.18.0 or later and can be used to run EMR notebooks via a script or command line. The ability to start, stop, list, and describe EMR notebook runs without the Amazon EMR console enables you to programmatically control running an EMR notebook. Using a parameterized notebook cell allows you to pass different parameter values to a notebook without having to create a copy of the notebook for each new set of parameter values. With this feature, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions or Apache Airflow to build pipelines. If you want to use EMR notebooks in a non-interactive manner, this enables you to run ETL workloads, especially in production.

In this post, we show how to orchestrate analytics jobs by running EMR Notebooks programmatically with the following two use cases:

For our data source, we use the open-source, real-time COVID-19 US daily case reports provided by Johns Hopkins University CSSE in the following GitHub repo.

Prerequisites

Before getting started, you must have the following prerequisites:

Record the notebook ID (for example, <e-*************************>); you use this later for our examples later. Organize the notebook files in the Jupyter UI as follows:

  • /demo_pyspark.ipynb
  • /experiment/trailing_N_day.ipynb

See Creating a Notebook for more information on how to create an EMR notebook.

Use case 1: Scheduling an EMR notebook to run via crontab and the AWS CLI

We use demo_pyspark.ipynb as the input notebook file, as mentioned in the prerequisites. In this use case, we use the AWS CLI to call the EMR Notebooks Execution API to run a notebook using some parameters that we pass in. We then download the notebook output and visualize it using the local Jupyter server.

First, we use the AWS CLI to run an example notebook using the EMR Notebooks Execution API.

demo_pyspark.ipynb is a Python script. The following parameters are defined in the first cell:

  • DATE – The date used when the notebook job is started.
  • TOP_K – The top k US states with confirmed COVID-19 cases. We use this to plot Graph
  • US_STATES – The names of the specific US states being checked for the fatality rates of COVID-19 patients. We use this plot Graph b.

Running this notebook plots two graphs:

  • Graph a – Visualizes the top k US states with most COVID-19 cases on a given date
  • Graph b – Visualizes the fatality rates among specific US states on a given date

The parameters in the first cell can be passed to the EMR Notebooks StartNotebookExecution API, which you can call via the AWS CLI or SDK. The following code is an example of the EMR notebook first cell, containing parameters with corresponding values in JSON format. It means the notebook uses the date 10-13-2020. For Graph a, we visualize the top five US states with confirmed COVID-19 cases on October 13, 2020. For Graph b, we visualize the fatality rates of COVID-19 patients in Alabama, California, and Arizona on October 13, 2020. See the following code:

{"DATE": "10-13-2020",
 "TOP_K": 5,
"US_STATES": ["Alabama", "California", "Arizona"]}

For this example, the parameters can be any of the Python Data Types.

Run the notebook using the following new set of parameters:

{"DATE": "10-15-2020",
 "TOP_K": 6,
"US_STATES": ["Wisconsin", "Texas", "Nevada"]}

Running an EMR notebook with the AWS CLI

Run the following command (replace <e-*************************> with the ID of the EMR notebook and <j-*************> with the EMR cluster ID as mentioned in the prerequisites):

% aws emr --region us-west-2 start-notebook-execution \
--editor-id <e-*************************> \
--notebook-params '{"DATE":"10-15-2020", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*************>"}' \
--service-role EMR_Notebooks_DefaultRole

The start-notebook-execution command returns an output similar to the following JSON document:

{
 "NotebookExecutionId": "ex-*****************************"
}

Record the value of NotebookExecutionId; you use in the next step.

Running the describe-notebook-execution command

Run the following command (replace <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws emr --region us-west-2 describe-notebook-execution \
--notebook-execution-id <ex-*****************************>

The describe-notebook-execution command returns an output similar to the following JSON document:

{
  "NotebookExecution": {
    "NotebookExecutionId": "ex-*****************************",
    "EditorId": "e-*************************",
    "ExecutionEngine": {
      "Id": "<j-*************>",
      "Type": "EMR",
      "MasterInstanceSecurityGroupId": "sg-********"
    },
    "NotebookExecutionName": "demo",
    "NotebookParams": "{\"DATE\":\"10-15-2020\", \"TOP_K\": 6, \"US_STATES\": [\"Wisconsin\", \"Texas\", \"Nevada\"]}",
    "Status": "FINISHED",
    "StartTime": "2020-10-18T19:46:01.125000-07:00",
    "EndTime": "2020-10-18T19:47:24.014000-07:00",
    "Arn": "arn:aws:elasticmapreduce:us-west-2:123456789012:notebook-execution/ex-*****************************",
    "OutputNotebookURI": "s3://<notebook_bucket_location>/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb",
    "LastStateChangeReason": "Execution is finished for cluster j-*************.",
    "NotebookInstanceSecurityGroupId": "sg-********",
    "Tags": []
  }
}

You can pass different parameter values to the same notebook without having to create a copy of the notebook for each new set of parameter values or log in to the Jupyter Notebooks UI via the Amazon EMR console.

Downloading the output file and visualizing the output with a local Jupyter server

EMR notebooks use Papermill to run the notebook. When it runs, a new notebook file is created with input parameters so as not to overwrite the existing file. The notebook is then started, and the output notebook can be found in s3://<Notebook bucket location>/<editor id>/executions/<Execution id>/<input file name>.

We run the following s3 cp command to download the EMR notebook output file to a local directory (replace <notebook_bucket_location> with the S3 location specified for the notebook during creation, <e-*************************> with the EMR Notebook ID, and <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws s3 cp s3://<notebook_bucket_location>/<e-*************************>/executions/<ex-*****************************>/demo_pyspark.ipynb

In the same directory where we downloaded the EMR notebook output file, run the following command to start a local Jupyter server:

% jupyter lab

The URL http://localhost:8888/lab automatically opens in your web browser, as shown in the following screenshot.

Choose demo_pyspark.ipynb to view the output file. In the output, it plots two graphs. Graph a shows the top six US states with confirmed COVID-19 cases on a given date.

Graph b shows the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada on a given date.

Scheduling to run a notebook daily using crontab

We have completed running the EMR notebook using the AWS CLI. Now, we demonstrate how to schedule running a notebook daily using crontab. We use the same notebook input file with the same parameters as the previous example. On a daily basis, it generates Graph a with the top six US states with confirmed COVID-19 cases, and Graph b with the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada.

We start by creating a bash script named run_notebook_daily.sh. The script starts an EMR notebook, waits for the notebook to either finish running or fail, and copies the output file to the local directory ~/daily_reports/.

The following code is the content of run_notebook_daily.sh (replace <e-*************************> with the ID of EMR Notebook and <j-*************> with the EMR cluster ID):

# Generate a report for day before yesterday
day_before_yesterday=`date -v-2d +'%m-%d-%Y'`

# Start an execution
execution_id=`aws emr start-notebook-execution \
--editor-id <e-*****************************> \
--notebook-params '{"DATE":"'"$day_before_yesterday"'", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*********">}' \
--service-role EMR_Notebooks_DefaultRole | jq -r .'NotebookExecutionId'`

echo "Started an execution for the date $day_before_yesterday. Execution id: $execution_id"

# Poll for execution to finish
while
    execution_status=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.Status'`
    echo "Execution Status: $execution_status"
    
    if [ $execution_status == "FINISHED" ] || [ $execution_status == "FAILED" ]; then
        # Copy the output file to local directory
        output_file=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.OutputNotebookURI'`
        mkdir -p daily_reports
        aws s3 cp "$output_file" daily_reports/
       break
    fi
    sleep 15s
do true; done

Next, we add this script to a crontab to run our EMR notebook job daily at 9:00 AM:

% crontab
0 9 * * * bash /folder/path/run_notebook_daily.sh >/tmp/stdout.log 2>/tmp/stderr.log

This is a simple example of how to schedule running an EMR notebook with a crontab.

Use case 2: Chaining EMR notebooks with Step Functions triggered by CloudWatch Events

We use demo_pyspark.ipynb and trailing_N_day.ipynb as the input notebook files for this use case. We also provide a CloudFormation template as a general guide. Please review and customize it as needed. Be aware that some of the resources deployed by this stack incur costs when they remain in use.

The following diagram illustrates the resources that the CloudFormation template creates.

The template first creates a step function to run a chain of EMR notebooks, which takes care of the following tasks:

  • Runs notebook demo_pyspark.ipynb with given parameters and waits until it’s complete. It plots a graph of the top k US states with most COVID-19 cases yesterday.
  • Runs notebook input trailing_N_day.ipynb using the output from the first task. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input, and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

The template also creates a CloudWatch event that periodically triggers the step function according to the given schedule expression.

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It may prompt you to sign in as needed. You can view the template on the AWS CloudFormation console as required. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters:

ParameterDescriptionDefault Value
Stack nameEnter a meaningful name for the stack, for example, emrRunnableNotebookDemo.None
ClusterIdThe unique ID of the EMR cluster that runs the notebook (j-*************).None
NotebookARelativePathThe path and file name of the notebook input file A (demo_pyspark.ipynb), relative to the path specified for the EMR notebook. For more information, see Notebook execution CLI command samples.demo_pyspark.ipynb
NotebookBRelativePathThe path and file name of the notebook input file B (trailing_N_day.ipynb), relative to the path specified for the EMR notebook.experiment/trailing_N_day.ipynb
NotebookIdThe unique ID of the EMR notebook to use for running the notebook (e-*****************************).None
ScheduleExpressionHow the notebook is scheduled to run. For more information, see Schedule Expressions for Rules.rate(1 day)
StorageLocationThe Amazon S3 path where the EMR notebook is stored (s3://aws-emr-resources-************-us-west-2/notebooks/e-*************************).None
TopKThe value of one of the parameters used to run notebook A. In this example, it checks the top k US states with confirmed COVID-19 cases and plots a graph for it.20

 

  1. Enter the parameter values from the preceding table.
  2. Review the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation only takes a few minutes. When the stack is complete, on the Resources tab, you can find the resources created as shown in the following screenshot.

Checking the notebook output files

When a step function is complete, you can find the execution IDs in the step function output.

We run the following command to view the output files (replace <notebook_bucket_location> with the Amazon S3 location specified for the notebook during creation and <e-*************************> with the EMR notebook ID):

% aws s3 ls --recursive s3://<notebook_bucket_location>/<e-*************************>/executions/

The aws s3 ls --recursive command returns an output similar to the following:

2020-10-16 16:39:02     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:44:14     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 17:00:37      18600 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:49:08     267781 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 16:59:01     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:54:06     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb

Downloading and visualizing the results

Follow the same steps in the first use case to download and visualize the results.

The following screenshot is the graph plotted in the notebook input file A (demo_pyspark.ipynb ) output file. It shows the top 20 US states with confirmed COVID-19 cases yesterday.

The output of input file B (trailing_N_day.ipynb) plots the graph as shown in the following screenshot. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

This example step function is the orchestration for running two notebook input files: the second notebook uses the result from the first. It also monitors the first notebook until it is complete, and populates the Amazon S3 file location in the outputs. You can achieve more sophisticated orchestration by adding more states in the step function.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack, the EMR cluster, and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how you can schedule running an EMR notebook using crontab and the AWS CLI, and how to chain EMR notebooks with Step Functions triggered by CloudWatch events. The EMR Notebooks Execution API enables the parameterization for EMR notebooks. With this feature, you can also use orchestration services such as Apache Airflow to build ETL pipelines.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

 

 

 

Palaniappan Nagarajan is a Software Development Engineer at Amazon EMR working mainly on EMR Notebooks. In his spare time, he likes to hike, try out different cuisines, and scan the night sky with his telescope.

 

 

Shuang Li is a senior product manager for Amazon EMR at AWS. She holds a doctoral degree in Computer Science and Engineering from Ohio State University.

How the ZS COVID-19 Intelligence Engine helps Pharma & Med device manufacturers understand local healthcare needs & gaps at scale

Post Syndicated from Saunak Chandra original https://aws.amazon.com/blogs/big-data/how-the-zs-covid-19-intelligence-engine-helps-pharma-med-device-manufacturers-understand-local-healthcare-needs-gaps-at-scale/

This post is co-written by Parijat Sharma: Principal, Strategy & Transformation, Wenhao Xia: Manager, Data Science, Vineeth Sandadi: Manager, Business Consulting from ZS Associates, Inc, Arianna Tousi: Strategy, Insights and Planning Consultant from ZS, Gopi Vikranth: Associate Principal from ZS. In their own words, “We’re passionately committed to helping our clients and their customers thrive, working side by side to drive customer value and results”.

The COVID-19 trajectory across the US continues to remain unstable and heterogeneous. Although certain cities and counties were able to tame the adverse effects of the pandemic by applying stricter controls on social life, newer hotspots are emerging in different locations sporadically.

Organizations in healthcare, pharma, and biotech are looking to adapt to a rapidly evolving and diverse local market landscape, and restart parts of their operations that are significantly impacted, such as patient support functions, sales, and key account management. Real-time insights into the rapidly evolving COVID-19 situation and its impact on all key stakeholders in the healthcare supply chain, including patients, physicians, and health systems, is a key asset in helping companies adapt based on local market dynamics and remain resilient to future disruptions. However, several life-science companies don’t have these insights because they lack the infrastructure to integrate and manage the relevant datasets at scale and the analytical capabilities to mine the data for the relevant insights.

ZS came into this critical situation and built a data lake on AWS to address these challenges. The primary characteristics of this data lake is that it’s largely open source, which gives ZS a head start to meet the product launch SLA using AWS. This post describes how ZS developed the data lake and brought their proprietary machine learning (ML) models to run on AWS, providing intelligent insight on COVID-19.

What is the ZS COVID-19 Intelligence Engine?

The ZS COVID-19 Intelligence Engine was designed as a customizable capability that does the following:

  • Integrates diverse public and proprietary healthcare datasets in a scalable data warehouse that stores data in a secure and compliant manner
  • Provides advanced descriptive and predictive analytical modules to forecast COVID-19 evolution and its impact on key stakeholders and the treatment journey
  • Packages insights into intuitive preconfigured reports and dashboards for dissemination across an organization

AWS Cloud data and analytics infrastructure

In this section, we dive into the infrastructure components of the ZS COVID-19 Intelligence Engine. The objective was to quickly set up a data lake with an accompanying ingestion mechanism to allow rapid ingestion of public datasets, third-party data, and datasets from AWS Data Exchange.

The overall data processing solution is based on ZS’s REVO™ data management product, which uses Apache Spark on Amazon EMR. The Spark engine processes and transforms raw data into structured data that is ready for interactive analysis. The raw data comes in compressed text delimited format ranging from 100 MBs to 15 GB. After the data is cleansed and rules applied, the processed data is staged in Amazon Simple Storage Service (Amazon S3) buckets in Apache Parquet format. This data is selectively loaded into an Amazon Redshift cluster for fast interactive querying and repetitive analysis on subsets of data.

The Intelligence Engine also uses a powerful Amazon Elastic Compute Cloud (Amazon EC2) instance to run ML workloads, which predicts future COVID-19 caseloads at the county level. The prediction models run daily on a compute-optimized EC2 C5.24xlarge On-Demand Instance, allowing rapid turnaround of prediction results and saving overall cost for using On-Demand Instances.

ZS uses Amazon Redshift as the data warehouse in this architecture. Amazon Redshift is easy to launch and maintain and can quickly run analytical queries on large normalized datasets using standard ANSI SQL. After the raw data gets processed using ZS’s REVO™, the curated data is loaded into Amazon Redshift to run interactive analytical queries. The queries generate insights specific to local geography, county, and healthcare systems, and run on Amazon Redshift tables consisting of anonymized patient data. The Amazon Redshift cluster uses On-Demand Instances and is sized to accommodate 25 TB of data at the time of this product launch. Typical interactive queries include joining data across large tables, up to 1.5 billion rows in the main table.

The following diagram illustrates this architecture:

The ZS COVID-19 data lake has several benefits and applicable use cases:

  • Streamlined data procurement processes – Eliminates the need for multiple ZS teams to procure, ingest, and process the same datasets separately
  • Optimized common usage across clients and business questions – ZS uses this capability to publish common derivations of data that can then be utilized across different ZS teams and use cases to create a single version of truth
  • Cross-functional processes and requirements – Some analytics use cases require cross-functional data and are significantly hampered by the ability of a user to access various data sources in one place—a data lake enables this by design
  • Connected healthcare data – Due to developing common standards and integrating with MDM and ontologies, data from the public domain can be compliantly integrated with pharma manufacturer-specific data sources to enable seamless analytics on the data lake

Comprehensive healthcare data lake

At its core, the Intelligence Engine builds a scalable and integrated repository of diverse public and proprietary data sources. These datasets range in variety, volume, and velocity:

  • COVID-19 incidence – There are several COVID-specific datasets that the public has become accustomed to viewing over the past several months, such as Johns Hopkins incidence tracking and IHME predictive data, which describes how the disease has been progressing over time and even into the future. This data tends to be at either the state or county level and is often refreshed daily. The data lake solution contains the entire history for these datasets, which, taken together, spans into the hundreds of gigabytes in size. In addition to these sources, ZS’ proprietary predictive models add an additional element of accuracy and are customized with ZS-specific insights.
  • Government policies – Government policy data, which is mostly being used from AWS Data Exchange on behalf of the New York Times, explains the current state of government mandates and recommendations for varying degrees of lockdown or reopening as it pertains to the pandemic. This data is much smaller in volume, well under 1 GB total.
  • Insurance claims at patient level – Thanks to the partnership with Symphony Health, ZS have had the opportunity to analyze and expose patient claims data that can be attributed to the specific hospital account or healthcare provider for which that claim took place. The insurance claims data is the largest volume of data—close to 15 TB—contributing to the ZS COVID-19 Intelligence Engine. ZS’ data engineering team has wrangled these large datasets with the help of Amazon EMR for aggregating and processing metrics, which are then stored in Amazon Redshift in a transformed version that is much smaller and can be more easily understood than the original raw datasets.
  • HCP to site of care affiliations – Thanks to the partnership with Definitive Healthcare, ZS are in the process of integrating best-in-class physician-hospital and clinic affiliations from Definitive Healthcare with patient claims from Symphony to help assess available healthcare capacity and evolving approaches to care delivery and type of care being delivered by disease area.
  • Other Intelligence engine data sources
    • State testing rates
    • Mobility
    • Demographics and social determinants of health
    • Provider access and affinity for pharma commercial engagement (from ZS affinity/access monitor)
    • Automated data ingestors for a variety of pharma manufacturer-specific data sources including specialty pharmacy and hub transactions, sales force activity, customer digital engagement, and more

Predictive models for COVID-19 projections and healthcare demand-supply gaps at a local level

To drive decision-making at a local level, ZS required more granular projections of COVID-19 disease spread than what’s publicly available at a state or national level. Therefore, as part of the Intelligence Engine, the ZS data science team aimed to developed an ensemble model of COVID-19 projections at the county level to identify emerging local healthcare gaps along different phases of the treatment process.

Developing a locally predictive model has many challenges, and ZS believe that no single model can capture all the virtually infinite drivers and factors contributing to disease spread within a specific geographic area. Therefore, the ZS data science team behind the COVID-19 projections has implemented multiple projection models, each with their own set of input data sources, assumptions, and parameters. This allows to increase the accuracy of the projection while retaining a level of stability and interpretability of tge model. These models include:

  • Statistical curve fitting model – A disease progression curve using a Generalized Gaussian Cumulative Distribution Function, optimized to minimize prediction error of COVID-19 cases and deaths
  • SEIR model – Traditional epidemiological disease progression model (pathway of Susceptible – Exposed – Infectious – Recovered) combined with traditional ML on model parameters
  • Agent-based simulation – County-level simulation of individual interactions between people within the county

Obtaining a more granular view of future virus spread at a local level is critical in order to provide support for challenges in specific sites of care. Accurately projecting cases at the county level can be difficult for many reasons. Counties with low current case counts means that the model has little historical data to learn from (both in time since first infection and in magnitude of cases). Additionally, forecasts can be sensitive to many variables, and the current second wave of COVID-19 infections adds additional complications to tracking the spread of the virus.

To combat some of these difficulties, ZS implemented a two-phased approach to generate county-level projections. Counties with a long enough history of virus spread are projected independently using the three disease progression models we outlined, whereas counties with limited history are projected using a combination of state-level projections and social determinants of health factors that are predictive of disease spread (for example, age distribution in a certain county).

As the world around us continues to evolve and the COVID-19 situation with it, the ZS data science team is also working to adapt the model alongside the current situation. Currently, model adaptability and its self-learning ability are continuing to improve to better adapt to the onset of the second wave of the virus. Additional parameters and re-optimizations are happening daily as the situation develops.

Following image shows the Input data sources, modeling techniques and outputs from ZS COVID-19 projection models:

Analyzing and predicting local non-COVID-19 treatment gaps and their drivers

Several flexible analytical tools can be used to evaluate barriers along the disease treatment journey for non-COVID-19 diseases at the local geography level, their evolution over time with COVID-19, and their underlying drivers. These tools summarize local changes in and the underlying drivers of the following:

  • New patient diagnosis
  • Changes in treatment approaches and drugs used
  • Patient affordability and access to medications
  • Persistency and compliance to treatment
  • Healthcare demand, patients needing care and supply, provider capacity to offer care

Following image represents output from the Intelligence Engine illustrating local variations in Healthcare gaps:

Intuitive visualization capabilities

The solution has two intuitive visualization capabilities:

  • COVID-19 monitor – A public access dashboard with insights on historical and future predictions of trajectories of COVID-19 incidences and hospital capacity. These insights are available at the state level and further and allow you to drill into individual counties. The individual county-level view allows you to not only understand the severity of COVID-19 in that area, but also better understand how that county compares to other counties within the same state and observe what policies their local governments have set for the shutdown and reopening process.
  • Treatment finder: A second public access dashboard with near-real-time insights into individual hospital and physician group availability to treat patients for prominent non-COVID-19 diseases. This dashboard allows you to select a specific non-COVID-19 disease and identify the estimated number of COIVD-19-infected people in their geography with the disease, mortality rates, and the individual providers that are accepting patients with a specific disease and health insurance.

Following image represents Intelligence Engine screen with COVID-19 insights for a selected county:

Following image represents Intelligence engine screen that allows patients to find Hospitals / Physician offices that are open & accepting patients:

Conclusion

At its core, the ZS Intelligence Engine is a real-time planning tool. The rich set of AWS services and technologies make it possible to ingest data from various third-party sources—public and proprietary sources alike. AWS services used to build the architecture can run on open technologies. For example, building the the data lake would not have been possible  without Amazon EMR and Amazon EC2. ZS had already been using Apache spark-based EMR instances—the service behind the REVOTM tool—prior to COVID-19 hitting us. ZS can run its ML models cost-effectively by using EC2 On-Demand Instances. Finally, using Amazon Redshift as a data warehouse solution allows ZS to provide COVID-19 analytical insights efficiently and cost-effectively.

Since the project went live, ZS has catered this product to at least six customers in pharma, biotech, and medical device spaces. They are using this product in a variety of ways, including but not limited to:

  • Refining the forecast relating the COVID-19 trajectory to estimate demand for their products
  • Assessing the level of openness of healthcare facilities to understand where patients across therapy areas are being treated
  • Determining which patients and communities to support, because COVID-19 impacts attitudes and concerns regarding immunity and drug use, and greater unemployment means more reimbursement support requirements
  • Readying the education and engagement field force for a mix of in-person and virtual interactions
  • Preparing the supply chain to ensure continuity of care

To try out the analysis yourself, see ZS’s COVID-19 Intelligence Engine.


About the  Authors

Saunak is a Sr. Solutions Architect with AWS helping customers and partners build data warehouse and scalable data platform on AWS.

 

 

Parijat is the current lead of strategy and transformation at ZS. He focuses on mid to small clients that are ready for a transformational process to commercialize new products/portfolio, purchase/sell assets or expand into new markets.

 

 

Wenhao has over 10 years of experience in various data science and advanced analytics field. During his time at ZS, he has helped both to build and popularize data science capabilities across many organizations.

 

 

Vineeth works with Pharmaceutical & Biotech manufacturers on a  broad-spectrum of Commercial issues including Commercial Analytics, Organized provider Strategy & Resource Planning & Deployment.

 

 

Arianna is a Strategy, Insights and Planning Consultant in ZS’ High Tech practice. Arianna has extensive experience in working with clients across industries with go to market strategy and commercial effectiveness issues.

 

 

Gopi Vikranth is an Associate Principal in ZS’ High Tech Practice. He has extensive experience in helping clients across Retail, HiTech, Hospitality, Pharmaceutical & Insurance sectors leverage BigData & Analytics to drive Topline growth.

 

Optimizing Amazon EMR for resilience and cost with capacity-optimized Spot Instances

Post Syndicated from Ran Sheinberg original https://aws.amazon.com/blogs/big-data/optimizing-amazon-emr-for-resilience-and-cost-with-capacity-optimized-spot-instances/

Amazon EMR now supports the capacity-optimized allocation strategy for Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances for launching Spot Instances from the most available Spot Instance capacity pools by analyzing capacity metrics in real time. You can now specify up to 15 instance types in your EMR task instance fleet configuration. This provides Amazon EMR with more options in choosing the optimal pools to launch Spot Instances from in order to decrease chances of Spot interruptions, and increases the ability to relaunch capacity using other instance types in case Spot Instances are interrupted when Amazon EC2 needs the capacity back.

Background

Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open-source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto. With Amazon EMR, you can run petabyte-scale analysis at less than half of the cost of traditional on-premises solutions, and over three times as fast on Amazon EMR runtime for Apache Spark compared to running without the runtime. If you have existing on-premises deployments of open-source tools such as Apache Spark and Apache Hive, you can also run Amazon EMR clusters on AWS Outposts.

Spot Instances are spare Amazon EC2 compute capacity in the AWS Cloud available to you at savings of up to 90% compared to On-Demand Instance prices. The only difference between On-Demand Instances and Spot Instances is that Amazon EC2 can interrupt Spot Instances with 2 minutes of notification when Amazon EC2 needs the capacity back. Using Spot Instances in Amazon EMR is a common pattern that allows AWS customers to achieve significant cost savings.

The capacity-optimized allocation strategy in the Amazon EC2 fleet (also available for Amazon EC2 Auto Scaling and Spot Fleet) provisions Spot Instances from the most-available Spot Instance pools by analyzing capacity metrics. By offering the possibility of fewer interruptions, the capacity-optimized strategy can lower the overall cost of your workload. For more information about how AWS customers are benefiting from decreased Spot interruptions with the capacity-optimized allocation strategy, see Capacity-Optimized Spot Instance Allocation in Action at Mobileye and Skyscanner.

Amazon EMR uses the Amazon EC2 RunInstances API to provision compute capacity. We are enhancing the way Amazon EMR provisions EC2 instances to provide more flexibility and increased cluster resilience using EC2 Fleet (CreateFleet) in Instant mode, as a drop-in replacement for RunInstances.

Optimizing capacity for greater resilience

With this launch, you can configure Amazon EMR to use allocation strategies.

The capacity-optimized allocation strategy uses real-time capacity data to allocate instances from the Spot Instance pools with the optimal capacity for the number of instances that are launching. This allocation strategy is appropriate for workloads that have a higher cost of interruption. Examples include long-running jobs and multi-tenant persistent clusters running Apache Spark, Apache Hive, and Presto. This allocation strategy lets you specify up to 15 EC2 instance types on task instance fleets to diversify your Spot requests and get steep discounts. Previously, instance fleets allowed a maximum of five instance types. You can now diversify your Spot requests across these 15 pools within each Availability Zone and prioritize deploying into a deeper capacity pool to lower the chance of interruptions. With more instance type diversification, Amazon EMR has more capacity pools to allocate capacity from, and chooses the Spot Instances which are least likely to be interrupted.

For example, if you’re initially using EC2 memory-optimized r5.4xlarge instances (with 16 vCPUs and 128GB of RAM) for your EMR task nodes, you can configure the EMR task instance fleet with different instances types. First, explore different-sized instance types such as r5.2xlarge and r5.8xlarge. Second, add previous generation r4.4xlarge and other R4 instance sizes. After you’ve added different sizes within the same family, as well as previous generation instance types, you can add extra instance types with similar hardware characteristics and vCPU to memory ratio, such as the r5a instance family with AMD processors, r5d instance family with locally attached NVMe storage, and more.

The allocation strategy is also taken into account in case the cluster scales out after the initial provisioning phase—for example, if you manually resize the core or task fleets, or if you’re using managed scaling to automatically increase or decrease the number of instances or units in your cluster based on workload.

For more information about Spot Instance configuration if you’re using Amazon EMR to run Apache Spark workloads, see Best practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR. This blog emphasizes best practices that will help you build your Spark workloads with Amazon EMR to achieve deep cost savings.

Amazon EMR has made significant enhancements to improve elasticity and resilience, including graceful decommissioning of Spot Instances running Apache Spark and Apache Hadoop applications. Amazon EMR has customizations to open-source Apache Spark that make it more resilient to node loss—integrating with YARN’s decommissioning mechanism, extending Apache Spark’s blacklisting mechanism and actions on decommissioned nodes.

For example, when Spot Instances are interrupted in a running EMR cluster, stage failures don’t count towards the total number of failures that trigger a total job failure. For more information, see Spark enhancements for elasticity and resilience on Amazon EMR.

New configuration options and IAM policy requirements

To leverage the allocation strategies in your EMR clusters, you need to use a new AllocationStrategy parameter in your cluster configurations. Amazon EMR added support for an On-Demand allocation strategy: you can specify multiple On-Demand Instance types in your core or task instance fleets, and specify an allocation strategy of “lowest-price” to have Amazon EMR provision On-Demand Instances that have the lowest costs. This allows you to also be flexible with your selection of On-Demand instance types.

The following is an example snippet from an Amazon EMR JSON configuration file with the new capabilities:

{
    "Name": "Taskfleet",
    "InstanceFleetType": "TASK",
    "TargetSpotCapacity": 1,
    "TargetOnDemandCapacity": 1,
    "LaunchSpecifications": {
        "OnDemandSpecification": {
            "AllocationStrategy": "lowest-price"
        },
        "SpotSpecification": {
            "AllocationStrategy": "capacity-optimized",
            "TimeoutDurationMinutes": 120,
            "TimeoutAction": "TERMINATE_CLUSTER"
        }
    }
}

For more information about the API options, see InstanceFleetProvisioningSpecifications.

The following IAM policy shows the additional service role permissions required to create a cluster that uses the instance fleet allocation strategy option. If your clusters are using the default role EMR_DefaultRole (which has the default managed policy attached AmazonElasticMapReduceRole), the managed policy is already updated to include these new role permissions. If your clusters are using a different role or policy, make sure you add these new permissions to your policy. See the following IAM policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DeleteLaunchTemplate",
                "ec2:CreateLaunchTemplate",
                "ec2:DescribeLaunchTemplates",
                "ec2:CreateFleet"
            ],
            "Resource": "*"
        }

Launching an EMR cluster with capacity-optimized Spot Instances and a diversified task fleet

In this section, we look at how to create an EMR cluster that includes allocation strategy configurations and a diversified task fleet. The reason for specifying more instance types is in order to allow Amazon EMR to launch instances from the most optimal capacity pools, and be able to replenish the cluster’s target capacity in case Spot Instances are interrupted. Moreover, by using the capacity-optimized allocation strategy, Spot Instances will be launched from the most available capacity pools, effectively decreasing the chances of Spot interruptions.

The following AWS Command Line Interface (AWS CLI) command launches an EMR cluster in the default AWS Region configured in your AWS CLI configuration, with the master and core nodes running on On-Demand Instances, and a task fleet running Spot Instances.

Amazon EMR uses a wide selection of instance types in the task instance fleet and uses "AllocationStrategy": "CAPACITY_OPTIMIZED" to launch instances from the most available Spot capacity pools and decrease the chances of workload interruptions. By providing a WeightedCapacity for each instance type that is equal to the number of vCPU (or YARN vCores), you can specify a TargetSpotCapacity that defines the number of vCPUs (YARN vCores) in your task fleet and be flexible around the instance sizes, effectively providing more capacity pools to choose from. You should specify a subnet ID per Availability Zone in the AWS Region. While each EMR cluster runs in a single Availability Zone, specifying multiple Availability Zones allows you to architect your workload with increased fault-tolerance.

See the following AWS CLI command for an example of launching an Amazon EMR cluster that adheres to the recommendations in this blog post (uses Allocation Strategies and a diversified set of instance types in the task fleet):

aws emr create-cluster \
--use-default-roles --release-label emr-5.30.1 \
--ec2-attributes SubnetIds=['subnet-1234567890abcdefg','subnet-1234567890abcdefg','subnet-1234567890abcdefg'] \
--name 'EMRCluster-TaskFleet' \
--instance-fleets \InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{WeightedCapacity=1,InstanceType=m5.xlarge}'] \InstanceFleetType=CORE,TargetOnDemandCapacity=4,LaunchSpecifications={OnDemandSpecification='{AllocationStrategy=LOWEST_PRICE}'},InstanceTypeConfigs=['{WeightedCapacity=4,InstanceType=r4.xlarge},{WeightedCapacity=4,InstanceType=r5.xlarge}'] \InstanceFleetType=TASK,TargetSpotCapacity=64,LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=60,AllocationStrategy=CAPACITY_OPTIMIZED,TimeoutAction=TERMINATE_CLUSTER}'},InstanceTypeConfigs=['{WeightedCapacity=4,InstanceType=r5.xlarge},{WeightedCapacity=4,InstanceType=r4.xlarge},{WeightedCapacity=8,InstanceType=r5.2xlarge},{WeightedCapacity=8,InstanceType=r4.2xlarge},{WeightedCapacity=16,InstanceType=r5.4xlarge},{WeightedCapacity=16,InstanceType=r4.4xlarge},{WeightedCapacity=32,InstanceType=r5.8xlarge},{WeightedCapacity=32,InstanceType=r4.8xlarge},{WeightedCapacity=64,InstanceType=r5.16xlarge},{WeightedCapacity=64,InstanceType=r4.16xlarge},{WeightedCapacity=16,InstanceType=r5d.4xlarge},{WeightedCapacity=16,InstanceType=r5a.4xlarge}']

The following screenshot shows the result on the Amazon EMR console.

Conclusion

With this new functionality in Amazon EMR, you can increase the resilience of your organization’s data-processing workloads and optimize your costs by using Spot Instances. The capacity-optimized allocation strategy works to decrease the possibility of Spot interruptions in your cluster and allows you to specify up to 15 different instance types for your task fleet, enabling Amazon EMR to find the most available Spot capacity pools for your workload. With the Amazon EMR enhancements for increased resilience and the capacity-optimized allocation strategy for Spot Instances, you can achieve deep cost savings without compromising on availability.


About the authors

Ran Sheinberg is a principal solutions architect in the EC2 Spot team with Amazon Web Services. He works with AWS customers on cost optimizing their compute spend by utilizing Spot Instances across different types of workloads: stateless web applications, queue workers, containerized workloads, analytics, HPC and others.

 

 

 

 

 

 

Apply record level changes from relational databases to Amazon S3 data lake using Apache Hudi on Amazon EMR and AWS Database Migration Service

Post Syndicated from Ninad Phatak original https://aws.amazon.com/blogs/big-data/apply-record-level-changes-from-relational-databases-to-amazon-s3-data-lake-using-apache-hudi-on-amazon-emr-and-aws-database-migration-service/

Data lakes give organizations the ability to harness data from multiple sources in less time. Users across different roles are now empowered to collaborate and analyze data in different ways, leading to better, faster decision-making. Amazon Simple Storage Service (Amazon S3) is the highly performant object storage service for structured and unstructured data and the storage service of choice to build a data lake.

However, many use cases like performing change data capture (CDC) from an upstream relational database to an Amazon S3-based data lake require handling data at a record level. Performing an operation like inserting, updating, and deleting individual records from a dataset requires the processing engine to read all the objects (files), make the changes, and rewrite the entire dataset as new files. Furthermore, making the data available in the data lake in near-real time often leads to the data being fragmented over many small files, resulting in poor query performance. Apache Hudi is an open-source data management framework that enables you to manage data at the record level in Amazon S3 data lakes, thereby simplifying building CDC pipelines and making it efficient to do streaming data ingestion. Datasets managed by Hudi are stored in Amazon S3 using open storage formats, and integrations with Presto, Apache Hive, Apache Spark, and the AWS Glue Data Catalog give you near real-time access to updated data using familiar tools. Hudi is supported in Amazon EMR and is automatically installed when you choose Spark, Hive, or Presto when deploying your EMR cluster.

In this post, we show you how to build a CDC pipeline that captures the data from an Amazon Relational Database Service (Amazon RDS) for MySQL database using AWS Database Migration Service (AWS DMS) and applies those changes to a dataset in Amazon S3 using Apache Hudi on Amazon EMR. Apache Hudi includes the utility HoodieDeltaStreamer, which provides an easy way to ingest data from many sources, such as a distributed file system or Kafka. It manages checkpointing, rollback, and recovery so you don’t need to keep track of what data has been read and processed from the source, which makes it easy to consume change data. It also allows for lightweight SQL-based transformations on the data as it is being ingested. For more information, see Writing Hudi Tables. Support for AWS DMS with HoodieDeltaStreamer is provided with Apache Hudi version 0.5.2 and is available on Amazon EMR 5.30.x and 6.1.0.

Architecture overview

The following diagram illustrates the architecture we deploy to build our CDC pipeline.

In this architecture, we have a MySQL instance on Amazon RDS. AWS DMS pulls full and incremental data (using the CDC feature of AWS DMS) into an S3 bucket in Parquet format. HoodieDeltaStreamer on an EMR cluster is used to process the full and incremental data to create a Hudi dataset. As the data in the MySQL database gets updated, the AWS DMS task picks up the changes and takes them to the raw S3 bucket. The HoodieDeltastreamer job can be run on the EMR cluster at a certain frequency or in a continuous mode to apply these changes to the Hudi dataset in the Amazon S3 data lake. You can query this data with tools such as SparkSQL, Presto, Apache Hive running on the EMR cluster, and Amazon Athena.

Deploying the solution resources

We use AWS CloudFormation to deploy these components in your AWS account. Choose an AWS Region for deployment where the following services are available:

You need to meet the following prerequisites before deploying the CloudFormation template:

  • Have a VPC with at least two public subnets in your account.
  • Have a S3 bucket where you want to collect logs from the EMR cluster. This should be in the same AWS region where you spin up the CloudFormation stack.
  • Have an AWS Identity and Access Management (IAM) role dms-vpc-role. For instructions on creating one, see Security in AWS Database Migration Service.
  • If you’re deploying the stack in an account using the AWS Lake Formation permission model, validate the following settings:
    • The IAM user used to deploy the stack is added as a data lake administrator under Lake Formation or the IAM user used to deploy the stack has IAM privileges to create databases in the AWS Glue Data Catalog.
    • The Data Catalog settings under Lake Formation are configured to use only IAM access control for new databases and new tables in new databases. This makes sure that all access to the newly created databases and tables in the Data Catalog are controlled solely using IAM permissions.
  • IAMAllowedPrincipals is granted database creator privilege on the Lake Formation Database creators page.

If this privilege is not in place, grant it by choosing Grant and selecting the Create database permission.

These Lake Formation settings are required so that all permissions to the Data Catalog objects are controlled using IAM only.

Launching the CloudFormation stack

To launch the CloudFormation stack, complete the following steps:

  1. Choose Launch Stack:
  2. Provide the mandatory parameters in the Parameters section, including an S3 bucket to store the Amazon EMR logs and a CIDR IP range from where you want to access Amazon RDS for MySQL.
  3. Follow through the CloudFormation stack creation wizard, leaving rest of the default values unchanged.
  4. On the final page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create stack.
  6. When the stack creation is complete, record the details of the S3 bucket, EMR cluster, and Amazon RDS for MySQL details on the Outputs tab of the CloudFormation stack.

The CloudFormation template uses m5.xlarge and m5.2xlarge instances for the EMR cluster. If these instance types aren’t available in the Region or Availability Zone you have selected for deployment, the creation of the CloudFormation stack fails. If that happens, choose a Region or subnet where the instance type is available. For more information about working around this issue, see Instance Type Not Supported.

CloudFormation also creates and configures the AWS DMS endpoints and tasks with requisite connection attributes such as dataFormat, timestampColumnName, and parquetTimestampInMillisecond. For more information, see Extra connection attributes when using Amazon S3 as a target for AWS DMS.

The database instance deployed as part of the CloudFormation stack has already been created with the settings needed for AWS DMS to work in CDC mode on the database. These are:

  • binlog_format=ROW
  • binlog_checksum=NONE

Also, automatic backups are enabled on the RDS DB instance. This is a required attribute for AWS DMS to do CDC. For more information, see Using a MySQL-compatible database as a source for AWS DMS.

Running the end-to-end data flow

Now that the CloudFormation stack is deployed, we can run our data flow to get the full and incremental data from MySQL into a Hudi dataset in our data lake.

  1. As a best practice, retain your binlogs for at least 24 hours. Log in to your Amazon RDS for MySQL database using your SQL client and run the following command:
    call mysql.rds_set_configuration('binlog retention hours', 24)

  2. Create a table in the dev database:
    create table dev.retail_transactions(
    tran_id INT,
    tran_date DATE,
    store_id INT,
    store_city varchar(50),
    store_state char(2),
    item_code varchar(50),
    quantity INT,
    total FLOAT);

  3. When the table is created, insert some dummy data into the database:
    insert into dev.retail_transactions values(1,'2019-03-17',1,'CHICAGO','IL','XXXXXX',5,106.25);
    insert into dev.retail_transactions values(2,'2019-03-16',2,'NEW YORK','NY','XXXXXX',6,116.25);
    insert into dev.retail_transactions values(3,'2019-03-15',3,'SPRINGFIELD','IL','XXXXXX',7,126.25);
    insert into dev.retail_transactions values(4,'2019-03-17',4,'SAN FRANCISCO','CA','XXXXXX',8,136.25);
    insert into dev.retail_transactions values(5,'2019-03-11',1,'CHICAGO','IL','XXXXXX',9,146.25);
    insert into dev.retail_transactions values(6,'2019-03-18',1,'CHICAGO','IL','XXXXXX',10,156.25);
    insert into dev.retail_transactions values(7,'2019-03-14',2,'NEW YORK','NY','XXXXXX',11,166.25);
    insert into dev.retail_transactions values(8,'2019-03-11',1,'CHICAGO','IL','XXXXXX',12,176.25);
    insert into dev.retail_transactions values(9,'2019-03-10',4,'SAN FRANCISCO','CA','XXXXXX',13,186.25);
    insert into dev.retail_transactions values(10,'2019-03-13',1,'CHICAGO','IL','XXXXXX',14,196.25);
    insert into dev.retail_transactions values(11,'2019-03-14',5,'CHICAGO','IL','XXXXXX',15,106.25);
    insert into dev.retail_transactions values(12,'2019-03-15',6,'CHICAGO','IL','XXXXXX',16,116.25);
    insert into dev.retail_transactions values(13,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    insert into dev.retail_transactions values(14,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    

    We now use AWS DMS to start pushing this data to Amazon S3.

  4. On the AWS DMS console, run the task hudiblogload.

This task does a full load of the table to Amazon S3 and then starts writing incremental data.

If you’re prompted to test the AWS DMS endpoints while starting the AWS DMS task for the first time, you should do so. It’s generally a good practice to test the source and target endpoints before starting an AWS DMS task for the first time.

In a few minutes, the status of the task changes to Load complete, replication ongoing, which means that the full load is complete and the ongoing replication has started. You can go to the S3 bucket created by the stack and you should see a .parquet file under the dmsdata/dev/retail_transactions folder in your S3 bucket.

  1. On the Hardware tab of your EMR cluster, choose the master instance group and note the EC2 instance ID for the master instance.
  2. On the Systems Manager console, choose Session Manager.
  3. Choose Start Session to start a session with the master node of your cluster.

If you face challenges connecting to the master instance of the EMR cluster, see Troubleshooting Session Manager.

  1. Switch the user to Hadoop by running the following command:
    sudo su hadoop

In a real-life use case, the AWS DMS task starts writing incremental files to the same Amazon S3 location when the full load is complete. The way to distinguish full load vs. incremental load files is that the full load files have a name starting with LOAD, whereas CDC filenames have datetimestamps, as you see in a later step. From a processing perspective, we want to process the full load into the Hudi dataset and then start incremental data processing. To do this, we move the full load files to a different S3 folder under the same S3 bucket and process those before we start processing incremental files.

  1. Run the following command on the master node of the EMR cluster (replace <s3-bucket-name> with your actual bucket name):
    aws s3 mv s3://<s3-bucket-name>/dmsdata/dev/retail_transactions/ s3://<s3-bucket-name>/dmsdata/data-full/dev/retail_transactions/  --exclude "*" --include "LOAD*.parquet" --recursive

With the full table dump available in the data-full folder, we now use the HoodieDeltaStreamer utility on the EMR cluster to populate the Hudi dataset on Amazon S3.

  1. Run the following command to populate the Hudi dataset to the hudi folder in the same S3 bucket (replace <s3-bucket-name> with the name of the S3 bucket created by the CloudFormation stack):
    spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
      --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
      --master yarn --deploy-mode cluster \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
      --table-type COPY_ON_WRITE \
      --source-ordering-field dms_received_ts \
      --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-full.properties \
      --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
      --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \
      --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
        --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
      --enable-hive-sync
    

The preceding command runs a Spark job that runs the HoodieDeltaStreamer utility. For more information about the parameters used in this command, see Writing Hudi Tables.

When the Spark job is complete, you can navigate to the AWS Glue console and find a table called retail_transactions created under the hudiblogdb database. The input format for the table is org.apache.hudi.hadoop.HoodieParquetInputFormat.

Next, we query the data and look at the data in the retail_transactions table in the catalog.

  1. In the Systems Manager session established earlier, run the following command (make sure that you have completed all the prerequisites for the post, including adding IAMAllowedPrincipals as a database creator in Lake Formation):
    spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" \
    --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
    --jars /usr/lib/hudi/hudi-spark-bundle_2.11-0.5.2-incubating.jar,/usr/lib/spark/external/lib/spark-avro.jar
    

  2. Run the following query on the retail_transactions table:
    spark.sql("Select * from hudiblogdb.retail_transactions order by tran_id").show()

You should see the same data in the table as the MySQL database with a few columns added by the HoodieDeltaStreamer process.

We now run some DML statements on our MySQL database and take these changes through to the Hudi dataset.

  1. Run the following DML statements on the MySQL database:
    insert into dev.retail_transactions values(15,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    update dev.retail_transactions set store_city='SPRINGFIELD' where tran_id=12;
    delete from dev.retail_transactions where tran_id=2;

In a few minutes, you see a new .parquet file created under dmsdata/dev/retail_transactions folder in the S3 bucket.

  1. Run the following command on the EMR cluster to get the incremental data to the Hudi dataset (replace <s3-bucket-name> with the name of the S3 bucket created by the CloudFormation template):
    spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
      --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
      --master yarn --deploy-mode cluster \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
      --table-type COPY_ON_WRITE \
      --source-ordering-field dms_received_ts \
      --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-incremental.properties \
      --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
      --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \
      --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
        --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
      --enable-hive-sync \
    --checkpoint 0

The key difference between this command and the previous one is in the properties file that was used as an argument to the –-props and --checkpoint parameters. For the earlier command that performed the full load, we used dfs-source-retail-transactions-full.properties; for the incremental one, we used dfs-source-retail-transactions-incremental.properties. The differences between these two property files are:

  • The location of source data changes between full and incremental data in Amazon S3.
  • The SQL transformer query included a hard-coded Op field for the full load task, because an AWS DMS first-time full load doesn’t include the Op field for Parquet datasets. The Op field can have values of I, U, and D—for Insert, Update and Delete indicators.

We cover the details of the --checkpoint parameter in the Considerations when deploying to production section later in this post.

  1. When the job is complete, run the same query in spark-shell.

You should see these updates applied to the Hudi dataset.

You can use the Hudi CLI to administer Hudi datasets to view information about commits, the filesystem, statistics, and more.

  1. To do this, in the Systems Manager session, run the following command:
    /usr/lib/hudi/cli/bin/hudi-cli.sh

  2. Inside the Hudi-cli, run the following command (replace the <s3-bucket-name> with the S3 bucket created by the Cloud Formation stack):
    connect --path s3://<s3-bucket-name>/hudi/retail_transactions

  3. To inspect commits on your Hudi dataset, run the following command:
    commits show

You can also query incremental data from the Hudi dataset. This is particularly useful when you want to take incremental data for downstream processing like aggregations. Hudi provides multiple ways of pulling data incrementally which is documented here. An example of how to use this feature is available in the Hudi Quick Start Guide.

Considerations when deploying to production

The preceding setup showed an example of how to build a CDC pipeline from your relational database to your Amazon S3-based data lake. However, if you want to use this solution for production, you should consider the following:

  • To ensure high availability, you can set up the AWS DMS instance in a Multi-AZ configuration.
  • The CloudFormation stack deployed the required properties files needed by the deltastreamer utility into the S3 bucket at s3://<s3-bucket-name>/properties/. You may need to customize these based on your requirements. For more information, see Configurations. There are a few parameters that may need your attention:
    • deltastreamer.transformer.sql – This property exposes an extremely powerful feature of the deltastreamer utility: it enables you to transform data on the fly as it’s being ingested and persisted in the Hudi dataset. In this post, we have shown a basic transformation that casts the tran_date column to a string, but you can apply any transformation as part of this query.
    • parquet.small.file.limit – This field is in bytes and a critical storage configuration specifying how Hudi handles small files on Amazon S3. Small files can happen due to the number of records being processed in each insert per partition. Setting this value allows Hudi to continue to treat inserts in a particular partition as updates to the existing files, causing files that are up to the size of this small.file.limit to be rewritten and keep growing in size.
    • parquet.max.file.size – This is the max file size of a single Parquet in your Hudi dataset, after which a new file is created to store more data. For Amazon S3 storage and data querying needs, we can keep this around 256 MB–1 GB (256x1024x1024 = 268435456).
    • [Insert|Upsert|bulkinsert].shuffle.parallelism – In this post, we dealt with a small dataset of few records only. However, in real-life situations, you might want to bring in hundreds of millions of records in the first load, and then incremental CDC can potentially be in millions per day. There is a very important parameter to set when you want quite predictable control on the number of files in each of your Hudi dataset partitions. This is also needed to ensure you don’t hit an Apache Spark limit of 2 GB for data shuffle blocks when processing large amounts of data. For example, if you plan to load 200 GBs of data in first load and want to keep file sizes of approximately 256 MB, set the shuffle parallelism parameters for this dataset as 800 (200×1024/256). For more information, see Tuning Guide.
  • In the incremental load deltastreamer command, we used an additional parameter: --checkpoint 0. When deltastreamer writes a Hudi dataset, it persists checkpoint information in the .commit files under the .hoodie folder. It uses this information in subsequent runs and only reads that data from Amazon S3, which is created after this checkpoint time. In a production scenario, after you start the AWS DMS task, the task keeps writing incremental data to the target S3 folder as soon as the full load is complete. In the steps that we followed, we ran a command on the EMR cluster to manually move the full load files to another folder and process the data from there. When we did that, the timestamp associated with the S3 objects changes to the most current timestamp. If we run the incremental load without the checkpoint argument, deltastreamer doesn’t pick up any incremental data written to Amazon S3 before we manually moved the full load files. To make sure that all incremental data is processed by deltastreamer the first time, set the checkpoint to 0, which makes it process all incremental data in the folder. However, only use this parameter for the first incremental load and let deltastreamer use its own checkpointing methodology from that point onwards.
  • For this post, we ran the spark-submit command manually. However, in production, you can run it as a step on the EMR cluster.
  • You can either schedule the incremental data load command to run at a regular interval using a scheduling or orchestration tool, or run it in a continuous fashion at a certain frequency by passing additional parameters to the spark-submit command --min-sync-interval-seconds XX –continuous, where XX is the number of seconds between each run of the data pull. For example, if you want to run the processing every 5 minutes, replace XX with 300.

Cleaning up

When you are done exploring the solution, complete the following steps to clean up the resources deployed by CloudFormation:

  1. Empty the S3 bucket created by the CloudFormation stack
  2. Delete any Amazon EMR log files generated under s3://<EMR-Logs-S3-Bucket> /HudiBlogEMRLogs/.
  3. Stop the AWS DMS task Hudiblogload.
  4. Delete the CloudFormation stack.
  5. Delete any Amazon RDS for MySQL database snapshots retained after the CloudFormation template is deleted.

Conclusion

More and more data lakes are being built on Amazon S3, and these data lakes often need to be hydrated with change data from transactional systems. Handling deletes and upserts of data into the data lake using traditional methods involves a lot of heavy lifting. In this post, we saw how to easily build a solution with AWS DMS and HoodieDeltaStreamer on Amazon EMR. We also looked at how to perform lightweight record-level transformations when integrating data into the data lake, and how to use this data for downstream processes like aggregations. We also discussed the important settings and command line options that were used and how you could modify them to suit your requirements.


About the Authors

Ninad Phatak is a Senior Analytics Specialist Solutions Architect with Amazon Internet Services Private Limited. He specializes in data engineering and datawarehousing technologies and helps customers architect their analytics use cases and platforms on AWS.

 

 

 

Raghu Dubey is a Senior Analytics Specialist Solutions Architect with Amazon Internet Services Private Limited. He specializes in Big Data Analytics, Data warehousing and BI and helps customers build scalable data analytics platforms.

 

 

 

 

Automating EMR workloads using AWS Step Functions

Post Syndicated from Afsar Jahangir original https://aws.amazon.com/blogs/big-data/automating-emr-workloads-using-aws-step-functions/

Amazon EMR allows you to process vast amounts of data quickly and cost-effectively at scale. Using open-source tools such as Apache Spark, Apache Hive, and Presto, and coupled with the scalable storage of Amazon Simple Storage Service (Amazon S3), Amazon EMR gives analytical teams the engines and elasticity to run petabyte-scale analysis for a fraction of the cost of traditional on-premises clusters. Developers and analysts can use Jupyter-based Amazon EMR notebooks for iterative development, collaboration, and access to data stored across AWS data products.

What happens if you have Amazon EMR code that needs to run automatically on a regular basis? Maybe the job only runs when for certain events, like new data arriving in Amazon S3. Or maybe you want to run a job every Friday afternoon at 2:00 PM. What if there is a multiple step process?

To run Amazon EMR workloads on a schedule, you can automate everything with AWS Step Functions. This post walks through how to use Step Functions state machines and the callback pattern to automate EMR jobs. You can download the code examples from the GitHub repo.

Prerequisites

To follow along with this walkthrough, you must have the following:

Solution overview

For this use case, I want to run two applications on my EMR cluster. The start of the second application depends on the successful completion and output of the first. At a high level, I want to launch an EMR cluster automatically, run the code, and remove the cluster. Specifically, when the first program successfully completes, I want to run the second program.

At the conclusion of the second application, in some cases I may want to run both programs multiple times (with different dataset sizes, perhaps). I need a way to decide to run the process again with the same cluster. Whether the steps succeed or fail, at the conclusion, I always want to delete the CloudFormation stack that contains my EMR cluster to reduce cost. The following diagram illustrates this high-level overview of the pipeline operation.

Workflow details

I run two programs, and I need the first program to complete before running the second one. I optionally want to repeat those two programs with different datasets to get the final state of the data. To orchestrate the jobs, I can run through the same steps multiple times with the same active EMR cluster.

To facilitate automating the pipeline, I use an inner state machine to check the cluster status and submit EMR job steps. I then wrap that inner state machine in an outer state machine. The outer state machine starts the cluster and submits information to the inner state machine. It waits for all steps to complete, then deletes the EMR cluster.

The following flow chart illustrates the steps and checkpoints that make up the pipeline.

Deploying the pipeline state machines

To simplify pipeline deployment, I use AWS SAM, an open-source framework for building serverless applications. AWS SAM provides a single deployment configuration, extensions to CloudFormation templates, built-in best practices, and local debugging and testing. You can use AWS SAM with a suite of AWS tools for building serverless applications. For more information, see What Is the AWS Serverless Application Model (AWS SAM)?

Initiating the application

Navigate to the path where you want to download the files and initiate the AWS SAM application. I want to run the code from my local machine and have created the following location:

~/Documents/projects/blog/steppipeline/automation-ml-step-data-pipeline

From this directory, I initialize the application using sam init. This connects to the repository and downloads the files for creation of the ML pipeline. See the following code:

sam init -l https://github.com/aws-samples/automation-ml-step-data-pipeline.git

Creating the S3 bucket and moving dependencies

For this post, I orchestrate an existing process from the post Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR, which runs on Amazon EMR. The pipeline reads code artifacts from Amazon S3, where the EMR cluster has read permission. There are two programs: kmeansandey.py and kmeanswsssey.py.

First, create the bucket from the command line using the aws s3 mb command and upload the code. Your bucket name must be globally unique:

aws s3 mb s3://<your bucket name>

Move the artifacts to your bucket, replacing <your bucket name> with your bucket name:

aws s3 cp sample_ml_code/kmeansandey.py s3://<your bucket name>/testcode/kmeansandey.py
aws s3 cp sample_ml_code/kmeanswsssey.py s3://<your bucket name>/testcode/kmeanswsssey.py
aws s3 cp emr/bootstrapactions.sh s3://<your bucket name>/emr-bootstrap-scripts/bootstrapactions.sh
aws s3 cp emr/emr-cluster-config.json s3://<your bucket name>/emr-cluster-config.json
aws s3 cp emr/emr-cluster-sample.yaml s3://<your bucket name>/emr-cluster-sample.yaml

Deploying the application

Deploy the build artifacts to the AWS Cloud using the following code:

sam deploy --guided

AWS SAM prompts you for the parameters that you need to build and deploy the application. I have provided some default values where possible.

The final output of your deployment process should indicate that all stacks were built:

Successfully created/updated stack - step-pipeline in us-east-1

After deployment, you receive an email to confirm your subscription. Choose the confirmation link in the email to receive pipeline notifications.

Submitting a workload to your Step Functions state machine

To create a cluster and submit EMR jobs, the outer state machine needs a JSON payload. This contains the location of the programs in Amazon S3, the Amazon EMR CloudFormation template, and the parameter files used to launch the EMR cluster.

Creating an Amazon EC2 key pair

To use the same sample programs and EMR cluster template that you used to test your pipeline, you need to use an Amazon EC2 key pair for SSH credentials. When you create a cluster, you can specify the Amazon Elastic Compute Cloud (Amazon EC2) key pair to use for SSH connections to all cluster instances. The name of the keypair for this cluster is referenced in the emr-cluster-config.json file. See the following code:

<snip>
  {
    "ParameterKey": "Keyname",
    "ParameterValue": "emrcluster-launch"
  },
</snip>

To use the example as-is with the parameters unchanged, create an Amazon EC2 key pair on the AWS Management Console or AWS Command Line Interface (AWS CLI).

  1. On the Amazon EC2 console, under Network & Security, choose Key Pairs.
  2. On the Key Pairs page, choose Create Key Pair.
  3. For Key pair name, enter emrcluster-launch.
  4. Choose Create.
  5. When the console prompts you to save the private key file, save it in a safe place.

This is the only chance for you to save the private key file.

Inputting JSON for launching the pipeline

The simplest way for you to run the pipeline is to use the Start execution feature on the Step Functions console. The console gives you full functionality to initiate the function and submit a payload. In the example test_input.json, update the bucket values, security group, and subnet with the information for your account:

{  
    "ModelName": "PipelineTest_01",  
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "EMRCloudFormation": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-sample.yaml",  
    "EMRParameters": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-config.json",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "SecurityGroup": "<your security group>",  
    "SubNet": "<your subnet>",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"]
}

The payload includes the following information:

  • ModelName – A short descriptive identifier used to identify the transient EMR cluster created during this process. This name shows on the Amazon EMR console for easy identification.
  • ModelProgram – The Amazon S3 URL location of the program that runs when the model initiates on the EMR cluster (step 3).
  • PreProcessingProgram – The Amazon S3 URL location of the program that runs when preprocessing initiates on the EMR cluster (step 2).
  • EMRCloudFormation – The S3 bucket HTTPS location of the CloudFormation template for launching the transient EMR cluster.
  • EMRParameters – The Amazon S3 HTTPS location of the parameter file supporting the Amazon EMR CloudFormation template.
  • JobInput – The Amazon S3 URL location of the input data for the preprocessing program.
  • SecurityGroup – The security group with ingress and egress rules for the launched EMR cluster
  • SubNet – The subnet identifier where you place your EMR cluster.
  • ClusterSize – Denotes the number of EMR cluster nodes to run the job and can be changed based on the compute need. I use 4 nodes as the input value for the sample program.
  • ProcessingMode – This is an array of values. The pipeline runs steps 2 and 3 for each value in the array. The value is passed into the program unchanged and can be used to internally control how the program runs. For this use case, it runs a single time on the small dataset.

Opening the Step Functions Executions page

On the Step Functions console, choose MLStateMachine. This is the outer state machine. On the detail page for the outer state machine, choose Start execution.

Entering your payload

On the New execution page, enter the JSON for your pipeline based on the example test_input.json. Choose Start execution.

Reviewing the workflow as it runs

You can see the pipeline running in the visual workflow and review the event history on the detail page. The following diagram shows the state machine definition used:

Diving into the pipelines details

There are four processes that run in the outer state machine pipeline:

  1. Step 1 launches an EMR cluster using the CloudFormation template. The AWS Lambda function downloads the template and parameter file from the specified Amazon S3 location and initiates the stack build.
  2. When the EMR cluster is ready, step 2 initiates the first set of code against the newly created EMR cluster, passing in the remaining parameters to the inner state machine. It adds the stack id, EMR cluster id, and status to the payload. These values are obtained from the output of the CloudFormation stack. See the following code:
    "ModelName": "PipelineTest_01",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"],
    "StackId": "arn:aws:cloudformation:us-east-1:575444587305:stack/PipelineTest01-auto-emr-02142020041612/bc5fd7a0-4ee0-11ea-a395-0e4c53e0aefd",
    "Status": "CREATE_COMPLETE",
    "ClusterId": "j-MF6LWBLJZ88K"

The code contains the following information:

  • ModelName is used in the EMR cluster name to make it easier to identify in the console and AWS CLI output.
  • PreProcessingProgram in our use case points to the first code step (py). The code is passed through the first state machine and submitted to the second state machine and Amazon EMR.
  • JobInput, ClusterSize, ClusterId, StackId, and ProcessingMode are passthrough values that the program needs to run.

The step initiates the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx, which engages the inner state machine asynchronously to run a high-level process of checking the cluster, adding a step, checking to see if the step is complete, and exiting back to the outer state machine when complete.

  1. Next, the outer state machine runs the Model program code (step 3) by submitting it to the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx to engage the inner state machine for the second set of code (py). The process is the same as step 2 but the code it runs is from a different file and the output from the preprocessing step becomes the input for the step. See the following code:
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",

When the inner state machine is complete, it moves to a step that removes the first value from the ProcessingMode array. For this use case, there is only one value (TRAINING), which is removed, leaving the array empty. The next step in the state machine looks for remaining values; if there are none, it marks all steps as complete and moves to Delete EMR cluster.

  1. The final step in the outer state machine is to remove the EMR cluster. The Delete EMR cluster step passes the CloudFormation stack ID into lambda/delete_cfn_stack.py, initiating the deletion of the stack and cleaning up all the resources.

The output of the test programs is stored in Amazon S3 in two folders under the pipeline artifacts. The preprocessing folder contains data that is used to drive the output in the model folder. The following screenshot shows the folders in Amazon S3.

Conclusion

The Step Functions workflow in this post is a repeatable two-step pipeline. It starts an EMR cluster, runs a program that outputs data, and initiates a second program that depends on the previous job finishing. It then deletes all resources automatically.

You can adapt the workflow to respond to Amazon S3 events, a message received in a queue, a file checked into a code repository, or a schedule. Any event that can invoke Lambda can initiate the pipeline. For more information, see Invoking AWS Lambda functions.

You can download the example code from the GitHub repo and adapt it for your use case. Let me know in the comments what you built for your environment.


About the Authors

Mohammed “Afsar ” Jahangir Ali is a Senior Big Data Consultant with Amazon since January 2018. He is a data enthusiast helping customers shape their data lakes and analytic journeys on AWS.In his spare time, he enjoys taking pictures, listening to music, and spend time with family.

 

 

 

Wendy Neu has worked as a Data Architect with Amazon since January 2015. Prior to joining Amazon, she worked as a consultant in Cincinnati, OH helping customers integrate and manage their data from different unrelated data sources.

 

 

 

Implementing LDAP authentication for Hive on a multi-tenant Amazon EMR cluster

Post Syndicated from Kiran Erra original https://aws.amazon.com/blogs/big-data/implementing-ldap-authentication-for-hive-on-a-multi-tenant-amazon-emr-cluster/

As Amazon EMR continues its widespread adoption, it’s important to enforce separation of duties using role-based access when submitting your hive jobs on EMR clusters in multi-tenant environments. In this post, we walk through the steps to set up authentication for Hive using Lightweight Directory Access Protocol (LDAP) and Microsoft Active Directory Domain Controller.

Solution overview

In a multi-tenant environment, it’s critical to enforce role-based access when submitting Hive jobs to an EMR cluster. Although you may add Hive steps to an existing cluster, such a setup doesn’t enforce role-based access, because Amazon EMR steps are always submitted using the default Hive user. The default way of submitting a Hive job to an EMR cluster is by using the Add Step functionality. This post outlines the process by which you can enforce EMRFS role mappings when an active directory user submits a Hive job after authenticating via LDAP and Microsoft Active Directory Domain Controller. The following diagram illustrates the provisioned infrastructure from AWS CloudFormation.

The following AWS services are used as part of the recommended solution:

  • AWS Secrets ManagerAWS Secrets Manager helps you protect secrets needed to access your applications, services, and IT resources. The service enables you to easily rotate, manage, and retrieve database credentials, API keys, and other secrets throughout their lifecycle.
  • Amazon EMR – Amazon EMR makes it easy to process large amounts of data efficiently. Amazon EMR uses Hadoop processing combined with several AWS products to do tasks such as web indexing, data mining, log file analysis, machine learning, scientific simulation, and data warehousing.
  • Amazon EC2Amazon Elastic Compute Cloud (Amazon EC2) provides secure, resizable compute capacity in the cloud. It’s designed to make web-scale cloud computing easier for developers.

In our solution (as we discuss it in this post), the corporate user base is maintained in the Microsoft Active Directory Domain Controller. The EMR cluster is integrated with AD using a bootstrap action so that you can securely submit Hive jobs using a beeline by establishing an LDAP connection from an edge node (represented by an EC2 instance). The user credentials are stored in and fetched from Secrets Manager, when establishing the beeline connection.

Prerequisites

Before getting started, you must have the following prerequisites:

  • Microsoft Active Directory Domain Controller needs to be installed and set up. For a quick setup of Microsoft Active Directory Domain Controller and VPC, see the step Launch and configure an Active Directory domain controller in the Deploying each component individually section of the post Implement perimeter security in Amazon EMR using Apache Knox.
  • A valid AWS account with access to AWS services.
  • An Amazon VPC with a public subnet.
  • An AWS Identity and Access Management (IAM) policy for Secrets Manager permissions.

Implementing the solution

We provide the CloudFormation template in this post as a general guide. Please review and customize it as needed. You should also be aware that some of the resources deployed by this stack incur costs when they remain in use. The CloudFormation template has the following steps:

  1. Start an EMR cluster with the configuration from the parameters.
  2. Integrate the EMR cluster with AD using a bootstrap action.
  3. Create and launch an EC2 instance to test the integration.
  4. Add an inbound rule to the Amazon EMR primary additional security group to allow port 10000 on the newly launched EC2 instance.

This section describes how to use the Cloud Formation templates to launch an EMR cluster with the following parameters:

ParameterDefault ValueDescription
ClusterNameemr-ldap4hiveThe name of the cluster.
CoreInstanceTypem4.xlargeThe instance type of the nodes.
CoreNodeCount2The number of nodes in the cluster.
CreateLogBucketFALSEA Boolean flag to see if we need to set up a bucket for logs.
KeyPairKey pair used to log in to the EC2 instance for validation.
MasterInstanceTypem4.xlargeThe instance type of the nodes.
ReleaseLabelemr-6.0.0Amazon EMR version. This template is tested with emr-6.0.0 or emr-5.29.0.
RemoteAccessCIDRThe CIDR range to access Amazon EMR. This is usually the same as the IP address of the local machine.
VPCIDVPC ID used in Amazon EMR configuration. Make sure you select a public VPC.
SubnetIdSubnet ID used in Amazon EMR configuration. Make sure you select the subnet that belongs to the VPC selected.
ldapurlThe LDAP URL of the AD domain controller, in the format ldap://<Private IP of AD domain controller>:389. Please refer to the first item in the Prerequisites section.
passwd4awsadmin[email protected]The AD admin password. Must be at least eight characters containing letters, numbers, and symbols.
EC2 AMIami-0ac80df6eff0e70b5The AMI used to create the EC2 instance for validation.
My IPThe IP address of the local machine.

The following screenshot shows the Specify stack details page when launching your template.

A bootstrap script ldap-bootstrap.sh is invoked during the cluster creation to perform the following actions:

  • Fetch the login credentials for the Active Directory domain admin from Secrets Manager
  • Perform the realm join using the credentials fetched
  • Enable password-based authentication to the cluster

To deploy the template into your account, choose Launch Stack:

The following screenshot shows the EMR cluster the Cloud Formation stack created.

Validating the solution

To validate the solution, SSH to the Ubuntu EC2 instance using the EC2 key pair, as shown in the following screenshot. Refer to the Outputs tab from your AWS CloudFormation stack.

For this post, we used the Ubuntu Server 18.04 LTS (HVM), SSD Volume Type – ami-07ebfd5b3428b6f4d (64-bit x86) / ami-0400a1104d5b9caa1 (64-bit Arm) AMI.

You should see the Python Hive beeline script in /home/ubuntu:

Run demo-hive-beeline.py as shown in the following screenshot. This Python script fetches the AD credentials from Secrets Manager, establishes a beeline connection for Hive on Amazon EMR, submits Hive commands to create an external table for the NYC taxi dataset located in your Amazon Simple Storage Service (Amazon S3) bucket, and runs a sample select statement on the table.

The script has the following parameters:

  • -r or –region_name – AWS Region
  • -s or –secret-id – Secret ARN
  • -h or –host-name – Amazon EMR public DNS address

Cleaning up

Delete the CloudFormation stack to clean up all the resources created in this post. Also, stop the EC2 Ubuntu instance that you created in the verification step. If you used the nested stack, AWS CloudFormation deletes all resources in one operation. If you deployed the templates individually, delete them in the reverse order of creation, deleting the VPC stack last.

Conclusion

In this post, we went through the setup and validation of LDAP authentication for Hive using an EMR cluster. This decouples the authentication mechanism from Hive and Amazon EMR and leverages the system of record using LDAP and Active Directory Domain Controller.


About the authors

Kiran Erra is a data architect with AWS. He works with AWS customers to provide guidance and technical assistance about Big Data, AI/ML and Security projects, helping them improve the value of their solutions when using AWS.

 

 

 

Rajarao Vijjapu is a security data architect with AWS. He works with AWS customers and partners to provide guidance and technical assistance about Big Data, Analytics, AI/ML and Security projects, helping them improve the value of their solutions when using AWS.

 

 

 

Amazon EMR supports Apache Hive ACID transactions

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/amazon-emr-supports-apache-hive-acid-transactions/

Apache Hive is an open-source data warehouse package that runs on top of an Apache Hadoop cluster. You can use Hive for batch processing and large-scale data analysis. Hive uses Hive Query Language (HiveQL), which is similar to SQL.

ACID (atomicity, consistency, isolation, and durability) properties make sure that the transactions in a database are atomic, consistent, isolated, and reliable.

Amazon EMR 6.1.0 adds support for Hive ACID transactions so it complies with the ACID properties of a database. With this feature, you can run INSERT, UPDATE, DELETE, and MERGE operations in Hive managed tables with data in Amazon Simple Storage Service (Amazon S3). This is a key feature for use cases like streaming ingestion, data restatement, bulk updates using MERGE, and slowly changing dimensions.

This post demonstrates how to enable Hive ACID transactions in Amazon EMR, how to create a Hive transactional table, how it can achieve atomic and isolated operations, and the concepts, best practices, and limitations of using Hive ACID in Amazon EMR.

Enabling Hive ACID in Amazon EMR

To enable Hive ACID as the default for all Hive managed tables in an EMR 6.1.0 cluster, use the following hive-site configuration:

[
   {
      "classification": "hive-site",
      "properties": {
         "hive.support.concurrency": "true",
         "hive.exec.dynamic.partition.mode": "nonstrict",
         "hive.txn.manager": "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"
      }
   }
]

For the complete list of configuration parameters related to Hive ACID and descriptions of the preceding parameters, see Hive Transactions.

Hive ACID use case

In this section, we explain the Hive ACID transactions with a straightforward use case in Amazon EMR.

Enter the following Hive command in the master node of an EMR cluster (6.1.0 release) and replace <s3-bucket-name> with the bucket name in your account:

hive --hivevar location=<s3-bucket-name> -f s3://aws-bigdata-blog/artifacts/hive-acid-blog/hive_acid_example.hql 

After Hive ACID is enabled on an Amazon EMR cluster, you can run the CREATE TABLE DDLs for Hive transaction tables.

To define a Hive table as transactional, set the table property transactional=true.

The following CREATE TABLE DDL is used in the script that creates a Hive transaction table acid_tbl:

CREATE TABLE acid_tbl (key INT, value STRING, action STRING)
PARTITIONED BY (trans_date DATE)
CLUSTERED BY (key) INTO 3 BUCKETS
STORED AS ORC
LOCATION 's3://${hivevar:location}/acid_tbl' 
TBLPROPERTIES ('transactional'='true');

This script generates three partitions in the provided Amazon S3 path. See the following screenshot.

The first partition, trans_date=2020-08-01, has the data generated as a result of sample INSERT, UPDATE, DELETE, and MERGE statements. We use the second and third partitions when explaining minor and major compactions later in this post.

ACID is achieved in Apache Hive using three types of files: base, delta, and delete_delta. Edits are written in delta and delete_delta files.

The base file is created by the Insert Overwrite Table query or as the result of major compaction over a partition, where all the files are consolidated into a single base_<write id> file, where the write ID is allocated by the Hive transaction manager for every write. This helps achieve isolation of Hive write queries and enables them to run in parallel.

The INSERT operation creates a new delta_<write id>_<write id> directory.

The DELETE operation creates a new delete_delta_<write id>_<write id> directory.

To support deletes, a unique row__id is added to each row on writes. When a DELETE statement runs, the corresponding row__id gets added to the delete_delta_<write id>_<write id> directory, which should be ignored on reads. See the following screenshot.

The UPDATE operation creates a new delta_<write id>_<write id> directory and a delete<write id>_<write id> directory.

The following screenshot shows the second partition in Amazon S3, trans_date=2020-08-02.

A Hive transaction provides snapshot isolation for reads. When an application or query reads the transaction table, it opens all the files of a partition/bucket and returns the records from the last transaction committed.

Hive compactions

With the previously mentioned logic for Hive writes on a transactional table, many small delta and delete_delta files are created, which could adversely impact read performance over time because each read over a particular partition has to open all the files (including delete_delta) to eliminate the deleted rows.

This brings the need for a compaction logic for Hive transactions. In the following sections, we use the same use case to explain minor and major compactions in Hive.

Minor compaction

A minor compaction merges all the delta and delete_delta files within a partition or bucket to a single delta_<start write id>_<end write id> and delete_delta_<start write id>_<end write id> file.

We can trigger the minor compaction manually for the second partition (trans_date=2020-08-02) in Amazon S3 with the following code:

ALTER TABLE acid_tbl PARTITION (trans_date='2020-08-02') COMPACT 'minor';

If you check the same second partition in Amazon S3, after a minor compaction, it looks like the following screenshot.

You can see all the delta and delete_delta files from write ID 0000005–0000009 merged to single delta and delete_delta files, respectively.

Major compaction

A major compaction merges the base, delta, and delete_delta files within a partition or bucket to a single base_<latest write id>. Here the deleted data gets cleaned.

A major compaction is automatically triggered in the third partition (trans_date='2020-08-03') because the default Amazon EMR compaction threshold is met, as described in the next section. See the following screenshot.

To check the progress of compactions, enter the following command:

hive> show compactions;

The following screenshot shows the output.

Compaction in Amazon EMR

Compaction is enabled by default in Amazon EMR 6.1.0. The following property determines the number of concurrent compaction tasks:

  • hive.compactor.worker.threads – Number of worker threads to run in the instance. The default is 1 or vCores/8, whichever is greater.

Automatic compaction is triggered in Amazon EMR 6.1.0 based on the following configuration parameters:

  • hive.compactor.check.interval – Time period in seconds to check if any partition requires compaction. The default is 300 seconds.
  • hive.compactor.delta.num.threshold – Triggers minor compaction when the total number of delta files is greater than this value. The default is 10.
  • hive.compactor.delta.pct.threshold – Triggers major compaction when the total size of delta files is greater than this percentage size of base file. The default is 0.1, or 10%.

Best practices

The following are some best practices when using this feature:

  • Use an external Hive metastore for Hive ACID tables – Our customers use EMR clusters for compute purposes and Amazon S3 as storage for cost-optimization. With this architecture, you can stop the EMR cluster when the Hive jobs are complete. However, if you use a local Hive metastore, the metadata is lost upon stopping the cluster, and the corresponding data in Amazon S3 becomes unusable. To persist the metastore, we strongly recommend using an external Hive metastore like an Amazon RDS for MySQL instance or Amazon Aurora. Also, if you need multiple EMR clusters running ACID transactions (read or write) on the same Hive table, you need to use an external Hive metastore.
  • Use ORC format – Use ORC format to get full ACID support for INSERT, UPDATE, DELETE, and MERGE statements.
  • Partition your data – This technique helps improve performance for large datasets.
  • Enable an EMRFS consistent view if using Amazon S3 as storage – Because you have frequent movement of files in Amazon S3, we recommend using an EMRFS consistent view to mitigate the issues related to the eventual consistency nature of Amazon S3.
  • Use Hive authorization – Because Hive transactional tables are Hive managed tables, to prevent users from deleting data in Amazon S3, we suggest implementing Hive authorization with required privileges for each user.

Limitations

Keep in mind the following limitations of this feature:

  • The AWS Glue Data Catalog doesn’t support Hive ACID transactions.
  • Hive external tables don’t support Hive ACID transactions.
  • Bucketing is optional in Hive 3, but in Amazon EMR 6.1.0 (as of this writing), if the table is partitioned, it needs to be bucketed. You can mitigate this issue in Amazon EMR 6.1.0 using the following bootstrap action:
    --bootstrap-actions '[{"Path":"s3://aws-bigdata-blog/artifacts/hive-acid-blog/make_bucketing_optional_for_hive_acid_EMR_6_1.sh","Name":"Set bucketing as optional for Hive ACID"}]'

Conclusion

This post introduced the Hive ACID feature in EMR 6.1.0 clusters, explained how it works and its concepts with a straightforward use case, described the default behavior of Hive ACID on Amazon EMR, and offered some best practices. Stay tuned for additional updates on new features and further improvements in Apache Hive on Amazon EMR.


About the Authors

Suthan Phillips is a big data architect at AWS. He works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

 

 

 

 

Chao Gao is a Software Development Engineer at Amazon EMR. He mainly works on Apache Hive project at EMR, and has some in-depth knowledge of distributed database and database internals. In his spare time, he enjoys making roadtrips, visiting all the national parks and traveling around the world.

Build a self-service environment for each line of business using Amazon EMR and AWS Service Catalog

Post Syndicated from Tanzir Musabbir original https://aws.amazon.com/blogs/big-data/build-a-self-service-environment-for-each-line-of-business-using-amazon-emr-and-aws-service-catalog/

Enterprises often want to centralize governance and compliance requirements, and provide a common set of policies on how Amazon EMR instances should be set up. You can use AWS Service Catalog to centrally manage commonly deployed Amazon EMR cluster configurations, and this helps you achieve consistent governance and meet your compliance requirements, while at the same time enabling your end users to quickly deploy only the approved EMR cluster configurations on a self-service basis.

In this post, we will demonstrate how enterprise administrators can use AWS Service Catalog to create and manage catalogs, that data engineers and data scientists use to quickly discover and deploy clusters using a self-service environment. With AWS Service Catalog you can control which EMR release versions are available, cluster configuration, and permission access by individual, group, department, or cost center.

The following are a few key AWS Service Catalog concepts:

  • An AWS Service Catalog product is a blueprint for building the AWS resources that you want available for deployment. You create your products by importing AWS CloudFormation templates.
  • A portfolio is a collection of products. With AWS Service Catalog, you can create a customized portfolio for each type of user in your organization and selectively grant access to the appropriate portfolio.
  • A provisioned product is a collection of resources that result from instantiating an AWS CloudFormation

Use cases

You can use AWS Service Catalog to provide Amazon EMR as a self-serve Extract, Transform, Load (ETL) platform at scale while hiding all the security and network configurations from end users.

As an administrator in AWS Service Catalog, you can create one or more Service Catalog products that define different configurations to be used for EMR clusters. In those Service Catalog products, you can define the security and network configurations to be used for the EMR cluster, you can define auto-scaling rules, instance configurations, different purchase options, or you can preconfigure EMR to run different EMR Step jobs. On the other hand, as a user in Service Catalog, you can browse through different EMR templates through Service Catalog products and provision the product based on your requirement. By following this approach, you can make your EMR usage self-serviceable, reduce the EMR learning curve for your users, and ensure adherence to security standards and best practices.

The following image illustrates how the interactions look between Amazon EMR administrators and end-users when using AWS Service Catalog to provision EMR clusters.

The use cases in this post have three AWS Identity and Access Management (IAM) users with different access permissions:

  • emr-admin: This user is the administrator and has access to all the resources. This user creates EMR clusters for their end-users based on their requirements.
  • emr-data-engineer: The data engineer uses Spark and Hive most of the time. They run different ETL scripts on Hive and Spark to process, transform, and enrich their datasets.
  • emr-data-analyst: This user is very familiar with SQL and mostly uses Hue to submit queries to Hive.

You can solve several Amazon EMR operational use cases using AWS Service Catalog. The following sections discuss three different use cases. Later in this post, you walk through each of the use cases with a solution.

Use case 1: Ensuring least privilege and appropriate access

The administrator wants to enforce a few organizational standards. The first one is no default EMR_EC2_ROLE for any EMR cluster. Instead, the administrator wants to have a role that has limited access to Amazon Simple Storage Service (Amazon S3) and assigns that role automatically every time an EMR cluster is launched. Second, end-users sometimes forget to add appropriate tags to their resources. Because of that, often times it is hard for the administrator to identify their resources and allocate cost appropriately. So, the administrator wants to have a mechanism that assigns tags to EMR clusters automatically when they launch.

Use case 2: Providing Amazon EMR as a self-serve ETL platform with Spark and Hive

Data engineers use Spark and Hive applications, and they prefer to have a platform where they just submit their jobs without spending time creating the cluster. They also want to try out different Amazon EMR versions to see how their jobs run on different Spark or Hive versions. They don’t want to spend time learning AWS or Amazon EMR. Additionally, the administrator doesn’t want to give full Amazon EMR access to all users.

Use case 3: Automatically scaling the Hive cluster for analysts

Data analysts have strong SQL backgrounds, so they typically use Hue to submit their Hive queries. They run queries against a large dataset, so they want to have an EMR cluster that can scale when needed. They also don’t have access to the Amazon EMR console and don’t know how to configure automatic scaling for Amazon EMR.

Solution overview

Service Catalog, self-serve your Amazon EMR users, enforce best practices and compliance, and speed up the adoption process.

At a high level, the solution includes the following steps:

  1. Configuring the AWS environment to run this solution.
  2. Creating a CloudFormation template.
  3. Setting up AWS Service Catalog products and portfolios.
  4. Managing access to AWS Service Catalog and provisioning products.
  5. Demonstrating the self-service Amazon EMR platform for users.
  6. Enforcing best practices and compliance through AWS Service Catalog.
  7. Executing ETL workloads on Amazon EMR using AWS Service Catalog.
  8. Optionally, setting up AWS Service Catalog and launching Amazon EMR products through the AWS Command Line Interface (AWS CLI).

The following section looks at the CloudFormation template, which you use to set up the AWS environment to run this solution.

Setting up the AWS environment

To set up this solution, you need to create a few AWS resources. The CloudFormation template provided in this post creates all the required AWS resources. This template requires you to pass the following parameters during the launch:

  • A password for your test users.
  • An Amazon Compute Cloud (Amazon EC2) key pair.
  • The latest AMI ID for the EC2 helper instance. This instance configures the environment and sets up the required files and templates for this solution.

This template is designed only to show how you can use Amazon EMR with AWS Service Catalog. This setup isn’t intended for production use without modification.

To launch the CloudFormation stack, choose Launch Stack:

Launching this stack creates several AWS resources. The following resources shown in the AWS CloudFormation output are the ones you need in the next step:

KeyDescription
ConsoleLoginURLURL you use to switch between multiple users
EMRSCBlogBucketName of the S3 bucket to store blog-related files
UserPasswordPassword to use for all the test users
DataAdminUsernameIAM user name for the administrator user
DataEngineerUsernameIAM user name for the data engineer user
DataAnalystUsernameIAM user name for the data analyst user
HiveScriptURLAmazon S3 path for the Hive script
HiveETLInputParameterPath for the Hive input parameter
HiveETLOutputParameterPath for the Hive output parameter
SparkScriptURLAmazon S3 path for the Spark script
SparkETLInputParameterPath for the Spark input parameter
SparkETLOutputParameterPath for the Spark output parameter

When the CloudFormation template is complete, record the outputs listed on the Outputs tab on the AWS CloudFormation console. See the following screenshot.

(Optional) Configuring the AWS CLI

The AWS CLI is a unified tool to manage your AWS services. In the optional step, you use the AWS CLI to create AWS Service Catalog products and portfolios. Installation of AWS CLI isn’t required for this solution. For instructions on configuring the AWS CLI in your environment, see Configuring the AWS CLI.

Provisioning EMR clusters through AWS Service Catalog

You can create AWS Service Catalog products from the existing CloudFormation template and use those products to provision a variety of EMR clusters. You can create an EMR cluster and consume the cluster’s services without having access to the cluster, which improves the Amazon EMR adoption process.

The following CloudFormation template creates an EMR cluster. This template takes two parameters:

  • Cluster size – You select how many core nodes you want to have in the EMR cluster
  • Compute type – Based on the compute type you choose; the template selects the respective EC2 instance type

As an account administrator, you can define the internal configuration for the EMR cluster. End users are not required to know all the security groups, subnet ID, key pair, and other information. They also don’t need to access the EMR cluster or spend time setting up your cluster. As an administrator, you define a template for the cluster; enforce all the compliance, versions, applications, automatic scaling rules through the CloudFormation template, and expose this template as a product through AWS Service Catalog.

The following section walks you through the solution for each use case.

Use cases walkthrough

The CloudFormation template already configured AWS Service Catalog portfolios and products. You can review these on the AWS Service Catalog console.

  1. Use the ConsoleLoginURL from the AWS CloudFormation console Outputs tab and sign in as an emr-admin user.
  2. On the AWS Service Catalog console, you can see two portfolios for engineers and analysts. In each of those portfolios, you can see two products.

The Data Analysts Stack contains products for the analyst and is assigned to the user emr-data-analyst. The Data Engineering Stack contains products for engineers and is assigned to the emr-data-engineer user. Upon logging in, they can see their respective products and portfolios.

Use case 1: Ensuring least privilege and appropriate access

The cluster administrator creates the least privilege IAM role for their users and associated that role through the Service Catalog product. Similarly, the administrator also assigns appropriate tags for each product. When data engineers or analysts launch an EMR cluster using any of their assigned products, the cluster has the least privilege access and resources are tagged automatically. To confirm this access is in place, complete the following steps:

  1. Sign in to the AWS Management Console as either emr-data-engineer user or emr-data-analyst user.

Your console looks slightly different because the end-user does not manage the products, they just use the product to launch the clusters or execute jobs on the cluster.

  1. Choose Default EMR and provision this product by choosing Launch Product.
  2. For the name of the provisioned product, enter SampleEMR.

The next screen shows a list of allowed parameters your administrator thinks you may need.

  1. Leave all parameters as default.
  2. For the cluster name, enter Sample EMR.
  3. Review all the information and launch the product.

It takes few minutes to spin up the cluster. When the cluster is ready, the status changes to Succeeded. The provision product page also shows you a list of outputs your product owner wants you to see. For example, using output values, your product owner can share Master DNS Address, Resource Manager URL, and Hue URL as shown in the following figure.

To verify if this launched EMR cluster has the expected IAM role and tags, sign in as emr-admin user and go to the AWS EMR Console to review the service role for EC2 instances and tags.

Use case 2: Providing Amazon EMR as a self-serve ETL platform with Spark and Hive

For this use case, data engineers have two different ETL scripts:

  • A Spark script that reads Amazon reviews stored in Amazon S3 and converts them into Parquet before writing back to Amazon S3
  • A Hive script that reads Amazon reviews data from Amazon S3 and finds out the top toys based on customer ratings.

The administrator creates a product to self-serve these users; the product defines the job type and the job parameters. End users selects the job type and passes script, input and output locations.

  1. Sign in as emr-data-engineer.
  2. Select the EMR ETL Engine product.
  3. Choose Launch.

The next page shows if the product has multiple versions. Because the engineer wants to try out two different Amazon EMR versions, the administrator provided both options through the product version. You can launch the EMR cluster with the required version by selecting your preferred product version.

  1. Enter the name of the product.
  2. For this post, select EMR 5.29.0.
  3. Choose Next.

  1. For JobType, choose Spark.
  2. For JobArtifacts, enter the following value (you can get these values from the AWS CloudFormation output):
s3://blog-emr-sc-<account-id>/scripts/spark_converter.py s3://amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz s3://blog-emr-sc-<account-id>/spark/
  1. Choose Next.

Based on your configuration, an EMR cluster launches. When the cluster is ready, the Spark job runs.

  1. In a different browser, sign in as emr-admin using the ConsoleLoginURL (from the AWS CloudFormation output).

You can see the cluster status, job status, and output path from the Amazon EMR console.

Now, go to Amazon S3 console to check the output path:

The Parquet files are written inside the Spark folder.

  1. To test the Hive job, go back to the first browser where you already signed in as emr-data-engineer.
  2. Choose Provisioned products list.
  3. Choose the product options menu (right-click) and choose Update provisioned product.

  1. On the next page, you can select a different version or the same version.
  2. In the Parameters section, choose Hive.
  3. In the JobArtifacts field, enter the following Hive parameters:
s3://blog-emr-sc-<account-id>/scripts/hive_converter.sql -d INPUT=s3://amazon-reviews-pds/tsv/ -d OUTPUT=s3://blog-emr-sc-<account-id>/hive/
  1. Choose Update.

If you select the same version, AWS Service Catalog compares the old provisioned product with the updated product and only runs the portion that you changed. For this post, I chose the same Amazon EMR version and only updated the job type and parameters. You can see that the same EMR cluster is still there, but on the Steps tab, a new step is executed for Hive.

  1. On the Amazon S3 console using the second browser, verify that a new folder hive is created with data that represents top toys based on Amazon reviews.

To recap, you saw how to use AWS Service Catalog to provide a product to run your ETL jobs. Your data engineers can focus on their ETL scripts and your platform can self-serve them to run their ETL jobs on the EMR cluster.

Use case 3: Automatically scaling the Hive cluster for data analysts

To automatically scale the Hive cluster for data analysts, complete the following steps:

  1. Using the console login URL from the AWS CloudFormation output, and sign in as emr-data-analyst and go to AWS Service Catalog console.

You can see a different set of products for this user.

For this use case, your data analysts want to have an automatically scaling EMR cluster with Hive application. The administrator set up the Auto-scaling EMR product with preconfigured rules.

  1. Choose Auto-scaling EMR.
  2. Enter a provisioned product name.
  3. Select Hive Auto-scaling.
  4. Choose Next.
  5. In the Parameters section, leave the options at their default and enter a cluster name.
  6. Launch the product.

The product owner also provided a client URL (for example, Hue URL) through the product output so business analysts can connect to it.

  1. Sign in as emr-admin and validate if this new cluster is configured with the expected automatic scaling rules.
  2. On the Amazon EMR console, choose the cluster.

You can see the configuration on the Hardware tab.

In this use case, you learned how to use AWS Service Catalog to provide business analyst users a preconfigured, automatically scaled EMR cluster.

(Optional) Setting up AWS Service Catalog for Amazon EMR using AWS CLI

In the previous section, I demonstrated the solution using the AWS Service Catalog console. In the following section, I will show you how you use AWS Service Catalog using the AWS CLI. You can create AWS Service Catalog products and portfolios, assign IAM principals, and launch products.

  1. Create a portfolio named CLI – Stack for the user emr-admin. See the following command:
aws --region us-east-1 servicecatalog create-portfolio --display-name "CLI - Stack" --provider-name "@emr-admin" --description "Sample stack for pre-defined EMR clusters"

You receive a JSON output.

  1. Record the portfolio id port-xxxxxxxx from the output to use later.

The emr-admin user is the provider for this portfolio. The user is created with power user access, so the user can see the full-service catalog console and can manage products and portfolios.

You can associate this portfolio with multiple users. By assigning them to a portfolio, they can use the portfolio, browse through its products, and provision new products. For this use case, you associate a portfolio to emr-admin and the AWS CLI user name (the name of the user that you used to configure your AWS CLI). Make sure to update the portfolio and AWS account ID.

  1. Enter the following code:
aws --region us-east-1 servicecatalog associate-principal-with-portfolio --portfolio-id port-xxxxxxxxxx --principal-type IAM --principal-arn arn:aws:iam::xxxxx:user/emr-admin

aws --region us-east-1 servicecatalog associate-principal-with-portfolio --portfolio-id port-xxxxxxxxxx --principal-type IAM --principal-arn arn:aws:iam::xxxxx:user/<aws-cli-user-name>
  1. To verify the portfolio to the user’s association, enter the following command with the portfolio ID:
aws --region us-east-1 servicecatalog list-principals-for-portfolio --portfolio-id port-xxxxxxxxx

It will list out the associated principals for the above portfolio as shown in this following figure:

The CloudFormation template already copied the Amazon EMR template into your Amazon S3 account at the path s3://blog-emr-sc-<account-id>/products.

  1. To create the product CLI - Sample EMR using that template from Amazon S3, enter the following command:
aws --region us-east-1 servicecatalog create-product --name "CLI - Sample EMR" --owner "@emr-admin" --description "Sample EMR cluster with default" --product-type CLOUD_FORMATION_TEMPLATE --provisioning-artifact-parameters '{"Name": "Initial revision", "Description": "", "Info":{"LoadTemplateFromURL":"https://s3.amazonaws.com/blog-emr-sc-<account-id>/products/sample-cluster.template"},"Type":"CLOUD_FORMATION_TEMPLATE"}'

  1. Record the product ID and provision ID from the JSON output.

You now have a product and a portfolio. A portfolio can have one to many products, and each product can have multiple versions.

  1. To assign the CLI -Sample EMR product to the portfolio you created in Step 1, enter the following command:
aws --region us-east-1 servicecatalog associate-product-with-portfolio --product-id prod-xxxxxx --portfolio-id port-xxxxxx

A launch constraint specifies the IAM role that AWS Service Catalog assumes when an end-user launches a product. With a launch constraint, you can control end-user access to your AWS resources and limit usage.

The CloudFormation template already created the role Blog-SCLaunchRole; create a launch constraint using that IAM role. Use the portfolio and product IDs that you collected from the previous step and your AWS account ID.

  1. To create the launch constraint, enter the following command:
aws --region us-east-1 servicecatalog create-constraint --type LAUNCH --portfolio-id port-xxxxxx --product-id prod-xxxxxx --parameters '{"RoleArn" : "arn:aws:iam::<account-id>:role/Blog-SCLaunchRole"}'

  1. Record the launch constraint ID to use later.

You now have an AWS Service Catalog product that you can use to provision an EMR cluster. The CloudFormation template that you used to create the CLI - Sample EMR product takes three parameters (ClusterName, ComputeRequirements, ClusterSize).

  1. To pass those three parameters as a key value pair, enter the following command (use the product ID and provision ID that you recorded earlier):
aws --region us-east-1 servicecatalog provision-product --product-id prod-xxxxxx --provisioning-artifact-id pa-xxxxx --provisioned-product-name cli-emr --provisioning-parameters Key=ClusterName,Value=cli-emr-cluster Key=ComputeRequirements,Value=CPU Key=ClusterSize,Value=2

  1. Check the provisioned product’s status by using the provisioned product ID:
aws --region us-east-1 servicecatalog describe-provisioned-product --id pp-xxxxx

To recap, in this section you learned how to use AWS Service Catalog CLI to configure AWS Service Catalog products and portfolios, and how to provision an EMR cluster through AWS Service Catalog product.

Cleaning up

To clean up the resources you created, complete the following steps:

  1. Terminate the product that you provisioned in the previous step:
aws --region us-east-1 servicecatalog terminate-provisioned-product --provisioned-product-id pp-xxxxx
  1. Disassociate the product CLI – Sample EMR from the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog disassociate-product-from-portfolio --product-id prod-xxxxx --portfolio-id port-xxxxx
  1. Disassociate IAM principals from the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog disassociate-principal-from-portfolio --portfolio-id port-xxxxx --principal-arn arn:aws:iam::xxxxxx:user/emr-admin

aws --region us-east-1 servicecatalog disassociate-principal-from-portfolio --portfolio-id port-xxxxx --principal-arn arn:aws:iam::xxxxxx:user/<aws-cli-user-name> 
  1. Delete the launch constraint created in the previous step:
aws --region us-east-1 servicecatalog delete-constraint --id cons-xxxxx
  1. Delete the product CLI – Sample EMR:
aws --region us-east-1 servicecatalog delete-product --id prod-xxxxx
  1. Delete the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog delete-portfolio --id port-xxxxx

Cleaning up additional resources

You must also clean up the resources you created with the CloudFormation template.

  1. On the AWS Service Catalog console, choose Provisioned products list.
  2. Terminate each product that you provisioned for these use cases.
  3. Check each of the users and their provisioned products to make sure they’re terminated.
  4. On the Amazon S3 console, empty the bucket blog-emr-sc-<account-id>.
  5. If you are using the AWS CLI, delete the objects in the blog-emr-sc-<account-id> bucket with the following command (make sure you’re running this command on the correct bucket):
aws S3 s3://blog-emr-sc-<account-id> --recursive
  1. If you ran the optional AWS CLI section, make sure you follow the cleanup process mentioned in that section.
  2. On the AWS CloudFormation console or AWS CLI, delete the stack named Blog-EMR-Service-Catalog.

Next steps

To enhance this solution, you can explore the following options:

  • In this post, I enforced resource tagging through AWS CloudFormation. You can also use the AWS Service Catalog TagOptions library to provide a consistent taxonomy and tagging of AWS Service Catalog resources. During a product launch (provisioning), AWS Service Catalog aggregates the associated portfolio and product TagOptions and applies them to the provisioned product.
  • This solution demonstrates the usage of launch constraints and how you can provide limited access to your AWS resources to your users. You can also use template constraints to manage parameters. Template constraints make sure that end-users only have options that you allow them when launching products. This can help you maintain your organization’s compliance requirements.
  • You can integrate AWS Budgets with AWS Service Catalog. By associating AWS Budgets with your products and portfolios, you can track your usage and service costs. You can set a custom budget for each of the portfolios and trigger alerts when your costs exceed your threshold.

Summary

In this post, I showed you how you can simplify your Amazon EMR provisional process using the AWS Service Catalog, how to make Amazon EMR a self-service platform for your end-users, and how you can enforce best practices and compliance to your EMR clusters. You also walked through three different use cases and implemented solutions with AWS Service Catalog. Give this solution a try and share your experience with us!

 


About the Author

Tanzir Musabbir is a Data & Analytics Architect with AWS. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.