Tag Archives: Amazon EMR

Amazon EMR 2020 year in review

Post Syndicated from Abhishek Sinha original https://aws.amazon.com/blogs/big-data/amazon-emr-2020-year-in-review/

Tens of thousands of customers use Amazon EMR to run big data analytics applications on Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto at scale. Amazon EMR automates the provisioning and scaling of these frameworks, and delivers high performance at low cost with optimized runtimes and support for a wide range of Amazon Elastic Compute Cloud (Amazon EC2) instance types and Amazon Elastic Kubernetes Service (Amazon EKS) clusters. Amazon EMR makes it easy for data engineers and data scientists to develop, visualize, and debug data science applications with Amazon EMR Studio (preview) and Amazon EMR Notebooks.

You can hear customers describe how they use Amazon EMR in the following 2020 AWS re:Invent sessions:

You can also find more information in the following posts:

Throughout 2020, we worked to deliver better Amazon EMR performance at a lower price, and to make Amazon EMR easier to manage and use for big data analytics within your Lake House Architecture. This post summarizes the key improvements during the year and provides links to additional information.

Differentiated engine performance

Amazon EMR simplifies building and operating big data environments and applications. You can launch an EMR cluster in minutes. You don’t need to worry about infrastructure provisioning, cluster setup, configuration, or tuning. Amazon EMR takes care of these tasks, allowing you to focus your teams on developing differentiated big data applications. In addition to eliminating the need for you to build and manage your own infrastructure to run big data applications, Amazon EMR gives you better performance than simply using open-source distributions, and provides 100% API compatibility. This means you can run your workloads faster without changing any code.

Amazon EMR runtime for Apache Spark is a performance-optimized runtime environment for Spark that is active by default. We first introduced the EMR runtime for Apache Spark in Amazon EMR release 5.28.0 in November 2019, and used queries based on the TPC-DS benchmark to measure the performance improvement over open-source Spark 2.4. Those results showed considerable improvement: the geometric mean in query execution time was 2.4 times faster and the total query runtime was 3.2 times faster. As discussed in Turbocharging Query Execution on Amazon EMR at AWS re:Invent 2020, we’ve continued to improve the runtime, and our latest results show that Amazon EMR 5.30 is three times faster than without the runtime, which means you can run petabyte-scale analysis at less than half the cost of traditional on-premises solutions. For more information, see How Drop used the EMR runtime for Apache Spark to halve costs and get results 5.4 times faster.

We’ve also improved Hive and PrestoDB performance. In April 2020, we announced support for Hive Low Latency Analytical Processing (LLAP) as a YARN service starting with Amazon EMR 6.0. Our tests show that Apache Hive is two times faster with Hive LLAP on Amazon EMR 6.0. You can choose to use Hive LLAP or dynamically allocated containers. In May 2020, we introduced the Amazon EMR runtime for PrestoDB in Amazon EMR 5.30. Our most recent tests based on TPC-DS benchmark queries compare Amazon EMR 5.31, which uses the runtime, to Amazon EMR 5.29, which does not. The geometric mean in query execution time is 2.6 times faster with Amazon EMR 5.31 and the runtime for PrestoDB.

Simpler incremental data processing

Apache Hudi (Hadoop Upserts, Deletes, and Incrementals) is an open-source data management framework used for simplifying incremental data processing and data pipeline development. You can use it to perform record-level inserts, updates, and deletes in Amazon Simple Storage Service (Amazon S3) data lakes, thereby simplifying building change data capture (CDC) pipelines. With this capability, you can comply with data privacy regulations and simplify data ingestion pipelines that deal with late-arriving or updated records from sources like streaming inputs and CDC from transactional systems. Apache Hudi integrates with open-source big data analytics frameworks like Apache Spark, Apache Hive, and Presto, and allows you to maintain data in Amazon S3 or HDFS in open formats like Apache Parquet and Apache Avro.

We first supported Apache Hudi starting with Amazon EMR release 5.28 in November 2019. In June 2020, Apache Hudi graduated from incubator with release 0.6.0, which we support with Amazon EMR releases 5.31.0, 6.2.0, and higher. The Amazon EMR team collaborated with the Apache Hudi community to create a new bootstrap operation, which allows you to use Hudi with your existing Parquet datasets without needing to rewrite the dataset. This bootstrap operation accelerates the process of creating a new Apache Hudi dataset from existing datasets—in our tests using a 1 TB Parquet dataset on Amazon S3, the bootstrap performed five times faster than bulk insert.

Also in June 2020, starting with Amazon EMR release 5.30.0, we added support for the HoodieDeltaStreamer utility, which provides an easy way to ingest data from many sources, including AWS Data Migration Services (AWS DMS). With this integration, you can now ingest data from upstream relational databases to your S3 data lakes in a seamless, efficient, and continuous manner. For more information, see Apply record level changes from relational databases to Amazon S3 data lake using Apache Hudi on Amazon EMR and AWS Database Migration Service.

Amazon Athena and Amazon Redshift Spectrum added support for querying Apache Hudi datasets in S3-based data lakes—Athena announcing in July 2020 and Redshift Spectrum announcing in September. Now, you can query the latest snapshot of Apache Hudi Copy-on-Write (CoW) datasets from both Athena and Redshift Spectrum, even while you continue to use Apache Hudi support in Amazon EMR to make changes to the dataset.

Differentiated instance performance

In addition to providing better software performance with Amazon EMR runtimes, we offer more instance options than any other cloud provider, allowing you to choose the instance that gives you the best performance and cost for your workload. You choose what types of EC2 instances to provision in your cluster (standard, high memory, high CPU, high I/O) based on your application’s requirements, and fully customize your cluster to suit your requirements.

In December 2020, we announced that Amazon EMR now supports M6g, C6g, and R6g instances with versions 6.1.0, 5.31.0 and later, which enables you to use instances powered by AWS Graviton2 processors. Graviton2 processors are custom designed by AWS using 64-bit Arm Neoverse cores to deliver the best price performance for cloud workloads running in Amazon EC2. Although your performance benefit will vary based on the unique characteristics of your workloads, our tests based on the TPC-DS 3 TB benchmark showed that the EMR runtime for Apache Spark provides up to 15% improved performance and up to 30% lower costs on Graviton2 instances relative to equivalent previous generation instances.

Easier cluster optimization

We’ve also made it easier to optimize your EMR clusters. In July 2020, we introduced Amazon EMR Managed Scaling, a new feature that automatically resizes your EMR clusters for best performance at the lowest possible cost. EMR Managed Scaling eliminates the need to predict workload patterns in advance or write custom automatic scaling rules that depend on an in-depth understanding of the application framework (for example, Apache Spark or Apache Hive). Instead, you specify the minimum and maximum compute resource limits for your clusters, and Amazon EMR constantly monitors key metrics based on the workload and optimizes the cluster size for best resource utilization. Amazon EMR can scale the cluster up during peaks and scale it down gracefully during idle periods, reducing your costs by 20–60% and optimizing cluster capacity for best performance.

EMR Managed Scaling is supported for Apache Spark, Apache Hive, and YARN-based workloads on Amazon EMR versions 5.30.1 and above. EMR Managed Scaling supports EMR instance fleets, enabling you to seamlessly scale Spot Instances, On-Demand Instances, and instances that are part of a Savings Plan, all within the same cluster. You can take advantage of Managed Scaling and instance fleets to provision the cluster capacity that has the lowest chance of getting interrupted, for the lowest cost.

In October 2020, we announced Amazon EMR support for the capacity-optimized allocation strategy for provisioning EC2 Spot Instances. The capacity-optimized allocation strategy automatically makes the most efficient use of available spare capacity while still taking advantage of the steep discounts offered by Spot Instances. You can now specify up to 15 instance types in your EMR task instance fleet configuration. This provides Amazon EMR with more options in choosing the optimal pools to launch Spot Instances from in order to decrease chances of Spot interruptions, and increases the ability to relaunch capacity using other instance types in case Spot Instances are interrupted when Amazon EC2 needs the capacity back.

For more information, see How Nielsen built a multi-petabyte data platform using Amazon EMR and Contextual targeting and ad tech migration best practices.

Workload consolidation

Previously, you had to choose between using fully managed Amazon EMR on Amazon EC2 or self-managing Apache Spark on Amazon EKS. When you use Amazon EMR on Amazon EC2, you can choose from a wide range of EC2 instance types to meet price and performance requirements, but you can’t run multiple versions of Apache Spark or other applications on a cluster, and you can’t use unused capacity for non-Amazon EMR applications. When you self-manage Apache Spark on Amazon EKS, you have to do the heavy lifting of installing, managing, and optimizing Apache Spark to run on Kubernetes, and you don’t get the benefit of optimized runtimes in Amazon EMR.

You no longer have to choose. In December 2020, we announced the general availability of Amazon EMR on Amazon EKS, a new deployment option for Amazon EMR that allows you to run fully managed open-source big data frameworks on Amazon EKS. If you already use Amazon EMR, you can now consolidate Amazon EMR-based applications with other Kubernetes-based applications on the same Amazon EKS cluster to improve resource utilization and simplify infrastructure management using common Amazon EKS tools. If you currently self-manage big data frameworks on Amazon EKS, you can now use Amazon EMR to automate provisioning and management, and take advantage of the optimized Amazon EMR runtimes to deliver better performance at lower cost.

Amazon EMR on EKS enables your team to collaborate more efficiently. You can run applications on a common pool of resources without having to provision infrastructure, and co-locate multiple Amazon EMR versions on a single Amazon EKS cluster to rapidly test and verify new Amazon EMR versions and the included open-source frameworks. You can improve developer productivity with faster cluster startup times because Amazon EMR application containers on existing Amazon EKS cluster instances start within 15 seconds, whereas creating new clusters of EC2 instances can take several minutes. You can use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to programmatically author, schedule, and monitor workflows, and use EMR Studio (preview) to develop, visualize, and debug applications. We discuss Amazon MWAA and EMR Studio more in the next section.

For more information, see Run Spark on Kubernetes with Amazon EMR on Amazon EKS and Amazon EMR on EKS Development Guide.

Higher developer productivity

Of course, your goal with Amazon EMR is not only to achieve the best price performance for your big data analytics workloads, but also to deliver new insights that help you run your business.

In November 2020, we announced Amazon MWAA, a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines. Airflow workflows retrieve input from sources like Amazon S3 using Athena queries, perform transformations on EMR clusters, and can use the resulting data to train machine learning (ML) models on Amazon SageMaker. Workflows in Airflow are authored as Directed Acyclic Graphs (DAGs) using the Python programming language.

At AWS re:Invent 2020, we introduced the preview of EMR Studio, a new notebook-first integrated development environment (IDE) experience with Amazon EMR. EMR Studio makes it easy for data scientists to develop, visualize, and debug applications written in R, Python, Scala, and PySpark. It provides fully managed Jupyter notebooks and tools like Spark UI and YARN Timeline Service to simplify debugging. You can install custom Python libraries or Jupyter kernels required for your applications directly to your EMR clusters, and can connect to code repositories such as AWS CodeCommit, GitHub, and Bitbucket to collaborate with peers. EMR Studio uses AWS Single Sign-On (AWS SSO), enabling you to log in directly with your corporate credentials without signing in to the AWS Management Console.

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing using the performance-optimized EMR runtime for Apache Spark. You can create cluster templates in AWS Service Catalog to simplify running jobs for your data scientists and data engineers, and can take advantage of EMR clusters running on Amazon EC2, Amazon EKS, or both. For example, you might reuse existing EC2 instances in your shared Kubernetes cluster to enable fast startup time for development work and ad hoc analysis, and use EMR clusters on Amazon EC2 to ensure the best performance for frequently run, long-running workloads.

To learn more, see Introducing a new notebook-first IDE experience with Amazon EMR and Amazon EMR Studio.

Unified governance

At AWS, we recommend you use a Lake House Architecture to modernize your data and analytics infrastructure in the cloud. A Lake House Architecture acknowledges the idea that taking a one-size-fits-all approach to analytics eventually leads to compromises. It’s not simply about integrating a data lake with a data warehouse, but rather about integrating a data lake, data warehouse, and purpose-built analytics services, and enabling unified governance and easy data movement. For more information about this approach, see Harness the power of your data with AWS Analytics by Rahul Pathak, and his AWS re:Invent 2020 analytics leadership session.

As shown in the following diagram, Amazon EMR is one element in a Lake House Architecture on AWS, along with Amazon S3, Amazon Redshift, and more.

One of the most important pieces of a modern analytics architecture is the ability for you to authorize, manage, and audit access to data. AWS gives you the fine-grained access control and governance you need to manage access to data across a data lake and purpose-built data stores and analytics services from a single point of control.

In October 2020, we announced the general availability of Amazon EMR integration with AWS Lake Formation. By integrating Amazon EMR with AWS Lake Formation, you can enhance data access control on multi-tenant EMR clusters by managing Amazon S3 data access at the level of databases, tables, and columns. This feature also enables SAML-based single sign-on to EMR Notebooks and Apache Zeppelin, and simplifies the authentication for organizations using Active Directory Federation Services (ADFS). With this integration, you have a single place to manage data access for Amazon EMR, along with the other AWS analytics services shown in the preceding diagram. At AWS re:Invent 2020, we announced the preview of row-level security for Lake Formation, which makes it even easier to control access for all the people and applications that need to share data.

In January 2021, we introduced Amazon EMR integration with Apache Ranger. Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka. Starting with Amazon EMR 5.32, we’re including plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon S3, and Apache Hive. You can set up a multi-tenant EMR cluster, use Kerberos for user authentication, use Apache Ranger 2.0 (managed separately outside the EMR cluster) for authorization, and configure fine-grained data access policies for databases, tables, columns, and S3 objects.

With this native integration, you use the Amazon EMR security configuration to specify Apache Ranger details, without the need for custom bootstrap scripts. You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

To learn more, see Integrate Amazon EMR with AWS Lake Formation and Integrate Amazon EMR with Apache Ranger.

Jumpstart your migration to Amazon EMR

Building a modern data platform using the Lake House Architecture enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools like Amazon EMR. Migrating your big data and ML to AWS and Amazon EMR offers many advantages over on-premises deployments. These include separation of compute and storage, increased agility, resilient and persistent storage, and managed services that provide up-to-date, familiar environments to develop and operate big data applications. We can help you design, deploy, and architect your analytics application workloads in AWS and help you migrate your big data and applications.

The AWS Well-Architected Framework helps you understand the pros and cons of decisions you make while building systems on AWS. By using the framework, you learn architectural best practices for designing and operating reliable, secure, efficient, and cost-effective systems in the cloud, and ways to consistently measure your architectures against best practices and identify areas for improvement. In May 2020, we announced the Analytics Lens for the AWS Well-Architected Framework, which offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. We believe that having well-architected systems greatly increases the likelihood of business success.

To move to Amazon EMR, you can download the Amazon EMR migration guide to follow step-by-step instructions, get guidance on key design decisions, and learn best practices. You can also request an Amazon EMR Migration Workshop, a virtual workshop to jumpstart your Apache Hadoop/Spark migration to Amazon EMR. You can also learn how AWS partners have helped customers migrate to Amazon EMR in Mactores’s Seagate case study, Cloudwick’s on-premises to AWS Cloud migration to drive cost efficiency, and DNM’s global analytics platform for the cinema industry.


About the Authors

Abhishek Sinha is a Principal Product Manager at Amazon Web Services.

 

 

 

 

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

 

 

 

 

BJ Haberkorn is principal product marketing manager for analytics at Amazon Web Services. BJ has worked previously on voice technology including Amazon Alexa, real time communications systems, and processor design. He holds BS and MS degrees in electrical engineering from the University of Virginia.

Amazon MSK backup for Archival, Replay, or Analytics

Post Syndicated from Rohit Yadav original https://aws.amazon.com/blogs/architecture/amazon-msk-backup-for-archival-replay-or-analytics/

Amazon MSK is a fully managed service that helps you build and run applications that use Apache Kafka to process streaming data. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes. You can also stream changes to and from databases, and power machine learning and analytics applications.

Amazon MSK simplifies the setup, scaling, and management of clusters running Apache Kafka. MSK manages the provisioning, configuration, and maintenance of resources for a highly available Kafka clusters. It is fully compatible with Apache Kafka and supports familiar community-build tools such as MirrorMaker 2.0, Kafka Connect and Kafka streams.

Introduction

In the past few years, the volume of data that companies must ingest has increased significantly. Information comes from various sources, like transactional databases, system logs, SaaS platforms, mobile, and IoT devices. Businesses want to act as soon as the data arrives. This has resulted in increased adoption of scalable real-time streaming solutions. These solutions scale horizontally to provide the needed throughput to process data in real time, with milliseconds of latency. Customers have adopted Amazon MSK as a top choice of streaming platforms. Amazon MSK gives you the flexibility to retain topic data for longer term (default 7 days). This supports replay, analytics, and machine learning based use cases. When IT and business systems are producing and processing terabytes of data per hour, it can become expensive to store, manage, and retrieve data. This has led to legacy data archival processes moving towards cheaper, reliable, and long-term storage solutions like Amazon Simple Storage Service (S3).

Following are some of the benefits of archiving Amazon MSK topic data to Amazon S3:

  1. Reduced Cost – You only must retain the data in the cluster based on your Recovery Point Objective (RPO). Any historical data can be archived in Amazon S3 and replayed if necessary.
  2. Integration with Enterprise Data Lake – Since your data is available in S3, you can now integrate with other data analytics services like Amazon EMR, AWS Glue, Amazon Athena, to run data aggregation and analytics. For example, you can build reports to visualize month over month changes.
  3. Optimize Machine Learning Workloads – Machine learning applications will be able to train new models and improve predictions using historical streams of data available in Amazon S3. This also enables better integration with Amazon Machine Learning services.
  4. Compliance – Long-term data archival for regulatory and security compliance.
  5. Backloading data to other systems – Ability to rebuild data into other application environments such as pre-prod, testing, and more.

There are many benefits to using Amazon S3 as long-term storage for Amazon MSK topics. Let’s dive deeper into the recommended architecture for this pattern. We will present an architecture to back up Amazon MSK topics to Amazon S3 in real time. In addition, we’ll demonstrate some of the use cases previously mentioned.

Architecture

The diagram following illustrates the architecture for building a real-time archival pipeline to archive Amazon MSK topics to S3. This architecture uses an AWS Lambda function to process records from your Amazon MSK cluster when the cluster is configured as an event source. As a consumer, you don’t need to worry about infrastructure management or scaling with Lambda. You only pay for what you consume, so you don’t pay for over-provisioned infrastructure.

To create an event source mapping, you can add your Amazon MSK cluster in a Lambda function trigger. The Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches from one or more partitions and provides these to your function as an event payload. The function then processes records, and sends the payload to an Amazon Kinesis Data Firehose delivery stream. We use Kinesis Data Firehose delivery stream because it can natively batch, compress, transform, and encrypt your events before loading to S3.

In this architecture, Kinesis Data Firehose delivers the records received from Lambda in Gzip file to Amazon S3. These files are partitioned in hive style format by Kinesis Data Firehose:

data/year = yyyy/month = MM/day = dd/hour = HH

Figure 1. Archival Architecture

Figure 1. Archival Architecture

Let’s review some of the possible solutions that can be built on this archived data.

Integration with Enterprise Data Lake

The architecture diagram following shows how you can integrate the archived data in Amazon S3 with your Enterprise Data Lake. Since the data files are prefixed in hive style format, you can partition and store the Data Catalog in AWS Glue. With partitioning in place, you can perform optimizations like partition pruning, which enables predicate pushdown for improved performance of your analytics queries. You can also use AWS Data Analytics services like Amazon EMR and AWS Glue for batch analytics. Amazon Athena can be used to run serverless SQL-like interactive queries on visualization and data.

Data currently gets stored in JSON files. Following are some of the services/tools that can be integrated with your archive for reporting, analytics, visualization, and machine learning requirements.

Figure 2. Analytics Architecture

Figure 2. Analytics Architecture

Cloning data into other application environments

There are use cases where you would want to use this data to clone other application environments using this archive.

These clusters could be used for testing or debugging purposes. You could decide to use only a subset of your data from the archive. Let’s say you want to debug an issue beyond the configured retention period, but not replicate all the data to your testing environment. With archived data in S3, you can build downstream jobs to filter data that can be loaded into a new Amazon MSK cluster. The following diagram highlights this pattern:

Figure 3. Replay Architecture

Figure 3. Replay Architecture

Ready for a Test Drive

To help you get started, we would like to introduce an AWS Solution: AWS Streaming Data Solution for Amazon MSK (scroll down and see Option 3 tab). There is a single-click AWS CloudFormation template, which can assist you in quickly provisioning resources. This will get your real-time archival pipeline for Amazon MSK up and running quickly. This solution shortens your development time by removing or reducing the need for you to:

  • Model and provision resources using AWS CloudFormation
  • Set up Amazon CloudWatch alarms, dashboards, and logging
  • Manually implement streaming data best practices in AWS

This solution is data and logic agnostic, enabling you to start with boilerplate code and start customizing quickly. After deployment, use this solution’s monitoring capabilities to transition easily to production.

Conclusion

In this post, we explained the architecture to build a scalable, highly available real-time archival of Amazon MSK topics to long term storage in Amazon S3. The architecture was built using Amazon MSK, AWS Lambda, Amazon Kinesis Data Firehose, and Amazon S3. The architecture also illustrates how you can integrate your Amazon MSK streaming data in S3 with your Enterprise Data Lake.

Orchestrating analytics jobs on Amazon EMR Notebooks using Amazon MWAA

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-on-amazon-emr-notebooks-using-amazon-mwaa/

In a previous post, we introduced the Amazon EMR notebook APIs, which allow you to programmatically run a notebook on both Amazon EMR Notebooks and Amazon EMR Studio (preview) without accessing the AWS web console. With the APIs, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions triggered by AWS CloudWatch Events.

In this post, we show how to use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to orchestrate analytics jobs on EMR Notebooks. We will start by walking you through the process of using AWS CloudFormation to set up an Amazon MWAA environment, which allows you to programmatically author, schedule, and monitor different sorts of workflows on Amazon EMR. We will then use this environment to run an EMR notebook example which does data analysis with Hive.

The data source for the example in this post is from the public Amazon Customer Reviews Dataset. We use the Parquet formatted dataset as the input dataset for our EMR notebook.

Apache Airflow and Amazon MWAA

Apache Airflow is an open-source platform for authoring, scheduling, and monitoring workflows. With Apache Airflow, we can define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. For additional details on Apache Airflow, see Concepts. Many organizations build, manage, and maintain Apache Airflow on AWS using services such as Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS). Amazon MWAA is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

Prerequisites

Before getting started, you must have the following prerequisites:

  • An AWS account that provides access to AWS services.
  • AWS Command Line Interface (AWS CLI) version 1.18.128 or later installed on your workstation.
  • An Amazon Simple Storage Service (Amazon S3) bucket that meets the following Amazon MWAA requirements:
    • The bucket must be in the same AWS Region where you create the MWAA environment.
    • The bucket name must start with airflow- and should be globally unique.
    • Bucket versioning is enabled.
    • A folder named dags must be created in the same bucket to store DAGs and associated support files.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
    • The IAM user has permissions to create an IAM role and policies, launch an EMR cluster, create an Amazon MWAA environment, and create stacks in AWS CloudFormation.
  • A possible limit increase for your account. (Usually a limit increase isn’t necessary. See AWS service quotas if you encounter a limit error while building the solution.)
  • An EMR notebook created through the Amazon EMR console, using the notebook file find_best_sellers.ipynb. See Creating a Notebook for instructions on creating an EMR notebook. Record the ID of the EMR notebook (for example, <e-*************************>); you will use this later in this post.

Architecture overview

At a high level, this solution uses Amazon MWAA with Amazon EMR to build pipelines for ETL workflow orchestration. The following diagram illustrates the solution architecture.

The following diagram illustrates the solution architecture.

We use the following services and configurations in this solution:

  • Amazon S3
  • VPC network configurations
  • VPC endpoints

Amazon S3

Amazon MWAA uses an S3 bucket to store DAGs and associated support files. You must create an S3 bucket before you can create the environment, with requirements as mentioned in the Prerequisites section. To use a bucket with an Amazon MWAA environment, you must create the bucket in the same Region where you create the environment. Refer to Create an Amazon S3 bucket for Amazon MWAA for further details.

VPC network configurations

Amazon MWAA requires a VPC network that meets the following requirements:

  • Includes two private subnets that are in two different Availability Zones within the same Region
  • Includes public subnets that are configured to route the private subnet data to the internet (via NAT gateways)

For more information, see Create the VPC network using a AWS CloudFormation template.

The Airflow UI in the Amazon MWAA environment is accessible over the internet by users granted access in the IAM policy. Amazon MWAA attaches an Application Load Balancer with an HTTPS endpoint for your web server as part of the Amazon MWAA managed service. For more information, see How it works.

VPC endpoints

VPC endpoints are highly available VPC components that enable private connections between your VPC and supported AWS services. Traffic between your VPC and the other services remains in your AWS network. For our example, we use the following VPC endpoints to ensure extra security, availability, and Amazon S3 data transfer performance:

  • An Amazon S3 gateway VPC endpoint to establish a private connection between the Amazon MWAA VPC and Amazon S3
  • An EMR interface VPC endpoint to securely route traffic directly to Amazon EMR from Amazon MWAA, instead of connecting over the internet

Setting up an Amazon MWAA environment

To make it easier to get started, we created a CloudFormation template that automatically configures and deploys the Amazon MWAA environment. The template takes care of the following tasks for you:

  • Create an Amazon MWAA execution IAM role.
  • Set up the VPC network for the Amazon MWAA environment, deploying the following resources:
    • A VPC with a pair of public and private subnets spread across two Availability Zones.
    • An internet gateway, with a default route on the public subnets.
    • A pair of NAT gateways (one in each Availability Zone), and default routes for them in the private subnets.
    • Amazon S3 gateway VPC endpoints and EMR interface VPC endpoints in the private subnets in two Availability Zones.
    • A security group to be used by the Amazon MWAA environment that only allows local inbound traffic and all outbound traffic.
  • Create an Amazon MWAA environment. For this post, we select mw1.small for the environment class and choose maximum worker count as 1. For monitoring, we choose to publish environment performance to CloudWatch Metrics. For Airflow logging configuration, we choose to send only the task logs and use log level INFO.

If you want to manually create, configure, and deploy the Amazon MWAA environment without using AWS CloudFormation, see Get started with Amazon Managed Workflows for Apache Airflow (MWAA).

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. The Amazon MWAA environment is created in the same Region as you launched the CloudFormation stack. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters.

Parameter Description Default Value
Stack name Enter a meaningful name for the stack. We use MWAAEmrNBDemo for this example. Replace it with your own value. None
AirflowBucketName Name of the S3 bucket to store DAGs and support files. The S3 bucket must be in the same Region where you create the environment. The name must start with airflow-. Enter the S3 bucket created as a prerequisite. We use the S3 bucket airflow-emr-demo-us-west-2 for this post. You must replace it with your own value for this field. None
EnvironmentName An MWAA environment name that is prefixed to resource names. All the resources created by this templated are named after the value saved for this field. We name our environment mwaa-emr-blog-demo for this post. Replace it with your own value for this field. mwaa-
PrivateSubnet1CIDR The IP range (CIDR notation) for the private subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.20.0/24
PrivateSubnet2CIDR The IP range (CIDR notation) for the private subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications.. 10.192.21.0/24
PublicSubnet1CIDR The IP range (CIDR notation) for the public subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.10.0/24
PublicSubnet2CIDR The IP range (CIDR notation) for the public subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.11.0/24
VpcCIDR The IP range (CIDR notation) for this VPC being created. For more information, see AWS CloudFormation VPC stack specifications. 10.192.0.0/16

The default values for the IP range (CIDR notation) fields refer to the AWS CloudFormation VPC stack specifications. You can make changes based on the requirements of your own network settings.

  1. Enter the parameter values from the preceding table.
  2. Review the details on the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation takes a few minutes. After the CloudFormation stack is complete, on the Resources tab, you can find the resources being created in this CloudFormation stack. Now, we’re ready to run our example.

Orchestrating Hive analytics jobs on EMR Notebooks using Apache Airflow

The following diagram illustrates the workflow: As a user, you first need to create the DAG file that describes how to run the analytics jobs and upload it to the dags folder under the S3 bucket specified. The DAG can be triggered in Apache Airflow UI to orchestrate the job workflow, which includes creating an EMR cluster, waiting for the cluster to be ready, running Hive analytics jobs on EMR notebooks, uploading the results to Amazon S3, and cleaning up the cluster after the job is complete.

The following diagram illustrates the workflow.

Input notebook file

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

find_best_sellers.ipynb is a Python script that does analysis on the public Amazon Customer Reviews Dataset. It generates the top 20 best sellers in a given list of categories over a given period of time and saves the results to the given S3 output location. For demonstration purpose only, we rank the seller simply by the sum of review star ratings from verified purchases.

The explanations of the default parameters in the first cell and each code block are included in the notebook itself.

The last line in the first cell, we have OUTPUT_LOCATION = "s3://airflow-emr-demo-us-west-2/query_output/” as a default value for the input parameter. Replace it with your own value for the output location. You can also supply a different value for this for this parameter in the Airflow Variables later.

DAG file

The DAG file test_dag.py is used to orchestrate our job flow via Apache Airflow. It performs the following tasks:

  1. Create an EMR cluster with one m5.xlarge primary and two m5.xlarge core nodes on release version 6.2.0 with Spark, Hive, Livy and JupyterEnterpriseGateway installed as applications.
  2. Wait until the cluster is up and ready.
  3. Run the notebook find_best_sellers.ipynb on the EMR cluster created in Step 1.
  4. Wait until the notebook run is complete.
  5. Clean up the EMR cluster.

Here is the full source code of the DAG:

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from time import sleep
from datetime import datetime
import boto3, time
from builtins import range
from pprint import pprint
from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
from airflow.models import Variable
from airflow.utils import apply_defaults
from airflow.utils.dates import days_ago

# Available categories:
#
# Apparel,Automotive,Baby,Beauty,Books,Camera,Digital_Ebook_Purchase,Digital_Music_Purchase,
# Digital_Software,Digital_Video_Download,Digital_Video_Games,Electronics,Furniture,Gift_Card,
# Grocery,Health_&_Personal_Care,Home,Home_Entertainment,Home_Improvement,Jewelry,Kitchen,
# Lawn_and_Garden,Luggage,Major_Appliances,Mobile_Apps,Mobile_Electronics,Music,Musical_Instruments,
# Office_Products,Outdoors,PC,Personal_Care_Appliances,Pet_Products,Shoes,Software,Sports,Tools,
# Toys,Video,Video_DVD,Video_Games,Watches,Wireless

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')
# =========================================

JOB_FLOW_OVERRIDES = {
    'Name': 'Test-Cluster',
    'ReleaseLabel': 'emr-6.2.0',
    'Applications': [{'Name':'Spark'}, {'Name':'Hive'}, {'Name':'Livy'}, {'Name':'JupyterEnterpriseGateway'}],
    'Configurations': [
          {
            "Classification": "hive-site",
            "Properties": {
                "hive.execution.engine": "spark"
            }
        }
    ],
    'Instances': {
        'Ec2SubnetId': SUBNET_ID,
        'InstanceGroups': [
            {
                'Name': 'Master node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': 'Core node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 2,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'LogUri': EMR_LOG_URI
}


class CustomEmrJobFlowSensor(EmrJobFlowSensor):
    NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'TERMINATING']

class NotebookExecutionSensor(EmrBaseSensor):
    NON_TERMINAL_STATES = ['START_PENDING', 'STARTING', 'RUNNING', 'FINISHING', 'STOP_PENDING', 'STOPPING']
    FAILED_STATE = ['FAILING', 'FAILED']
    template_fields = ['notebook_execution_id']
    template_ext = ()
    @apply_defaults
    def __init__(self, notebook_execution_id, *args, **kwargs):
        super(NotebookExecutionSensor, self).__init__(*args, **kwargs)
        self.notebook_execution_id = notebook_execution_id
    def get_emr_response(self):
        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
        self.log.info('Poking notebook execution %s', self.notebook_execution_id)
        return emr.describe_notebook_execution(NotebookExecutionId=self.notebook_execution_id)
    @staticmethod
    def state_from_response(response):
        return response['NotebookExecution']['Status']
    @staticmethod
    def failure_message_from_response(response):
        state_change_reason = response['NotebookExecution']['LastStateChangeReason']
        if state_change_reason:
            return 'Execution failed with reason: ' + state_change_reason
        return None

def start_execution(**context):
    ti = context['task_instance']
    cluster_id = ti.xcom_pull(key='return_value', task_ids='create_cluster_task')
    print("Starting an execution using cluster: " + cluster_id)
    # generate a JSON key-pair of <String : String Array>, e.g. 
    # "\"CATEGORIES\": [\"Apparel\", \"Automotive\", \"Baby\", \"Books\"]"
    categories_escaped_quotes = ""
    for category in CATEGORIES_CSV.split(','):
        categories_escaped_quotes = categories_escaped_quotes + "\"" + category + "\","
    categories_escaped_quotes = categories_escaped_quotes[:-1]
    categories_parameter = "\"CATEGORIES\" : [" + categories_escaped_quotes + "]"

    output_location_parameter = "\"OUTPUT_LOCATION\": \"" + OUTPUT_LOCATION + "\""
    from_date_parameter = "\"FROM_DATE\": \"" + FROM_DATE + "\""
    to_date_parameter = "\"TO_DATE\": \"" + TO_DATE + "\""
    parameters = f"{{ {categories_parameter}, {output_location_parameter}, {from_date_parameter}, {to_date_parameter} }}"
    emr = boto3.client('emr', region_name=REGION)
    start_resp = emr.start_notebook_execution(
        EditorId=NOTEBOOK_ID,
        RelativePath=NOTEBOOK_FILE_NAME,
        ExecutionEngine={'Id': cluster_id, 'Type': 'EMR'},
        NotebookParams=parameters,
        ServiceRole='EMR_Notebooks_DefaultRole'
    )
    execution_id = start_resp['NotebookExecutionId']
    print("Started an execution: " + execution_id)
    return execution_id



with DAG('test_dag', description='test dag', schedule_interval='0 * * * *', start_date=datetime(2020,3,30), catchup=False) as dag:
    create_cluster = EmrCreateJobFlowOperator(
        task_id='create_cluster_task',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )
    cluster_sensor = CustomEmrJobFlowSensor(
        task_id='check_cluster_task',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    start_execution = PythonOperator(
        task_id='start_execution_task', 
        python_callable=start_execution,
        provide_context=True
    )
    execution_sensor = NotebookExecutionSensor(
        task_id='check_execution_task',
        notebook_execution_id="{{ task_instance.xcom_pull(task_ids='start_execution_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='terminate_cluster',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    
    create_cluster >> cluster_sensor >> start_execution >> execution_sensor >> cluster_remover

The very last line of the DAG code explains how the tasks are linked in the orchestration workflow. It’s overloading the right shift >> operator to create a dependency, meaning that the task on the left should be run first, and the output passed to the task on the right.

Instead of hard-coding the variables in the DAG code, we choose to supply these variables by importing a JSON file in the Airflow UI before actually running the DAG. This way, we can also update the variables without having to update the DAG code, which requires updating the DAG file in Amazon S3. We walk you through how to do so in the later steps. You can see the lines for VARIABLES that we repeated:

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')

We create a JSON formatted file named variables.json for our example. See the following code:

{
    "REGION": "us-west-2",
    "SUBNET_ID": "<subnet-********>",
    "EMR_LOG_URI": "s3://<S3 path for EMR logs>/",
    "NOTEBOOK_ID": "<e-*************************>",
    "NOTEBOOK_FILE_NAME": "find_best_sellers.ipynb",
    "CATEGORIES_CSV": "Apparel,Automotive,Baby,Beauty,Books",
    "FROM_DATE": "2015-08-25",
    "TO_DATE": "2015-08-31",
    "OUTPUT_LOCATION": "s3://<S3 path for query output>/"
}

To use this JSON code, you need to replace all the variable values (subnet and S3 paths) with the actual values.

Accessing Apache Airflow UI and running the workflow

To run the workflow, complete the following steps:

  1. On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

  1. Choose Open Airflow UI.
  2. Log in as an authenticated user.

Log in as an authenticated user.

Next, we import the JSON file for the variables into Airflow UI.

As we mentioned earlier, we want to supply the variable values for our DAG definition later upon triggering the DAG in Airflow UI instead of hard-coding the values.

  1. On the Admin menu, choose Variables.
  2. Choose Browse.
  3. Choose json.
  4. Choose Import Variables.

For more information about importing variables, see Variables.

  1. Run the following command in the same directory as where file test_dag.py is to upload the DAG file to the dags folder under the S3 bucket specified for the Airflow environment. Replace <your_airflow_bucket_name> with the S3 bucket name that you created as a prerequisite:
    aws s3 cp test_dag.py s3://<your_airflow_bucket_name>/dags/

test_dag.py should automatically appear in the Airflow UI.

  1. Trigger the DAG by turning it to On

Trigger the DAG by turning it to On

  1. Choose test_dag to go to the detail page for the DAG.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

  1. Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

You now get an email when failure happens on any of the tasks. You can also configure to get email notification when retry happens as well.

  1. On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

  1. On the Airflow UI, you can switch tabs to check the status of the workflow tasks.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

As specified in our DAG definition, the EMR cluster is stopped when the workflow is complete.

Because we use the cron expression 0 * * * * as the scheduled running interval for our workflow, if the triggered status of the DAG is ON, it runs every hour. You need to switch the status to OFF if you don’t want it to run again.

  1. On the Amazon S3 console, view the result of our notebook job in the S3 folder.

On the Amazon S3 console, view the result of our notebook job in the S3 folder.

For example, the following screenshot is the output for the Books category that we provided as a value in the CATEGORIES parameter. As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how to use the Amazon EMR Notebooks API and use orchestration services such as Amazon MWAA to build ETL pipelines. It demonstrated how set up a secured Amazon MWAA environment using a CloudFormation template and run a sample workflow with Apache Airflow.

If you want to learn how to run Amazon EMR applications such as PySpark with Amazon MWAA, see Running Spark Jobs on Amazon EMR with Apache Airflow.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

Run Apache Spark 3.0 workloads 1.7 times faster with Amazon EMR runtime for Apache Spark

Post Syndicated from Al MS original https://aws.amazon.com/blogs/big-data/run-apache-spark-3-0-workloads-1-7-times-faster-with-amazon-emr-runtime-for-apache-spark/

With Amazon EMR release 6.1.0, Amazon EMR runtime for Apache Spark is now available for Spark 3.0.0. EMR runtime for Apache Spark is a performance-optimized runtime for Apache Spark that is 100% API compatible with open-source Apache Spark.

In our benchmark performance tests using TPC-DS benchmark queries at 3 TB scale, we found EMR runtime for Apache Spark 3.0 provides a 1.7 times performance improvement on average, and up to 8 times improved performance for individual queries over open-source Apache Spark 3.0.0. With Amazon EMR 6.1.0, you can now run your Apache Spark 3.0 applications faster and cheaper without requiring any changes to your applications.

Results observed using TPC-DS benchmarks

To evaluate the performance improvements, we used TPC-DS benchmark queries with 3 TB scale and ran them on a 6-node c4.8xlarge EMR cluster with data in Amazon Simple Storage Service (Amazon S3). We ran the tests with and without the EMR runtime for Apache Spark. The following two graphs compare the total aggregate runtime and geometric mean for all queries in the TPC-DS 3 TB query dataset between the Amazon EMR releases.

The following table shows the total runtime in seconds.

The following table shows the total runtime in seconds.

The following table shows the geometric mean of the runtime in seconds.

The following table shows the geometric mean of the runtime in seconds.

In our tests, all queries ran successfully on EMR clusters that used the EMR runtime for Apache Spark. However, when using Spark 3.0 without the EMR runtime, 34 out of the 104 benchmark queries failed due to SPARK-32663. To work around these issues, we disabled spark.shuffle.readHostLocalDisk configuration. However, even after this change, queries 14a and 14b continued to fail. Therefore, we chose to exclude these queries from our benchmark comparison.

The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is illustrated in the following chart. The horizontal axis shows each query in the TPC-DS 3 TB benchmark. The vertical axis shows the speedup of each query due to the EMR runtime. We found a 1.7 times performance improvement as measured by the geometric mean of the per-query speedups, with all queries showing a performance improvement with the EMR Runtime.

The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is also illustrated in the following chart.

Conclusion

You can run your Apache Spark 3.0 workloads faster and cheaper without making any changes to your applications by using Amazon EMR 6.1. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more great Apache Spark optimizations, configuration best practices, and tuning advice.


About the Authors

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

 

 

 

 

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

Building complex workflows with Amazon MWAA, AWS Step Functions, AWS Glue, and Amazon EMR

Post Syndicated from Dipankar Ghosal original https://aws.amazon.com/blogs/big-data/building-complex-workflows-with-amazon-mwaa-aws-step-functions-aws-glue-and-amazon-emr/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS and build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

You can use AWS Step Functions as a serverless function orchestrator to build scalable big data pipelines using services such as Amazon EMR to run Apache Spark and other open-source applications on AWS in a cost-effective manner, and use AWS Glue for a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs

For production pipelines, a common use case is to read data originating from a variety of sources. This data requires transformation to extract business value and generate insights before sending to downstream applications, such as machine learning algorithms, analytics dashboards, and business reports.

This post demonstrates how to use Amazon MWAA as a primary workflow management service to create and run complex workflows and extend the directed acyclic graph (DAG) to start and monitor a state machine created using Step Functions. In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Architectural overview

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. This workflow uses Amazon EMR to preprocess data and starts a Step Functions state machine. The state machine transforms data using AWS Glue.

The state machine transforms data using AWS Glue.

The workflow includes the following core components:

  1. Airflow Scheduler triggers the DAG based on a schedule or manually.
  2. DAG uses PythonOperator to create an EMR cluster and waits for the cluster creation process to complete.
  3. DAG uses a custom operator EmrSubmitAndMonitorStepOperator to submit and monitor the Amazon EMR step.
  4. DAG uses PythonOperator to stop the EMR cluster when the preprocessing tasks are complete.
  5. DAG starts a Step Functions state machine and monitors it for completion using PythonOperator.

You can build complex ETL pipelines with Step Functions separately and trigger them from an Airflow DAG.

Prerequisites

Before starting, create an Amazon MWAA environment. If this is your first time using Amazon MWAA, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs. It’s located on the environment details page on the Amazon MWAA console.

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs.

Also note the AWS Identity and Access Management (IAM) execution role. This role should be modified to allow MWAA to read and write from your S3 bucket, submit an Amazon EMR step, start a Step Functions state machine, and read from the AWS Systems Manager Parameter Store. The IAM role is available in the Permissions section of the environment details.

The IAM role is available in the Permissions section of the environment details.

The solution references Systems Manager parameters in an AWS CloudFormation template and scripts. For information on adding and removing IAM identity permissions, see Adding and removing IAM identity permissions. A sample IAM policy is also provided in the GitHub repository amazon-mwaa-complex-workflow-using-step-functions.

For this post, we use the MovieLens dataset. We concurrently convert the MovieLens CSV files to Parquet format and save them to Amazon S3 as part of preprocessing.

Setting up the state machine using Step Functions

Our solution extends the ETL pipeline to run a Step Functions state machine from the Airflow DAG. Step Functions lets you build visual workflows that enable fast translation of business requirements into technical requirements. With Step Functions, you can set up dependency management and failure handling using a JSON-based template. A workflow is a series of steps, such as tasks, choices, parallel runs, and timeouts with the output of one step acting as input into the next. For more information about other use cases, see AWS Step Functions Use Cases.

The following diagram shows the ETL process set up through a Step Functions state machine.

The following diagram shows the ETL process set up through a Step Functions state machine.

In the workflow, the Process Data step runs an AWS Glue job, and the Get Job Status step periodically checks for the job completion. The AWS Glue job reads the input datasets and creates output data for the most popular movies and top-rated movies. After the job is complete, the Run Glue Crawler step runs an AWS Glue crawler to catalog the data. The workflow also allows you to monitor and respond to failures at any stage.

Creating resources

Create your resources by following the installation instructions provided in the amazon-mwaa-complex-workflow-using-step-functions README.md.

Running the ETL workflow

To run your ETL workflow, complete the following steps:

  1. On the Amazon MWAA console, choose Open Airflow UI.
  2. Locate the mwaa_movielens_demo DAG.
  3. Turn on the DAG.

Turn on the DAG.

  1. Select the mwaa_movielens_demo DAG and choose Graph View.

This displays the overall ETL pipeline managed by Airflow.

This displays the overall ETL pipeline managed by Airflow.

  1. To view the DAG code, choose Code.

To view the DAG code, choose Code.

The code for the custom operator can be found in the amazon-mwaa-complex-workflow-using-step-functions GitHub repo. 

  1. From the Airflow UI, select the mwaa_movielens_demo DAG and choose Trigger DAG.
  2. Leave the Optional Configuration JSON box blank.

Leave the Optional Configuration JSON box blank.

When the Airflow DAG runs, the first task calls the PythonOperator to create an EMR cluster using Boto3. Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3. Boto provides object-oriented API, as well as low-level access to AWS services.

The second task waits until the EMR cluster is ready and in the Waiting state. As soon as the cluster is ready, the data load task runs, followed by the data preprocessing tasks, which are started in parallel using EmrSubmitAndMonitorStepOperator. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. You can change the concurrency of Amazon EMR to run multiple Amazon EMR steps in parallel.

When the data preprocessing tasks are complete, the EMR cluster is stopped and the DAG starts the Step Functions state machine to initiate data transformation.

The final task in the DAG monitors the completion of the Step Functions state machine.

The DAG run should complete in approximately 10 minutes.

Verifying the DAG run

While the DAG is running, you can view the task logs.

  1. From Graph View, select any task and choose View Log.

From Graph View, select any task and choose View Log.

  1. When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

  1. You can also monitor ETL process completion from the Airflow UI.

You can also monitor ETL process completion from the Airflow UI.

  1. On the Airflow UI, verify the completion from the log entries.

On the Airflow UI, verify the completion from the log entries.

Querying the data

After the successful completion of the Airflow DAG, two tables are created in the AWS Glue Data Catalog. To query the data with Amazon Athena, complete the following steps:

  1. On the Athena console, choose Databases.
  2. Select the mwaa-movielens-demo-db database.

You should see the two tables. If the tables aren’t listed, verify that the AWS Glue crawler run is complete and that the console is showing the correct Region.

  1. Run the following query:
    SELECT * FROM "mwaa-movielens-demo-db"."most_popular_movies" limit 10;

The following screenshot shows the output.

The following screenshot shows the output.

Cleaning up

To clean up the resources created as part of our CloudFormation template, delete the mwaa-demo-foundations stack. You can either use the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI).

Conclusion

In this post, we used Amazon MWAA to orchestrate an ETL pipeline on Amazon EMR and AWS Glue with Step Functions. We created an Airflow DAG to demonstrate how to run data processing jobs concurrently and extended the DAG to start a Step Functions state machine to build a complex ETL pipeline. A custom Airflow operator submitted and then monitored the Amazon EMR steps synchronously.

If you have comments or feedback, please leave them in the comments section.


About the Author

Dipankar GhosalDipankar Ghosal is a Sr Data Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

Introducing Amazon EMR integration with Apache Ranger

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-integration-with-apache-ranger/

Data security is an important pillar in data governance. It includes authentication, authorization , encryption and audit.

Amazon EMR enables you to set up and run clusters of Amazon Elastic Compute Cloud (Amazon EC2) instances with open-source big data applications like Apache Spark, Apache Hive, Apache Flink, and Presto. You may also want to set up multi-tenant EMR clusters where different users (or teams) can use a shared EMR cluster to run big data analytics workloads. In a multi-tenant cluster, it becomes important to set up mechanisms for authentication (determine who is invoking the application and authenticate the user), authorization (set up who has access to what data), and audit (maintain a log of who accessed what data).

Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka.

We’re happy to share that starting with Amazon EMR 5.32, we’re including plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon Simple Storage Service (Amazon S3), and Apache Hive.

You can set up a multi-tenant EMR cluster, use Kerberos for user authentication, use Apache Ranger 2.0 (managed separately outside the EMR cluster) for authorization, and configure fine-grained data access policies for databases, tables, columns, and S3 objects. In this post, we explain how you can set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. We show how you can set up multiple short-running and long-running EMR clusters with a single, centralized Apache Ranger server that maintains data access control policies.

Managed Apache Ranger plugins for PrestoSQL and PrestoDB will soon follow.

You should consider this solution if one or all of these apply:

  • Have experience setting up and managing Apache Ranger admin server (needs to be self-managed)
  • Want to port existing Apache Ranger Hive policies over to Amazon EMR
  • Need to use the database-backed Hive Metastore and can’t use the AWS Glue Data Catalogdue to limitations
  • Require authorization support for Apache Spark (SQL and storage and file access) and Amazon S3
  • Store Apache Ranger authorization audits in Amazon Cloudwatch, avoiding the need to maintain an Apache Solr infrastructure

With this native integration, you use the Amazon EMR security configuration to specify Apache Ranger details, without the need for custom bootstrap scripts. You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

The following image shows table and column-level access set up for Apache SparkSQL.

Additionally, SSH users are blocked from getting AWS Identity and Access Management (IAM) permissions tied with the Amazon EMR instance profiles. This disables  access to Amazon S3 using tools like the AWS Command Line Interface(AWS CLI).

The following screenshot that shows access to Amazon S3 blocked when using AWS CLI.

The following screenshot that shows access to Amazon S3 blocked when using AWS CLI. 

The following screenshots shows how access to the same Amazon S3 location is set up and used through EMRFS (default EMR file system implementation for reading and writing files from Amazon S3).

Prerequisites

Before getting started, you must have the following prerequisites:

  • Self-managed Apache Ranger server (2.x only) outside of an EMR cluster
  • TLS mutual authentication enabled between Apache Ranger server and Apache Ranger plugins running on the EMR cluster
  • Additional IAM roles:
    • IAM role for Apache Ranger– Defines privileges that trusted processes have when submitting Spark and Hive jobs
    • IAM role for other AWS services– Defines privileges that end-users have when accessing services that aren’t protected by Apache Ranger plugins.
  • Updates to the Amazon EC2 EMR role:
  • New Apache Ranger service definitions installed for Apache Spark and Amazon S3
  • Apache Ranger server certificate and private key for plugins uploaded into Secrets Manager
  • A CloudWatch log group for Apache Ranger audits

Architecture overview

The following diagram illustrates the architecture for this solution.

The following diagram illustrates the architecture for this solution.

In the architecture, the Amazon EMR secret agent intercepts user requests and vends credentials based on user and resources. The Amazon EMR record server receives requests to access data from Spark, reads data from Amazon S3, and returns filtered data based on Apache Ranger policies.

See Amazon EMR Components to learn more about Amazon EMR Secret Agent and Record Server.

Setting up your resources

In this section, we walk you through setting up your resources manually.

If you want to use CloudFormation scripts to automate the setup, see the section Setting up your architecture with CloudFormation later in this post.

Uploading SSL private keys and certificates to Secrets Manager

Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see this script.

Uploading SSL private keys and certificates to Secrets Manager

Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see the script create-tls-certs.sh.

Setting up an Apache Ranger server

You need to set up a two-way SSL-enabled Apache Ranger server. To set up the server manually, refer to the script install-ranger-admin-server.sh.

Installing Apache Ranger service definitions

In this section, we review installing the Apache Ranger service definitions for Apache Spark and Amazon S3.

Apache Spark

To add a new Apache Ranger service definition, see the following script:

mkdir /tmp/emr-spark-plugin/
cd /tmp/emr-spark-plugin/

# Download the Service definition
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-spark.json

# Download Service implementation jar/class
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-spark-plugin-2.x.jar

# Copy Service implementation jar to Ranger server
export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark
mv ranger-spark-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark

# Add the service definition using the Ranger REST API
curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-spark.json \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

This script is included in the Apache Ranger server setup script, if you’re deploying resources with the CloudFormation template.

The policy definition is similar to Apache Hive, except that the actions are limited to select only. The following screenshot shows the definition settings.

The following screenshot shows the definition settings.

To change permissions, for the user, choose select.

To change permissions, for the user, choose select.

Amazon S3 (via Amazon EMR File System)

Similar to Apache Spark, we have a new Apache Ranger service definition for Amazon S3. See the following script:

mkdir /tmp/emr-emrfs-s3-plugin/
cd /tmp/emr-emrfs-s3-plugin/

# Download the Service definition
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-emrfs.json

# Download Service implementation jar/class
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-emr-emrfs-plugin-2.x.jar

# Copy Service implementation jar to Ranger server
export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs
mv ranger-emrfs-s3-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs 

# Add the service definition using the Ranger REST API
curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-emrfs.json \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

If you’re using the CloudFormation template, this script is included in the Apache Ranger server setup script.

The following screenshot shows the policy details.

The following screenshot shows the policy details.

You can enable standard Amazon S3 access permissions in this policy.

You can enable standard Amazon S3 access permissions in this policy. 

Importing your existing Apache Hive policies

You can import your existing Apache Hive policies into the Apache Ranger server tied to the EMR cluster. For more information, see User Guide for Import-Export.

The following image shows how to use Apache Ranger’s export and import option.

 

CloudWatch for Apache Ranger audits

Apache Ranger audits are sent to CloudWatch. You should create a new Cloudwatch log group and specify that in the security configuration. See the following code:

aws logs create-log-group --log-group-name /aws/emr/rangeraudit/

You can search audit information using CloudWatch Insights. The following screenshot shows a query.

The following screenshot shows a query.
The following screenshot shows a query.

New Amazon EMR security configuration

The new Amazon EMR security configuration requires the following inputs:

  • IP address of the Apache Ranger server
  • IAM role for the Apache Ranger service (see the GitHub repo) running on the EMR cluster and accessing other AWS services (see the GitHub repo)
  • Secrets Manager name with the Apache Ranger admin server certificate
  • Secrets Manager name with the private key used by the plugins
  • CloudWatch log group name

The following code is an example of using the AWS CLI to create this security configuration:

aws emr create-security-configuration --name MyEMRRangerSecurityConfig --security-configuration
'{
   "EncryptionConfiguration":{
      "EnableInTransitEncryption":false,
      "EnableAtRestEncryption":false
   },
   "AuthenticationConfiguration":{
      "KerberosConfiguration":{
         "Provider":"ClusterDedicatedKdc",
         "ClusterDedicatedKdcConfiguration":{
            "TicketLifetimeInHours":24
         }
      }
   },
   "AuthorizationConfiguration":{
      "RangerConfiguration":{
         "AdminServerURL":"https://<RANGER ADMIN SERVER IP>:8080",
         "RoleForRangerPluginsARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<RANGER PLUGIN DATA ACCESS ROLE NAME>",
         "RoleForOtherAWSServicesARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<USER ACCESS ROLE NAME>",
         "AdminServerSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES ADMIN SERVERS PUBLIC TLS CERTICATE>",
         "RangerPluginConfigurations":[
            {
               "App":"Spark",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES SPARK PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"spark-policy-repository"
            },
            {
               "App":"Hive",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES HIVE PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"hive-policy-repository"
            },
            {
               "App":"EMRFS-S3",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES EMRFS S3 PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"emrfs-policy-repository"
            }
         ],
         "AuditConfiguration":{
            "Destinations":{
               "AmazonCloudWatchLogs":{
                  "CloudWatchLogGroup":"arn:aws:logs:us-east-1:<AWS ACCOUNT ID>:log-group:<LOG GROUP NAME FOR AUDIT EVENTS>"
               }
            }
         }
      }
   }
}'

Install Amazon EMR cluster with Kerberos

Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

Setting up your architecture with CloudFormation

To help you get started, we added a new GitHub repo with setup instructions. The following diagram shows the logical architecture after the CloudFormation stack is fully deployed. Review the roadmap for future enhancements.

Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

To set up this architecture using CloudFormation, complete the following steps:

  1. Use the create-tls-certs.sh script to upload the SSL key and certifications to Secrets Manager.
  2. Set up the VPC or Active Directory server by launching the following CloudFormation template.
  3. Verify DHCP options to make sure the domain name servers for the VPC are listed in the right order (LDAP/AD server first, followed by AmazonProvidedDNS).
  4. Set up the Apache Ranger server,  Amazon Relational Database Service (Amazon RDS) instance, and EMR cluster by launching the following CloudFormation template.

Limitations

When using this solution, keep in mind the following limitations:

  • As of this writing, Amazon EMR 6.x isn’t supported (only Amazon EMR 5.32+ is supported)
  • Non-Kerberos clusters will not be supported.
  • Jobs must be submitted through Apache Zeppelin, Hue, Livy, and SSH.
  • Only selected applications can be installed on the Apache Ranger-enabled EMR cluster, such as Hadoop, Tez and Ganglia. For a full list, see Supported Applications. The cluster creation request is rejected if you choose applications outside this supported list.
  • As of this writing, the SparkSQL plugin doesn’t support column masking and row-level filters.
  • The SparkSQL INSERT INTO and INSERT OVERWRITE overrides aren’t supported.
  • You can’t view audits on the Apache Ranger UI as they’re sent to CloudWatch.
  • The AWS Glue Data Catalog isn’t supported as the Apache Hive Metastore.

Available now

Native support for Apache Ranger 2.0 with Apache Hive, Apache Spark, and Amazon S3 is available today in the following AWS Regions:

  • US East (Ohio)
  • US East (N. Virginia)
  • US West (N. California)
  • US West (Oregon)
  • Africa (Cape Town)
  • Asia Pacific (Hong Kong)
  • Asia Pacific (Mumbai)
  • Asia Pacific (Seoul)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Canada (Central)
  • Europe (Frankfurt)
  • Europe (Ireland)
  • Europe (London)
  • Europe (Paris)
  • Europe (Milan)
  • Europe (Stockholm)
  • South America (São Paulo)
  • Middle East (Bahrain)

For the latest Region availability, see Amazon EMR Management Guide.

Conclusion

Amazon EMR 5.32 includes plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon S3, and Apache Hive. This post demonstrates how to set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. If you have any thoughts of questions, please leave them in the comments.


About the Author

Varun Rao Bhamidimarri is a Sr Manager, AWS Analytics Specialist Solutions Architect team. His focus is helping customers with adoption of cloud-enabled analytics solutions to meet their business requirements. Outside of work, he loves spending time with his wife and two kids, stay healthy, mediate and recently picked up garnering during the lockdown.

Testing data quality at scale with PyDeequ

Post Syndicated from Calvin Wang original https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/

You generally write unit tests for your code, but do you also test your data? Incoming data quality can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. Examples of data quality issues include the following:

  • Missing values can lead to failures in production system that require non-null values (NullPointerException)
  • Changes in the distribution of data can lead to unexpected outputs of machine learning (ML) models
  • Aggregations of incorrect data can lead to wrong business decisions

In this post, we introduce PyDeequ, an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark, the language of choice of many data scientists. PyDeequ democratizes and extends the power of Deequ by allowing you to use it alongside the many data science libraries that are available in that language. Furthermore, PyDeequ allows for fluid interface with Pandas DataFrames as opposed to restricting within Apache Spark DataFrames.

Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (billions of rows) that typically live in a data lake, distributed file system, or a data warehouse. PyDeequ gives you access to this capability, but also allows you to use it from the familiar environment of your Python Jupyter notebook.

Deequ at Amazon

Deequ is used internally at Amazon to verify the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues don’t propagate to consumer data pipelines, reducing their blast radius.

Deequ is also used within Amazon SageMaker Model Monitor. Now with the availability of PyDeequ, you can use it from a broader set of environments— Amazon SageMaker notebooks, AWS Glue, Amazon EMR, and more.

Overview of PyDeequ

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram):

  • Metrics computation – Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon Simple Storage Service (Amazon S3) and compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
  • Constraint verification – As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
  • Constraint suggestion – You can choose to define your own custom data quality constraints or use the automated constraint suggestion methods that profile the data to infer useful constraints.
  • Python wrappers – You can call each Deequ function using Python syntax. The wrappers translate the commands to the underlying Deequ calls and return their response.

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram)

Use case overview

As a running example, we use a customer review dataset provided by Amazon on Amazon S3. We intentionally follow the example in the post Test data quality at scale with Deequ to show the similarity in functionality and implementation. We begin the way many data science projects do: with initial data exploration and assessment in a Jupyter notebook.

If you’d like to follow along with a live Jupyter notebook, check out the notebook on our GitHub repo.

During the data exploration phase, you want to easily answer some basic questions about the data:

  • Are the fields that are supposed to contain unique values really unique? Are there fields that are missing values?
  • How many distinct categories are there in the categorical fields?
  • Are there correlations between some key features?
  • If there are two supposedly similar datasets (such as different categories or different time periods), are they really similar?

We also show you how to scale this approach to large-scale datasets, using the same code on an Amazon EMR cluster. This is how you’d likely do your ML training, and later as you move into a production setting.

Starting a PySpark session in a SageMaker notebook

To follow along with this post, open up a SageMaker notebook instance, clone the PyDeequ GitHub on the Sagemaker notebook instance, and run the test_data_quality_at_scale.ipynb notebook from the tutorials directory from the PyDeequ repository.

Let’s install our dependencies first in a terminal window:

$ pip install pydeequ

Next, in a cell of our SageMaker notebook, we need to create a PySpark session:

import sagemaker_pyspark
import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

Loading data

Load the dataset containing reviews for the category Electronics into our Jupyter notebook:

df = spark.read.parquet("s3a://amazon-reviews-pds/parquet/product_category=Electronics/")

After you load the DataFrame, you can run df.printSchema() to view the schema of the dataset:

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)

Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. As with Deequ, PyDeequ supports a rich set of metrics. For more information, see Test data quality at scale with Deequ or the GitHub repo. In the following example, we use the AnalysisRunner to capture the metrics you’re interested in:

from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("review_id")) \
                    .addAnalyzer(ApproxCountDistinct("review_id")) \
                    .addAnalyzer(Mean("star_rating")) \
                    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
                    .addAnalyzer(Correlation("total_votes", "star_rating")) \
                    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

The following table summarizes our findings.

Name Instance Value
ApproxCountDistinct review_id 3010972
Completeness review_id 1
Compliance top star_rating 0.74941
Correlation helpful_votes,total_votes 0.99365
Correlation total_votes,star_rating -0.03451
Mean star_rating 4.03614
Size * 3120938

From this, we learn the following:

  • review_id has no missing values and approximately 3,010,972 unique values
  • 9% of reviews have a star_rating of 4 or higher
  • total_votes and star_rating are not correlated
  • helpful_votes and total_votes are strongly correlated
  • The average star_rating is 4.0
  • The dataset contains 3,120,938 reviews

Defining and running tests for data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the VerificationSuite and add checks on attributes of the data. In this example, we test for the following properties of our data:

  • At least 3 million rows in total
  • review_id is never NULL
  • review_id is unique
  • star_rating has a minimum of 1.0 and maximum of 5.0
  • marketplace only contains US, UK, DE, JP, or FR
  • year does not contain negative values

This is the code that reflects the previous statements. For information about all available checks, see the GitHub repo. You can run this directly in the Spark shell as previously explained:

from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Amazon Electronics Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3000000) \
        .hasMin("star_rating", lambda x: x == 1.0) \
        .hasMax("star_rating", lambda x: x == 5.0)  \
        .isComplete("review_id")  \
        .isUnique("review_id")  \
        .isComplete("marketplace")  \
        .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"]) \
        .isNonNegative("year")) \
    .run()
    
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

After calling run(), PyDeequ translates your test description into Deequ, which translates it into a series of Spark jobs that are run to compute metrics on the data. Afterwards, it invokes your assertion functions (for example, lambda x: x == 1.0 for the minimum star rating check) on these metrics to see if the constraints hold on the data. The following table summarizes our findings.

Constraint constraint_status constraint_message
SizeConstraint(Size(None)) Success
MinimumConstraint(Minimum(star_rating,None)) Success
MaximumConstraint(Maximum(star_rating,None)) Success
CompletenessConstraint(Completeness(review_id,None)) Success
UniquenessConstraint(Uniqueness(List(review_id))) Failure Value: 0.9926566948782706 does not meet the constraint requirement!
CompletenessConstraint(Completeness(marketplace,None)) Success
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None)) Success
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None)) Success

Interestingly, the review_id column isn’t unique, which resulted in a failure of the check on uniqueness. We can also look at all the metrics that Deequ computed for this check by running the following:

checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult)
checkResult_df.show()

The following table summarizes our findings.

Name Instance Value
Completeness review_id 1
Completeness marketplace 1
Compliance marketplace contained in US,UK,DE,JP,FR 1
Compliance year is non-negative 1
Maximum star_rating 5
Minimum star_rating 1
Size * 3120938
Uniqueness review_id 0.99266

Automated constraint suggestion

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see the GitHub repo.

from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

The result contains a list of constraints with descriptions and Python code, so that you can directly apply it in your data quality checks. Call print(json.dumps(result_json)) to inspect the suggested constraints; the following table shows a subset.

Column Constraint Python code
customer_id customer_id is not null .isComplete("customer_id")
customer_id customer_id has type Integral .hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_id customer_id has no negative values .isNonNegative("customer_id")
helpful_votes helpful_votes is not null .isComplete("helpful_votes")
helpful_votes helpful_votes has no negative values .isNonNegative("helpful_votes")
marketplace marketplace has value range “US”, “UK”, “DE”, “JP”, “FR” .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"])
product_title product_title is not null .isComplete("product_title")
star_rating star_rating is not null .isComplete("star_rating")
star_rating star_rating has no negative values .isNonNegative("star_rating")
vine vine has value range “N”, “Y” .isContainedIn("vine", ["N", "Y"])

You can explore the other tutorials in the PyDeequ GitHub repo.

Scaling to production

So far, we’ve shown you how to use these capabilities in the context of data exploration using a Jupyter notebook running on a SageMaker notebook instance. As your project matures, you need to use the same capabilities on larger and larger datasets, and in a production environment. With PyDeequ, it’s easy to make that transition. The following diagram illustrates deployment options for local and production purposes on AWS.

The following diagram illustrates deployment options for local and production purposes on AWS.

Amazon EMR and AWS Glue interface with PyDeequ through the PySpark drivers that PyDeequ utilizes as its main engine. PyDeequ can run as a PySpark application in both contexts when the Deequ JAR is added the Spark context. You can run PyDeequ’s data validation toolkit after the Spark context and drivers are configured and your data is loaded into a DataFrame. We describe the Amazon EMR configuration options and use cases in this section (configurations 2 and 3 in the diagram).

Data exploration from a SageMaker notebook via an EMR cluster

As shown in configuration 2 in the diagram, you can connect to an EMR cluster from a SageMaker notebook to run PyDeequ. This enables you to explore much larger volumes of data than you can using a single notebook. Your Amazon EMR cluster must be running Spark v2.4.6, available with Amazon EMR version 5.31 or higher, in order to work with PyDeequ. After you have a running cluster that has those components and a SageMaker notebook, you configure a SparkSession object using the following template to connect to your cluster. For more information about connecting a SageMaker notebook to Amazon EMR or the necessary IAM permissions, see Submitting User Applications with spark-submit.

In the SageMaker notebook, run the following JSON in a cell before you start your SparkSession to configure your EMR cluster:

%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
          "spark.jars.packages": "com.amazon.deequ:deequ:1.0.3",
          "spark.jars.excludes": "net.sourceforge.f2j:arpack_combined_all"
         }
}

Start your SparkSession object in a cell after the preceding configuration by running spark. Then install PyDeequ onto your EMR cluster using the SparkContext (default named sc) with the following command:

sc.install_pypi_package('pydeequ')

Now you can start using PyDeequ from your notebook to run the same statements as before, but with much larger volumes of data.

Running a transient EMR cluster

Another way to leverage the power of an EMR cluster is to treat it as a transient cluster and run it in a headless configuration, as shown in configuration 3 in the diagram. We use spark-submit in an EMR add-step to run PyDeequ on Amazon EMR. For each of the following steps, make sure to replace the values in brackets accordingly.

  1. Create a bootstrap shell script and upload it to an S3 bucket. The following code is an example of pydeequ-emr-bootstrap.sh:
    #!/bin/bash
    
    sudo python3 -m pip install --no-deps pydeequ
    sudo python3 -m pip install pandas 

  1. Create an EMR cluster via the AWS Command Line Interface (AWS CLI):
    $ aws emr create-cluster \
    --name 'my-pydeequ-cluster' \
    --release-label emr-5.31.0 --applications Name=Spark Name=Hadoop Name=Hive Name=Livy Name=Pig Name=Hue 
    --use-default-roles \
    --instance-type m5.xlarge \
    --instance-count 2 \
    --bootstrap-actions \
        Path="s3://<S3_PATH_TO_BOOTSTRAP>/pydeequ-emr-bootstrap.sh",Name='install_pydeequ' \
    --visible-to-all-users \
    --enable-debugging \
    --ec2-attributes KeyName="<MY_SSH_KEY>",SubnetId="<MY_SUBNET>" \
    --auto-scaling-role EMR_AutoScaling_DefaultRole

  1. Create your PySpark PyDeequ run script and upload into Amazon S3. The following code is our example of pydeequ-test.py:
    import sys
    import pydeequ
    from pydeequ.checks import *
    from pydeequ.verification import *
    from pyspark.sql import SparkSession, Row
    
    if __name__ == "__main__":
    
        with SparkSession.builder.appName("pydeequ").getOrCreate() as spark:
    
            df = spark.sparkContext.parallelize([
                Row(a="foo", b=1, c=5),
                Row(a="bar", b=2, c=6),
                Row(a="baz", b=3, c=None)]).toDF()
    
            check = Check(spark, CheckLevel.Error, "Integrity checks")
    
            checkResult = VerificationSuite(spark) \
                .onData(df) \
                .addCheck(
                    check.hasSize(lambda x: x >= 3) \
                    .hasMin("b", lambda x: x == 0) \
                    .isComplete("c")  \
                    .isUnique("a")  \
                    .isContainedIn("a", ["foo", "bar", "baz"]) \
                    .isNonNegative("b")) \
                .run()
    
            checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
            checkResult_df.repartition(1).write.csv("s3a://<PATH_TO_OUTPUT>/pydeequ-out.csv", sep='|')

  1. When your cluster is running and in the WAITING stage, submit your Spark job to Amazon EMR via the AWS CLI:
    $ aws emr add-steps \
    --cluster-id <MY_CLUSTER_ID> \
    --steps Type=Spark,Name="pydeequ-spark-submit",Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,--packages,com.amazon.deequ:deequ:1.0.3,--exclude-packages,net.sourceforge.f2j:arpack_combined_all,s3a://pydeequ-emr/setup/pydeequ-test.py],ActionOnFailure=CANCEL_AND_WAIT

Congratulations, you have now submitted a PyDeequ PySpark job to Amazon EMR. Give the job a few minutes to run, after which you can view your results at the S3 output path specified on the last line of pydeequ-test.py.

Afterwards, remember to clean up your results and spin down the EMR cluster using the following command:

$ aws emr terminate-clusters --cluster-ids <MY_CLUSTER_ID>

Now you can use Amazon EMR to process large datasets in batch using PyDeequ to plug into your pipelines and provide scalable tests on your data.

More examples on GitHub

You can find examples of more advanced features on the Deequ GitHub page:

  • Deequ provides more than data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
  • Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
  • If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

Conclusion

This post showed you how to use PyDeequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. PyDeequ is available via pip install and on GitHub now for you to build your own data quality management pipeline.

Learn more about the inner workings of Deequ in the VLDB 2018 paper Automating large-scale data quality verification.

Stay tuned for another post demonstrating production workflows on AWS Glue.


About the Authors

Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.

 

 

Chris Ghyzel is a Data Engineer for AWS Professional Services. Currently, he is working with customers to integrate machine learning solutions on AWS into their production pipelines.

 

 

 

Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

Dream11’s journey to building their Data Highway on AWS

Post Syndicated from Pradip Thoke original https://aws.amazon.com/blogs/big-data/dream11s-journey-to-building-their-data-highway-on-aws/

This is a guest post co-authored by Pradip Thoke of Dream11. In their own words, “Dream11, the flagship brand of Dream Sports, is India’s biggest fantasy sports platform, with more than 100 million users. We have infused the latest technologies of analytics, machine learning, social networks, and media technologies to enhance our users’ experience. Dream11 is the epitome of the Indian sports technology revolution.”

Since inception, Dream11 has been a data-driven sports technology brand. 

Since inception, Dream11 has been a data-driven sports technology brand. The systems that power Dream11, including their transactional data warehouse, run on AWS. As Dream11 hosts fantasy sports contests that are joined by millions of Indian sports fans, they have large volumes of transactional data that is organized in a well-defined Amazon Redshift data warehouse. Previously they were using 3rd party services to collect, analyze and build models over user interaction data combined with transactional data. Although this approach was convenient, it presented certain critical issues:

  • The approach wasn’t conducive to 360-degree user analytics. Dream11’s user interactions data wasn’t present on the cloud, where the rest of Dream11’s infrastructure and data were present (AWS, in this case). To get a complete picture of a user’s experience and journey, the user’s interaction data (client events) needs to be analyzed alongside their transactional data (server events). This is known as 360-degree user analytics.
  • It wasn’t possible to get accurate user journey funnel reports. Currently, there are limitations with every tool available on the market with respect to identifying and mapping a given user’s actions across multiple platforms (on the web, iOS, or Android), as well as multiple related apps. This use case is specifically important if your company is a parent to other companies.
  • The statistics on user behavior that Dream11 was getting weren’t as accurate as they wanted. Some of the popular services they were using for web & mobile analytics use the technique of sampling to be able to deal with high volumes of data. Although this is a well-regarded technique to deal with high volumes of data and provides reasonable accuracy in multiple cases, Dream11 wanted statistics to be as accurate as possible.
  • The analytics wasn’t real-time. Dream11 experiences intense use by their users just before and during the real-life sports matches, so real-time and near-real-time analytics is very critical for them. This need wasn’t sufficiently met by the plethora of services they were using.
  • Their approach was leading to high cost for custom analytics for Dream11’s user interactions data, consisting of hundreds of event types. Serverless query engines typically charge by the amount of data scanned and so it can get very expensive if events data isn’t organized properly in separate tables in a data lake to enable selective access.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform. Therefore, they embarked on the Data Highway project on AWS.

This project has additional advantages. It is increasingly becoming important to store and process data securely. Having everything in-house can help Dream11 with data security and data privacy objectives. The platform enables 360-degree customer analytics, which further allows Dream11 to do intelligent user segmentation in-house and share only those segments (without exposing underlying transactional or interactions data) with third-party messaging service providers. 

Design goals

Dream11 had the following design goals for the project:

  • The system should be easy to maintain and should be able to handle a very high volume of data, consisting of billions of events and terabytes of data daily.
  • The cost should be low and should be pay-as-you-go.
  • Dream11’s web and mobile developers regularly create new types of events to capture new types of interactions. Whenever they add new types of events, they should be immediately available in the system for analytics, and their statistics should immediately reflect in relevant dashboards and reports without any human intervention.
  • Certain types of statistics (such as concurrency) should be available in real-time and near-real time—within 5 minutes or less.
  • Dream11 should be able to use custom logic to calculate key statistics. The analytics should be accurate—no more sampling.
  • The data for various events should be neatly organized in separate tables and analytics-friendly file formats.
  • Although Dream11 will have a common data lake, they shouldn’t be constrained to use a single analytics engine for all types of analytics. Different types of analytics engines excel for different types of queries.
  • The Product Management team should have access to views they commonly use in their decision-making process, such as funnels and user flow diagrams.
  • The system should be extensible by adding lanes in the system. Lanes allow you to reuse your basic setup without mixing events data for different business units. It also potentially allows you to study user behavior across different apps.
  • The system should be able to build 360-degree user profiles
  • The system should provide alerting on important changes to key business metrics.
  • Last but not the least, the system should be secure and reliable with 6 nines of availability guarantee.

Data Highway architecture

In less than 3 months, Dream11’s data team built a system that met all the aforementioned goals. The following diagram shows the high-level architecture.

The following diagram shows the high-level architecture.

For this project, they used the following components:

The rest of this post explains the various design choices and trade-offs made by the Dream11’s data engineers. 

Event ingestion, segregation, and organization

Dream11 has several hundred event types. These events have common attributes and specific attributes. The following diagram shows the logical structure of these events.

The following diagram shows the logical structure of these events.

When the front end receives an event, it saves fields up to common attributes into a message and posts it to Kafka_AllEvents_CommonAttributes. This Kafka topic is the source for the following systems:

  • Apache HBase on Amazon EMR – Provides real-time concurrency analytics
  • Apache Druid – Provides near real-time dimensional analytics
  • Amazon Redshift – Provides session analytics

The front end also saves events, as they are, into Kafka_AllEvents_AllAttributes. These events are further picked by Apache Ni-Fi, which forwards them to their respective topics. Apache Ni-Fi supports data routing, transformation, and system mediation logic using powerful and scalable directed graphs. Data is transformed and published to Kafka by using a combination of RouteOnAttribute and JoltTransformJSON processors (to parse JSON). Apache Ni-Fi basically reads event names and posts to the Kafka topic with matching names. If Kafka doesn’t have a topic with that name, it creates a new topic with that name. You can configure your Kafka brokers to auto-create a topic when a message is received for a non-existent topic.

The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

  The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

 

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

Storage, cataloging, ETL, and scheduling

In this section, we discuss how Dream11 updates their AWS Glue Data Catalog, performs extract, transform, and load (ETL) jobs with Amazon EMR Presto, and uses Apache Airflow for schedule management.

Updating the AWS Glue Data Catalog with metadata for the target table

The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. It provides out-of-the-box integration with Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon Redshift, Amazon Redshift Spectrum, Athena, Amazon EMR, and any application compatible with the Apache Hive metastore. You can create your table definitions one time and query across engines. For more information, see FAQ: Upgrading to the AWS Glue Data Catalog.

Because this Data Catalog is accessible from multiple services that were going to be used for the Data Highway project, Dream11 decided to use it to register all the table definitions.

Registering tables with AWS Glue Data Catalog is easy. You can use an AWS Glue crawler. It can infer schema from files in Amazon S3 and register a table in the Data Catalog. It works quite well, but Dream11 needed additional actions, such as automatically configuring Kafka Amazon S3 sink connectors etc. Therefore, they developed two Python based crawlers.

The first Python based crawler runs every 2 hours and looks up Kafka topics. If it finds a new topic, it configures a Kafka Amazon S3 connector sink to dump its data to Amazon S3 every 30 minutes in JSON Gzip format. It also registers a table with Glue Data Catalog so that users can query the JSON data directly, if needed. 

The second Python based crawler runs once a day and registers a corresponding table for each new table created that day to hold flattened data (Parquet, Snappy). It infers schemas and registers tables with the Data Catalog using its Table API. It adds customization needed by the Dream11 team to the metadata. It then creates Amazon EMR Presto ETL jobs to convert JSON, Gzip data to Parquet, Snappy, and registers them with Apache Airflow to run every 24 hours.

ETL with Amazon EMR Presto

Dream11 has a multi node, long-running, multi-purpose EMR cluster. They decided to run scheduled ETL jobs on it for the Data Highway project.

ETL for an event table involves a simple SELECT FROM -> INSERT INTO command to convert JSON (Gzip) to Parquet (Snappy). Converted data takes up to 70% less space, results in 10 times improvement in Athena query performance. ETL happens once a day. Tables are partitioned by day.

Data received on Kafka_AllEvents_CommonAttributes topic is loaded to Redshift. ETL involves SELECT FROM -> INSERT INTO to convert JSON (Gzip) to CSV, followed by Amazon Redshift COPY.

Apache Airflow for schedule management

Apache Airflow is an open-source tool for authoring and orchestrating big data workflows. With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that run independently. The DAG keeps track of the relationships and dependencies between tasks.

Dream11 uses Apache Airflow to schedule Python scripts and over few hundred ETL jobs on Amazon EMR Presto to convert JSON (Gzip) data for over few hundred events to Parquet (Snappy) format, and converts JSON data containing common attributes for all events to CSV before loading to Amazon Redshift. For more information, see Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling. 

Real-time and near-real-time analytics

In this section, we discuss the real-time and near-real-time analytics performed on Dream11’s data.

Concurrency analytics with Apache Druid

Apache Druid is an OLAP-style data store. It computes facts and metrics against various dimensions while data is being loaded. This avoids the need to compute results when a query is run.

Dream11’s web and mobile events are loaded from the Kafka_AllEvents_CommonAttributes topic to Apache Druid with the help of the Apache Druid Kafka indexing service. Dream11 has a dashboard with different granularity levels and dimensions such as app version, org, and other dimensions present in the common event attributes list.

Finding active users with Amazon EMR HBase

Dream11 also needs to identify individual active users at any given time or during a given window. This is required by other downstream teams such as the Data Science team and Digital User Engagement team.

With the help of a Java consumer, they push all events from the Kafka_AllEvents_ CommonAttributes topic to HBase on an EMR cluster with just required user dimensions. They can query the data in HBase with SQL syntax supported by the Apache Phoenix interface. 

Session analytics with Amazon Redshift

Dream11 maintains their transactional data warehouse on Amazon Redshift multi node cluster. Amazon Redshift allows them to run complex SQL queries efficiently. Amazon Redshift would have been a natural choice for event analytics for hundreds of event types. However, in Dream11’s case, events data grows very rapidly and this would be a lot of data in Amazon Redshift. Also, this data loses its value rapidly as time passes (relatively speaking) compared with transactional data. Therefore, they decided to do only session analytics in Amazon Redshift to benefit from its complex SQL query capabilities and to do analytics for individual events with the help of Athena (which we discuss in the next section).

Data received on Kafka_AllEvents_CommonAttributes is loaded into Amazon S3 every 30 minutes by the associated kafka connector sink. This data is in JSON format with Gzip compression. Every 24 hours, a job runs on Amazon EMR Presto that flattens this data into CSV format. The data is loaded into Amazon Redshift with the COPY command. The data gets loaded first into a staging table. Data in the staging table is aggregated to get sessions data. Amazon Redshift already has transactional data from other tables that, combined now with the session data, allows Dream11 to perform 360-degree user analytics. They can now easily segment users based on their interactions data and transactions data. They can then run campaigns for those users with the help of messaging platforms. 

Event analytics with Athena

Dream11 uses Athena to analyze the data in Amazon S3. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. It made perfect sense to organize data for over hundreds of event tables in Amazon S3 and analyze them with Athena on demand.

With Athena, you’re charged based on the amount of data scanned by each query. You can get significant cost savings and performance gains by compressing, partitioning, or converting your data to a columnar format, because each of those operations reduces the amount of data that Athena needs to scan to run a query. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

As discussed before, Dream11 has registered over hundreds of tables for events data in JSON format, and similar number of tables for events data in Parquet format with the AWS Glue Data Catalog. They observed a performance gain of 10 times on conversion of data format to Parquet, and an 80% savings in space. Data in Amazon S3 can be queried directly through the Athena UI with SQL queries. The other option they use is connecting to Athena using a JDBC driver from Looker and their custom Java UI for the Data Aware project.

Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

  Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 

Conclusion

This architecture has enabled Dream11 to achieve all the design goals they set out with. Results of analytics for real-time requirements are available under millisecond latency, and the system costs 40% less than the previous system. Analytics is performed with all the data without sampling, so results are accurate and reliable. All the data and analytics engines are within Dream11’s AWS account, improving data security and privacy.

As of this writing, the system handles 14 TB of data per day and it has served 80 million requests per minute at peak during Dream11 IPL 2020.

Doing all their analytics in-house on AWS has not just improved speed, accuracy, and data security, it has also enabled newer possibilities. Now Dream11 has a 360-degree view of their users. They can study their users’ progress across multiple platforms – web, Android, and IOS. This new system is enabling novel applications of machine learning, digital user engagement, and social media technologies at Dream11.


About the Authors

Pradip Thoke is a AVP Data Engineering at Dream11 and leads their Data Engineering team. The team involved in this implementation includes Vikas Gite, Salman Dhariwala, Naincy Suman, Lavanya Pulijala, Ruturaj Bhokre, Dhanraj Gaikwad, Vishal Verma, Hitesh Bansal, Sandesh Shingare, Renu Yadav, Yash Anand, Akshay Rochwani, Alokh P, Sunaim and Nandeesh Bijoor.

 

Girish Patil is a Principal Architect AI, Big Data, India Scale Apps for Amazon.

Amazon EMR Studio (Preview): A new notebook-first IDE experience with Amazon EMR

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/amazon-emr-studio-preview-a-new-notebook-first-ide-experience-with-amazon-emr/

We’re happy to announce Amazon EMR Studio (Preview), an integrated development environment (IDE) that makes it easy for data scientists and data engineers to develop, visualize, and debug applications written in R, Python, Scala, and PySpark. EMR Studio provides fully managed Jupyter notebooks and tools like Spark UI and YARN Timeline Service to simplify debugging. EMR Studio uses AWS Single Sign-On (AWS SSO), and allows you to log in directly with your corporate credentials without signing in to the AWS Management Console.

With EMR Studio, you can run notebook code on Amazon EMR running on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS), and debug your applications. For more information about Amazon EMR on Amazon EKS, see What is Amazon EMR on EKS.

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing with the performance-optimized Apache Spark runtime that Amazon EMR provides. You can also install custom kernels and libraries, collaborate with peers using code repositories such as GitHub and Bitbucket, or run parameterized notebooks as part of scheduled workflows using orchestration services like Apache Airflow or Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Administrators can set up EMR clusters that can be used by EMR Studio users, or create predefined AWS CloudFormation templates for Amazon EMR and allow you to simply choose a template for creating your own cluster.

In this post, we discuss the benefits that EMR Studio offers and we introduce to you some of its capabilities. To learn more about creating and using EMR Studios, see Use Amazon EMR Studio.

Benefits of using EMR Studio

EMR Studio offers the following benefits:

  • Set up a unified experience to develop and diagnose EMR Spark applications – Administrators can set up EMR Studio to allow you to log in using your corporate credentials without having to sign in to the AWS console. You get a single unified environment to interactively explore, process, and visualize data using notebooks, build and schedule pipelines, and debug applications without having to log in to EMR clusters.
  • Use fully managed Jupyter notebooks – With EMR Studio, you can develop analytics and data science applications in R, Python, Scala, and PySpark with fully managed Jupyter notebooks. You can take advantage of distributed processing using the performance-optimized Amazon EMR runtime for Apache Spark with Jupyter kernels and applications running on EMR clusters. you can attach notebooks to an existing cluster that uses Amazon EC2 instances, or to an EMR on EKS virtual cluster. You can also start your own clusters using templates pre-configured by administrators.
  • Collaborate with others using code repositories – From the EMR Studio notebooks environment, you can connect to code repositories such as AWS CodeCommit, GitHub, and Bitbucket to collaborate with peers.
  • Run custom Python libraries and kernels – From EMR Studio, you can install custom Python libraries or Jupyter kernels required for your applications directly to the EMR clusters.
  • Automate workflows using pipelines – EMR Studio makes it easy to move from prototyping to production. You can create EMR Studio notebooks that can be programmatically invoked with parameters, and use APIs to run the parameterized notebooks. You can also use orchestration tools such as Apache Airflow or Amazon MWAA to run notebooks in automated workflows.
  • Simplified debugging – With EMR Studio, you can debug jobs and access logs without logging in to the cluster. EMR Studio provides native application interfaces such as Spark UI and YARN Timeline. When a notebook is run in EMR Studio, the application logs are uploaded to Amazon Simple Storage Service (Amazon S3). As a result, you can access logs and diagnose applications even after your EMR cluster is terminated. You can quickly locate the job to debug by filtering based on the cluster or time when the application was run.

In the following section, we demonstrate some of the capabilities of Amazon EMR Studio using a sample notebook. For our sample notebook, we use the open-source, real-time COVID-19 US daily case reports provided by Johns Hopkins University CSSE from the following GitHub repo.

Notebook-first IDE experience with AWS SSO integration

EMR Studio makes it simple to interact with applications on an EMR cluster. After an administrator sets up EMR Studio and provides the access URL (which looks like https://es-*************************.emrstudio.us-east-1.amazonaws.com), you can log in to EMR Studio with your corporate credentials.

After you log in to EMR Studio, you get started by creating a Workspace. A Workspace is a collection of one or more notebooks for a project. The Workspaces and the notebooks that you create in EMR Studio are automatically saved in an Amazon S3 location.

Now, we create a Workspace by completing the following steps:

  1. On the EMR Studio Dashboard page, choose Create Workspace.
  2. On the Create a Workspace page, enter a Workspace name and a Description.

Naming the Workspace helps identify your project. Your workspace is automatically saved, and you can find it later on the Workspaces page. For this post, we name our Workspace EMR-Studio-WS-Demo1.

  1. On the Subnet drop-down menu, choose a subnet for your Workspace.

Each subnet belongs to the same Amazon Virtual Private Cloud (Amazon VPC) as your EMR Studio. Your administrator may have set up one or more subnets to use for your EMR clusters. You should choose a subnet that matches the subnet where you use EMR clusters. If you’re not sure about which subnet to use, contact your administrator.

  1. For S3 location, choose the Amazon S3 location where EMR Studio backs up all notebook files in the Workspace.

This location is where your Workspace and all the notebooks in the Workspace are automatically saved.

  1. In the Advanced configuration section, you can attach an EMR cluster to your Workspace.

For this post, we skip this step. EMR Studio allows you to create Workspaces and notebooks without attaching to an EMR cluster. You can attach an EMR cluster later when you’re ready to run your notebooks.

  1. Choose Create Workspace.

Fully managed environment for managing and running Jupyter-based notebooks

EMR Studio provides a fully managed environment to help organize and manage Workspaces. Workspaces are the primary building blocks of EMR Studio, and they preserve the state of your notebooks. You can create different Workspaces for each project. From within a Workspace, you can create notebooks, link your Workspace to a code repository, and attach your Workspace to an EMR cluster to run notebooks. Your Workspaces and the notebooks and settings it contains are automatically saved in the Amazon S3 location that you specify.

If you created the workspace EMR-Studio-WS-Demo1 by following the preceding steps, it appears on the Workspaces page with the name EMR-Studio-WS-Demo1 along with status Ready, creation time, and last modified timestamp.

The following table describes each possible Workspace status.

Status Meaning
Starting The Workspace is being prepared, but is not yet ready to use.
Ready You can open the Workspace to use the notebook editor. When a Workspace has a Ready status, you can open or delete it.
Attaching The Workspace is being attached to a cluster.
Attached The Workspace is attached to an EMR cluster. If a Workspace is not attached to an EMR cluster, you need to attach it to an EMR cluster before you can run any notebook code in the Workspace.
Idle

The Workspace is stopped and currently idle. When you launch an idle Workspace, the Workspace status changes from Idle to Starting to Ready.

 

Stopping The Workspace is being stopped.
Deleting When you delete a Workspace, it’s marked for deletion. EMR Studio automatically deletes Workspaces marked for deletion. After a Workspace is deleted, it no longer shows in the list of Workspaces.

You can choose the Workspace that you created (EMR-Studio-WS-Demo1) to open it. This opens a new web browser tab with the JupyterLab interface. The icon-denoted tabs on the left sidebar allow you to access tool panels such as the file browser or JupyterLab command palette. To learn more about the EMR Studio Workspace interface, see Understand the Workspace User Interface.

EMR Studio automatically creates an empty notebook with the same name as the Workspace. For this post, we the Workspace that we created, it automatically creates EMR-Studio-WS-Demo1.ipynb. In the following screenshot, no cluster or kernel is specified in the top right corner, because we didn’t choose to attach any cluster while creating the Workspace. You can write code in your new notebook, but before you run your code, you need to attach it to an EMR cluster and specify a kernel. To attach your workspace to a cluster, choose the EMR clusters icon on the left panel.

Linking Git-based code repositories with your Workspace

You can collaborate with your peers by sharing notebooks as code via code repositories. EMR Studio supports the following Git-based services:

This capability provides the following benefits:

  • Version control – Record code changes in a version control system so you can review the history of your changes and selectively revert them.
  • Collaboration – Share code with team members working in different Workspaces through remote Git-based repositories. Workspaces can clone or merge code from remote repositories and push changes back to those repositories.
  • Code reuse – Many Jupyter notebooks that demonstrate data analysis or machine learning techniques are available in publicly hosted repositories, such as GitHub. You can associate your Workspace with a GitHub repository to reuse the Jupyter notebooks contained in a repository.

To link Git repositories to your Workspace, you can link an existing repository or create a new one. When you link an existing repository, you choose from a list of Git repositories associated with the AWS account in which your EMR Studio was created.

We add a new repository by completing the following steps:

  1. Choose the Git icon.
  2. For Repository name¸ enter a name (for example, emr-notebook).
  3. For Git repository URL, enter the URL for the Git repo (for this post, we use the sample notebook at https://github.com/emrnotebooks/notebook_execution).
  4. For Git credentials, select your credentials. Because we’re using a public repo, we select Use a public repository without credentials.
  5. Choose Add repository.

After it’s added, we can see the repo on the Git repositories drop-down menu.

  1. Choose the repo to link to the Workspace.

You can link up to three Git repositories with an EMR Studio Workspace. For more information, see Link Git-Based Repositories to an EMR Studio Workspace.

  1. Choose the File browser icon to locate the Git repo we just linked.

Attaching and detaching Workspaces to and from EMR clusters

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing using the performance-optimized EMR runtime for Apache Spark. You can attach your Workspace to an EMR cluster and get distributed data processing using Spark or custom kernels. You can use primary node capacity to run non-distributed applications.

In addition to using Amazon EMR clusters running on Amazon EC2, you can attach a Workspace to an Amazon EMR on EKS virtual cluster to run notebook code. For more information about how to use an Amazon EMR on EKS cluster in EMR Studio, see Use an Amazon EMR on EKS Cluster to Run Notebook Code.

Before you can run your notebooks, you must attach your Workspace to an EMR cluster. For more information about clusters, see Create and Use Clusters with EMR Studio.

To run the Git repo notebooks that we linked in the previous step, complete the following steps:

  1. Choose the EMR cluster
  2. Attach the Workspace to an existing EMR cluster running on Amazon EC2 instances.
  3. Open the notebook demo_pyspark.ipynb from the Git repo emr-notebook that we linked to the Workspace.

In the upper right corner of the Workspace UI, we can see the ID of the EMR cluster being attached to our Workspace, as well as the kernel selected to run the notebook.

  1. Record the value of the cluster ID (for example, <j-*************>).

We use this value later to locate the EMR cluster for application debugging purposes.

You can also detach the Workspace from the cluster in the Workspace UI and re-attach it to another cluster. For more information, see Detach a Cluster from Your Workspace.

Being able to easily attach and detach to and from any EMR cluster allows you to move any workload from prototyping into production. For example, you can start your prototype development by attaching your workspace to a development EMR cluster and working with test datasets. When you’re ready to run your notebook with larger production datasets, you can detach your workspace from the development EMR cluster and attach it to a larger production EMR cluster.

Installing and loading custom libraries and kernels

You can install notebook-scoped libraries with a PySpark kernel in EMR Studio. The libraries installed are isolated to your notebook session and don’t interfere with libraries installed via EMR bootstrap actions, or libraries installed by other EMR Studio notebook sessions that may be running on the same EMR cluster. After you install libraries for your Workspace, they’re available for other notebooks in the Workspace in the same session.

Our sample notebook demo_pyspark.ipynb is a Python script. It uses real-time COVID-19 US daily case reports as input data. The following parameters are defined in the first cell:

  • DATE – The given date used when the notebook job is started.
  • TOP_K – The top k US states with confirmed COVID-19 cases. We use this to plot Graph a.
  • US_STATES – The names of the specific US states being checked for the fatality rates of COVID-19 patients. We use this plot Graph b.

The parameters can be any of the Python data types.

Running this notebook plots two graphs:

  • Graph a – Visualizes the top k US states with most the COVID-19 cases on a given date
  • Graph b – Visualizes the fatality rates among specific US states on a given date

In our notebook, we install notebook-scoped libraries by running the following code from within a notebook cell:

sc.install_pypi_package("pandas==0.25.1")
sc.install_pypi_package("requests==2.24.0")
sc.install_pypi_package("numpy==1.19.1")
sc.install_pypi_package("kiwisolver==1.2.0")
sc.install_pypi_package("matplotlib==3.3.0")

We use these libraries in the subsequent cells for the further data analysis and visualization steps in the notebook.

The following set of parameters is used to run the notebook:

{"DATE": "10-15-2020",
 "TOP_K": 6,
"US_STATES": ["Wisconsin", "Texas", "Nevada"]}

Running all the notebook cells generates two graphs. Graph a shows the top six US states with confirmed COVID-19 cases on October 15, 2020.

Graph b shows the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada on October 15, 2020.

EMR Studio also allows you to install Jupyter notebook kernels and Python libraries on a cluster primary node, which makes your custom environment available to any EMR Studio Workspace attached the cluster. To install the sas_kernel kernel on a cluster primary node, run the following code within a notebook cell:

!/emr/notebook-env/bin/pip install sas_kernel

The following screenshot shows your output.

For more information about how to install kernels and use libraries, see Installing and Using Kernels and Libraries.

Diagnosing applications and jobs with EMR Studio

In EMR Studio, you can quickly debug jobs and access logs without logging in to the cluster, such as setting up a web proxy through an SSH connection, for both active and stopped clusters. You can use native application interfaces such as Spark UI and YARN Timeline Service directly from EMR Studio. EMR Studio also allows you to quickly locate the cluster or job to debug by using filters such as cluster state, creation time, and cluster ID. For more information, see Diagnose Applications and Jobs with EMR Studio.

Now, we show you how to open a native application interface to debug the notebook job that already finished.

  1. On the EMR Studio page, choose Clusters.

A list appears with all the EMR clusters launched under the same AWS account. You can filter the list by cluster state, cluster ID, or creation time range by entering values in the provided fields.

  1. Choose the cluster ID of the EMR cluster that we attached to the Workspace EMR-Studio-WS-Demo1 for running notebook demo_pyspark.ipynb.
  2. For Spark job debugging, on the Launch application UIs menu, choose Spark History Server.

The following screenshot shows you the Spark job debugging UI.

We can traverse the details for our notebook application by checking actual logs from the Spark History Server, as in the following screenshot.

  1. For Yarn application debugging, on the Launch application UIs menu, choose Yarn Timeline Server.

The following screenshot shows the Yarn debugging UI.

Orchestrating analytics notebook jobs to build ETL production pipelines

EMR Studio makes it easy for you to move any analytics workload from prototyping to production. With EMR Studio, you can run parameterized notebooks as part of scheduled workflows using orchestration services like AWS Step Functions and Apache Airflow or Amazon MWAA.

In this section, we show a simple example of how to orchestrate running notebook workflows using Apache Airflow.

We have a fully tested notebook under an EMR Studio Workspace, and want to schedule a workflow that runs the notebook on an on-demand EMR cluster every 10 minutes.

Record the value of the Workspace ID (for example, e-*****************************) and the notebook file path relative to the home directory within the Workspace (for example, demo.ipynb or my_folder/demo.ipynb)

The workflow that we create takes care of the following tasks:

  1. Create an EMR cluster.
  2. Wait until the cluster is ready.
  3. Start running a notebook defined by the Workspace ID, notebook file path, and the cluster created.
  4. Wait until the notebook is complete.

The following screenshot is the tree view of this example DAG. The DAG definition is available on the GitHub repo. Make sure you replace any placeholder values with the actual ones before using.

When you open the Gantt chart of one of the successful notebooks, we can see the timeline of our workflow. The time spent creating the cluster and creating a notebook execution is negligible compared to the time spent waiting for the cluster to be ready and waiting for the notebook to finish, which meets the expectation of our SLA.

This example is a just starting point. Try it out and extend it with more sophisticated workflows that suit your needs.

Summary

In this post, we highlighted some of the capabilities of EMR Studio, such as the ability to log in via AWS SSO, access fully managed Jupyter notebooks, link Git-based code repositories, change clusters, load custom Python libraries and kernels, diagnose clusters and jobs using native application UIs, and orchestrate notebook jobs using Apache Airflow or Amazon MWAA.

There is no additional charge for using EMR Studio in public preview, and you only pay for the use of the EMR cluster or other AWS services such as AWS Service Catalog. For more information, see the EMR Studio FAQs.

EMR Studio is available on Amazon EMR release version 6.2 and later, in the US East (N. Virginia), US West (Oregon), and EU (Ireland) Regions for public preview. For the latest Region availability for the public preview, see Considerations.

If you have questions or suggestions, feel free to leave a comment.


About the  Authors

Fei Lang is a Senior Big Data Architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

 

Shuang Li is a Senior Product Manager for Amazon EMR at AWS. She holds a doctoral degree in Computer Science and Engineering from Ohio State University.

 

 

Ray Liu is a Software Development Engineer at AWS. Besides work, he enjoys traveling and spending time with family.

 

 

 

Kendra Ellis is a Programmer Writer at AWS.

 

 

 

 

New – Amazon EMR on Amazon Elastic Kubernetes Service (EKS)

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/new-amazon-emr-on-amazon-elastic-kubernetes-service-eks/

Tens of thousands of customers use Amazon EMR to run big data analytics applications on frameworks such as Apache Spark, Hive, HBase, Flink, Hudi, and Presto at scale. EMR automates the provisioning and scaling of these frameworks and optimizes performance with a wide range of EC2 instance types to meet price and performance requirements. Customer are now consolidating compute pools across organizations using Kubernetes. Some customers who manage Apache Spark on Amazon Elastic Kubernetes Service (EKS) themselves want to use EMR to eliminate the heavy lifting of installing and managing their frameworks and integrations with AWS services. In addition, they want to take advantage of the faster runtimes and development and debugging tools that EMR provides.

Today, we are announcing the general availability of Amazon EMR on Amazon EKS, a new deployment option in EMR that allows customers to automate the provisioning and management of open-source big data frameworks on EKS. With EMR on EKS, customers can now run Spark applications alongside other types of applications on the same EKS cluster to improve resource utilization and simplify infrastructure management.

Customers can deploy EMR applications on the same EKS cluster as other types of applications, which allows them to share resources and standardize on a single solution for operating and managing all their applications. Customers get all the same EMR capabilities on EKS that they use on EC2 today, such as access to the latest frameworks, performance optimized runtimes, EMR Notebooks for application development, and Spark user interface for debugging.

Amazon EMR automatically packages the application into a container with the big data framework and provides pre-built connectors for integrating with other AWS services. EMR then deploys the application on the EKS cluster and manages logging and monitoring. With EMR on EKS, you can get 3x faster performance using the performance-optimized Spark runtime included with EMR compared to standard Apache Spark on EKS.

Amazon EMR on EKS – Getting Started
If you already have a EKS cluster where you run Spark jobs, you simply register your existing EKS cluster with EMR using the AWS Management Console, AWS Command Line Interface (CLI) or APIs to deploy your Spark appication.

For exampe, here is a simple CLI command to register your EKS cluster.

$ aws emr create-virtual-cluster \
          --name <virtual_cluster_name> \
          --container-provider '{
             "id": "<eks_cluster_name>",
             "type": "EKS",
             "info": {
                 "eksInfo": {
                     "namespace": "<namespace_name>"
                 }
             } 
         }'

In the EMR Management console, you can see it in the list of virtual clusters.

When Amazon EKS clusters are registered, EMR workloads are deployed to Kubernates nodes and pods to manage application execution and auto-scaling, and sets up managed endpoints so that you can connect notebooks and SQL clients. EMR builds and deploys a performance-optimized runtime for the open source frameworks used in analytics applications.

You can simply start your Spark jobs.

$ aws emr start-job-run \
          --name <job_name> \
          --virtual-cluster-id <cluster_id> \
          --execution-role-arn <IAM_role_arn> \
          --virtual-cluster-id <cluster_id> \
          --release-label <<emr_release_label> \
          --job-driver '{
            "sparkSubmitJobDriver": {
              "entryPoint": <entry_point_location>,
              "entryPointArguments": ["<arguments_list>"],
              "sparkSubmitParameters": <spark_parameters>
            }
       }'

To monitor and debug jobs, you can use inspect logs uploaded to your Amazon CloudWatch and Amazon Simple Storage Service (S3) location configured as part of monitoringConfiguration. You can also use the one-click experience from the console to launch the Spark History Server.

Integration with Amazon EMR Studio

Now you can submit analytics applications using AWS SDKs and AWS CLI, Amazon EMR Studio notebooks, and workflow orchestration services like Apache Airflow. We have developed a new Airflow Operator for Amazon EMR on EKS. You can use this connector with self-managed Airflow or by adding it to the Plugin Location with Amazon Managed Workflows for Apache Airflow.

You can also use newly previewed Amazon EMR Studio to perform data analysis and data engineering tasks in a web-based integrated development environment (IDE). Amazon EMR Studio lets you submit notebook code to EMR clusters deployed on EKS using the Studio interface. After seting up one or more managed endpoints to which Studio users can attach a Workspace, EMR Studio can communicate with your virtual cluster.

For EMR Studio preview, there is no additional cost when you create managed endpoints for virtual clusters. To learn more, visit the guide document.

Now Available
Amazon EMR on Amazon EKS is available in US East (N. Virginia), US West (Oregon), and Europe (Ireland) Regions. You can run EMR workloads in AWS Fargate for EKS removing the need to provision and manage infrastructure for pods as a serverless option.

To learn more, visit the documentation. Please send feedback to the AWS forum for Amazon EMR or through your usual AWS support contacts.

Learn all the details about Amazon EMR on Amazon EKS and get started today.

Channy;

How the Allen Institute uses Amazon EMR and AWS Step Functions to process extremely wide transcriptomic datasets

Post Syndicated from Gautham Acharya original https://aws.amazon.com/blogs/big-data/how-the-allen-institute-uses-amazon-emr-and-aws-step-functions-to-process-extremely-wide-transcriptomic-datasets/

This is a guest post by Gautham Acharya, Software Engineer III at the Allen Institute for Brain Science, in partnership with AWS Data Lab Solutions Architect Ranjit Rajan, and AWS Sr. Enterprise Account Executive Arif Khan.

The human brain is one of the most complex structures in the universe. Billions of neurons and trillions of connections come together to form a labyrinthine network of activity. Understanding the mechanisms that guide our minds is one of the most challenging problems in modern scientific research.

The Allen Institute for Brain Science is dedicated to solving large-scale, fundamental problems in neuroscience. Our mission is to accelerate the rate at which the world understands the inner workings of the human brain and to uncover the essence of what makes us human.

Processing extremely wide datasets

As a part of “big science,” one of our core principles, we seek to tackle scientific challenges at scales no one else has attempted before. One of these challenges is processing large-scale transcriptomic datasets. Transcriptomics is the study of RNA. In particular, we’re interested in the genes that are expressed in individual neurons. The human brain contains almost 100 billion neurons—how do they differ from each other, and what genes do they express? After a series of complex analysis using cutting-edge techniques such as Smart-Seq and 10x Genomics Chromium Sequencing, we produce extremely large matrices of numeric values.

Such matrices are called feature matrices. Each column represents the feature of a cell, which in this case are genes. A genome is over 50,000 genes, so a single matrix can have over 50,000 columns! We expect the number of rows in our matrices to increase over time, reaching tens of millions, if not more. These matrices can reach 500 GB or more in size. Over the next few years, we want to be able to ingest tens or hundreds of such matrices.

Our goal is to provide low-latency visualizations on such matrices, allowing researchers to aggregate, slice, and dissect our data in real time. To do this, we run a series of precomputations that store expensive calculations in a database for future retrieval.

We wanted to create a flexible, scalable pipeline to run computations on these matrices and store the results for visualizations.

The pipeline

We wanted to build a pipeline that takes these large matrices as inputs, runs various Spark jobs, and stores the outputs in an Apache HBase cluster. We wanted to create something flexible so that we could easily add additional Spark transformations.

We decided on AWS Step Functions as our workflow-orchestration tool of choice. Step Functions allows us to create a state machine that orchestrates the dataflow from payload submission to database loading.

After close collaboration with the engineers at the AWS Data Lab, we came up with the following pipeline architecture.

At a high level, our pipeline has the following workflow:

  1. Trigger a state machine from an upload event to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Copy and unzip the input ZIP file containing a feature matrix into an Amazon S3 working directory.
  3. Run Spark jobs on Amazon EMR to transform input feature matrices into various pre-computed datasets. Store all intermittent results in a working directory on Amazon S3 and output the results of the Spark Jobs as HFiles.
  4. Bulk load the results of our Spark jobs into Apache HBase.

The preceding architecture diagram is deceptively simple. We found a number of challenges during our initial implementation, which we discuss in the following sections.

Lack of transaction support and rollbacks across tables in Apache HBase

The results of our Spark jobs are a number of precomputed views of our original input dataset. Each view is stored as a separate table in Apache HBase. A major drawback of Apache HBase is the lack of a native transactional system. HBase only provides row-level atomicity. Our worst-case scenario is writing partial data—cases where some views are updated, but not others, showing different results for different visualizations and resulting in scientifically incorrect data!

We worked around this by rolling our own blue/green system on top of Apache HBase. We suffix each set of tables related to a dataset with a universally unique identifier (UUID). We use Amazon DynamoDB to track the UUID associated with each individual dataset. When an update to a dataset is being written, the UUID is not switched in DynamoDB until we verify that all the new tables have been successfully written to Apache HBase. We have an API on top of HBase to facilitate reads. This API checks DynamoDB for the dataset UUID before querying HBase, so user traffic is never redirected toward a new view until we confirm a successful write. Our API involves an AWS Lambda function using HappyBase to connect to our HBase cluster, wrapped in an Amazon API Gateway layer to provide a REST interface. The following diagram illustrates this architecture.

The read path has the following steps:

  • R1 – API Gateway invokes a Lambda function to fetch data from a dataset
  • R2 – The Lambda function requests and receives the dataset UUID from DynamoDB
  • R3 – Lambda queries the Apache HBase cluster with the UUID

The write path has the following steps:

  • W1 – The state machine bulk loads new dataset tables to the Apache HBase cluster suffixed with the new UUID
  • W2 – After validation, the state machine updates DynamoDB so user traffic is directed towards those changes

Stalled Spark jobs on extremely wide datasets

Although Apache Spark is a fantastic engine for running distributed compute operations, it doesn’t do too well when scaling to extremely wide datasets. We routinely operate on data that surpasses 50,000 columns, which often causes issues such as a stalled JavaToPython step in our PySpark job. Although we have more investigating to do to figure out why our Spark jobs hang on these wide datasets, we found a simple workaround in the short term—batching!

A number of our jobs involve computing simple columnar aggregations on our data. This means that each calculation on a column is completely independent of all the other columns. This lends itself quite well to batching our compute. We can break our input columns into chunks and run our compute on each chunk.

The following code chunks Apache Spark aggregation functions into groups of columns:

def get_aggregation_for_matrix_and_metadata(matrix, metadata, group_by_arg, agg_func, cols_per_write):
   '''
   Performs an aggregation on the joined matrix, aggregating the desired column by the given function.
   agg_func must be a valid Pandas UDF function. Runs in batches so we don't overload the Task Scheduler with 50,000
   columns at once.
   '''
   # Chunk the data
   for col_group in pyspark_utilities.chunks(matrix.columns, cols_per_write):

       # Add the row key to the column group
       col_group.append(matrix.columns[0])

       selected_matrix = matrix.select(pyspark_utilities.escape_column_list(col_group))

       # create argument list for group by and then process
       cast_as_udf = pyspark_functions.pandas_udf(
                       agg_func,
                       pyspark_datatype.FloatType(),
                       pyspark_functions.PandasUDFType.GROUPED_AGG)

       udf_input = [cast_as_udf(selected_matrix [column_name]).alias(column_name)
                    for column_name in selected_matrix .columns
                    if column_name != group_by_arg]

       yield joined.groupby(group_by_arg).agg(*udf_input)

We then write the results of each batch to an HFile, which is then later bulk loaded into HBase.

Because the post-aggregation DataFrame was very small, we found a significant performance increase in coalescing the DataFrame post-aggregation and checkpointing the results before writing the HFiles. This forces Spark to compute the aggregation before writing the HFiles. HFiles need to be sorted by row key, so it’s easier to pass a smaller DataFrame to our HFile converter.

Using Apache Spark to write DataFrames as HFiles

Apache Spark supports writing DataFrames in multiple formats, including as HFiles. However, the documentation for doing so leaves a lot to be desired. To write out our Spark DataFrames as HFiles, we had to take the following steps:

  1. Convert a DataFrame into a HFile-compatible format, assuming that the first column is the HBase rowkey—(row_key, column_family, col, value).
  2. Create a JAR file containing a converter to convert input Python Objects into Java key-value byte classes. This step took a lot of trial and error—we couldn’t find clear documentation on how the Python object was serialized and passed into the Java function.
  3. Call the saveAsNewAPIHadoopFile function, passing in the relevant information: the ZooKeeper Quorum IP, port, and cluster DNS of our Apache HBase on the Amazon EMR cluster; the HBase table name; the class name of our Java converter function; and more.

The following code writes HFiles:

import src.spark_transforms.pyspark_jobs.pyspark_utilities as pyspark_utilities
import src.spark_transforms.pyspark_jobs.output_handler.emr_constants as constants


def csv_to_key_value(row, sorted_cols, column_family):
   '''
   This method is an RDD mapping function that will map each
   row in an RDD to an hfile-formatted tuple for hfile creation
   (rowkey, (rowkey, columnFamily, columnQualifier, value))
   '''
   result = []
   for index, col in enumerate(sorted_cols[constants.ROW_KEY_INDEX + 1:], 1):
       row_key = str(row[constants.ROW_KEY_INDEX])
       value = row[index]

       if value is None:
           raise ValueError(f'Null value found at {row_key}, {col}')

       # We store sparse representations, dropping all zeroes.
       if value != 0:
           result.append((row_key, (row_key, column_family, col, value)))

   return tuple(result)


def get_sorted_df_by_cols(df):
   '''
   Sorts the matrix by column. Retains the row key as the initial column.
   '''
   cols = [df.columns[0]] + sorted(df.columns[1:])
   escaped_cols = pyspark_utilities.escape_column_list(cols)
   return df.select(escaped_cols)


def flat_map_to_hfile_format(df, column_family):
   '''
   Flat maps the matrix DataFrame into an RDD formatted for conversion into HFiles.
   '''
   sorted_df = get_sorted_df_by_cols(df)
   columns = sorted_df.columns
   return sorted_df.rdd.flatMap(lambda row: csv_to_key_value(row, columns, column_family)).sortByKey(True)


def write_hfiles(df, output_path, zookeeper_quorum_ip, table_name, column_family):
   '''
   This method will sort and map the medians psyspark dataFrame and
   then write to hfiles in the output directory using the supplied
   hbase configuration.
   '''
   # sort columns other than the row key (first column)

   rdd = flat_map_to_hfile_format(df, column_family)

   conf = {
           constants.HBASE_ZOOKEEPER_QUORUM: zookeeper_quorum_ip,
           constants.HBASE_ZOOKEEPER_CLIENTPORT: constants.ZOOKEEPER_CLIENTPORT,
           constants.ZOOKEEPER_ZNODE_PARENT: constants.ZOOKEEPER_PARENT,
           constants.HBASE_TABLE_NAME: table_name
           }

   rdd.saveAsNewAPIHadoopFile(output_path,
                              constants.OUTPUT_FORMAT_CLASS,
                              keyClass=constants.KEY_CLASS,
                              valueClass=constants.VALUE_CLASS,
                              keyConverter=constants.KEY_CONVERTER,
                              valueConverter=constants.VALUE_CONVERTER,
                              conf=conf)

The following code describes all the constants configuration:

HBASE_ZOOKEEPER_QUORUM="hbase.zookeeper.quorum"
HBASE_ZOOKEEPER_CLIENTPORT="hbase.zookeeper.property.clientPort"
ZOOKEEPER_ZNODE_PARENT="zookeeper.znode.parent"
HBASE_TABLE_NAME="hbase.mapreduce.hfileoutputformat.table.name"

OUTPUT_FORMAT_CLASS='org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2'
KEY_CLASS='org.apache.hadoop.hbase.io.ImmutableBytesWritable'
VALUE_CLASS='org.apache.hadoop.hbase.KeyValue'
KEY_CONVERTER="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
VALUE_CONVERTER="KeyValueConverter"

ZOOKEEPER_CLIENTPORT='2181'
ZOOKEEPER_PARENT='/hbase'

ROW_KEY_INDEX = 0

The following code is a Java class to serialize input PySpark RDDs:

import org.apache.spark.api.python.Converter;
import org.apache.hadoop.hbase.KeyValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

/**
* This class is used to convert a tuple
* supplied by a spark job created in Python
* to the corresponding hbase keyValue type
* which is needed for hfile creation.
*
*/
@SuppressWarnings("rawtypes")
public class KeyValueConverter implements Converter {

  private static final long serialVersionUID = 1L;

  /**
   * this method will take a tuple object supplied
   * by Python spark job and convert and
   * return the corresponding hbase KeyValue object.
   */
  public Object convert(Object obj) {
     KeyValue cell;
     List<?> list = new ArrayList<>();
     if (obj.getClass().isArray()) {
          list = Arrays.asList((Object[])obj);
      } else if (obj instanceof Collection) {
          list = new ArrayList<>((Collection<?>)obj);
      }

     cell = new KeyValue(
           list.get(0).toString().getBytes(),
           list.get(1).toString().getBytes(),
           list.get(2).toString().getBytes(),
           list.get(3).toString().getBytes());

     return cell;
  }
}

Looking ahead

Our computation pipeline was a success, and you can see the resulting visualizations on https://transcriptomics.brain-map.org/.

We’ve been thrilled with AWS’s reliable and feature-rich ecosystem. We used Amazon EMR, Step Functions, and Amazon S3 to build a robust, large-scale data processing pipeline.

Since writing this post, we’ve done much more, including a cross-database transaction system, wide-matrix transposes in Spark, and more. Big Data problems in neuroscience never end, and we’re excited to share more with you in the future!


About the Authors

Gautham Acharya is a Software Engineer at the Allen Institute for Brain Science. He works on the backend data platform team responsible for integrating multimodal neuroscience data into a single cohesive system.

 

 

Ranjit Rajan is a Data Lab Solutions Architect with AWS. Ranjit works with AWS customers to help them design and build data and analytics applications in the cloud.

 

 

Arif Khan is a Senior Account Executive with Amazon Web Services. He works with nonprofit research customers to help shape and deliver on a strategy that focuses on customer success, building mind share and driving broad use of Amazon’s utility computing services to support their mission.

Amazon EMR now provides up to 30% lower cost and up to 15% improved performance for Spark workloads on Graviton2-based instances

Post Syndicated from Peter Gvozdjak original https://aws.amazon.com/blogs/big-data/amazon-emr-now-provides-up-to-30-lower-cost-and-up-to-15-improved-performance-for-spark-workloads-on-graviton2-based-instances/

Amazon EMR now supports M6g, C6g and R6g instances with Amazon EMR versions 6.1.0, 5.31.0 and later. These instances are powered by AWS Graviton2 processors that are custom designed by AWS using 64-bit Arm Neoverse cores to deliver the best price performance for cloud workloads running in Amazon Elastic Compute Cloud (Amazon EC2). On Graviton2 instances, Amazon EMR runtime for Apache Spark provides up to 15% improved performance at up to 30% lower costs relative to equivalent previous generation instances. In our TPC-DS 3 TB benchmark tests, we found that queries run up to 32 times faster using Amazon EMR runtime for Apache Spark. For more information, see Amazon EMR introduces EMR runtime for Apache Spark.

You can use Apache Spark for a wide array of analytics use cases ranging from large-scale transformations to streaming, data science, and machine learning. Amazon EMR provides the latest, stable, open-source innovations, performant storage with Amazon S3, and the unique cost savings capabilities of Spot Instances and Managed Scaling.

Amazon EMR runtime for Apache Spark is a performance-optimized runtime environment for Apache Spark, available and turned on by default on Amazon EMR release 5.28.0 and later. Amazon EMR runtime for Apache Spark provides 100% API compatibility with open-source Apache Spark. This means that your Apache Spark workloads run faster and incur lower compute costs when run on Amazon EMR without requiring any changes to your application.

In this post, we discuss the results that we observed by running Spark workloads on Graviton2 instances.

AWS Graviton2 and Amazon EMR runtime performance improvements

To measure improvements, we ran TPC-DS 3 TB benchmark queries on Amazon EMR 5.30.1 using Amazon EMR runtime for Apache Spark (compatible with Apache Spark version 2.4) with 5-10 node clusters of M6g instances with data in Amazon Simple Storage Service (Amazon S3) and compared it to equivalent configuration using M5 instances. We measured performance improvements using total query execution time and geometric mean of query execution time across the 104 TPC-DS 3 TB benchmark queries.

The results showed improved performance on M6g instance EMR clusters compared to equivalent M5 instance EMR clusters of between 11.61–15.61% improvement in total query runtime for different sizes within the instance family, and between 10.52–12.91% improvement in geometric mean. To measure cost improvement, we added up the Amazon EMR and Amazon EC2 cost per instance per hour and multiplied it by the total query runtime. We observed between 21.58–30.58% reduced instance hour cost on M6g instance EMR clusters compared equivalent M5 instance EMR clusters to execute the 104 TPC-DS benchmark queries.

The following table shows results from running TPC-DS 3 TB benchmark queries using Amazon EMR 5.30.1 over equivalent M5 and M6g instance EMR clusters.

Instance Size 16 XL 12 XL 8 XL 4 XL 2 XL
Number of core instances in EMR cluster 5 5 5 5 10
Total query runtime on M5 (seconds) 6157 6167 6857 10593 10676
Total query runtime on M6g (seconds) 5196 5389 6061 9313 9240
Total query execution time improvement with M6g 15.61% 12.63% 11.61% 12.08% 13.45%
Geometric mean query execution time on M5 (sec) 33 34 35 47 47
Geometric mean query execution time on M6g (sec) 29 30 32 41 42
Geometric mean query execution time improvement with M6g 12.73% 10.79% 10.52% 12.91% 11.24%
EC2 M5 instance price ($ per hour) $3.072 $2.304 $1.536 0.768 0.384
EMR M5 instance price ($ per hour) $0.27 $0.27 $0.27 0.192 0.096
(EC2 + EMR) M5 instance price ($ per hour) $3.342 $2.574 $1.806 $0.960 $0.480
Cost of running on M5 ($ per instance) $5.72 $4.41 $3.44 $2.82 $1.42
EC2 M6g instance price ($ per hour) $2.464 $1.848 $1.232 $0.616 $0.308
EMR M6g price ($ per hour per instance) $0.616 $0.462 $0.308 $0.15 $0.08
(EC2 + EMR) M6g instance price ($ per hour) $3.080 $2.310 $1.540 $0.770 $0.385
Cost of running on M6g ($ per instance) $4.45 $3.46 $2.59 $1.99 $0.99
Total cost reduction with M6g including performance improvement -22.22% -21.58% -24.63% -29.48% -30.58%

The following graph shows per query improvements observed on M6g 2XL instances with EMR Runtime for Spark on Amazon EMR version 5.30.1 compared to equivalent M5 2XL instances for the 104 queries in the TPC-DS 3 TB benchmark. Performance of 100 of 104 TPC-DS queries improved with M6g 2XL, and performance for 4 queries regressed (q41, q20, q42, and q52 – with the maximum regression of -20.99%). If you are evaluating migrating from M5 instances to M6g instances for EMR Spark workloads, we recommend that you test your workloads to check if any of your queries encounter slower performance.

 

R6g instances showed a similar performance improvement while running Apache Spark workloads compared to equivalent R5 instances. Our test results showed between 14.27-21.50% improvement in total query runtime for 5 different instance sizes within the instance family, and between 12.48-18.95% improvement in geometric mean. On cost comparison, we observed 23.26%-31.66% reduced instance hour cost on R6g instance EMR clusters compared to R5 EMR instance clusters to execute the 104 TPC-DS benchmark queries. We observed 4 benchmark queries (q6, q21, q41 and q26 – with a maximum regression of -18.28%) taking longer to execute on R6g instance clusters compared to R5 instance clusters.

With C6g instances, we observed improved performance compared to C5 instances for Spark workloads on 2XL, 4XL, 12XL and 16XL instance sizes. Query execution performance regressed on 8XL instances by -0.38%. We observed between 16.84-24.15% lower instance hour costs on C6g instance EMR clusters compared equivalent C5 EMR instance clusters to execute the 104 TPC-DS benchmark queries. Performance of 73 of 104 TPC-DS queries improved with C6g 4XL, and performance for 31 queries regressed (with a maximum regression of -31.38% for q78).

Summary

By using Amazon EMR with M6g, C6g and R6g instances powered by Graviton2 processors, we observed improved performance and reduced cost of running 104 TPC-DS benchmark queries. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more Apache Spark optimizations, configuration best practices, and tuning advice.


About the Authors

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

 

 

 

Al MS is a product manager for Amazon EMR at Amazon Web Services.

Data preprocessing for machine learning on Amazon EMR made easy with AWS Glue DataBrew

Post Syndicated from Kartik Kannapur original https://aws.amazon.com/blogs/big-data/data-preprocessing-for-machine-learning-on-amazon-emr-made-easy-with-aws-glue-databrew/

The machine learning (ML) lifecycle consists of several key phases: data collection, data preparation, feature engineering, model training, model evaluation, and model deployment. The data preparation and feature engineering phases ensure an ML model is given high-quality data that is relevant to the model’s purpose. Because most raw datasets require multiple cleaning steps (such as addressing missing values and imbalanced data) and numerous data transformations to produce useful features ready for model training, these phases are often considered the most time-consuming in the ML lifecycle. Additionally, producing well-prepared training datasets has typically required extensive knowledge of multiple data analysis libraries and frameworks. This has presented a barrier to entry for new ML practitioners and reduced iteration speed for more experienced practitioners.

In this post, we show you how to address this challenge with the newly released AWS Glue DataBrew. DataBrew is a visual data preparation service, with over 250 pre-built transformations to automate data preparation tasks, without the need to write any code. We show you how to use DataBrew to analyze, prepare, and extract features from a dataset for ML, and subsequently train an ML model using PySpark on Amazon EMR. Amazon EMR is a managed cluster platform that provides the ability to process and analyze large amounts of data using frameworks such as Apache Spark and Apache Hadoop.

For more details about DataBrew, Amazon EMR, and each phase of the ML lifecycle, see the following:

Solution overview

The following diagram illustrates the architecture of our solution.

Loading the dataset to Amazon S3

We use the Census Income dataset from the UCI Machine Learning Repository to train an ML model that predicts whether a person’s income is above $50,000 a year. This multivariate dataset contains 48,842 observations and 14 attributes, such as age, nature of employment, educational background, and marital status.

For this post, we download the Adult dataset. The data folder contains five files, of which adult.data and adult.test are the train and test datasets, and adult.names contains the column names and description. Because the raw dataset doesn’t contain the column names, we add them to the first row of the train and test datasets and save the files with the extension .csv:

Column Names
age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race, sex,capital-gain,capital-loss,hours-per-week,native-country,target

Create a new bucket in Amazon Simple Storage Service (Amazon S3) and upload the train and test data files under a new folder titled raw-data.

Data preparation and feature engineering using DataBrew

In this section, we use DataBrew to explore a sample of the dataset uploaded to Amazon S3 and prepare the dataset to train an ML model.

Creating a DataBrew project

To get started with DataBrew, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create a project.
  3. For Name, enter census-income.
  4. For Attached recipe, choose Create a new recipe.
  5. For Recipe name, enter census-income-recipe.
  6. For Select a dataset, select New dataset.
  7. For Dataset name¸ enter adult.data.

  1. Import the train dataset adult.data.csv from Amazon S3.
  2. Create a new AWS Identity and Access Management (IAM) policy and IAM role by following the steps on the DataBrew console, which provides DataBrew the necessary permissions to access the source data in Amazon S3.
  3. In the Sampling section, for Type, choose Random rows.
  4. Select 1,000.

Exploratory data analysis

The first step in the data preparation phase is to perform exploratory data analysis (EDA). EDA allows us to gain an intuitive understanding of the dataset by summarizing its main characteristics. Example outputs from EDA include identifying data types across columns, plotting the distribution of data points, and creating visuals that describe the relationship between columns. This process informs the data transformations and feature engineering steps that you need to apply prior to building an ML model.

After you create the project, DataBrew provides three different views of the dataset:

  • Grid view – Presents the 15 columns and first 1,000 rows sampled from the dataset and the distribution of data across each column
  • Schema view – In addition to information in the grid view, presents information about the data types (such as double, integer, or string) and the data quality that indicates the presence of missing or invalid values
  • Data profile view – Supported by a data profile job, generates summary statistics such as quartiles, standard deviation, variance, most frequently occurring values, and the correlation between columns

The following screenshot shows our view of the dataset.

Each view presents a unique piece of information that helps us gain a better understanding of the dataset. For instance, in the grid view, we can observe the distribution of data across the 15 columns and spot erroneous data points, such as those with ? in the workclass, occupation, or native-country columns.

In the schema view, we can observe six columns with continuous data, nine columns with categorical or binary data, and no missing or invalid observations in the sample of our dataset. The columns with continuous data also contain the corresponding minimum, maximum, mean, median, and mode values represented as a box plot.

In the data profile view, after running a data profile job, we can observe the summary statistics from the first 20,000 rows, such as the five-number summary, measures of central tendency, variance, skewness, kurtosis, correlations, and the most frequently occurring values in each column. For instance, we can combine the information from the grid view and the data profile view to replace erroneous data points such as ? by the most frequently occurring value in that column as a form of data cleaning. To run a data profile job on more than 20,000 rows, request for a limit increase at [email protected]

As part of the EDA phase, we can look at the distribution of data in the target column, which represents whether a person’s income is above $50,000 per year. The ratio of people whose income is greater than $50,000 per year to those whose income is less than or equal to $50,000 per year is 1:3, indicating that the distribution of the target classes is not imbalanced.

Building a set of data transformation steps and publishing a recipe

Now that we have an intuitive understanding of the dataset, let’s build out the data transformation steps. Based on our EDA, we replace the ? observation with the most frequently occurring value in each column.

  1. Choose the Replace value or pattern transformation.
  2. Replace ? with Private in the workclass column.
  3. Replace ? with United-States in the native-country column.

The occupation column also contains observations with ?, but the data points are spread across categories without a clear frequently occurring category. Therefore, we can categorically encode the observations in the occupation column, including those with ? observation, thereby treating ? as a separate category. The occupation column in the adult.data training dataset contains 15 categories, of which Protective-serv, Priv-house-serv, and Armed-Forces occur infrequently. To avoid excessive granularity in ML modeling, we can group these three categories into a single category named Other.

During ML model evaluation and prediction, we can also map categories that the model hasn’t encountered during model training to the Other category.

With that as the background, let’s apply the categorical mapping transformation to only the top 12 distinct values.

  1. Select Map top 12 values.
  2. Select Map values to numeric values.

This selects the top 12 categories and combines the other categories into a single category named Other. We now have a new column named occupation_mapped.

  1. Delete the occupation column to avoid redundancy.
  2. Similarly, apply the categorical mapping transformation to the top five values in the workclass column and the top one value in the native-country Remember to select Map values to numeric values.

This groups the remaining categories into a single category named Other.

  1. Delete the columns workclass and native-country.

The other four columns with categorical data—marital-status, relationship, race, and sex—have few categories with most of them occurring frequently. Let’s apply the categorical mapping transformation to these columns as well.

  1. Apply categorical mapping, with the following differences:
    1. Select Map all values.
    2. Select Map values to numeric values.
  2. Delete the original columns to avoid redundancy.
  3. Delete the fnlwgt column, because it represents the sampling weight and isn’t related to the target
  4. Delete the education column, because it has already been categorically mapped to education-num.
  5. Map the target column to numeric values, where income less than or equal to $50,000 per year is mapped to class 0 and income greater than $50,000 per year is mapped to class 1.
  6. Rename the destination column to label in order to align with our downstream PySpark model training code.

  1. Delete the original target column.

The data preparation phase is now complete, and the set of 20 transformations that consist of data cleaning and categorical mapping is combined into a recipe.

Because the data preparation and ML model training phases are highly iterative, we can save the set of data transformation steps applied by publishing the recipe. This provides version control, and allows us to maintain the data transformation steps and experiment with multiple versions of the recipe in order to determine the version with the best ML model performance. For more information about DataBrew recipes, see Creating and using AWS Glue DataBrew recipes.

Creating and running a DataBrew recipe job

The exploratory data analysis phase helped us gain an intuitive understanding of the dataset, from which we built a recipe to prepare and transform our data for ML modeling. We have been working with a random sample of 1,000 rows from the adult.data training dataset, and we need to apply the same set of data transformation steps to the over 32,000 rows in the adult.data dataset. A DataBrew recipe job provides the ability to scale the transformation steps from a sample of data to the entire dataset. To create our recipe job, complete the following steps:

  1. On the DataBrew console, choose Jobs.
  2. Choose Create recipe job.
  3. For Job name, enter a name.
  4. Create a new folder in Amazon S3 (s3://<YOUR-S3-BUCKET-NAME>/transformed-data/) for the recipe job to save the transformed dataset.

The recipe job should take under 2 minutes to complete.

Training an ML model on the transformed dataset using PySpark

With the data transformation job complete, we can use the transformed dataset to train a binary classification model to predict whether a person’s income is above $50,000 per year.

  1. Create an Amazon EMR notebook.
  2. When the notebook’s status is Ready, open the notebook in a JupyterLab or Jupyter Notebook environment.
  3. Choose the PySpark kernel.

For this post, we use Spark version 2.4.6.

  1. Load the transformed dataset into a PySpark DataFrame within the notebook:
    train_dataset = spark.read.csv(path='s3://<YOUR-S3-BUCKET-NAME>/transformed-data/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>_part00000.csv', header=True, inferSchema=True)
    print('The transformed train dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=train_dataset.count(), n_cols=len(train_dataset.columns)))
    The transformed train dataset has 32561 rows and 13 columns

  2. Inspect the schema of the transformed dataset:
    train_dataset.printSchema()
    root 
    |-- age: integer (nullable = true) 
    |-- workclass_mapped: double (nullable = true) 
    |-- education-num: double (nullable = true) 
    |-- marital_status_mapped: double (nullable = true) 
    |-- occupation_mapped: double (nullable = true) 
    |-- relationship_mapped: double (nullable = true) 
    |-- race_mapped: double (nullable = true) 
    |-- sex_mapped: double (nullable = true) 
    |-- capital-gain: double (nullable = true) 
    |-- capital-loss: double (nullable = true) 
    |-- hours-per-week: double (nullable = true) 
    |-- native_country_mapped: double (nullable = true) 
    |-- label: double (nullable = true)

Of the 13 columns in the dataset, we use the first 12 columns as features for the model and the label column as the final target value for prediction.

  1. Use the VectorAssembler method within PySpark to combine the 12 columns into a single feature vector column, which makes it convenient to train the ML model:
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import VectorAssembler
    stages = []
    arr_features = train_dataset.columns[:-1]
    # Transform input features into a vector using VectorAssembler
    features_vector_assembler = VectorAssembler(inputCols=arr_features, outputCol='features')
    stages.append(features_vector_assembler)
    # Run the train dataset through the pipeline
    pipeline = Pipeline(stages=stages)
    train_dataset_pipeline = pipeline.fit(train_dataset).transform(train_dataset)
    # Select the feature vector and label column
    train_dataset_pipeline = train_dataset_pipeline.select('features', 'label')

  2. To estimate the model performance on the unseen test dataset (test) split the transformed train dataset (train_dataset_pipline) into 70% for model training and 30% for model validation:
    df_train, df_val = train_dataset_pipeline.randomSplit([0.7, 0.3], seed=42)
    print('The train dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=df_train.count(), n_cols=len(df_train.columns)))
    print('The validation dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=df_val.count(), n_cols=len(df_val.columns)))
    The train dataset has 22841 rows and 2 columns
    The validation dataset has 9720 rows and 2 columns

  3. Train a Random Forest classifier on the training dataset df_train and evaluate its performance on the validation dataset df_val using the area under the ROC curve (AUC), which is a measure of model performance for binary classifiers at different classification thresholds:
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    # Train a Random Forest classifier
    rf_classifier = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
    model = rf_classifier.fit(df_train)
    # Model predictions on the validation dataset
    preds = model.transform(df_val)
    # Evaluate model performance
    evaluator = BinaryClassificationEvaluator()
    auc = evaluator.evaluate(preds, {evaluator.metricName: "areaUnderROC"})
    print('Validation AUC: {}'.format(auc))
    Validation AUC: 0.8909629419656796

A validation AUC of 0.89 indicates strong model performance for the classifier. Because the data transformation and model training phases are highly iterative in nature, in order to improve the model performance, we can experiment with different data transformation steps, additional features, and other classification models. After we achieve a satisfactory model performance, we can evaluate the model predictions on the unseen test dataset, adult.test.

Evaluating the ML model on the test dataset

In the data transformation and ML model training sections, we have developed a reusable pipeline that we can use to evaluate the model predictions on the unseen test dataset.

  1. Create a new DataBrew project and load the raw test dataset (adult.test.csv) from Amazon S3, as we did in the data preparation section.
  2. Import the recipe we created earlier with the 20 data transformation steps to apply them on the adult.test dataset.



We can observe that all the columns have been transformed successfully, apart from the label column, which contains null values. This is because the adult.test dataset contains messy data in the target column, namely an extra punctuation mark at the end of the classes <=50k and >50k. To correct this, we can remove the last step of the recipe.

  1. Delete the column target.
  2. Edit the prior step in creating a categorical map to account for the extra punctuation mark.
  3. Delete the original target column to avoid redundancy.
  4. Create and run the recipe job to transform and store the over 16,000 rows in the adult.test dataset under s3://<YOUR-S3-BUCKET-NAME>/transformed-data/.

This job should take approximately 1 minute to complete.

When the train and test datasets don’t have any variation in the types of categories, we can create and run a recipe job directly from the DataBrew console, without having to create a separate project.

  1. When the data transformation job on the adult.test dataset is complete, load the transformed dataset into a PySpark dataframe to evaluate the performance of the binary classification model:
    # Load the transformed test dataset
    test_dataset = spark.read.csv(path='s3://<YOUR-S3-BUCKET-NAME>/transformed-data/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>_part00000.csv', header=True, inferSchema=True)
    
    print('The transformed test dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=test_dataset.count(), n_cols=len(test_dataset.columns)))

The transformed test dataset has 16281 rows and 13 columns

# Run the test dataset through the same feature vector pipeline
test_dataset_pipeline = pipeline.fit(test_dataset).transform(test_dataset)

# Select the feature vector and label column
test_dataset_pipeline = test_dataset_pipeline.select('features', 'label')

# Model predictions on the test dataset
preds_test = model.transform(test_dataset_pipeline)

# Evaluate model performance
evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(preds_test, {evaluator.metricName: "areaUnderROC"})
print('Test AUC: {}'.format(auc))
Test AUC: 0.8947235975486465

The model performance with an AUC of 0.89 on the unseen test dataset is about the same as the model performance on the validation set, which demonstrates strong model performance on the unseen test dataset as well.

Summary

In this post, we showed you how to use DataBrew and Amazon EMR to streamline and speed up the data preparation and feature engineering stages of the ML lifecycle. We explored a binary classification problem, but the wide selection of DataBrew pre-built transformations and PySpark ML libraries make this approach extendable to numerous ML use cases.

Get started today! Explore your use case with the services mentioned in this post and many others on the AWS Management Console


About the Authors

Kartik Kannapur is a Data Scientist with AWS Professional Services. He holds a Master’s degree in Applied Mathematics and Statistics from Stony Brook University and focuses on using machine learning to solve customer business problems.

 

 

 

Prithiviraj Jothikumar, PhD, is a Data Scientist with AWS Professional Services, where he helps customers build solutions using machine learning. He enjoys watching movies and sports and spending time to meditate.

 

 

 

Bala Krishnamoorthy is a Data Scientist with AWS Professional Services, where he helps customers solve problems and run machine learning workloads on AWS. He has worked with customers across diverse industries, including software, finance, and healthcare. In his free time, he enjoys spending time outdoors, running with his dog, beating his family and friends at board games and keeping up with the stock market.

Accessing and visualizing external tables in an Apache Hive metastore with Amazon Athena and Amazon QuickSight

Post Syndicated from James Sun original https://aws.amazon.com/blogs/big-data/accessing-and-visualizing-external-tables-in-an-apache-hive-metastore-with-amazon-athena-and-amazon-quicksight/

Many organizations have an Apache Hive metastore that stores the schemas for their data lake. You can use Amazon Athena due to its serverless nature; Athena makes it easy for anyone with SQL skills to quickly analyze large-scale datasets. You may also want to reliably query the rich datasets in the lake, with their schemas hosted in an external Hive metastore. In response to customers’ requests, AWS announced Athena’s support for Hive metastore in early 2020. This extends the ability in Athena to query external data stores, with a combined benefit of better performance and lower cost.

In this post, we provide an AWS CloudFormation template that configures a remote Hive metastore based on Amazon Relational Database Service (Amazon RDS) and MySQL with Amazon EMR located in a private subnet to perform ETL tasks. We then demonstrate how you can use a Spark step to pull COVID-19 datasets from a public repository and transform the data into a performant Parquet columnar storage format. We also walk through the steps to query the data with Athena and visualize it with Amazon QuickSight. QuickSight is a fully managed data visualization service; it lets the you easily create and publish interactive dashboards by analyzing data from various data sources, including Athena.

Solution walkthrough

The following diagram shows the architecture for our solution.


As shown in the preceding architecture, we have an Availability Zone within a VPC in an AWS Region. The Availability Zone hosts subnets that are either public or private. A multi-master EMR cluster that has Hive software components installed is launched in a private subnet with egress internet traffic through a NAT gateway to download data from public sites for analysis. The multi-master feature also ensures that the primary nodes are highly available. The Hive metastore is backed by a remote RDS for MySQL instance located in the same private subnet.

We also have an Amazon Simple Storage Service (Amazon S3)-based data lake. A Spark step in Amazon EMR retrieves the data in CSV format, saves it in the provisioned S3 bucket, and transforms the data into Parquet format. The Spark step creates an external Hive table referencing the Parquet data and is ready for Athena to query.

With Athena, you can create a data source connector based on an AWS Lambda function to access the Hive metastore hosted on the EMR cluster by using the Apache Thrift interface.

The connector is called a catalog, which when invoked in a SQL statement with Athena, invokes the Lambda function. The function exits if the connector is not active for 15 minutes. For queries that run longer than 15 minutes, it’s recommended to let the query complete before retrieving the query results in an Amazon S3 location you can specify.

In Athena, you can compose a SQL statement against the Hive tables with predicates to further limit the size of the query result for faster visualization by QuickSight.

Deploying the resources with AWS CloudFormation

To demonstrate our solution, we provide a CloudFormation template that you can download to easily deploy the necessary AWS resources. The template creates the following resources to simulate the environment:

  • VPC and subnets – A VPC with one public and one private subnets. A NAT gateway is also created to allow outbound internet access from the EMR cluster to download public COVID-19 datasets from the Johns Hopkins GitHub repo.
  • EMR cluster – A multi-master EMR cluster with Hive, running on three primary nodes (m5.xlarge) and two core nodes (m5.xlarge), is launched to support the thrift connection required by the Athena Lambda connectors.
  • Amazon RDS for MySQL database – An RDS for MySQL primary instance is launched in the same subnet as the EMR cluster. The RDS instance serves as the Hive metastore backend data store.
  • S3 bucket – An S3 bucket stores files in Parquet format by Amazon EMR and is accessed later by Athena.
  • AWS IAM users – Two AWS Identity and Access Management (IAM) users belonging to different user groups. The first user, the data engineer, has permissions to access the Lambda-based Athena data source connector. The other user, the salesperson, does not have permissions to access the connector.

To get started, you need to have an AWS account. If you don’t have one, go to aws.amazon.com to sign up for one. Then complete the following steps:

  1. Sign in to the AWS Management Console as an IAM power user, preferably an admin user.
  2. Choose Launch Stack to launch the CloudFormation template:

This template has been tested in the US East (N. Virginia) Region.

  1. Choose Next.

You’re prompted to enter a few launch parameters.

  1. For Stack name, enter a name for the stack (for example, athena-hms).
  2. For Hive Metastore Host Number, choose the Amazon EMR primary node to connect to (or use the default value).
  3. Continue to choose Next and leave other parameters at their default.
  4. On the review page, select the three check boxes to confirm that AWS CloudFormation might create resources.
  5. Choose Create stack.

The stack takes 15–20 minutes to complete.

  1. On the Outputs tab of the stack details, save the key-value pairs to use later.

When the EMR cluster is provisioned, it uses a bootstrap action to install the necessary Python libraries. It runs an Amazon EMR Spark step (a PySpark script) that downloads three COVID-19 datasets for confirmed, recovered, and death cases from the John Hopkins GitHub repo in CSV format and stores them in the csv subfolder of the S3 bucket created by the CloudFormation stack. Lastly, the final transformed data is converted to Parquet format and external Hive tables are created referencing the Parquet data located in the parquet subfolder of the S3 bucket.

The following are the source codes for the bootstrap and Spark step actions for your reference:

To validate the data, on the Amazon S3 console, choose the bucket name from the CloudFormation template outputs. You should see a covid_data folder in the bucket. The folder contains the two subfolders, csv and parquet, which store the raw CSV and transformed Parquet data, respectively.

Querying the data in Athena

The CloudFormation template creates two users belonging to two different AWS IAM groups. The de_user_us-east-1_athena-hms user is a data engineer with permissions to access the Lambda function to communicate with the Hive metastore using the Athena data source connector. They belong to the group athena-hms-DataEngineerGroup-xxxxxxx. The sales_user_us-east-1_athena-hms user is a salesperson with no access to the connector. They belong to the group athena-hms-SalesGroup-xxxxx. 

To query the data, first retrieve your secret values in AWS Secrets Manager:

  1. On the Secrets Manager console, choose Secrets.
  2. Choose DataEngineerUserCreds.

  1. Choose Retrieve secret value.

 

  1. Save the username and password.

 

  1. Repeat these steps for SalesUserCreds. 

With the data in place, we can now use Athena to query it.

Accessing data as the data engineer

To query the data as the de_user_us-east-1_athena-hms user, complete the following steps:

  1. Sign in to the Athena console as the de_user_us-east-1_athena-hms user with the credentials retrieved from Secrets Manager.

After logging in, we need to create a data source for Hive metastore.

  1. On the navigation bar, choose Data sources.
  2. Choose Connect data source.

  1. For Choose where you data is located, select Query data in Amazon S3.
  2. For Choose a metadata catalog, select Apache Hive metastore.
  3. Chose Next.

  1. For Lambda function, choose the function the CloudFormation template created with the key HiveMetastoreFunctionName.
  2. For Catalog name, enter a name for the Athena catalog (for example, demo_hive_metastore).
  3. Choose Connect.

You should now have a catalog named demo_hive_metastore with the catalog type Hive metastore.

  1. On the navigation bar, choose Query editor.
  2. Enter the following SQL statement:
    SELECT *
    FROM demo_hive_metastore.default.covid_confirmed_cases
    WHERE country = 'US'
            OR country = 'Italy'
            OR country = 'India'
            OR country = 'Brazil'
            OR country = 'Spain'
            OR country = 'United Kingdom'

This SQL statement selects all the columns in the covid_confirmed_cases table with predicates to only include a few countries of interest. We use a table name with the pattern <catalog name>.<hive database name>.<hive table name in the database>, which for this post translates to demo_hive_metastore.default.covid_confirmed_cases.

  1. Choose Run query.

The following screenshot shows your query results.

Make sure you completely sign out of the console before moving on to the next steps.

Accessing data as the salesperson

Sign in to the console as sales_user_us-east-1_athena-hms user. Because the salesperson user doesn’t have the appropriate IAM policies to access the Hive metastore connection, you can’t see the tables.

 

Permissions policies

The data engineer has additional policies attached to their IAM group in addition to the managed AmazonAthenaFullAccess policy: the <stack-name>-DataBucketReadAccessPolicy-xxxxx and <stack-name>-HiveMetastoreDataSourceAccessPolicy-xxxxx policies created by the CloudFormation template. Therefore, the data engineer can view the tables, but the salesperson can’t.

These policies are available on the IAM console, on the Permissions tab for the group <stack-name>-DataEngineerGroup-xxxxx.

The following sample JSON code is the <stack-name>-DataBucketReadWriteAccessPolicy-xxxxx policy to allow access to the provisioned S3 bucket:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:Get*",
                "s3:List*",
                "s3:Put*"
            ],
            "Resource": [
                "arn:aws:s3:::<provisioned bucket name>",
                "arn:aws:s3:::<provisioned bucket name>/",
                "arn:aws:s3:::<provisioned bucket name>/*"
            ],
            "Effect": "Allow"
        }
    ]
}

The following sample JSON code is the <stack-name>-HiveMetastoreDataSourceAccessPolicy-xxxxx policy to allow access to the Lambda Hive metastore function:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "lambda:GetFunction",
                "lambda:GetLayerVersion",
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:us-east-1:<account id>:function:athena-hms-HiveMetastoreFunction"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:PutObject",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload"
            ],
            "Resource": [
               "arn:aws:s3:::<provisioned bucket name>/hms_spill"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "lambda:ListFunctions"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

Next, we walk through using QuickSight to visualize the results. Make sure you completely sign out of the console as the salesperson user before proceeding to the next steps. 

Signing up for QuickSight

You can skip this section if you have already signed up for QuickSight previously.

  1. Sign in to the console as the IAM power user who deployed the CloudFormation template or any user with enough IAM privileges to set up QuickSight.
  2. On the QuickSight console, choose Sign up for QuickSight.
  3. Select either the Standard or Enterprise edition for QuickSight.
  4. Choose Continue.

  1. For QuickSight account name, enter your AWS account ID.
  2. For Notification email address, enter your email.
  3. Select Amazon S3.

  1. Select the provisioned S3 bucket to grant QuickSight permission to access.
  2. Choose Finish.

Your QuickSight account should now be set up.

Attaching the Hive metastore access policy

Before you can use QuickSight, you have to attach the Hive metastore access policy to the QuickSight service role.

  1. On the IAM console, search for the service role aws-quicksight-service-role-v0.
  2. Choose the role.

  1. Search for the <stack-name>-HiveMetastoreDataSourceAccessPolicy-xxxxx policy.
  2. Select the policy and attach it to the QuickSight service role.

Creating your data source and performing data conversions

Before we can create visualizations, we need to set up our data source.

  1. Download the SQL script covid-19.sql.
  2. On the QuickSight console, choose Datasets in the navigation pane.

  1. Choose New dataset.

  1. Choose Athena.

  1. In the pop-up window, for Data source name, enter demo_hive_metastore.
  2. Choose Validate.
  3. When the connection is validated, choose Create data source.

  1. In the next window, choose Use custom SQL.

  1. Enter the content of the covid-19.sql script in the query window.
  2. Choose Confirm query.

  1. Leave Import to SPICE for quicker analytics
  2. Choose Visualize.

Now we perform a few data type conversions before visualizing the data.

  1. Choose the Edit icon next to Data set on the menu bar.

  1. Choose the … icon.
  2. Choose Edit.

  1. Expand date and choose Change data type.
  2. Choose Date.

  1. Enter yyyy-MM-dd to convert the date format.
  2. Choose Update.

Now we create a coordinate using the latitude and longitude values.r

  1. Expand lat and choose Ad to coordinates.

  1. Leave Create new geospatial coordinates
  2. Chose Add.

  1. In the pop-up window, for Name your coordinates, enter coordinate.
  2. For Field to use for longitude, choose lon.
  3. Choose Create coordinates.

  1. Choose Save and visualize on the menu bar.

Creating visualizations in QuickSight

Now we can visualize our data. For this post, we create a map visualization.

  1. For Visual types, choose the map

  1. For Geospatial, drag coordinates.
  2. For Size, drag confirmed.
  3. For Color, drag country.

This world map shows the accumulated confirmed cases for selected countries over time; you need to use a filter to look at confirmed cases on a specific date.

  1. In the navigation pane, choose Filter.
  2. Choose the + icon.
  3. For Filter type, choose Time range.
  4. Choose start and end dates, such as 2020-09-10 00:00 and 2020-09-11 00:00, respectively.
  5. Choose Apply.

This plots the confirmed cases on September 10, 2020, for these countries.

Similarly, you can choose other visual types, such as a line chart, and generate the mortality rate for selected countries over time.

Using highly available primary nodes of the Amazon EMR cluster

The EMR cluster has a multi-master configuration with three primary nodes running to meet high availability requirements. At any time, the Lambda function communicates with one of these three EMR primary nodes. In the rare event that this node goes down, you can quickly re-establish the Athena data source connector to the external Hive metastore by failing over to another active primary node.

To perform this failover, complete the following steps:

  1. On the AWS CloudFormation console, choose Stacks.
  2. Choose athena-hms.
  3. Choose update.
  4. Choose Use current update.
  5. Choose Next.
  6. For Hive Metastore Host Number, choose a host other than the current one you’re using.

  1. Choose Next.
  2. Acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Update stack.

In less than a minute, you should be able to access the Hive metastore and continue to query on the Athena console.

Cleaning up

You may want to clean up the demo environment when you’re done. To do so, on the AWS CloudFormation console, select the template and choose Delete.

This action also deletes the S3 bucket and any data in it. If you want to retain the data for future use, you should make a copy of the bucket before you delete it.

Summary

In this post, we walked through a solution using Athena to query external Hive tables with public COVID-19 datasets hosted in an S3 bucket and visualizing the data with QuickSight. We provided a CloudFormation template to automate the deployment of necessary AWS services for the demo. We encourage you to use these managed and scalable services for your specific use cases in production.


About the Authors

James Sun is a Senior Solutions Architect with Amazon Web Services. James has over 15 years of experience in information technology. Prior to AWS, he held several senior technical positions at MapR, HP, NetApp, Yahoo, and EMC. He holds a PhD from Stanford University.

 

 

Chinmayi Narasimhadevara is a Solutions Architect with Amazon Web Services. Chinmayi has over 15 years of experience in information technology and has worked with organizations ranging from large enterprises to mid-sized startups. She helps AWS customers leverage the correct mix of AWS services to achieve success for their business goals.

 

 

Gagan Brahmi is a Solutions Architect focused on Big Data & Analytics at Amazon Web Services. Gagan has over 15 years of experience in information technology. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Orchestrating analytics jobs by running Amazon EMR Notebooks programmatically

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-by-running-amazon-emr-notebooks-programmatically/

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS in a cost-effective manner. Amazon EMR Notebooks is a managed environment based on Jupyter Notebook that allows data scientists, analysts, and developers to prepare and visualize data, collaborate with peers, build applications, and perform interactive analysis using EMR clusters.

EMR notebook APIs are available on Amazon EMR release version 5.18.0 or later and can be used to run EMR notebooks via a script or command line. The ability to start, stop, list, and describe EMR notebook runs without the Amazon EMR console enables you to programmatically control running an EMR notebook. Using a parameterized notebook cell allows you to pass different parameter values to a notebook without having to create a copy of the notebook for each new set of parameter values. With this feature, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions or Apache Airflow to build pipelines. If you want to use EMR notebooks in a non-interactive manner, this enables you to run ETL workloads, especially in production.

In this post, we show how to orchestrate analytics jobs by running EMR Notebooks programmatically with the following two use cases:

For our data source, we use the open-source, real-time COVID-19 US daily case reports provided by Johns Hopkins University CSSE in the following GitHub repo.

Prerequisites

Before getting started, you must have the following prerequisites:

Record the notebook ID (for example, <e-*************************>); you use this later for our examples later. Organize the notebook files in the Jupyter UI as follows:

  • /demo_pyspark.ipynb
  • /experiment/trailing_N_day.ipynb

See Creating a Notebook for more information on how to create an EMR notebook.

Use case 1: Scheduling an EMR notebook to run via crontab and the AWS CLI

We use demo_pyspark.ipynb as the input notebook file, as mentioned in the prerequisites. In this use case, we use the AWS CLI to call the EMR Notebooks Execution API to run a notebook using some parameters that we pass in. We then download the notebook output and visualize it using the local Jupyter server.

First, we use the AWS CLI to run an example notebook using the EMR Notebooks Execution API.

demo_pyspark.ipynb is a Python script. The following parameters are defined in the first cell:

  • DATE – The date used when the notebook job is started.
  • TOP_K – The top k US states with confirmed COVID-19 cases. We use this to plot Graph
  • US_STATES – The names of the specific US states being checked for the fatality rates of COVID-19 patients. We use this plot Graph b.

Running this notebook plots two graphs:

  • Graph a – Visualizes the top k US states with most COVID-19 cases on a given date
  • Graph b – Visualizes the fatality rates among specific US states on a given date

The parameters in the first cell can be passed to the EMR Notebooks StartNotebookExecution API, which you can call via the AWS CLI or SDK. The following code is an example of the EMR notebook first cell, containing parameters with corresponding values in JSON format. It means the notebook uses the date 10-13-2020. For Graph a, we visualize the top five US states with confirmed COVID-19 cases on October 13, 2020. For Graph b, we visualize the fatality rates of COVID-19 patients in Alabama, California, and Arizona on October 13, 2020. See the following code:

{"DATE": "10-13-2020",
 "TOP_K": 5,
"US_STATES": ["Alabama", "California", "Arizona"]}

For this example, the parameters can be any of the Python Data Types.

Run the notebook using the following new set of parameters:

{"DATE": "10-15-2020",
 "TOP_K": 6,
"US_STATES": ["Wisconsin", "Texas", "Nevada"]}

Running an EMR notebook with the AWS CLI

Run the following command (replace <e-*************************> with the ID of the EMR notebook and <j-*************> with the EMR cluster ID as mentioned in the prerequisites):

% aws emr --region us-west-2 start-notebook-execution \
--editor-id <e-*************************> \
--notebook-params '{"DATE":"10-15-2020", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*************>"}' \
--service-role EMR_Notebooks_DefaultRole

The start-notebook-execution command returns an output similar to the following JSON document:

{
 "NotebookExecutionId": "ex-*****************************"
}

Record the value of NotebookExecutionId; you use in the next step.

Running the describe-notebook-execution command

Run the following command (replace <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws emr --region us-west-2 describe-notebook-execution \
--notebook-execution-id <ex-*****************************>

The describe-notebook-execution command returns an output similar to the following JSON document:

{
  "NotebookExecution": {
    "NotebookExecutionId": "ex-*****************************",
    "EditorId": "e-*************************",
    "ExecutionEngine": {
      "Id": "<j-*************>",
      "Type": "EMR",
      "MasterInstanceSecurityGroupId": "sg-********"
    },
    "NotebookExecutionName": "demo",
    "NotebookParams": "{\"DATE\":\"10-15-2020\", \"TOP_K\": 6, \"US_STATES\": [\"Wisconsin\", \"Texas\", \"Nevada\"]}",
    "Status": "FINISHED",
    "StartTime": "2020-10-18T19:46:01.125000-07:00",
    "EndTime": "2020-10-18T19:47:24.014000-07:00",
    "Arn": "arn:aws:elasticmapreduce:us-west-2:123456789012:notebook-execution/ex-*****************************",
    "OutputNotebookURI": "s3://<notebook_bucket_location>/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb",
    "LastStateChangeReason": "Execution is finished for cluster j-*************.",
    "NotebookInstanceSecurityGroupId": "sg-********",
    "Tags": []
  }
}

You can pass different parameter values to the same notebook without having to create a copy of the notebook for each new set of parameter values or log in to the Jupyter Notebooks UI via the Amazon EMR console.

Downloading the output file and visualizing the output with a local Jupyter server

EMR notebooks use Papermill to run the notebook. When it runs, a new notebook file is created with input parameters so as not to overwrite the existing file. The notebook is then started, and the output notebook can be found in s3://<Notebook bucket location>/<editor id>/executions/<Execution id>/<input file name>.

We run the following s3 cp command to download the EMR notebook output file to a local directory (replace <notebook_bucket_location> with the S3 location specified for the notebook during creation, <e-*************************> with the EMR Notebook ID, and <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws s3 cp s3://<notebook_bucket_location>/<e-*************************>/executions/<ex-*****************************>/demo_pyspark.ipynb

In the same directory where we downloaded the EMR notebook output file, run the following command to start a local Jupyter server:

% jupyter lab

The URL http://localhost:8888/lab automatically opens in your web browser, as shown in the following screenshot.

Choose demo_pyspark.ipynb to view the output file. In the output, it plots two graphs. Graph a shows the top six US states with confirmed COVID-19 cases on a given date.

Graph b shows the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada on a given date.

Scheduling to run a notebook daily using crontab

We have completed running the EMR notebook using the AWS CLI. Now, we demonstrate how to schedule running a notebook daily using crontab. We use the same notebook input file with the same parameters as the previous example. On a daily basis, it generates Graph a with the top six US states with confirmed COVID-19 cases, and Graph b with the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada.

We start by creating a bash script named run_notebook_daily.sh. The script starts an EMR notebook, waits for the notebook to either finish running or fail, and copies the output file to the local directory ~/daily_reports/.

The following code is the content of run_notebook_daily.sh (replace <e-*************************> with the ID of EMR Notebook and <j-*************> with the EMR cluster ID):

# Generate a report for day before yesterday
day_before_yesterday=`date -v-2d +'%m-%d-%Y'`

# Start an execution
execution_id=`aws emr start-notebook-execution \
--editor-id <e-*****************************> \
--notebook-params '{"DATE":"'"$day_before_yesterday"'", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*********">}' \
--service-role EMR_Notebooks_DefaultRole | jq -r .'NotebookExecutionId'`

echo "Started an execution for the date $day_before_yesterday. Execution id: $execution_id"

# Poll for execution to finish
while
    execution_status=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.Status'`
    echo "Execution Status: $execution_status"
    
    if [ $execution_status == "FINISHED" ] || [ $execution_status == "FAILED" ]; then
        # Copy the output file to local directory
        output_file=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.OutputNotebookURI'`
        mkdir -p daily_reports
        aws s3 cp "$output_file" daily_reports/
       break
    fi
    sleep 15s
do true; done

Next, we add this script to a crontab to run our EMR notebook job daily at 9:00 AM:

% crontab
0 9 * * * bash /folder/path/run_notebook_daily.sh >/tmp/stdout.log 2>/tmp/stderr.log

This is a simple example of how to schedule running an EMR notebook with a crontab.

Use case 2: Chaining EMR notebooks with Step Functions triggered by CloudWatch Events

We use demo_pyspark.ipynb and trailing_N_day.ipynb as the input notebook files for this use case. We also provide a CloudFormation template as a general guide. Please review and customize it as needed. Be aware that some of the resources deployed by this stack incur costs when they remain in use.

The following diagram illustrates the resources that the CloudFormation template creates.

The template first creates a step function to run a chain of EMR notebooks, which takes care of the following tasks:

  • Runs notebook demo_pyspark.ipynb with given parameters and waits until it’s complete. It plots a graph of the top k US states with most COVID-19 cases yesterday.
  • Runs notebook input trailing_N_day.ipynb using the output from the first task. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input, and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

The template also creates a CloudWatch event that periodically triggers the step function according to the given schedule expression.

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It may prompt you to sign in as needed. You can view the template on the AWS CloudFormation console as required. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters:

Parameter Description Default Value
Stack name Enter a meaningful name for the stack, for example, emrRunnableNotebookDemo. None
ClusterId The unique ID of the EMR cluster that runs the notebook (j-*************). None
NotebookARelativePath The path and file name of the notebook input file A (demo_pyspark.ipynb), relative to the path specified for the EMR notebook. For more information, see Notebook execution CLI command samples. demo_pyspark.ipynb
NotebookBRelativePath The path and file name of the notebook input file B (trailing_N_day.ipynb), relative to the path specified for the EMR notebook. experiment/trailing_N_day.ipynb
NotebookId The unique ID of the EMR notebook to use for running the notebook (e-*****************************). None
ScheduleExpression How the notebook is scheduled to run. For more information, see Schedule Expressions for Rules. rate(1 day)
StorageLocation The Amazon S3 path where the EMR notebook is stored (s3://aws-emr-resources-************-us-west-2/notebooks/e-*************************). None
TopK The value of one of the parameters used to run notebook A. In this example, it checks the top k US states with confirmed COVID-19 cases and plots a graph for it. 20

 

  1. Enter the parameter values from the preceding table.
  2. Review the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation only takes a few minutes. When the stack is complete, on the Resources tab, you can find the resources created as shown in the following screenshot.

Checking the notebook output files

When a step function is complete, you can find the execution IDs in the step function output.

We run the following command to view the output files (replace <notebook_bucket_location> with the Amazon S3 location specified for the notebook during creation and <e-*************************> with the EMR notebook ID):

% aws s3 ls --recursive s3://<notebook_bucket_location>/<e-*************************>/executions/

The aws s3 ls --recursive command returns an output similar to the following:

2020-10-16 16:39:02     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:44:14     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 17:00:37      18600 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:49:08     267781 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 16:59:01     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:54:06     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb

Downloading and visualizing the results

Follow the same steps in the first use case to download and visualize the results.

The following screenshot is the graph plotted in the notebook input file A (demo_pyspark.ipynb ) output file. It shows the top 20 US states with confirmed COVID-19 cases yesterday.

The output of input file B (trailing_N_day.ipynb) plots the graph as shown in the following screenshot. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

This example step function is the orchestration for running two notebook input files: the second notebook uses the result from the first. It also monitors the first notebook until it is complete, and populates the Amazon S3 file location in the outputs. You can achieve more sophisticated orchestration by adding more states in the step function.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack, the EMR cluster, and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how you can schedule running an EMR notebook using crontab and the AWS CLI, and how to chain EMR notebooks with Step Functions triggered by CloudWatch events. The EMR Notebooks Execution API enables the parameterization for EMR notebooks. With this feature, you can also use orchestration services such as Apache Airflow to build ETL pipelines.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

 

 

 

Palaniappan Nagarajan is a Software Development Engineer at Amazon EMR working mainly on EMR Notebooks. In his spare time, he likes to hike, try out different cuisines, and scan the night sky with his telescope.

 

 

Shuang Li is a senior product manager for Amazon EMR at AWS. She holds a doctoral degree in Computer Science and Engineering from Ohio State University.

How the ZS COVID-19 Intelligence Engine helps Pharma & Med device manufacturers understand local healthcare needs & gaps at scale

Post Syndicated from Saunak Chandra original https://aws.amazon.com/blogs/big-data/how-the-zs-covid-19-intelligence-engine-helps-pharma-med-device-manufacturers-understand-local-healthcare-needs-gaps-at-scale/

This post is co-written by Parijat Sharma: Principal, Strategy & Transformation, Wenhao Xia: Manager, Data Science, Vineeth Sandadi: Manager, Business Consulting from ZS Associates, Inc, Arianna Tousi: Strategy, Insights and Planning Consultant from ZS, Gopi Vikranth: Associate Principal from ZS. In their own words, “We’re passionately committed to helping our clients and their customers thrive, working side by side to drive customer value and results”.

The COVID-19 trajectory across the US continues to remain unstable and heterogeneous. Although certain cities and counties were able to tame the adverse effects of the pandemic by applying stricter controls on social life, newer hotspots are emerging in different locations sporadically.

Organizations in healthcare, pharma, and biotech are looking to adapt to a rapidly evolving and diverse local market landscape, and restart parts of their operations that are significantly impacted, such as patient support functions, sales, and key account management. Real-time insights into the rapidly evolving COVID-19 situation and its impact on all key stakeholders in the healthcare supply chain, including patients, physicians, and health systems, is a key asset in helping companies adapt based on local market dynamics and remain resilient to future disruptions. However, several life-science companies don’t have these insights because they lack the infrastructure to integrate and manage the relevant datasets at scale and the analytical capabilities to mine the data for the relevant insights.

ZS came into this critical situation and built a data lake on AWS to address these challenges. The primary characteristics of this data lake is that it’s largely open source, which gives ZS a head start to meet the product launch SLA using AWS. This post describes how ZS developed the data lake and brought their proprietary machine learning (ML) models to run on AWS, providing intelligent insight on COVID-19.

What is the ZS COVID-19 Intelligence Engine?

The ZS COVID-19 Intelligence Engine was designed as a customizable capability that does the following:

  • Integrates diverse public and proprietary healthcare datasets in a scalable data warehouse that stores data in a secure and compliant manner
  • Provides advanced descriptive and predictive analytical modules to forecast COVID-19 evolution and its impact on key stakeholders and the treatment journey
  • Packages insights into intuitive preconfigured reports and dashboards for dissemination across an organization

AWS Cloud data and analytics infrastructure

In this section, we dive into the infrastructure components of the ZS COVID-19 Intelligence Engine. The objective was to quickly set up a data lake with an accompanying ingestion mechanism to allow rapid ingestion of public datasets, third-party data, and datasets from AWS Data Exchange.

The overall data processing solution is based on ZS’s REVO™ data management product, which uses Apache Spark on Amazon EMR. The Spark engine processes and transforms raw data into structured data that is ready for interactive analysis. The raw data comes in compressed text delimited format ranging from 100 MBs to 15 GB. After the data is cleansed and rules applied, the processed data is staged in Amazon Simple Storage Service (Amazon S3) buckets in Apache Parquet format. This data is selectively loaded into an Amazon Redshift cluster for fast interactive querying and repetitive analysis on subsets of data.

The Intelligence Engine also uses a powerful Amazon Elastic Compute Cloud (Amazon EC2) instance to run ML workloads, which predicts future COVID-19 caseloads at the county level. The prediction models run daily on a compute-optimized EC2 C5.24xlarge On-Demand Instance, allowing rapid turnaround of prediction results and saving overall cost for using On-Demand Instances.

ZS uses Amazon Redshift as the data warehouse in this architecture. Amazon Redshift is easy to launch and maintain and can quickly run analytical queries on large normalized datasets using standard ANSI SQL. After the raw data gets processed using ZS’s REVO™, the curated data is loaded into Amazon Redshift to run interactive analytical queries. The queries generate insights specific to local geography, county, and healthcare systems, and run on Amazon Redshift tables consisting of anonymized patient data. The Amazon Redshift cluster uses On-Demand Instances and is sized to accommodate 25 TB of data at the time of this product launch. Typical interactive queries include joining data across large tables, up to 1.5 billion rows in the main table.

The following diagram illustrates this architecture:

The ZS COVID-19 data lake has several benefits and applicable use cases:

  • Streamlined data procurement processes – Eliminates the need for multiple ZS teams to procure, ingest, and process the same datasets separately
  • Optimized common usage across clients and business questions – ZS uses this capability to publish common derivations of data that can then be utilized across different ZS teams and use cases to create a single version of truth
  • Cross-functional processes and requirements – Some analytics use cases require cross-functional data and are significantly hampered by the ability of a user to access various data sources in one place—a data lake enables this by design
  • Connected healthcare data – Due to developing common standards and integrating with MDM and ontologies, data from the public domain can be compliantly integrated with pharma manufacturer-specific data sources to enable seamless analytics on the data lake

Comprehensive healthcare data lake

At its core, the Intelligence Engine builds a scalable and integrated repository of diverse public and proprietary data sources. These datasets range in variety, volume, and velocity:

  • COVID-19 incidence – There are several COVID-specific datasets that the public has become accustomed to viewing over the past several months, such as Johns Hopkins incidence tracking and IHME predictive data, which describes how the disease has been progressing over time and even into the future. This data tends to be at either the state or county level and is often refreshed daily. The data lake solution contains the entire history for these datasets, which, taken together, spans into the hundreds of gigabytes in size. In addition to these sources, ZS’ proprietary predictive models add an additional element of accuracy and are customized with ZS-specific insights.
  • Government policies – Government policy data, which is mostly being used from AWS Data Exchange on behalf of the New York Times, explains the current state of government mandates and recommendations for varying degrees of lockdown or reopening as it pertains to the pandemic. This data is much smaller in volume, well under 1 GB total.
  • Insurance claims at patient level – Thanks to the partnership with Symphony Health, ZS have had the opportunity to analyze and expose patient claims data that can be attributed to the specific hospital account or healthcare provider for which that claim took place. The insurance claims data is the largest volume of data—close to 15 TB—contributing to the ZS COVID-19 Intelligence Engine. ZS’ data engineering team has wrangled these large datasets with the help of Amazon EMR for aggregating and processing metrics, which are then stored in Amazon Redshift in a transformed version that is much smaller and can be more easily understood than the original raw datasets.
  • HCP to site of care affiliations – Thanks to the partnership with Definitive Healthcare, ZS are in the process of integrating best-in-class physician-hospital and clinic affiliations from Definitive Healthcare with patient claims from Symphony to help assess available healthcare capacity and evolving approaches to care delivery and type of care being delivered by disease area.
  • Other Intelligence engine data sources
    • State testing rates
    • Mobility
    • Demographics and social determinants of health
    • Provider access and affinity for pharma commercial engagement (from ZS affinity/access monitor)
    • Automated data ingestors for a variety of pharma manufacturer-specific data sources including specialty pharmacy and hub transactions, sales force activity, customer digital engagement, and more

Predictive models for COVID-19 projections and healthcare demand-supply gaps at a local level

To drive decision-making at a local level, ZS required more granular projections of COVID-19 disease spread than what’s publicly available at a state or national level. Therefore, as part of the Intelligence Engine, the ZS data science team aimed to developed an ensemble model of COVID-19 projections at the county level to identify emerging local healthcare gaps along different phases of the treatment process.

Developing a locally predictive model has many challenges, and ZS believe that no single model can capture all the virtually infinite drivers and factors contributing to disease spread within a specific geographic area. Therefore, the ZS data science team behind the COVID-19 projections has implemented multiple projection models, each with their own set of input data sources, assumptions, and parameters. This allows to increase the accuracy of the projection while retaining a level of stability and interpretability of tge model. These models include:

  • Statistical curve fitting model – A disease progression curve using a Generalized Gaussian Cumulative Distribution Function, optimized to minimize prediction error of COVID-19 cases and deaths
  • SEIR model – Traditional epidemiological disease progression model (pathway of Susceptible – Exposed – Infectious – Recovered) combined with traditional ML on model parameters
  • Agent-based simulation – County-level simulation of individual interactions between people within the county

Obtaining a more granular view of future virus spread at a local level is critical in order to provide support for challenges in specific sites of care. Accurately projecting cases at the county level can be difficult for many reasons. Counties with low current case counts means that the model has little historical data to learn from (both in time since first infection and in magnitude of cases). Additionally, forecasts can be sensitive to many variables, and the current second wave of COVID-19 infections adds additional complications to tracking the spread of the virus.

To combat some of these difficulties, ZS implemented a two-phased approach to generate county-level projections. Counties with a long enough history of virus spread are projected independently using the three disease progression models we outlined, whereas counties with limited history are projected using a combination of state-level projections and social determinants of health factors that are predictive of disease spread (for example, age distribution in a certain county).

As the world around us continues to evolve and the COVID-19 situation with it, the ZS data science team is also working to adapt the model alongside the current situation. Currently, model adaptability and its self-learning ability are continuing to improve to better adapt to the onset of the second wave of the virus. Additional parameters and re-optimizations are happening daily as the situation develops.

Following image shows the Input data sources, modeling techniques and outputs from ZS COVID-19 projection models:

Analyzing and predicting local non-COVID-19 treatment gaps and their drivers

Several flexible analytical tools can be used to evaluate barriers along the disease treatment journey for non-COVID-19 diseases at the local geography level, their evolution over time with COVID-19, and their underlying drivers. These tools summarize local changes in and the underlying drivers of the following:

  • New patient diagnosis
  • Changes in treatment approaches and drugs used
  • Patient affordability and access to medications
  • Persistency and compliance to treatment
  • Healthcare demand, patients needing care and supply, provider capacity to offer care

Following image represents output from the Intelligence Engine illustrating local variations in Healthcare gaps:

Intuitive visualization capabilities

The solution has two intuitive visualization capabilities:

  • COVID-19 monitor – A public access dashboard with insights on historical and future predictions of trajectories of COVID-19 incidences and hospital capacity. These insights are available at the state level and further and allow you to drill into individual counties. The individual county-level view allows you to not only understand the severity of COVID-19 in that area, but also better understand how that county compares to other counties within the same state and observe what policies their local governments have set for the shutdown and reopening process.
  • Treatment finder: A second public access dashboard with near-real-time insights into individual hospital and physician group availability to treat patients for prominent non-COVID-19 diseases. This dashboard allows you to select a specific non-COVID-19 disease and identify the estimated number of COIVD-19-infected people in their geography with the disease, mortality rates, and the individual providers that are accepting patients with a specific disease and health insurance.

Following image represents Intelligence Engine screen with COVID-19 insights for a selected county:

Following image represents Intelligence engine screen that allows patients to find Hospitals / Physician offices that are open & accepting patients:

Conclusion

At its core, the ZS Intelligence Engine is a real-time planning tool. The rich set of AWS services and technologies make it possible to ingest data from various third-party sources—public and proprietary sources alike. AWS services used to build the architecture can run on open technologies. For example, building the the data lake would not have been possible  without Amazon EMR and Amazon EC2. ZS had already been using Apache spark-based EMR instances—the service behind the REVOTM tool—prior to COVID-19 hitting us. ZS can run its ML models cost-effectively by using EC2 On-Demand Instances. Finally, using Amazon Redshift as a data warehouse solution allows ZS to provide COVID-19 analytical insights efficiently and cost-effectively.

Since the project went live, ZS has catered this product to at least six customers in pharma, biotech, and medical device spaces. They are using this product in a variety of ways, including but not limited to:

  • Refining the forecast relating the COVID-19 trajectory to estimate demand for their products
  • Assessing the level of openness of healthcare facilities to understand where patients across therapy areas are being treated
  • Determining which patients and communities to support, because COVID-19 impacts attitudes and concerns regarding immunity and drug use, and greater unemployment means more reimbursement support requirements
  • Readying the education and engagement field force for a mix of in-person and virtual interactions
  • Preparing the supply chain to ensure continuity of care

To try out the analysis yourself, see ZS’s COVID-19 Intelligence Engine.


About the  Authors

Saunak is a Sr. Solutions Architect with AWS helping customers and partners build data warehouse and scalable data platform on AWS.

 

 

Parijat is the current lead of strategy and transformation at ZS. He focuses on mid to small clients that are ready for a transformational process to commercialize new products/portfolio, purchase/sell assets or expand into new markets.

 

 

Wenhao has over 10 years of experience in various data science and advanced analytics field. During his time at ZS, he has helped both to build and popularize data science capabilities across many organizations.

 

 

Vineeth works with Pharmaceutical & Biotech manufacturers on a  broad-spectrum of Commercial issues including Commercial Analytics, Organized provider Strategy & Resource Planning & Deployment.

 

 

Arianna is a Strategy, Insights and Planning Consultant in ZS’ High Tech practice. Arianna has extensive experience in working with clients across industries with go to market strategy and commercial effectiveness issues.

 

 

Gopi Vikranth is an Associate Principal in ZS’ High Tech Practice. He has extensive experience in helping clients across Retail, HiTech, Hospitality, Pharmaceutical & Insurance sectors leverage BigData & Analytics to drive Topline growth.

 

Optimizing Amazon EMR for resilience and cost with capacity-optimized Spot Instances

Post Syndicated from Ran Sheinberg original https://aws.amazon.com/blogs/big-data/optimizing-amazon-emr-for-resilience-and-cost-with-capacity-optimized-spot-instances/

Amazon EMR now supports the capacity-optimized allocation strategy for Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances for launching Spot Instances from the most available Spot Instance capacity pools by analyzing capacity metrics in real time. You can now specify up to 15 instance types in your EMR task instance fleet configuration. This provides Amazon EMR with more options in choosing the optimal pools to launch Spot Instances from in order to decrease chances of Spot interruptions, and increases the ability to relaunch capacity using other instance types in case Spot Instances are interrupted when Amazon EC2 needs the capacity back.

Background

Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open-source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto. With Amazon EMR, you can run petabyte-scale analysis at less than half of the cost of traditional on-premises solutions, and over three times as fast on Amazon EMR runtime for Apache Spark compared to running without the runtime. If you have existing on-premises deployments of open-source tools such as Apache Spark and Apache Hive, you can also run Amazon EMR clusters on AWS Outposts.

Spot Instances are spare Amazon EC2 compute capacity in the AWS Cloud available to you at savings of up to 90% compared to On-Demand Instance prices. The only difference between On-Demand Instances and Spot Instances is that Amazon EC2 can interrupt Spot Instances with 2 minutes of notification when Amazon EC2 needs the capacity back. Using Spot Instances in Amazon EMR is a common pattern that allows AWS customers to achieve significant cost savings.

The capacity-optimized allocation strategy in the Amazon EC2 fleet (also available for Amazon EC2 Auto Scaling and Spot Fleet) provisions Spot Instances from the most-available Spot Instance pools by analyzing capacity metrics. By offering the possibility of fewer interruptions, the capacity-optimized strategy can lower the overall cost of your workload. For more information about how AWS customers are benefiting from decreased Spot interruptions with the capacity-optimized allocation strategy, see Capacity-Optimized Spot Instance Allocation in Action at Mobileye and Skyscanner.

Amazon EMR uses the Amazon EC2 RunInstances API to provision compute capacity. We are enhancing the way Amazon EMR provisions EC2 instances to provide more flexibility and increased cluster resilience using EC2 Fleet (CreateFleet) in Instant mode, as a drop-in replacement for RunInstances.

Optimizing capacity for greater resilience

With this launch, you can configure Amazon EMR to use allocation strategies.

The capacity-optimized allocation strategy uses real-time capacity data to allocate instances from the Spot Instance pools with the optimal capacity for the number of instances that are launching. This allocation strategy is appropriate for workloads that have a higher cost of interruption. Examples include long-running jobs and multi-tenant persistent clusters running Apache Spark, Apache Hive, and Presto. This allocation strategy lets you specify up to 15 EC2 instance types on task instance fleets to diversify your Spot requests and get steep discounts. Previously, instance fleets allowed a maximum of five instance types. You can now diversify your Spot requests across these 15 pools within each Availability Zone and prioritize deploying into a deeper capacity pool to lower the chance of interruptions. With more instance type diversification, Amazon EMR has more capacity pools to allocate capacity from, and chooses the Spot Instances which are least likely to be interrupted.

For example, if you’re initially using EC2 memory-optimized r5.4xlarge instances (with 16 vCPUs and 128GB of RAM) for your EMR task nodes, you can configure the EMR task instance fleet with different instances types. First, explore different-sized instance types such as r5.2xlarge and r5.8xlarge. Second, add previous generation r4.4xlarge and other R4 instance sizes. After you’ve added different sizes within the same family, as well as previous generation instance types, you can add extra instance types with similar hardware characteristics and vCPU to memory ratio, such as the r5a instance family with AMD processors, r5d instance family with locally attached NVMe storage, and more.

The allocation strategy is also taken into account in case the cluster scales out after the initial provisioning phase—for example, if you manually resize the core or task fleets, or if you’re using managed scaling to automatically increase or decrease the number of instances or units in your cluster based on workload.

For more information about Spot Instance configuration if you’re using Amazon EMR to run Apache Spark workloads, see Best practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR. This blog emphasizes best practices that will help you build your Spark workloads with Amazon EMR to achieve deep cost savings.

Amazon EMR has made significant enhancements to improve elasticity and resilience, including graceful decommissioning of Spot Instances running Apache Spark and Apache Hadoop applications. Amazon EMR has customizations to open-source Apache Spark that make it more resilient to node loss—integrating with YARN’s decommissioning mechanism, extending Apache Spark’s blacklisting mechanism and actions on decommissioned nodes.

For example, when Spot Instances are interrupted in a running EMR cluster, stage failures don’t count towards the total number of failures that trigger a total job failure. For more information, see Spark enhancements for elasticity and resilience on Amazon EMR.

New configuration options and IAM policy requirements

To leverage the allocation strategies in your EMR clusters, you need to use a new AllocationStrategy parameter in your cluster configurations. Amazon EMR added support for an On-Demand allocation strategy: you can specify multiple On-Demand Instance types in your core or task instance fleets, and specify an allocation strategy of “lowest-price” to have Amazon EMR provision On-Demand Instances that have the lowest costs. This allows you to also be flexible with your selection of On-Demand instance types.

The following is an example snippet from an Amazon EMR JSON configuration file with the new capabilities:

{
    "Name": "Taskfleet",
    "InstanceFleetType": "TASK",
    "TargetSpotCapacity": 1,
    "TargetOnDemandCapacity": 1,
    "LaunchSpecifications": {
        "OnDemandSpecification": {
            "AllocationStrategy": "lowest-price"
        },
        "SpotSpecification": {
            "AllocationStrategy": "capacity-optimized",
            "TimeoutDurationMinutes": 120,
            "TimeoutAction": "TERMINATE_CLUSTER"
        }
    }
}

For more information about the API options, see InstanceFleetProvisioningSpecifications.

The following IAM policy shows the additional service role permissions required to create a cluster that uses the instance fleet allocation strategy option. If your clusters are using the default role EMR_DefaultRole (which has the default managed policy attached AmazonElasticMapReduceRole), the managed policy is already updated to include these new role permissions. If your clusters are using a different role or policy, make sure you add these new permissions to your policy. See the following IAM policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DeleteLaunchTemplate",
                "ec2:CreateLaunchTemplate",
                "ec2:DescribeLaunchTemplates",
                "ec2:CreateFleet"
            ],
            "Resource": "*"
        }

Launching an EMR cluster with capacity-optimized Spot Instances and a diversified task fleet

In this section, we look at how to create an EMR cluster that includes allocation strategy configurations and a diversified task fleet. The reason for specifying more instance types is in order to allow Amazon EMR to launch instances from the most optimal capacity pools, and be able to replenish the cluster’s target capacity in case Spot Instances are interrupted. Moreover, by using the capacity-optimized allocation strategy, Spot Instances will be launched from the most available capacity pools, effectively decreasing the chances of Spot interruptions.

The following AWS Command Line Interface (AWS CLI) command launches an EMR cluster in the default AWS Region configured in your AWS CLI configuration, with the master and core nodes running on On-Demand Instances, and a task fleet running Spot Instances.

Amazon EMR uses a wide selection of instance types in the task instance fleet and uses "AllocationStrategy": "CAPACITY_OPTIMIZED" to launch instances from the most available Spot capacity pools and decrease the chances of workload interruptions. By providing a WeightedCapacity for each instance type that is equal to the number of vCPU (or YARN vCores), you can specify a TargetSpotCapacity that defines the number of vCPUs (YARN vCores) in your task fleet and be flexible around the instance sizes, effectively providing more capacity pools to choose from. You should specify a subnet ID per Availability Zone in the AWS Region. While each EMR cluster runs in a single Availability Zone, specifying multiple Availability Zones allows you to architect your workload with increased fault-tolerance.

See the following AWS CLI command for an example of launching an Amazon EMR cluster that adheres to the recommendations in this blog post (uses Allocation Strategies and a diversified set of instance types in the task fleet):

aws emr create-cluster \
--use-default-roles --release-label emr-5.30.1 \
--ec2-attributes SubnetIds=['subnet-1234567890abcdefg','subnet-1234567890abcdefg','subnet-1234567890abcdefg'] \
--name 'EMRCluster-TaskFleet' \
--instance-fleets \InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{WeightedCapacity=1,InstanceType=m5.xlarge}'] \InstanceFleetType=CORE,TargetOnDemandCapacity=4,LaunchSpecifications={OnDemandSpecification='{AllocationStrategy=LOWEST_PRICE}'},InstanceTypeConfigs=['{WeightedCapacity=4,InstanceType=r4.xlarge},{WeightedCapacity=4,InstanceType=r5.xlarge}'] \InstanceFleetType=TASK,TargetSpotCapacity=64,LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=60,AllocationStrategy=CAPACITY_OPTIMIZED,TimeoutAction=TERMINATE_CLUSTER}'},InstanceTypeConfigs=['{WeightedCapacity=4,InstanceType=r5.xlarge},{WeightedCapacity=4,InstanceType=r4.xlarge},{WeightedCapacity=8,InstanceType=r5.2xlarge},{WeightedCapacity=8,InstanceType=r4.2xlarge},{WeightedCapacity=16,InstanceType=r5.4xlarge},{WeightedCapacity=16,InstanceType=r4.4xlarge},{WeightedCapacity=32,InstanceType=r5.8xlarge},{WeightedCapacity=32,InstanceType=r4.8xlarge},{WeightedCapacity=64,InstanceType=r5.16xlarge},{WeightedCapacity=64,InstanceType=r4.16xlarge},{WeightedCapacity=16,InstanceType=r5d.4xlarge},{WeightedCapacity=16,InstanceType=r5a.4xlarge}']

The following screenshot shows the result on the Amazon EMR console.

Conclusion

With this new functionality in Amazon EMR, you can increase the resilience of your organization’s data-processing workloads and optimize your costs by using Spot Instances. The capacity-optimized allocation strategy works to decrease the possibility of Spot interruptions in your cluster and allows you to specify up to 15 different instance types for your task fleet, enabling Amazon EMR to find the most available Spot capacity pools for your workload. With the Amazon EMR enhancements for increased resilience and the capacity-optimized allocation strategy for Spot Instances, you can achieve deep cost savings without compromising on availability.


About the authors

Ran Sheinberg is a principal solutions architect in the EC2 Spot team with Amazon Web Services. He works with AWS customers on cost optimizing their compute spend by utilizing Spot Instances across different types of workloads: stateless web applications, queue workers, containerized workloads, analytics, HPC and others.

 

 

 

 

 

 

Apply record level changes from relational databases to Amazon S3 data lake using Apache Hudi on Amazon EMR and AWS Database Migration Service

Post Syndicated from Ninad Phatak original https://aws.amazon.com/blogs/big-data/apply-record-level-changes-from-relational-databases-to-amazon-s3-data-lake-using-apache-hudi-on-amazon-emr-and-aws-database-migration-service/

Data lakes give organizations the ability to harness data from multiple sources in less time. Users across different roles are now empowered to collaborate and analyze data in different ways, leading to better, faster decision-making. Amazon Simple Storage Service (Amazon S3) is the highly performant object storage service for structured and unstructured data and the storage service of choice to build a data lake.

However, many use cases like performing change data capture (CDC) from an upstream relational database to an Amazon S3-based data lake require handling data at a record level. Performing an operation like inserting, updating, and deleting individual records from a dataset requires the processing engine to read all the objects (files), make the changes, and rewrite the entire dataset as new files. Furthermore, making the data available in the data lake in near-real time often leads to the data being fragmented over many small files, resulting in poor query performance. Apache Hudi is an open-source data management framework that enables you to manage data at the record level in Amazon S3 data lakes, thereby simplifying building CDC pipelines and making it efficient to do streaming data ingestion. Datasets managed by Hudi are stored in Amazon S3 using open storage formats, and integrations with Presto, Apache Hive, Apache Spark, and the AWS Glue Data Catalog give you near real-time access to updated data using familiar tools. Hudi is supported in Amazon EMR and is automatically installed when you choose Spark, Hive, or Presto when deploying your EMR cluster.

In this post, we show you how to build a CDC pipeline that captures the data from an Amazon Relational Database Service (Amazon RDS) for MySQL database using AWS Database Migration Service (AWS DMS) and applies those changes to a dataset in Amazon S3 using Apache Hudi on Amazon EMR. Apache Hudi includes the utility HoodieDeltaStreamer, which provides an easy way to ingest data from many sources, such as a distributed file system or Kafka. It manages checkpointing, rollback, and recovery so you don’t need to keep track of what data has been read and processed from the source, which makes it easy to consume change data. It also allows for lightweight SQL-based transformations on the data as it is being ingested. For more information, see Writing Hudi Tables. Support for AWS DMS with HoodieDeltaStreamer is provided with Apache Hudi version 0.5.2 and is available on Amazon EMR 5.30.x and 6.1.0.

Architecture overview

The following diagram illustrates the architecture we deploy to build our CDC pipeline.

In this architecture, we have a MySQL instance on Amazon RDS. AWS DMS pulls full and incremental data (using the CDC feature of AWS DMS) into an S3 bucket in Parquet format. HoodieDeltaStreamer on an EMR cluster is used to process the full and incremental data to create a Hudi dataset. As the data in the MySQL database gets updated, the AWS DMS task picks up the changes and takes them to the raw S3 bucket. The HoodieDeltastreamer job can be run on the EMR cluster at a certain frequency or in a continuous mode to apply these changes to the Hudi dataset in the Amazon S3 data lake. You can query this data with tools such as SparkSQL, Presto, Apache Hive running on the EMR cluster, and Amazon Athena.

Deploying the solution resources

We use AWS CloudFormation to deploy these components in your AWS account. Choose an AWS Region for deployment where the following services are available:

You need to meet the following prerequisites before deploying the CloudFormation template:

  • Have a VPC with at least two public subnets in your account.
  • Have a S3 bucket where you want to collect logs from the EMR cluster. This should be in the same AWS region where you spin up the CloudFormation stack.
  • Have an AWS Identity and Access Management (IAM) role dms-vpc-role. For instructions on creating one, see Security in AWS Database Migration Service.
  • If you’re deploying the stack in an account using the AWS Lake Formation permission model, validate the following settings:
    • The IAM user used to deploy the stack is added as a data lake administrator under Lake Formation or the IAM user used to deploy the stack has IAM privileges to create databases in the AWS Glue Data Catalog.
    • The Data Catalog settings under Lake Formation are configured to use only IAM access control for new databases and new tables in new databases. This makes sure that all access to the newly created databases and tables in the Data Catalog are controlled solely using IAM permissions.
  • IAMAllowedPrincipals is granted database creator privilege on the Lake Formation Database creators page.

If this privilege is not in place, grant it by choosing Grant and selecting the Create database permission.

These Lake Formation settings are required so that all permissions to the Data Catalog objects are controlled using IAM only.

Launching the CloudFormation stack

To launch the CloudFormation stack, complete the following steps:

  1. Choose Launch Stack:
  2. Provide the mandatory parameters in the Parameters section, including an S3 bucket to store the Amazon EMR logs and a CIDR IP range from where you want to access Amazon RDS for MySQL.
  3. Follow through the CloudFormation stack creation wizard, leaving rest of the default values unchanged.
  4. On the final page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create stack.
  6. When the stack creation is complete, record the details of the S3 bucket, EMR cluster, and Amazon RDS for MySQL details on the Outputs tab of the CloudFormation stack.

The CloudFormation template uses m5.xlarge and m5.2xlarge instances for the EMR cluster. If these instance types aren’t available in the Region or Availability Zone you have selected for deployment, the creation of the CloudFormation stack fails. If that happens, choose a Region or subnet where the instance type is available. For more information about working around this issue, see Instance Type Not Supported.

CloudFormation also creates and configures the AWS DMS endpoints and tasks with requisite connection attributes such as dataFormat, timestampColumnName, and parquetTimestampInMillisecond. For more information, see Extra connection attributes when using Amazon S3 as a target for AWS DMS.

The database instance deployed as part of the CloudFormation stack has already been created with the settings needed for AWS DMS to work in CDC mode on the database. These are:

  • binlog_format=ROW
  • binlog_checksum=NONE

Also, automatic backups are enabled on the RDS DB instance. This is a required attribute for AWS DMS to do CDC. For more information, see Using a MySQL-compatible database as a source for AWS DMS.

Running the end-to-end data flow

Now that the CloudFormation stack is deployed, we can run our data flow to get the full and incremental data from MySQL into a Hudi dataset in our data lake.

  1. As a best practice, retain your binlogs for at least 24 hours. Log in to your Amazon RDS for MySQL database using your SQL client and run the following command:
    call mysql.rds_set_configuration('binlog retention hours', 24)

  2. Create a table in the dev database:
    create table dev.retail_transactions(
    tran_id INT,
    tran_date DATE,
    store_id INT,
    store_city varchar(50),
    store_state char(2),
    item_code varchar(50),
    quantity INT,
    total FLOAT);

  3. When the table is created, insert some dummy data into the database:
    insert into dev.retail_transactions values(1,'2019-03-17',1,'CHICAGO','IL','XXXXXX',5,106.25);
    insert into dev.retail_transactions values(2,'2019-03-16',2,'NEW YORK','NY','XXXXXX',6,116.25);
    insert into dev.retail_transactions values(3,'2019-03-15',3,'SPRINGFIELD','IL','XXXXXX',7,126.25);
    insert into dev.retail_transactions values(4,'2019-03-17',4,'SAN FRANCISCO','CA','XXXXXX',8,136.25);
    insert into dev.retail_transactions values(5,'2019-03-11',1,'CHICAGO','IL','XXXXXX',9,146.25);
    insert into dev.retail_transactions values(6,'2019-03-18',1,'CHICAGO','IL','XXXXXX',10,156.25);
    insert into dev.retail_transactions values(7,'2019-03-14',2,'NEW YORK','NY','XXXXXX',11,166.25);
    insert into dev.retail_transactions values(8,'2019-03-11',1,'CHICAGO','IL','XXXXXX',12,176.25);
    insert into dev.retail_transactions values(9,'2019-03-10',4,'SAN FRANCISCO','CA','XXXXXX',13,186.25);
    insert into dev.retail_transactions values(10,'2019-03-13',1,'CHICAGO','IL','XXXXXX',14,196.25);
    insert into dev.retail_transactions values(11,'2019-03-14',5,'CHICAGO','IL','XXXXXX',15,106.25);
    insert into dev.retail_transactions values(12,'2019-03-15',6,'CHICAGO','IL','XXXXXX',16,116.25);
    insert into dev.retail_transactions values(13,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    insert into dev.retail_transactions values(14,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    

    We now use AWS DMS to start pushing this data to Amazon S3.

  4. On the AWS DMS console, run the task hudiblogload.

This task does a full load of the table to Amazon S3 and then starts writing incremental data.

If you’re prompted to test the AWS DMS endpoints while starting the AWS DMS task for the first time, you should do so. It’s generally a good practice to test the source and target endpoints before starting an AWS DMS task for the first time.

In a few minutes, the status of the task changes to Load complete, replication ongoing, which means that the full load is complete and the ongoing replication has started. You can go to the S3 bucket created by the stack and you should see a .parquet file under the dmsdata/dev/retail_transactions folder in your S3 bucket.

  1. On the Hardware tab of your EMR cluster, choose the master instance group and note the EC2 instance ID for the master instance.
  2. On the Systems Manager console, choose Session Manager.
  3. Choose Start Session to start a session with the master node of your cluster.

If you face challenges connecting to the master instance of the EMR cluster, see Troubleshooting Session Manager.

  1. Switch the user to Hadoop by running the following command:
    sudo su hadoop

In a real-life use case, the AWS DMS task starts writing incremental files to the same Amazon S3 location when the full load is complete. The way to distinguish full load vs. incremental load files is that the full load files have a name starting with LOAD, whereas CDC filenames have datetimestamps, as you see in a later step. From a processing perspective, we want to process the full load into the Hudi dataset and then start incremental data processing. To do this, we move the full load files to a different S3 folder under the same S3 bucket and process those before we start processing incremental files.

  1. Run the following command on the master node of the EMR cluster (replace <s3-bucket-name> with your actual bucket name):
    aws s3 mv s3://<s3-bucket-name>/dmsdata/dev/retail_transactions/ s3://<s3-bucket-name>/dmsdata/data-full/dev/retail_transactions/  --exclude "*" --include "LOAD*.parquet" --recursive

With the full table dump available in the data-full folder, we now use the HoodieDeltaStreamer utility on the EMR cluster to populate the Hudi dataset on Amazon S3.

  1. Run the following command to populate the Hudi dataset to the hudi folder in the same S3 bucket (replace <s3-bucket-name> with the name of the S3 bucket created by the CloudFormation stack):
    spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
      --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
      --master yarn --deploy-mode cluster \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
      --table-type COPY_ON_WRITE \
      --source-ordering-field dms_received_ts \
      --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-full.properties \
      --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
      --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \
      --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
        --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
      --enable-hive-sync
    

The preceding command runs a Spark job that runs the HoodieDeltaStreamer utility. For more information about the parameters used in this command, see Writing Hudi Tables.

When the Spark job is complete, you can navigate to the AWS Glue console and find a table called retail_transactions created under the hudiblogdb database. The input format for the table is org.apache.hudi.hadoop.HoodieParquetInputFormat.

Next, we query the data and look at the data in the retail_transactions table in the catalog.

  1. In the Systems Manager session established earlier, run the following command (make sure that you have completed all the prerequisites for the post, including adding IAMAllowedPrincipals as a database creator in Lake Formation):
    spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" \
    --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
    --jars /usr/lib/hudi/hudi-spark-bundle_2.11-0.5.2-incubating.jar,/usr/lib/spark/external/lib/spark-avro.jar
    

  2. Run the following query on the retail_transactions table:
    spark.sql("Select * from hudiblogdb.retail_transactions order by tran_id").show()

You should see the same data in the table as the MySQL database with a few columns added by the HoodieDeltaStreamer process.

We now run some DML statements on our MySQL database and take these changes through to the Hudi dataset.

  1. Run the following DML statements on the MySQL database:
    insert into dev.retail_transactions values(15,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    update dev.retail_transactions set store_city='SPRINGFIELD' where tran_id=12;
    delete from dev.retail_transactions where tran_id=2;

In a few minutes, you see a new .parquet file created under dmsdata/dev/retail_transactions folder in the S3 bucket.

  1. Run the following command on the EMR cluster to get the incremental data to the Hudi dataset (replace <s3-bucket-name> with the name of the S3 bucket created by the CloudFormation template):
    spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
      --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
      --master yarn --deploy-mode cluster \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
      --table-type COPY_ON_WRITE \
      --source-ordering-field dms_received_ts \
      --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-incremental.properties \
      --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
      --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \
      --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
        --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
      --enable-hive-sync \
    --checkpoint 0

The key difference between this command and the previous one is in the properties file that was used as an argument to the –-props and --checkpoint parameters. For the earlier command that performed the full load, we used dfs-source-retail-transactions-full.properties; for the incremental one, we used dfs-source-retail-transactions-incremental.properties. The differences between these two property files are:

  • The location of source data changes between full and incremental data in Amazon S3.
  • The SQL transformer query included a hard-coded Op field for the full load task, because an AWS DMS first-time full load doesn’t include the Op field for Parquet datasets. The Op field can have values of I, U, and D—for Insert, Update and Delete indicators.

We cover the details of the --checkpoint parameter in the Considerations when deploying to production section later in this post.

  1. When the job is complete, run the same query in spark-shell.

You should see these updates applied to the Hudi dataset.

You can use the Hudi CLI to administer Hudi datasets to view information about commits, the filesystem, statistics, and more.

  1. To do this, in the Systems Manager session, run the following command:
    /usr/lib/hudi/cli/bin/hudi-cli.sh

  2. Inside the Hudi-cli, run the following command (replace the <s3-bucket-name> with the S3 bucket created by the Cloud Formation stack):
    connect --path s3://<s3-bucket-name>/hudi/retail_transactions

  3. To inspect commits on your Hudi dataset, run the following command:
    commits show

You can also query incremental data from the Hudi dataset. This is particularly useful when you want to take incremental data for downstream processing like aggregations. Hudi provides multiple ways of pulling data incrementally which is documented here. An example of how to use this feature is available in the Hudi Quick Start Guide.

Considerations when deploying to production

The preceding setup showed an example of how to build a CDC pipeline from your relational database to your Amazon S3-based data lake. However, if you want to use this solution for production, you should consider the following:

  • To ensure high availability, you can set up the AWS DMS instance in a Multi-AZ configuration.
  • The CloudFormation stack deployed the required properties files needed by the deltastreamer utility into the S3 bucket at s3://<s3-bucket-name>/properties/. You may need to customize these based on your requirements. For more information, see Configurations. There are a few parameters that may need your attention:
    • deltastreamer.transformer.sql – This property exposes an extremely powerful feature of the deltastreamer utility: it enables you to transform data on the fly as it’s being ingested and persisted in the Hudi dataset. In this post, we have shown a basic transformation that casts the tran_date column to a string, but you can apply any transformation as part of this query.
    • parquet.small.file.limit – This field is in bytes and a critical storage configuration specifying how Hudi handles small files on Amazon S3. Small files can happen due to the number of records being processed in each insert per partition. Setting this value allows Hudi to continue to treat inserts in a particular partition as updates to the existing files, causing files that are up to the size of this small.file.limit to be rewritten and keep growing in size.
    • parquet.max.file.size – This is the max file size of a single Parquet in your Hudi dataset, after which a new file is created to store more data. For Amazon S3 storage and data querying needs, we can keep this around 256 MB–1 GB (256x1024x1024 = 268435456).
    • [Insert|Upsert|bulkinsert].shuffle.parallelism – In this post, we dealt with a small dataset of few records only. However, in real-life situations, you might want to bring in hundreds of millions of records in the first load, and then incremental CDC can potentially be in millions per day. There is a very important parameter to set when you want quite predictable control on the number of files in each of your Hudi dataset partitions. This is also needed to ensure you don’t hit an Apache Spark limit of 2 GB for data shuffle blocks when processing large amounts of data. For example, if you plan to load 200 GBs of data in first load and want to keep file sizes of approximately 256 MB, set the shuffle parallelism parameters for this dataset as 800 (200×1024/256). For more information, see Tuning Guide.
  • In the incremental load deltastreamer command, we used an additional parameter: --checkpoint 0. When deltastreamer writes a Hudi dataset, it persists checkpoint information in the .commit files under the .hoodie folder. It uses this information in subsequent runs and only reads that data from Amazon S3, which is created after this checkpoint time. In a production scenario, after you start the AWS DMS task, the task keeps writing incremental data to the target S3 folder as soon as the full load is complete. In the steps that we followed, we ran a command on the EMR cluster to manually move the full load files to another folder and process the data from there. When we did that, the timestamp associated with the S3 objects changes to the most current timestamp. If we run the incremental load without the checkpoint argument, deltastreamer doesn’t pick up any incremental data written to Amazon S3 before we manually moved the full load files. To make sure that all incremental data is processed by deltastreamer the first time, set the checkpoint to 0, which makes it process all incremental data in the folder. However, only use this parameter for the first incremental load and let deltastreamer use its own checkpointing methodology from that point onwards.
  • For this post, we ran the spark-submit command manually. However, in production, you can run it as a step on the EMR cluster.
  • You can either schedule the incremental data load command to run at a regular interval using a scheduling or orchestration tool, or run it in a continuous fashion at a certain frequency by passing additional parameters to the spark-submit command --min-sync-interval-seconds XX –continuous, where XX is the number of seconds between each run of the data pull. For example, if you want to run the processing every 5 minutes, replace XX with 300.

Cleaning up

When you are done exploring the solution, complete the following steps to clean up the resources deployed by CloudFormation:

  1. Empty the S3 bucket created by the CloudFormation stack
  2. Delete any Amazon EMR log files generated under s3://<EMR-Logs-S3-Bucket> /HudiBlogEMRLogs/.
  3. Stop the AWS DMS task Hudiblogload.
  4. Delete the CloudFormation stack.
  5. Delete any Amazon RDS for MySQL database snapshots retained after the CloudFormation template is deleted.

Conclusion

More and more data lakes are being built on Amazon S3, and these data lakes often need to be hydrated with change data from transactional systems. Handling deletes and upserts of data into the data lake using traditional methods involves a lot of heavy lifting. In this post, we saw how to easily build a solution with AWS DMS and HoodieDeltaStreamer on Amazon EMR. We also looked at how to perform lightweight record-level transformations when integrating data into the data lake, and how to use this data for downstream processes like aggregations. We also discussed the important settings and command line options that were used and how you could modify them to suit your requirements.


About the Authors

Ninad Phatak is a Senior Analytics Specialist Solutions Architect with Amazon Internet Services Private Limited. He specializes in data engineering and datawarehousing technologies and helps customers architect their analytics use cases and platforms on AWS.

 

 

 

Raghu Dubey is a Senior Analytics Specialist Solutions Architect with Amazon Internet Services Private Limited. He specializes in Big Data Analytics, Data warehousing and BI and helps customers build scalable data analytics platforms.

 

 

 

 

Automating EMR workloads using AWS Step Functions

Post Syndicated from Afsar Jahangir original https://aws.amazon.com/blogs/big-data/automating-emr-workloads-using-aws-step-functions/

Amazon EMR allows you to process vast amounts of data quickly and cost-effectively at scale. Using open-source tools such as Apache Spark, Apache Hive, and Presto, and coupled with the scalable storage of Amazon Simple Storage Service (Amazon S3), Amazon EMR gives analytical teams the engines and elasticity to run petabyte-scale analysis for a fraction of the cost of traditional on-premises clusters. Developers and analysts can use Jupyter-based Amazon EMR notebooks for iterative development, collaboration, and access to data stored across AWS data products.

What happens if you have Amazon EMR code that needs to run automatically on a regular basis? Maybe the job only runs when for certain events, like new data arriving in Amazon S3. Or maybe you want to run a job every Friday afternoon at 2:00 PM. What if there is a multiple step process?

To run Amazon EMR workloads on a schedule, you can automate everything with AWS Step Functions. This post walks through how to use Step Functions state machines and the callback pattern to automate EMR jobs. You can download the code examples from the GitHub repo.

Prerequisites

To follow along with this walkthrough, you must have the following:

Solution overview

For this use case, I want to run two applications on my EMR cluster. The start of the second application depends on the successful completion and output of the first. At a high level, I want to launch an EMR cluster automatically, run the code, and remove the cluster. Specifically, when the first program successfully completes, I want to run the second program.

At the conclusion of the second application, in some cases I may want to run both programs multiple times (with different dataset sizes, perhaps). I need a way to decide to run the process again with the same cluster. Whether the steps succeed or fail, at the conclusion, I always want to delete the CloudFormation stack that contains my EMR cluster to reduce cost. The following diagram illustrates this high-level overview of the pipeline operation.

Workflow details

I run two programs, and I need the first program to complete before running the second one. I optionally want to repeat those two programs with different datasets to get the final state of the data. To orchestrate the jobs, I can run through the same steps multiple times with the same active EMR cluster.

To facilitate automating the pipeline, I use an inner state machine to check the cluster status and submit EMR job steps. I then wrap that inner state machine in an outer state machine. The outer state machine starts the cluster and submits information to the inner state machine. It waits for all steps to complete, then deletes the EMR cluster.

The following flow chart illustrates the steps and checkpoints that make up the pipeline.

Deploying the pipeline state machines

To simplify pipeline deployment, I use AWS SAM, an open-source framework for building serverless applications. AWS SAM provides a single deployment configuration, extensions to CloudFormation templates, built-in best practices, and local debugging and testing. You can use AWS SAM with a suite of AWS tools for building serverless applications. For more information, see What Is the AWS Serverless Application Model (AWS SAM)?

Initiating the application

Navigate to the path where you want to download the files and initiate the AWS SAM application. I want to run the code from my local machine and have created the following location:

~/Documents/projects/blog/steppipeline/automation-ml-step-data-pipeline

From this directory, I initialize the application using sam init. This connects to the repository and downloads the files for creation of the ML pipeline. See the following code:

sam init -l https://github.com/aws-samples/automation-ml-step-data-pipeline.git

Creating the S3 bucket and moving dependencies

For this post, I orchestrate an existing process from the post Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR, which runs on Amazon EMR. The pipeline reads code artifacts from Amazon S3, where the EMR cluster has read permission. There are two programs: kmeansandey.py and kmeanswsssey.py.

First, create the bucket from the command line using the aws s3 mb command and upload the code. Your bucket name must be globally unique:

aws s3 mb s3://<your bucket name>

Move the artifacts to your bucket, replacing <your bucket name> with your bucket name:

aws s3 cp sample_ml_code/kmeansandey.py s3://<your bucket name>/testcode/kmeansandey.py
aws s3 cp sample_ml_code/kmeanswsssey.py s3://<your bucket name>/testcode/kmeanswsssey.py
aws s3 cp emr/bootstrapactions.sh s3://<your bucket name>/emr-bootstrap-scripts/bootstrapactions.sh
aws s3 cp emr/emr-cluster-config.json s3://<your bucket name>/emr-cluster-config.json
aws s3 cp emr/emr-cluster-sample.yaml s3://<your bucket name>/emr-cluster-sample.yaml

Deploying the application

Deploy the build artifacts to the AWS Cloud using the following code:

sam deploy --guided

AWS SAM prompts you for the parameters that you need to build and deploy the application. I have provided some default values where possible.

The final output of your deployment process should indicate that all stacks were built:

Successfully created/updated stack - step-pipeline in us-east-1

After deployment, you receive an email to confirm your subscription. Choose the confirmation link in the email to receive pipeline notifications.

Submitting a workload to your Step Functions state machine

To create a cluster and submit EMR jobs, the outer state machine needs a JSON payload. This contains the location of the programs in Amazon S3, the Amazon EMR CloudFormation template, and the parameter files used to launch the EMR cluster.

Creating an Amazon EC2 key pair

To use the same sample programs and EMR cluster template that you used to test your pipeline, you need to use an Amazon EC2 key pair for SSH credentials. When you create a cluster, you can specify the Amazon Elastic Compute Cloud (Amazon EC2) key pair to use for SSH connections to all cluster instances. The name of the keypair for this cluster is referenced in the emr-cluster-config.json file. See the following code:

<snip>
  {
    "ParameterKey": "Keyname",
    "ParameterValue": "emrcluster-launch"
  },
</snip>

To use the example as-is with the parameters unchanged, create an Amazon EC2 key pair on the AWS Management Console or AWS Command Line Interface (AWS CLI).

  1. On the Amazon EC2 console, under Network & Security, choose Key Pairs.
  2. On the Key Pairs page, choose Create Key Pair.
  3. For Key pair name, enter emrcluster-launch.
  4. Choose Create.
  5. When the console prompts you to save the private key file, save it in a safe place.

This is the only chance for you to save the private key file.

Inputting JSON for launching the pipeline

The simplest way for you to run the pipeline is to use the Start execution feature on the Step Functions console. The console gives you full functionality to initiate the function and submit a payload. In the example test_input.json, update the bucket values, security group, and subnet with the information for your account:

{  
    "ModelName": "PipelineTest_01",  
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "EMRCloudFormation": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-sample.yaml",  
    "EMRParameters": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-config.json",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "SecurityGroup": "<your security group>",  
    "SubNet": "<your subnet>",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"]
}

The payload includes the following information:

  • ModelName – A short descriptive identifier used to identify the transient EMR cluster created during this process. This name shows on the Amazon EMR console for easy identification.
  • ModelProgram – The Amazon S3 URL location of the program that runs when the model initiates on the EMR cluster (step 3).
  • PreProcessingProgram – The Amazon S3 URL location of the program that runs when preprocessing initiates on the EMR cluster (step 2).
  • EMRCloudFormation – The S3 bucket HTTPS location of the CloudFormation template for launching the transient EMR cluster.
  • EMRParameters – The Amazon S3 HTTPS location of the parameter file supporting the Amazon EMR CloudFormation template.
  • JobInput – The Amazon S3 URL location of the input data for the preprocessing program.
  • SecurityGroup – The security group with ingress and egress rules for the launched EMR cluster
  • SubNet – The subnet identifier where you place your EMR cluster.
  • ClusterSize – Denotes the number of EMR cluster nodes to run the job and can be changed based on the compute need. I use 4 nodes as the input value for the sample program.
  • ProcessingMode – This is an array of values. The pipeline runs steps 2 and 3 for each value in the array. The value is passed into the program unchanged and can be used to internally control how the program runs. For this use case, it runs a single time on the small dataset.

Opening the Step Functions Executions page

On the Step Functions console, choose MLStateMachine. This is the outer state machine. On the detail page for the outer state machine, choose Start execution.

Entering your payload

On the New execution page, enter the JSON for your pipeline based on the example test_input.json. Choose Start execution.

Reviewing the workflow as it runs

You can see the pipeline running in the visual workflow and review the event history on the detail page. The following diagram shows the state machine definition used:

Diving into the pipelines details

There are four processes that run in the outer state machine pipeline:

  1. Step 1 launches an EMR cluster using the CloudFormation template. The AWS Lambda function downloads the template and parameter file from the specified Amazon S3 location and initiates the stack build.
  2. When the EMR cluster is ready, step 2 initiates the first set of code against the newly created EMR cluster, passing in the remaining parameters to the inner state machine. It adds the stack id, EMR cluster id, and status to the payload. These values are obtained from the output of the CloudFormation stack. See the following code:
    "ModelName": "PipelineTest_01",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"],
    "StackId": "arn:aws:cloudformation:us-east-1:575444587305:stack/PipelineTest01-auto-emr-02142020041612/bc5fd7a0-4ee0-11ea-a395-0e4c53e0aefd",
    "Status": "CREATE_COMPLETE",
    "ClusterId": "j-MF6LWBLJZ88K"

The code contains the following information:

  • ModelName is used in the EMR cluster name to make it easier to identify in the console and AWS CLI output.
  • PreProcessingProgram in our use case points to the first code step (py). The code is passed through the first state machine and submitted to the second state machine and Amazon EMR.
  • JobInput, ClusterSize, ClusterId, StackId, and ProcessingMode are passthrough values that the program needs to run.

The step initiates the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx, which engages the inner state machine asynchronously to run a high-level process of checking the cluster, adding a step, checking to see if the step is complete, and exiting back to the outer state machine when complete.

  1. Next, the outer state machine runs the Model program code (step 3) by submitting it to the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx to engage the inner state machine for the second set of code (py). The process is the same as step 2 but the code it runs is from a different file and the output from the preprocessing step becomes the input for the step. See the following code:
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",

When the inner state machine is complete, it moves to a step that removes the first value from the ProcessingMode array. For this use case, there is only one value (TRAINING), which is removed, leaving the array empty. The next step in the state machine looks for remaining values; if there are none, it marks all steps as complete and moves to Delete EMR cluster.

  1. The final step in the outer state machine is to remove the EMR cluster. The Delete EMR cluster step passes the CloudFormation stack ID into lambda/delete_cfn_stack.py, initiating the deletion of the stack and cleaning up all the resources.

The output of the test programs is stored in Amazon S3 in two folders under the pipeline artifacts. The preprocessing folder contains data that is used to drive the output in the model folder. The following screenshot shows the folders in Amazon S3.

Conclusion

The Step Functions workflow in this post is a repeatable two-step pipeline. It starts an EMR cluster, runs a program that outputs data, and initiates a second program that depends on the previous job finishing. It then deletes all resources automatically.

You can adapt the workflow to respond to Amazon S3 events, a message received in a queue, a file checked into a code repository, or a schedule. Any event that can invoke Lambda can initiate the pipeline. For more information, see Invoking AWS Lambda functions.

You can download the example code from the GitHub repo and adapt it for your use case. Let me know in the comments what you built for your environment.


About the Authors

Mohammed “Afsar ” Jahangir Ali is a Senior Big Data Consultant with Amazon since January 2018. He is a data enthusiast helping customers shape their data lakes and analytic journeys on AWS.In his spare time, he enjoys taking pictures, listening to music, and spend time with family.

 

 

 

Wendy Neu has worked as a Data Architect with Amazon since January 2015. Prior to joining Amazon, she worked as a consultant in Cincinnati, OH helping customers integrate and manage their data from different unrelated data sources.