Tag Archives: Amazon EMR

Visualize data using Apache Spark running on Amazon EMR with Amazon QuickSight

Post Syndicated from Tom McMeekin original https://aws.amazon.com/blogs/big-data/visualize-data-using-apache-spark-running-on-amazon-emr-with-amazon-quicksight/

Organizations often need to process large volumes of data before serving to business stakeholders. In this blog, we will learn how to leverage Amazon EMR to process data using Apache Spark, the go-to platform for in-memory analytics of large data volume, and connect business intelligence (BI) tool Amazon QuickSight to serve data to end-users.

QuickSight is a fast, cloud-powered BI service that makes it easy to build visualizations, perform ad hoc analysis, and quickly get business insights from your data. With our cloud-based service, you can easily connect to your data, perform advanced analysis, and create stunning visualizations and rich dashboards that can be accessed from any browser or mobile device.

QuickSight supports connectors for big data analytics using Spark. With the SparkSQL connector in QuickSight, you can easily create interactive visualizations over large datasets using Amazon EMR. Amazon EMR provides a simple and cost-effective way to run highly distributed processing frameworks such as Spark.

In this post, we use the public data set, New York City Taxi and Limousine Commission (TLC) Trip Record Data, which contains data of trips taken by taxis and for-hire vehicles in New York City. We use an optimized Parquet version of the CSV public dataset available from the Registry of Open Data on AWS.

This post also explores how to use AWS Glue to create the Data Catalog by crawling the NYC taxi data in an Amazon Simple Storage Service (Amazon S3) bucket, making it immediately query able for analyzing. AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. To learn more about how to use AWS Glue to transform a dataset from CSV to Parquet, see Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight.

Prerequisites

The following steps assume that you have a VPC with public and private subnets, with NAT configured for private subnets and an available S3 bucket for Amazon EMR logging.

If you create the EMR cluster in a private subnet, you can use AWS Systems Manager Session Manager, a bastion host, or a VPN connection to access the EMR cluster. For this post, we use Session Manager to access our EMR cluster.

In QuickSight Enterprise edition, you can create connections to your VPCs from your QuickSight account. Each connection creates an elastic network interface in your VPC for QuickSight to send traffic to instances in your VPC. For more information, see Connecting to a VPC with Amazon QuickSight. If you haven’t already signed up for QuickSight, you can sign up before getting started. QuickSight offers a free trial so you can try out this solution at no cost.

The following AWS CloudFormation template offers single-click deployment.

We use the US East (N. Virginia) Region as the default; we highly recommended you launch the stack to optimize querying the public dataset in Amazon S3. You can change to any Region that supports Amazon EMR, AWS Glue, and QuickSight, but it may impact the time it takes to query the data.

If deploying into production, we recommend you secure communication by using the Configure SSL with a QuickSight supported authority step after deployment of the CloudFormation template to enable SSL.

Solution overview

We walk you through the following steps:

  1. Deploy and configure Amazon EMR with a CloudFormation template.
  2. Run AWS Glue crawlers to crawl and populate the Hive-compatible metastore.
  3. Test JDBC connectivity using Beeline.
  4. Visualize the data with QuickSight.

The CloudFormation template provided in the prerequisites, provides a configured Amazon EMR cluster for you to start querying your data with Spark. After deploying the CloudFormation stack, you can skip the first step and start running the AWS Glue crawlers.

Deploy and Configure Amazon EMR

Those looking to dive deep to understand what the CloudFormation template is deploying can use the following steps to manually deploy Amazon EMR running Spark and connect it to QuickSight:

  1. Create an EMR cluster with 5.30.0 or later release.
  2. Connect to the cluster using Session Manager.
  3. Install and configure OpenLDAP.
  4. Create a user in LDAP.
  5. Start the Thrift server.
  6. Configure SSL using a QuickSight supported authority.

Create an EMR cluster

For this post, we use an EMR cluster with 5.30.0 or later release.

  1. On the Amazon EMR console, choose Create cluster.
  2. For Cluster name, enter a name (for example, visualisedatablog).
  3. For Release, choose your release version.
  4. For Applications, select Spark.
  5. Select Use AWS Glue Data Catalog for table metadata.
  6. For Instance type¸ choose your instance.
  7. For EC2 key pair, choose Proceed without an EC2 key pair.
  8. Choose Create cluster.

Make sure you enabled run as support for Session Manager.

Connect to the EMR cluster using Session Manager

Session Manager is a fully managed AWS Systems Manager capability that lets you manage your Amazon Elastic Compute Cloud (Amazon EC2) instances, on-premises instances, and virtual machines through an interactive, one-click, browser-based shell or through the AWS Command Line Interface (AWS CLI). Session Manager provides secure and auditable instance management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys.

By default, sessions are launched using the credentials of a system-generated ssm-user account that is created on a managed instance. For Amazon EMR, you can instead launch sessions using the Hadoop user account. Session Manager provides two methods for specifying the Hadoop user operating system account to use. For more information, see Enable run as support for Linux and macOS instances. For those configuring Systems Manager for the first time, review Why is my EC2 instance not appearing under Managed Instances in the Systems Manager console? for helpful tips on getting started with adding managed instances.

After you log in to the primary node of your cluster, run the following commands to install and configure OpenLDAP.

Install and configure OpenLDAP

To install and configure OpenLDAP, complete the following steps (alternatively, you can download the script used in the CloudFormation script and run it):

  1. Run the following commands:
# Install LDAP Server
sudo yum -y install openldap compat-openldap openldap-clients openldap-servers openldap-servers-sql openldap-devel
# Restart LDAP 
sudo service slapd restart

For more about configuring OpenLDAP, see the OpenLDAP documentation.

  1. Run the following command to set a new password for the root account and store the resulting hash:
slappasswd

This command outputs a hash that looks like the following sample:

{SSHA}DmD616c3yZyKndsccebZK/vmWiaQde83
  1. Copy the hash output to a text editor to use in subsequent steps.

Next, we prepare the commands to set the password for the LDAP root.

  1. Run the following code (replace the hash with the one you generated in the previous step, and make sure carriage returns are preserved):
cat > /tmp/config.ldif <<EOF
dn: olcDatabase={2}hdb,cn=config
changetype: modify
replace: olcSuffix
olcSuffix: dc=example,dc=com

dn: olcDatabase={2}hdb,cn=config
changetype: modify
replace: olcRootDN
olcRootDN: cn=dev,dc=example,dc=com

dn: olcDatabase={2}hdb,cn=config
changetype: modify
replace: olcRootPW
olcRootPW: <<REPLACE_WITH_PASSWORD_HASH>>
EOF
  1. Run the following command to run the preceding commands against LDAP:
sudo ldapmodify -Y EXTERNAL -H ldapi:/// -f /tmp/config.ldif
  1. Copy the sample database configuration file to /var/lib/ldap and add relevant schemas:
sudo cp /usr/share/openldap-servers/DB_CONFIG.example /var/lib/ldap/DB_CONFIG

sudo ldapadd -Y EXTERNAL -H ldapi:/// -f /etc/openldap/schema/cosine.ldif
sudo ldapadd -Y EXTERNAL -H ldapi:/// -f /etc/openldap/schema/nis.ldif 
sudo ldapadd -Y EXTERNAL -H ldapi:/// -f /etc/openldap/schema/inetorgperson.ldif

Create a user in LDAP

Next, create a user account with a password in the LDAP directory with the following commands. When prompted for a password, use the LDAP root password that you created in the previous step (for this post, we use sparky as the username). Make sure carriage returns are preserved when copying and entering the code.

cat > /tmp/accounts.ldif <<EOF 
dn: dc=example,dc=com
objectclass: domain
objectclass: top
dc: example

dn: ou=dev,dc=example,dc=com
objectclass: organizationalUnit
ou: dev
description: Container for developer entries

dn: uid=$username,ou=dev,dc=example,dc=com
uid: $username
objectClass: inetOrgPerson
userPassword: <<REPLACE_WITH_STRONG_PASSWORD>>
sn: sparky
cn: dev
EOF

Run the following command to run the preceding commands against LDAP (you must enter the root LDAP password specified in the previous section):

sudo ldapadd -x -w <<LDAP_ROOT_PASSWORD>> -D "cn=dev,dc=example,dc=com" -f /tmp/accounts.ldif

We have now configured OpenLDAP on the EMR cluster running Spark and created the user sparky that we use to connect to QuickSight.

Start the Thrift server

Start the Thrift server by running the following command. By default, the Thrift server runs on port 10001. Amazon EMR by default places limits on executor sizes to avoid having the executor consume too much memory and interfere with the operating system and other processes running on the instance. To optimize the use of R family instances with the flexibility of also using the smallest supported instance types, we use –executor-memory=18GB –executor-cores=4 for our Thrift server configuration. See the following code:

sudo /usr/lib/spark/sbin/start-thriftserver.sh --master yarn –executor-memory=18GB –executor-cores=4

Now that we have configured the EMR cluster to accept connections, let’s get our public dataset ready.

Configure SSL using a QuickSight supported authority

If deploying into production, we recommend using a secure communication between QuickSight and Spark. QuickSight doesn’t accept certificates that are self-signed or issued from a non-public CA. For more information, see Amazon QuickSight SSL and CA Certificates. To secure the Thrift connection, you can enable the SSL encryption and restart the hive-server2 and Thrift service on the primary EMR instance.

After you have your certificate, you can enable SSL.

In your preferred editor, open and edit /etc/hive/conf/hive-site.xml:

    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>HOSTNAME</value>
    </property>
    <property>
        <name>hive.server2.use.SSL</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.server2.keystore.path</name>
        <value>PATH_TO_KEYSTORE/KEYSTORE/KEYSTORE.jks</value>
    </property>
    <property>
        <name>hive.server2.keystore.password</name>
        <value>KEYSTORE_PASSWORD</value>
    </property>

Restart the Thrift server by running the following command:

sudo /usr/lib/spark/sbin/stop-thriftserver.sh --master yarn && sudo /usr/lib/spark/sbin/start-thriftserver.sh --master yarn

Run AWS Glue crawlers

Now let’s use AWS Glue crawlers to detect the schema. If you used the CloudFormation template, you already have a crawler ready to start via the AWS Glue console. When the crawler is complete, you should have a table listed in the database.

If you’re configuring the crawler manually on the AWS Glue console, the following screenshot summarizes the crawler configuration.

After the crawler has run, you can go to the Tables page to view the taxi_ny_pub table with the table properties and schema. The following screenshot shows the table details page; here you can find the partitions and various versions of the schema.

The Data Catalog is shared between Amazon EMR, Amazon Athena, and Amazon Redshift Spectrum. You can use Athena to preview the data that is stored in this table.

Test JDBC connectivity using Beeline

Now that the EMR cluster is deployed and the data is copied, we can quickly test the JDBC connectivity on the EMR cluster using Beeline. Beeline is an open-source JDBC client, based on the SQLLine CLI, used to connect to your cluster via the command line.

Log in to your EMR cluster using Session Manager. You can use Beeline to connect to the Thrift server and test the connection:

/usr/lib/spark/bin/beeline -u 'jdbc:hive2://<REPLACE_MASTER_PUBLIC_DNS>:10001/default' -n <<USERNAME>> -p <<PASSWORD>> -e "show databases;" 

The preceding command connects to the Spark cluster and shows you the list of databases, as in the following example code:

Connected to: Spark SQL (version 2.3.0) 
Driver: Hive JDBC (version 1.2.1-spark2-amzn-0) 
Transaction isolation: TRANSACTION_REPEATABLE_READ 
+---------------+--+ 
| databaseName | 
+---------------+--+ 
| default | 
| nyc_taxi |
| sampledb | 
+---------------+--+ 
3 rows selected (0.171 seconds) 
Beeline version 1.2.1-spark2-amzn-0 by Apache Hive 
Closing: 0: 
jdbc:hive2://<REPLACE_MASTER_PUBLIC_DNS>:10001/default

Visualize data with QuickSight

Now let’s connect Amazon EMR to QuickSight and do a quick visualization of this data.

  1. On the QuickSight console, on the Datasets page, choose New dataset.
  2. Choose Spark as your connector.

  1. For Data source name, enter a name (for example, SPARKY).
  2. For Database server, enter your public primary DNS.

To allow QuickSight to connect to your EMR cluster, you must create a security group containing an inbound rule authorizing access from the appropriate IP address range for the QuickSight servers in that Region. For further details on how to create appropriate security group rules, see Authorizing Connections from Amazon QuickSight to Amazon EC2 Instances.

For this post, we use security groups to control network connectivity.

  1. For Port, add TCP port 10001 as an inbound rule to allow for inbound connectivity from QuickSight to Amazon EMR.

If deploying into production, we recommend using a secure communication between QuickSight and Spark, which we covered in a previous step.

QuickSight Enterprise edition provides full integration with Amazon Virtual Private Cloud (Amazon VPC), which enables you to secure and isolate traffic between resources. For more information, see Connecting to a VPC with Amazon QuickSight. This allows you to deploy your EMR cluster in a private VPC Subnet.

  1. Enter a username and password.
  2. If you configured SSL, select Enable SSL.
  3. Choose Create data source.

The Spark cluster reads the Data Catalog and provides information about the schema and the tables in the schema. You can also choose the table created by the AWS Glue crawler and load the data into SPICE for faster analytics. SPICE is the in-memory calculation engine in QuickSight that provides blazing fast performance at scale. SPICE automatically replicates data for high availability, allowing thousands of users to simultaneously perform fast, interactive analysis, while shielding your underlying data infrastructure, which saves you time and resources. QuickSight supports uploading 250 million rows (and 500 GB) per SPICE dataset. If you have larger datasets than this, you can use the direct query option. In this post, we use SPICE.

Also make sure that you defined the correct permissions to access the S3 bucket for the EMR cluster. For instructions, see Reading and Writing Data to Amazon S3 Using EMRFS.

Let’s create a custom SQL query to perform some aggregations prior to loading into SPICE (see the following screenshot).

  1. Enter the following code:
SELECT 
SUM (cast (fare_amount as double)) as TotalFare 
,AVG(cast (fare_amount as double)) as AvgFare 
,AVG (cast (trip_distance as double)) as AvgTripDistance 
,AVG(passenger_count) as AvgPassengerCount 
,year 
,month
FROM nyc_taxi.taxi_ny_pub
WHERE year BETWEEN 2011 AND 2016
GROUP BY year, month;

The database and table names may vary in your deployment.

  1. For this post, select Import to SPICE for quicker analytics.

Alternatively, because the NYC taxi dataset is larger than 250 million rows, you can choose to directly query your data.

  1. To create a visualization, select the fields in the left panel.

For this post, we review the Average Fare Amount and Passenger Count between 2013–2019, using ML Insights to automatically generate natural language narratives when analyzing the 229.12 GB dataset.

Summary

In less than an hour, we created an EMR cluster, enabled OpenLDAP, and started the Thrift server. We also used AWS Glue to crawl a public dataset and visualize the data. Now you have what you need to get started creating powerful dashboards and reports using QuickSight on your Amazon S3 data using Apache Spark. Feel free to reach out if you have any questions or suggestions.

To learn more about these capabilities and start using them in your dashboards, check out the QuickSight User Guide.

If you have questions and suggestions, you can post them on the QuickSight forum.

Go to the QuickSight website to get started now for free.


About the Author

Tom McMeekin is an Enterprise Solutions Architect with a career in technology spanning over 20 years. Tom has worked across a number of industry verticals including Telecommunications, Manufacturing, Infrastructure and Development, Utilities, Energy, and Retail. Throughout his career, he has focused on solving complex business problems through innovative technologies that deliver the right business outcomes for his customers.

 

ERGO Breaks New Frontiers for Insurance with AI Factory on AWS

Post Syndicated from Piotr Klesta original https://aws.amazon.com/blogs/architecture/ergo-breaks-new-frontiers-for-insurance-with-ai-factory-on-aws/

This post is co-authored with Piotr Klesta, Robert Meisner and Lukasz Luszczynski of ERGO

Artificial intelligence (AI) and related technologies are already finding applications in our homes, cars, industries, and offices. The insurance business is no exception to this. When AI is implemented correctly, it adds a major competitive advantage. It enhances the decision-making process, improves efficiency in operations, and provides hassle-free customer assistance.

At ERGO Group, we realized early on that innovation using AI required more flexibility in data integration than most of our legacy data architectures allowed. Our internal governance, data privacy processes, and IT security requirements posed additional challenges towards integration. We had to resolve these issues in order to use AI at the enterprise level, and allow for sensitive data to be used in a cloud environment.

We aimed for a central system that introduces ‘intelligence’ into other core application systems, and thus into ERGO’s business processes. This platform would support the process of development, training, and testing of complex AI models, in addition to creating more operational efficiency. The goal of the platform is to take the undifferentiated heavy lifting away from our data teams so that they focus on what they do best – harness data insights.

Building ERGO AI Factory to power AI use cases

Our quest for this central system led to the creation of AI Factory built on AWS Cloud. ERGO AI Factory is a compliant platform for running production-ready AI use cases. It also provides a flexible model development and testing environment. Let’s look at some of the capabilities and services we offer to our advanced analytics teams.

Figure 1: AI Factory imperatives

Figure 1: AI Factory imperatives

  • Compliance: Enforcing security measures (for example, authentication, encryption, and least privilege) was one of our top priorities for the platform. We worked closely with the security teams to meet strict domain and geo-specific compliance requirements.
  • Data governance: Data lineage and deep metadata extraction are important because they support proper data governance and auditability. They also allow our users to navigate a complex data landscape. Our data ingestion frameworks include a mixture of third party and AWS services to capture and catalog both technical and business metadata.
  • Data storage and access: AI Factory stores data in Amazon Simple Storage Service (S3) in a secure and compliant manner. Access rights are only granted to individuals working on the corresponding projects. Roles are defined in Active Directory.
  • Automated data pipelines: We sought to provide a flexible and robust data integration solution. An ETL pipeline using Apache Spark, Apache Airflow, and Kubernetes pods is central to our data ingestion. We use this for AI model development and subsequent data preparation for operationalization and model integration.
  • Monitoring and security: AI Factory relies on open-source cloud monitoring solutions like Grafana to detect security threats and anomalies. It does this by collecting service and application logs, tracking metrics, and generating alarms.
  • Feedback loop: We store model inputs/outputs and use BI tools, such as Amazon QuickSight, to track the behavior and performance of productive AI models. It’s important to share such information with our business partners so we can build their trust and confidence with AI.
  • Developer-friendly environment: Creating AI models is possible in a notebook-style or integrated development environment. Because our data teams use a variety of machine learning (ML) frameworks and libraries, we keep our platform extensible and our framework agnostic. We support Python/R, Apache Spark, PyTorch and TensorFlow, and more. All this is bolstered by CI/CD processes that accelerate delivery and reduce errors.
  • Business process integration: AI Factory offers services to integrate ML models into existing business processes. We focus on standardizing processes and close collaboration with business and technical stakeholders. Our overarching goal is to operationalize the AI model in the shortest possible timeframe, while preserving high quality and security standards.

AI Factory architecture

So far, we have looked at the functional building blocks of the AI Factory. Let’s take an architectural view of the platform using a five-step workflow:

Figure 2: AI Factory high-level architecture

Figure 2: AI Factory high-level architecture

  1. Data ingestion environment: We use this environment to ingest data from the prominent on-premises ERGO data sources. We can schedule the batch or Delta transfer data to various cloud destinations using multiple Kubernetes-hosted microservices. Once ingested, data is persisted and cataloged as ERGO’s data lake on Amazon S3. It is prepared for processing by the upstream environments.
  2. Model development environment: This environment is used primarily by data scientists and data engineers. We use Amazon EMR and Amazon SageMaker extensively for data preparation, data wrangling, experimentation with predictive models, and development through rapid iterations.
  3. Model operationalization environment: Trained models with satisfactory KPIs are promoted from the model development to the operationalization environment. This is where we integrate AI models in business processes. The team focuses on launching and optimizing the operation of services and algorithms.
    • Integration with ERGO business processes is achieved using Kubernetes-hosted ‘Model Service.’ This allows us to infuse AI models provided by data scientists in existing business processes.
    • An essential part of model operationalization is to continuously monitor the quality of the deployed ML models using the ‘feedback loop service.’
  4. Model insights environment: This environment is used for displaying information about platform performance, processes, and analytical data. Data scientists use its services to check for unexpected bias or performance drifts that the model could exhibit. Feedback coming from the business through the “feedback loop service’ allows them to identify problems fast and retrain the model.
  5. Shared services: Though shown as the fifth step of the workflow, the shared services environment supports almost every step in the process. It provides common, shared components between different parts of the platform managing CI/CD and orchestration processes within the AI factory. Additional services like platform logging and monitoring, authentication, and metadata management are also delivered from the shared services environment.

A binding theme across the various subplatforms is that all provisioning and deployment activities are automated using Infrastructure as Code (IaC) practices. This reduces the potential for human error, provides architectural flexibility, and greatly speeds up software development and our infrastructure-related operations.

All components of the AI factory are run in the AWS Cloud and can be scaled and adapted as needed. The connection between model development and operationalization happens at well-defined interfaces to prevent unnecessary coupling of components.

Lessons learned

Security first

  • Align with security early and often
  • Understand all the regulatory obligations and document them as critical, non-functional requirements

Modular approach

  • Combine modern data science technology and professional IT with a cross-functional, agile way of working
  • Apply loosely coupled services with an API-first approach

Data governance

  • Tracking technical metadata is important but not sufficient, you need business attributes too
  • Determine data ownership in operational systems to map upstream data governance workflows
  • Establish solutions to data masking as the data moves across sub-platforms
  • Define access rights and permissions boundaries among various personas

FinOps strategy

  • Carefully track platform cost
  • Assign owners responsible for monitoring and cost improvements
  • Provide regular feedback to platform stakeholders on usage patterns and associated expenses

Working with our AWS team

  • Establish cadence for architecture review and new feature updates
  • Plan cloud training and enablement

The future for the AI factory

The creation of the AI Factory was an essential building block of ERGO’s strategy. Now we are ready to embrace the next chapter in our advanced analytics journey.

We plan to focus on important use cases that will deliver the highest business value. We want to make the AI Factory available to ERGO’s international subsidiaries. We are also enhancing and scaling its capabilities. We are creating an ‘analytical content hub’ based on automated text extraction, improving speech to text, and developing translation processes for all unstructured and semistructured data using AWS AI services.

Customize and Package Dependencies With Your Apache Spark Applications on Amazon EMR on Amazon EKS

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/customize-and-package-dependencies-with-your-apache-spark-applications-on-amazon-emr-on-amazon-eks/

Last AWS re:Invent, we announced the general availability of Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS), a new deployment option for Amazon EMR that allows customers to automate the provisioning and management of Apache Spark on Amazon EKS.

With Amazon EMR on EKS, customers can deploy EMR applications on the same Amazon EKS cluster as other types of applications, which allows them to share resources and standardize on a single solution for operating and managing all their applications. Customers running Apache Spark on Kubernetes can migrate to EMR on EKS and take advantage of the performance-optimized runtime, integration with Amazon EMR Studio for interactive jobs, integration with Apache Airflow and AWS Step Functions for running pipelines, and Spark UI for debugging.

When customers submit jobs, EMR automatically packages the application into a container with the big data framework and provides prebuilt connectors for integrating with other AWS services. EMR then deploys the application on the EKS cluster and manages running the jobs, logging, and monitoring. If you currently run Apache Spark workloads and use Amazon EKS for other Kubernetes-based applications, you can use EMR on EKS to consolidate these on the same Amazon EKS cluster to improve resource utilization and simplify infrastructure management.

Developers who run containerized, big data analytical workloads told us they just want to point to an image and run it. Currently, EMR on EKS dynamically adds externally stored application dependencies during job submission.

Today, I am happy to announce customizable image support for Amazon EMR on EKS that allows customers to modify the Docker runtime image that runs their analytics application using Apache Spark on your EKS cluster.

With customizable images, you can create a container that contains both your application and its dependencies, based on the performance-optimized EMR Spark runtime, using your own continuous integration (CI) pipeline. This reduces the time to build the image and helps predicting container launches for a local development or test.

Now, data engineers and platform teams can create a base image, add their corporate standard libraries, and then store it in Amazon Elastic Container Registry (Amazon ECR). Data scientists can customize the image to include their application specific dependencies. The resulting immutable image can be vulnerability scanned, deployed to test and production environments. Developers can now simply point to the customized image and run it on EMR on EKS.

Customizable Runtime Images – Getting Started
To get started with customizable images, use the AWS Command Line Interface (AWS CLI) to perform these steps:

  1. Register your EKS cluster with Amazon EMR.
  2. Download the EMR-provided base images from Amazon ECR and modify the image with your application and libraries.
  3. Publish your customized image to a Docker registry such as Amazon ECR and then submit your job while referencing your image.

You can download one of the following base images. These images contain the Spark runtime that can be used to run batch workloads using the EMR Jobs API. Here is the latest full image list available.

Release Label Spark Hadoop Versions Base Image Tag
emr-5.32.0-latest Spark 2.4.7 + Hadoop 2.10.1 emr-5.32.0-20210129
emr-5.33-latest Spark 2.4.7-amzn-1 + Hadoop 2.10.1-amzn-1 emr-5.33.0-20210323
emr-6.2.0-latest Spark 3.0.1 + Hadoop 3.2.1 emr-6.2.0-20210129
emr-6.3-latest Spark 3.1.1-amzn-0 + Hadoop 3.2.1-amzn-3 emr-6.3.0:latest

These base images are located in an Amazon ECR repository in each AWS Region with an image URI that combines the ECR registry account, AWS Region code, and base image tag in the case of US East (N. Virginia) Region.

755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-5.32.0-20210129

Now, sign in to the Amazon ECR repository and pull the image into your local workspace. If you want to pull an image from a different AWS Region to reduce network latency, choose the different ECR repository that corresponds most closely to where you are pulling the image from US West (Oregon) Region.

$ aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin 895885662937.dkr.ecr.us-west-2.amazonaws.com
$ docker pull 895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-5.32.0-20210129

Create a Dockerfile on your local workspace with the EMR-provided base image and add commands to customize the image. If the application requires custom Java SDK, Python, or R libraries, you can add them to the image directly, just as with other containerized applications.

The following example Docker commands are for a use case in which you want to install useful Python libraries such as Natural Language Processing (NLP) using Spark and Pandas.

FROM 895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-5.32.0-20210129
USER root
### Add customizations here ####
RUN pip3 install pyspark pandas spark-nlp // Install Python NLP Libraries
USER hadoop:hadoop

In another use case, as I mentioned, you can install a different version of Java (for example, Java 11):

FROM 895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-5.32.0-20210129
USER root
### Add customizations here ####
RUN yum install -y java-11-amazon-corretto // Install Java 11 and set home
ENV JAVA_HOME /usr/lib/jvm/java-11-amazon-corretto.x86_64
USER hadoop:hadoop

If you’re changing Java version to 11, then you also need to change Java Virtual Machine (JVM) options for Spark. Provide the following options in applicationConfiguration when you submit jobs. You need these options because Java 11 does not support some Java 8 JVM parameters.

"applicationConfiguration": [ 
  {
    "classification": "spark-defaults",
    "properties": {
        "spark.driver.defaultJavaOptions" : "
		    -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70",
        "spark.executor.defaultJavaOptions" : "
		    -verbose:gc -Xlog:gc*::time -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
			-XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 
			-XX:+IgnoreUnrecognizedVMOptions"
    }
  }
]

To use custom images with EMR on EKS, publish your customized image and submit a Spark workload in Amazon EMR on EKS using the available Spark parameters.

You can submit batch workloads using your customized Spark image. To submit batch workloads using the StartJobRun API or CLI, use the spark.kubernetes.container.image parameter.

$ aws emr-containers start-job-run \
    --virtual-cluster-id <enter-virtual-cluster-id> \
    --name sample-job-name \
    --execution-role-arn <enter-execution-role-arn> \
    --release-label <base-release-label> \ # Base EMR Release Label for the custom image
    --job-driver '{
        "sparkSubmitJobDriver": {
        "entryPoint": "local:///usr/lib/spark/examples/jars/spark-examples.jar",
        "entryPointArguments": ["1000"],
        "sparkSubmitParameters": [ "--class org.apache.spark.examples.SparkPi --conf spark.kubernetes.container.image=123456789012.dkr.ecr.us-west-2.amazonaws.com/emr5.32_custom"
		  ]
      }
  }'

Use the kubectl command to confirm the job is running your custom image.

$ kubectl get pod -n <namespace> | grep "driver" | awk '{print $1}'
Example output: k8dfb78cb-a2cc-4101-8837-f28befbadc92-1618856977200-driver

Get the image for the main container in the Driver pod (Uses jq).

$ kubectl get pod/<driver-pod-name> -n <namespace> -o json | jq '.spec.containers
| .[] | select(.name=="spark-kubernetes-driver") | .image '
Example output: 123456789012.dkr.ecr.us-west-2.amazonaws.com/emr5.32_custom

To view jobs in the Amazon EMR console, under EMR on EKS, choose Virtual clusters. From the list of virtual clusters, select the virtual cluster for which you want to view logs. On the Job runs table, select View logs to view the details of a job run.

Automating Your CI Process and Workflows
You can now customize an EMR-provided base image to include an application to simplify application development and management. With custom images, you can add the dependencies using your existing CI process, which allows you to create a single immutable image that contains the Spark application and all of its dependencies.

You can apply your existing development processes, such as vulnerability scans against your Amazon EMR image. You can also validate for correct file structure and runtime versions using the EMR validation tool, which can be run locally or integrated into your CI workflow.

The APIs for Amazon EMR on EKS are integrated with orchestration services like AWS Step Functions and AWS Managed Workflows for Apache Airflow (MWAA), allowing you to include EMR custom images in your automated workflows.

Now Available
You can now set up customizable images in all AWS Regions where Amazon EMR on EKS is available. There is no additional charge for custom images. To learn more, see the Amazon EMR on EKS Development Guide and a demo video how to build your own images for running Spark jobs on Amazon EMR on EKS.

You can send feedback to the AWS forum for Amazon EMR or through your usual AWS support contacts.

Channy

Architecting Persona-centric Data Platform with On-premises Data Sources

Post Syndicated from Raghavarao Sodabathina original https://aws.amazon.com/blogs/architecture/architecting-persona-centric-data-platform-with-on-premises-data-sources/

Many organizations are moving their data from silos and aggregating it in one location. Collecting this data in a data lake enables you to perform analytics and machine learning on that data. You can store your data in purpose-built data stores, like a data warehouse, to get quick results for complex queries on structured data.

In this post, we show how to architect a persona-centric data platform with on-premises data sources by using AWS purpose-built analytics services and Apache NiFi. We will also discuss Lake House architecture on AWS, which is the next evolution from data warehouse and data lake-based solutions.

Data movement services

AWS provides a wide variety of services to bring data into a data lake:

You may want to bring on-premises data into the AWS Cloud to take advantage of AWS purpose-built analytics services, derive insights, and make timely business decisions. Apache NiFi is an open source tool that enables you to move and process data using a graphical user interface.

For this use case and solution architecture, we use Apache NiFi to ingest data into Amazon S3 and AWS purpose-built analytics services, based on user personas.

Building persona-centric data platform on AWS

When you are building a persona-centric data platform for analytics and machine learning, you must first identify your user personas. Who will be using your platform? Then choose the appropriate purpose-built analytics services. Envision a data platform analytics architecture as a stack of seven layers:

  1. User personas: Identify your user personas for data engineering, analytics, and machine learning
  2. Data ingestion layer: Bring the data into your data platform and data lineage lifecycle view, while ingesting data into your storage layer
  3. Storage layer: Store your structured and unstructured data
  4. Cataloging layer: Store your business and technical metadata about datasets from the storage layer
  5. Processing layer: Create data processing pipelines
  6. Consumption layer: Enable your user personas for purpose-built analytics
  7. Security and Governance: Protect your data across the layers

Reference architecture

The following diagram illustrates how to architect a persona-centric data platform with on-premises data sources by using AWS purpose-built analytics services and Apache NiFi.

Figure 1. Example architecture for persona-centric data platform with on-premises data sources

Figure 1. Example architecture for persona-centric data platform with on-premises data sources

Architecture flow:

    1. Identify user personas: You must first identify user personas to derive insights from your data platform. Let’s start with identifying your users:
      • Enterprise data service users who would like to consume data from your data lake into their respective applications.
      • Business users who would like to like create business intelligence dashboards by using your data lake datasets.
      • IT users who would like to query data from your data lake by using traditional SQL queries.
      • Data scientists who would like to run machine learning algorithms to derive recommendations.
      • Enterprise data warehouse users who would like to run complex SQL queries on your data warehouse datasets.
    2. Data ingestion layer: Apache NiFi scans the on-premises data stores and ingest the data into your data lake (Amazon S3). Apache NiFi can also transform the data in transit. It supports both Extract, Transform, Load (ETL) and Extract, Load, Transform (ELT) data transformations. Apache NiFi also supports data lineage lifecycle while ingesting data into Amazon S3.
    3. Storage layer: For your data lake storage, we recommend using Amazon S3 to build a data lake. It has unmatched 11 nines of durability and 99.99% availability. You can also create raw, transformed, and enriched storage layers depending upon your use case.
    4. Cataloging layer: AWS Lake Formation provides the central catalog to store and manage metadata for all datasets hosted in the data lake by AWS Glue Data Catalog. AWS services such as AWS Glue, Amazon EMR, and Amazon Athena natively integrate with Lake Formation. They automate discovering and registering dataset metadata into the Lake Formation catalog.
    5. Processing layer: Amazon EMR processes your raw data and places them into a new S3 bucket. Use AWS Glue DataBrew and AWS Glue to process the data as needed.
    6. Consumption layer or persona-centric analytics: Once data is transformed:
      • AWS Lambda and Amazon API Gateway will allow you to develop data services for enterprise data service users
      • You can develop user-friendly dashboards for your business users using Amazon QuickSight
      • Use Amazon Athena to query transformed data for your IT users
      • Your data scientists can utilize AWS Glue DataBrew to clean and normalize the data and Amazon SageMaker for machine learning models
      • Your enterprise data warehouse users can use Amazon Redshift to derive business intelligence
    7. Security and governance layer: AWS IAM provides users, groups, and role-level identity, in addition to the ability to configure coarse-grained access control for resources managed by AWS services in all layers. AWS Lake Formation provides fine-grained access controls and you can grant/revoke permissions at the database- or table- or column-level access.

Lake House architecture on AWS

The vast majority of data lakes are built on Amazon S3. At the same time, customers are leveraging purpose-built analytics stores that are optimized for specific use cases. Customers want the freedom to move data between their centralized data lakes and the surrounding purpose-built analytics stores. And they want to get insights with speed and agility in a seamless, secure, and compliant manner. We call this modern approach to analytics the Lake House architecture.

Figure 2. Lake House architecture on AWS

Figure 2. Lake House architecture on AWS

Refer to the whitepaper Derive Insights from AWS Lake house for various design patterns to derive persona-centric analytics by using the AWS Lake House approach. Check out the blog post Build a Lake House Architecture on AWS  for a Lake House reference architecture on AWS.

Conclusion

In this post, we show you how to build a persona-centric data platform on AWS with a seven-layered approach. This uses Apache NiFi as a data ingestion tool and AWS purpose-built analytics services for persona-centric analytics and machine learning. We have also shown how to build persona-centric analytics by using the AWS Lake House approach.

With the information in this post, you can now build your own data platform on AWS to gain faster and deeper insights from your data. AWS provides you the broadest and deepest portfolio of purpose-built analytics and machine learning services to support your business needs.

Read more and get started on building a data platform on AWS:

Improve query performance using AWS Glue partition indexes

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/improve-query-performance-using-aws-glue-partition-indexes/

While creating data lakes on the cloud, the data catalog is crucial to centralize metadata and make the data visible, searchable, and queryable for users. With the recent exponential growth of data volume, it becomes much more important to optimize data layout and maintain the metadata on cloud storage to keep the value of data lakes.

Partitioning has emerged as an important technique for optimizing data layout so that the data can be queried efficiently by a variety of analytic engines. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. Over time, hundreds of thousands of partitions get added to a table, resulting in slow queries. To speed up query processing of highly partitioned tables cataloged in AWS Glue Data Catalog, you can take advantage of AWS Glue partition indexes.

Partition indexes are available for queries in Amazon EMRAmazon Redshift Spectrum, and AWS Glue extract, transform, and load (ETL) jobs (Spark DataFrame). When partition indexes are enabled on the heavily partitioned AWS Glue Data Catalog tables, all these query engines are accelerated. You can add partition indexes to both new tables and existing tables. This post demonstrates how to utilize partition indexes, and discusses the benefit you can get with partition indexes when working with highly partitioned data.

Partition indexes

AWS Glue partition indexes are an important configuration to reduce overall data transfers and processing, and reduce query processing time. In the AWS Glue Data Catalog, the GetPartitions API is used to fetch the partitions in the table. The API returns partitions that match the expression provided in the request. If no partition indexes are present on the table, all the partitions of the table are loaded, and then filtered using the query expression provided by the user in the GetPartitions request. The query takes more time to run as the number of partitions increase on a table with no indexes. With an index, the GetPartitions request tries to fetch a subset of the partitions instead of loading all the partitions in the table.

The following are key benefits of partition indexes:

  • Increased query performance
  • Increased concurrency as a result of fewer GetPartitions API calls
  • Cost savings:
    • Analytic engine cost (query performance is related to the charges in Amazon EMR and AWS Glue ETL)
    • AWS Glue Data Catalog API request cost

Setting up resources with AWS CloudFormation

This post provides an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. Some of the resources that this stack deploys incur costs when in use.

The CloudFormation template generates the following resources:

If you’re using AWS Lake Formation permissions, you need to ensure that the IAM user or role running AWS CloudFormation has the required permissions (to create a database on the Data Catalog).

The tables use sample data located in an Amazon Simple Storage Service (Amazon S3) public bucket. Initially, no partition indexes are configured in these AWS Glue Data Catalog tables.

To create your resources, complete the following steps:

  1. Sign in to the CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For DatabaseName, leave as the default.
  5. Choose Next.
  6. On the next page, choose Next.
  7. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create.

Stack creation can take up to 5 minutes. When the stack is completed, you have two Data Catalog tables: table_with_index and table_without_index. Both tables point to the same S3 bucket, and the data is highly partitioned based on yearmonthday, and hour columns for more than 42 years (1980-2021). In total, there are 367,920 partitions, and each partition has one JSON file, data.json. In the following sections, you see how the partition indexes work with these sample tables.

Setting up a partition index on the AWS Glue console

You can create partition indexes at any time. If you want to create a new table with partition indexes, you can make the CreateTable API call with a list of PartitionIndex objects. If you want to add a partition index to an existing table, make the CreatePartitionIndex API call. You can also perform these actions on the AWS Glue console. You can create up to three partition indexes on a table.

Let’s configure a new partition index for the table table_with_index we created with the CloudFormation template.

  1. On the AWS Glue console, choose Tables.
  2. Choose the table table_with_index.
  3. Choose Partitions and indices.
  4. Choose Add new index.
  5. For Index name, enter year-month-day-hour.
  6. For Selected keys from schema, select year, month, day, and hour.
  7. Choose Add index.

The Status column of the newly created partition index shows the status as Creating. Wait for the partition index to be Active. The process takes about 1 hour because more number of partitions longer it takes for index creation and we have 367,920 partitions on this table.

Now the partition index is ready for the table table_with_index. You can use this index from various analytic engines when you query against the table. You see default behavior in the table table_without_index because no partition indexes are configured for this table.

You can follow (or skip) any of the following sections based on your interest.

Making a GetPartitions API call with an expression

Before we use the partition index from various query engines, let’s try making the GetPartitions API call using AWS Command Line Interface (AWS CLI) to see the difference. The AWS CLI get-partitions command makes multiple GetPartitions API calls if needed. In this section, we simply use the time command to compare the duration for each table, and use the debug logging to compare the number of API calls for each table.

  1. Run the get-partitions command against the table table_without_index with the expression year='2021' and month='04' and day='01':
    $ time aws glue get-partitions --database-name partition_index --table-name table_without_index --expression "year='2021' and month='04' and day='01'"
    ...
    real    3m57.438s
    user    0m2.872s
    sys    0m0.248s
    

The command took about 4 minutes. Note that you used only three partition columns out of four.

  1. Run the same command with debug logging to get the number of the GetPartitionsAPI calls:
    $ aws glue get-partitions --database-name partition_index --table-name table_without_index --expression "year='2021' and month='04' and day='01'" --debug 2>get-partitions-without-index.log
    $ cat get-partitions-without-index.log | grep x-amz-target:AWSGlue.GetPartitions | wc -l
         737

There were 737 GetPartitions API calls when the partition indexes aren’t used.

  1. Next, run the get-partitions command against table_with_index with the same expression:
    $ time aws glue get-partitions --database-name partition_index --table-name table_with_index --expression "year='2020' and month='07' and day='01' and hour='09'"
    ...
    real    0m2.697s
    user    0m0.442s
    sys    0m0.163s

The command took just 2.7 seconds. You can see how quickly the required partitions were returned.

  1. Run the same command with debug logging to get the number of the GetPartitionsAPI calls:
    $ aws glue get-partitions --database-name partition_index --table-name table_with_index --expression "year='2021' and month='04' and day='01'" --debug 2>get-partitions-with-index.log
    $ cat get-partitions-with-index.log | grep x-amz-target:AWSGlue.GetPartitions | wc -l
           4
    

There were only four GetPartitions API calls when the partition indexes are used.

Querying a table using Apache Spark on Amazon EMR

In this section, we explore querying a table using Apache Spark on Amazon EMR.

  1. Launch a new EMR cluster with Apache Spark.

For instructions, see Setting Up Amazon EMR. You need to specify the AWS Glue Data Catalog as the metastore. In this example, we use the default EMR cluster (release: emr-6.2.0, three m5.xlarge nodes).

  1. Connect to the EMR node using SSH.
  2. Run the spark-sql command on the EMR node to start an interactive shell for Spark SQL:
    $ spark-sql

  3. Run the following SQL against partition_index.table_without_index:
    spark-sql> SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01';
    24    13840.894731640636
    Time taken: 35.518 seconds, Fetched 1 row(s)

The query took 35 seconds. Even though you aggregated records only in the specific partition, the query took so long because there are many partitions and the GetPartitions API call takes time.

Now let’s run the same query against table_with_index to see how much benefit the partition index introduces.

  1. Run the following SQL against partition_index.table_with_index:
    spark-sql> SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01';
    24    13840.894731640636
    Time taken: 2.247 seconds, Fetched 1 row(s)

The query took just 2 seconds. The reason for the difference in query duration is because the number of GetPartitions calls is smaller because of the partition index.

The following chart shows the granular metrics for query planning time without and with the partition index. The query planning time with the index is far less than that without the index.

For more information about comparing metrics in Apache Spark, see Appendix 2 at the end of this post.

Querying a table using Redshift Spectrum

To query with Redshift Spectrum, complete the following steps:

  1. Launch a new Redshift cluster.

You need to configure an IAM role for the cluster to utilize Redshift Spectrum and the Amazon Redshift query editor. Choose dc2.large, 1 node in this example. You need to launch the cluster in the us-east-1 Region because you need to place your cluster in the same Region as the bucket location.

  1. Connect with the Redshift query editor. For instructions, see Querying a database using the query editor.
  2. Create an external schema for the partition_index database to use it in Redshift Spectrum: (replace <your IAM role ARN> with your IAM role ARN).
    create external schema spectrum from data catalog 
    database 'partition_index' 
    iam_role '<your IAM role ARN>'
    create external database if not exists;

  3. Run the following SQL against spectrum_schema.table_without_index:
    SELECT count(*), sum(value) FROM spectrum.table_without_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The query took more than 3 minutes.

  1. Run the following SQL against spectrum_schema.table_with_index:
    SELECT count(*), sum(value) FROM spectrum.table_with_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The query for the table using indexes took just 8 seconds, which is much faster than the table without indexes.

Querying a table using AWS Glue ETL

Let’s launch an AWS Glue development endpoint and an Amazon SageMaker notebook.

  1. Open the AWS Glue console, choose Dev endpoints.
  2. Choose Add endpoint.
  3. For Development endpoint name, enter partition-index.
  4. For IAM role, choose your IAM role.

For more information about roles, see Managing Access Permissions for AWS Glue Resources.

  1. For Worker type under Security configuration, script libraries, and job parameters (optional), choose 1X.
  2. For Number of workers, enter 4.
  3. For Dependent jar path, enter s3://crawler-public/json/serde/json-serde.jar.
  4. Select Use Glue data catalog as the Hive metastore under Catalog options (optional).
  5. Choose Next.
  6. For Networking, leave as is (by default, Skip networking configuration is selected), and choose Next.
  7. For Add an SSH public key (Optional), leave it blank, and choose Next.
  8. Choose Finish.
  9. Wait for the development endpoint partition-index to show as READY.

The endpoint may take up to 10 minutes to be ready.

  1. Select the development endpoint partition-index, and choose Create SageMaker notebook on the Actions
  2. For Notebook name, enter partition-index.
  3. Select Create an IAM role.
  4. For IAM role, enter partition-index.
  5. Choose Create notebook.
  6. Wait for the notebook aws-glue-partition-index to show the status as Ready.

The notebook may take up to 3 minutes to be ready.

  1. Select the notebook aws-glue-partition-index, and choose Open notebook.
  2. Choose Sparkmagic (PySpark)on the New
  3. Enter the following code snippet against table_without_index, and run the cell:
    %%time
    %%sql
    SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The query took 3 minutes.

  1. Enter the following code snippet against partition_index.table_with_index, and run the cell:
    %%time
    %%sql
    SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The cell took just 7 seconds. The query for the table using indexes is faster than the table without indexes.

Cleaning up

Now to the final step, cleaning up the resources:

  1. Delete the CloudFormation stack. 
  2. Delete the EMR cluster.
  3. Delete the Amazon Redshift cluster.
  4. Delete the AWS Glue development endpoint and SageMaker notebook.

Conclusion

In this post, we explained how to use partition indexes and how they accelerate queries in various query engines. If you have several millions of partitions, the performance benefit is significantly more. You can learn about partition indexes more deeply in Working with Partition Indexes.


Appendix 1: Setting up a partition index using AWS CLI

If you prefer using the AWS CLI, run the following create-partition-index command to set up a partition index:

$ aws glue create-partition-index --database-name partition_index --table-name table_with_index --partition-index Keys=year,month,day,hour,IndexName=year-month-day-hour

To get the status of the partition index, run the following get-partition-indexes command:

$ aws glue get-partition-indexes --database-name partition_index --table-name table_with_index
{
    "PartitionIndexDescriptorList": [
        {
            "IndexName": "year-month-day-hour",
            "Keys": [
                {
                    "Name": "year",
                    "Type": "string"
                },
                {
                    "Name": "month",
                    "Type": "string"
                },
                {
                    "Name": "day",
                    "Type": "string"
                },
                {
                    "Name": "hour",
                    "Type": "string"
                }
            ],
            "IndexStatus": "CREATING"
        }
    ]
}

Appendix 2: Comparing breakdown metrics in Apache Spark

If you’re interested in comparing the breakdown metrics for query planning time, you can register a SQL listener with the following Scala code snippet:

spark.listenerManager.register(new org.apache.spark.sql.util.QueryExecutionListener {
  override def onSuccess(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
    val metricMap = qe.tracker.phases.mapValues { ps => ps.endTimeMs - ps.startTimeMs }
    println(metricMap.toSeq)
  }
  override def onFailure(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {}
})

If you use spark-shell, you can register the listener as follows:

$ spark-shell
...
scala> spark.listenerManager.register(new org.apache.spark.sql.util.QueryExecutionListener {
     |   override def onSuccess(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
     |     val metricMap = qe.tracker.phases.mapValues { ps => ps.endTimeMs - ps.startTimeMs }
     |     println(metricMap.toSeq)
     |   }
     |   override def onFailure(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {}
     | })

Then run the same query without using the index to get the breakdown metrics:

scala> spark.sql("SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01'").show()
Vector((planning,208), (optimization,29002), (analysis,4))
+--------+------------------+
|count(1)|        sum(value)|
+--------+------------------+
|      24|13840.894731640632|
+--------+------------------+

In this example, we use the same setup for the EMR cluster (release: emr-6.2.0, three m5.xlarge nodes). The console has additional line:

Vector((planning,208), (optimization,29002), (analysis,4)) 

Apache Spark’s query planning mechanism has three phases: analysis, optimization, and physical planning (shown as just planning). This line means that the query planning took 4 milliseconds in analysis, 29,002 milliseconds in optimization, and 208 milliseconds in physical planning.

Let’s try running the same query using the index:

scala> spark.sql("SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01'").show()
Vector((planning,7), (optimization,608), (analysis,2))                          
+--------+------------------+
|count(1)|        sum(value)|
+--------+------------------+
|      24|13840.894731640634|
+--------+------------------+

The query planning took 2 milliseconds in analysis, 608 milliseconds in optimization, and 7 milliseconds in physical planning.


About the Authors

Noritaka Sekiyama is a Senior Big Data Architect at AWS Glue and AWS Lake Formation. He is passionate about big data technology and open source software, and enjoys building and experimenting in the analytics area.

 

 

 

Sachet Saurabh is a Senior Software Development Engineer at AWS Glue and AWS Lake Formation. He is passionate about building fault tolerant and reliable distributed systems at scale.

 

 

 

Vikas Malik is a Software Development Manager at AWS Glue. He enjoys building solutions that solve business problems at scale. In his free time, he likes playing and gardening with his kids and exploring local areas with family.

 

 

 

 

Amazon EMR 2020 year in review

Post Syndicated from Abhishek Sinha original https://aws.amazon.com/blogs/big-data/amazon-emr-2020-year-in-review/

Tens of thousands of customers use Amazon EMR to run big data analytics applications on Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto at scale. Amazon EMR automates the provisioning and scaling of these frameworks, and delivers high performance at low cost with optimized runtimes and support for a wide range of Amazon Elastic Compute Cloud (Amazon EC2) instance types and Amazon Elastic Kubernetes Service (Amazon EKS) clusters. Amazon EMR makes it easy for data engineers and data scientists to develop, visualize, and debug data science applications with Amazon EMR Studio (preview) and Amazon EMR Notebooks.

You can hear customers describe how they use Amazon EMR in the following 2020 AWS re:Invent sessions:

You can also find more information in the following posts:

Throughout 2020, we worked to deliver better Amazon EMR performance at a lower price, and to make Amazon EMR easier to manage and use for big data analytics within your Lake House Architecture. This post summarizes the key improvements during the year and provides links to additional information.

Differentiated engine performance

Amazon EMR simplifies building and operating big data environments and applications. You can launch an EMR cluster in minutes. You don’t need to worry about infrastructure provisioning, cluster setup, configuration, or tuning. Amazon EMR takes care of these tasks, allowing you to focus your teams on developing differentiated big data applications. In addition to eliminating the need for you to build and manage your own infrastructure to run big data applications, Amazon EMR gives you better performance than simply using open-source distributions, and provides 100% API compatibility. This means you can run your workloads faster without changing any code.

Amazon EMR runtime for Apache Spark is a performance-optimized runtime environment for Spark that is active by default. We first introduced the EMR runtime for Apache Spark in Amazon EMR release 5.28.0 in November 2019, and used queries based on the TPC-DS benchmark to measure the performance improvement over open-source Spark 2.4. Those results showed considerable improvement: the geometric mean in query execution time was 2.4 times faster and the total query runtime was 3.2 times faster. As discussed in Turbocharging Query Execution on Amazon EMR at AWS re:Invent 2020, we’ve continued to improve the runtime, and our latest results show that Amazon EMR 5.30 is three times faster than without the runtime, which means you can run petabyte-scale analysis at less than half the cost of traditional on-premises solutions. For more information, see How Drop used the EMR runtime for Apache Spark to halve costs and get results 5.4 times faster.

We’ve also improved Hive and PrestoDB performance. In April 2020, we announced support for Hive Low Latency Analytical Processing (LLAP) as a YARN service starting with Amazon EMR 6.0. Our tests show that Apache Hive is two times faster with Hive LLAP on Amazon EMR 6.0. You can choose to use Hive LLAP or dynamically allocated containers. In May 2020, we introduced the Amazon EMR runtime for PrestoDB in Amazon EMR 5.30. Our most recent tests based on TPC-DS benchmark queries compare Amazon EMR 5.31, which uses the runtime, to Amazon EMR 5.29, which does not. The geometric mean in query execution time is 2.6 times faster with Amazon EMR 5.31 and the runtime for PrestoDB.

Simpler incremental data processing

Apache Hudi (Hadoop Upserts, Deletes, and Incrementals) is an open-source data management framework used for simplifying incremental data processing and data pipeline development. You can use it to perform record-level inserts, updates, and deletes in Amazon Simple Storage Service (Amazon S3) data lakes, thereby simplifying building change data capture (CDC) pipelines. With this capability, you can comply with data privacy regulations and simplify data ingestion pipelines that deal with late-arriving or updated records from sources like streaming inputs and CDC from transactional systems. Apache Hudi integrates with open-source big data analytics frameworks like Apache Spark, Apache Hive, and Presto, and allows you to maintain data in Amazon S3 or HDFS in open formats like Apache Parquet and Apache Avro.

We first supported Apache Hudi starting with Amazon EMR release 5.28 in November 2019. In June 2020, Apache Hudi graduated from incubator with release 0.6.0, which we support with Amazon EMR releases 5.31.0, 6.2.0, and higher. The Amazon EMR team collaborated with the Apache Hudi community to create a new bootstrap operation, which allows you to use Hudi with your existing Parquet datasets without needing to rewrite the dataset. This bootstrap operation accelerates the process of creating a new Apache Hudi dataset from existing datasets—in our tests using a 1 TB Parquet dataset on Amazon S3, the bootstrap performed five times faster than bulk insert.

Also in June 2020, starting with Amazon EMR release 5.30.0, we added support for the HoodieDeltaStreamer utility, which provides an easy way to ingest data from many sources, including AWS Data Migration Services (AWS DMS). With this integration, you can now ingest data from upstream relational databases to your S3 data lakes in a seamless, efficient, and continuous manner. For more information, see Apply record level changes from relational databases to Amazon S3 data lake using Apache Hudi on Amazon EMR and AWS Database Migration Service.

Amazon Athena and Amazon Redshift Spectrum added support for querying Apache Hudi datasets in S3-based data lakes—Athena announcing in July 2020 and Redshift Spectrum announcing in September. Now, you can query the latest snapshot of Apache Hudi Copy-on-Write (CoW) datasets from both Athena and Redshift Spectrum, even while you continue to use Apache Hudi support in Amazon EMR to make changes to the dataset.

Differentiated instance performance

In addition to providing better software performance with Amazon EMR runtimes, we offer more instance options than any other cloud provider, allowing you to choose the instance that gives you the best performance and cost for your workload. You choose what types of EC2 instances to provision in your cluster (standard, high memory, high CPU, high I/O) based on your application’s requirements, and fully customize your cluster to suit your requirements.

In December 2020, we announced that Amazon EMR now supports M6g, C6g, and R6g instances with versions 6.1.0, 5.31.0 and later, which enables you to use instances powered by AWS Graviton2 processors. Graviton2 processors are custom designed by AWS using 64-bit Arm Neoverse cores to deliver the best price performance for cloud workloads running in Amazon EC2. Although your performance benefit will vary based on the unique characteristics of your workloads, our tests based on the TPC-DS 3 TB benchmark showed that the EMR runtime for Apache Spark provides up to 15% improved performance and up to 30% lower costs on Graviton2 instances relative to equivalent previous generation instances.

Easier cluster optimization

We’ve also made it easier to optimize your EMR clusters. In July 2020, we introduced Amazon EMR Managed Scaling, a new feature that automatically resizes your EMR clusters for best performance at the lowest possible cost. EMR Managed Scaling eliminates the need to predict workload patterns in advance or write custom automatic scaling rules that depend on an in-depth understanding of the application framework (for example, Apache Spark or Apache Hive). Instead, you specify the minimum and maximum compute resource limits for your clusters, and Amazon EMR constantly monitors key metrics based on the workload and optimizes the cluster size for best resource utilization. Amazon EMR can scale the cluster up during peaks and scale it down gracefully during idle periods, reducing your costs by 20–60% and optimizing cluster capacity for best performance.

EMR Managed Scaling is supported for Apache Spark, Apache Hive, and YARN-based workloads on Amazon EMR versions 5.30.1 and above. EMR Managed Scaling supports EMR instance fleets, enabling you to seamlessly scale Spot Instances, On-Demand Instances, and instances that are part of a Savings Plan, all within the same cluster. You can take advantage of Managed Scaling and instance fleets to provision the cluster capacity that has the lowest chance of getting interrupted, for the lowest cost.

In October 2020, we announced Amazon EMR support for the capacity-optimized allocation strategy for provisioning EC2 Spot Instances. The capacity-optimized allocation strategy automatically makes the most efficient use of available spare capacity while still taking advantage of the steep discounts offered by Spot Instances. You can now specify up to 15 instance types in your EMR task instance fleet configuration. This provides Amazon EMR with more options in choosing the optimal pools to launch Spot Instances from in order to decrease chances of Spot interruptions, and increases the ability to relaunch capacity using other instance types in case Spot Instances are interrupted when Amazon EC2 needs the capacity back.

For more information, see How Nielsen built a multi-petabyte data platform using Amazon EMR and Contextual targeting and ad tech migration best practices.

Workload consolidation

Previously, you had to choose between using fully managed Amazon EMR on Amazon EC2 or self-managing Apache Spark on Amazon EKS. When you use Amazon EMR on Amazon EC2, you can choose from a wide range of EC2 instance types to meet price and performance requirements, but you can’t run multiple versions of Apache Spark or other applications on a cluster, and you can’t use unused capacity for non-Amazon EMR applications. When you self-manage Apache Spark on Amazon EKS, you have to do the heavy lifting of installing, managing, and optimizing Apache Spark to run on Kubernetes, and you don’t get the benefit of optimized runtimes in Amazon EMR.

You no longer have to choose. In December 2020, we announced the general availability of Amazon EMR on Amazon EKS, a new deployment option for Amazon EMR that allows you to run fully managed open-source big data frameworks on Amazon EKS. If you already use Amazon EMR, you can now consolidate Amazon EMR-based applications with other Kubernetes-based applications on the same Amazon EKS cluster to improve resource utilization and simplify infrastructure management using common Amazon EKS tools. If you currently self-manage big data frameworks on Amazon EKS, you can now use Amazon EMR to automate provisioning and management, and take advantage of the optimized Amazon EMR runtimes to deliver better performance at lower cost.

Amazon EMR on EKS enables your team to collaborate more efficiently. You can run applications on a common pool of resources without having to provision infrastructure, and co-locate multiple Amazon EMR versions on a single Amazon EKS cluster to rapidly test and verify new Amazon EMR versions and the included open-source frameworks. You can improve developer productivity with faster cluster startup times because Amazon EMR application containers on existing Amazon EKS cluster instances start within 15 seconds, whereas creating new clusters of EC2 instances can take several minutes. You can use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to programmatically author, schedule, and monitor workflows, and use EMR Studio (preview) to develop, visualize, and debug applications. We discuss Amazon MWAA and EMR Studio more in the next section.

For more information, see Run Spark on Kubernetes with Amazon EMR on Amazon EKS and Amazon EMR on EKS Development Guide.

Higher developer productivity

Of course, your goal with Amazon EMR is not only to achieve the best price performance for your big data analytics workloads, but also to deliver new insights that help you run your business.

In November 2020, we announced Amazon MWAA, a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines. Airflow workflows retrieve input from sources like Amazon S3 using Athena queries, perform transformations on EMR clusters, and can use the resulting data to train machine learning (ML) models on Amazon SageMaker. Workflows in Airflow are authored as Directed Acyclic Graphs (DAGs) using the Python programming language.

At AWS re:Invent 2020, we introduced the preview of EMR Studio, a new notebook-first integrated development environment (IDE) experience with Amazon EMR. EMR Studio makes it easy for data scientists to develop, visualize, and debug applications written in R, Python, Scala, and PySpark. It provides fully managed Jupyter notebooks and tools like Spark UI and YARN Timeline Service to simplify debugging. You can install custom Python libraries or Jupyter kernels required for your applications directly to your EMR clusters, and can connect to code repositories such as AWS CodeCommit, GitHub, and Bitbucket to collaborate with peers. EMR Studio uses AWS Single Sign-On (AWS SSO), enabling you to log in directly with your corporate credentials without signing in to the AWS Management Console.

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing using the performance-optimized EMR runtime for Apache Spark. You can create cluster templates in AWS Service Catalog to simplify running jobs for your data scientists and data engineers, and can take advantage of EMR clusters running on Amazon EC2, Amazon EKS, or both. For example, you might reuse existing EC2 instances in your shared Kubernetes cluster to enable fast startup time for development work and ad hoc analysis, and use EMR clusters on Amazon EC2 to ensure the best performance for frequently run, long-running workloads.

To learn more, see Introducing a new notebook-first IDE experience with Amazon EMR and Amazon EMR Studio.

Unified governance

At AWS, we recommend you use a Lake House Architecture to modernize your data and analytics infrastructure in the cloud. A Lake House Architecture acknowledges the idea that taking a one-size-fits-all approach to analytics eventually leads to compromises. It’s not simply about integrating a data lake with a data warehouse, but rather about integrating a data lake, data warehouse, and purpose-built analytics services, and enabling unified governance and easy data movement. For more information about this approach, see Harness the power of your data with AWS Analytics by Rahul Pathak, and his AWS re:Invent 2020 analytics leadership session.

As shown in the following diagram, Amazon EMR is one element in a Lake House Architecture on AWS, along with Amazon S3, Amazon Redshift, and more.

One of the most important pieces of a modern analytics architecture is the ability for you to authorize, manage, and audit access to data. AWS gives you the fine-grained access control and governance you need to manage access to data across a data lake and purpose-built data stores and analytics services from a single point of control.

In October 2020, we announced the general availability of Amazon EMR integration with AWS Lake Formation. By integrating Amazon EMR with AWS Lake Formation, you can enhance data access control on multi-tenant EMR clusters by managing Amazon S3 data access at the level of databases, tables, and columns. This feature also enables SAML-based single sign-on to EMR Notebooks and Apache Zeppelin, and simplifies the authentication for organizations using Active Directory Federation Services (ADFS). With this integration, you have a single place to manage data access for Amazon EMR, along with the other AWS analytics services shown in the preceding diagram. At AWS re:Invent 2020, we announced the preview of row-level security for Lake Formation, which makes it even easier to control access for all the people and applications that need to share data.

In January 2021, we introduced Amazon EMR integration with Apache Ranger. Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka. Starting with Amazon EMR 5.32, we’re including plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon S3, and Apache Hive. You can set up a multi-tenant EMR cluster, use Kerberos for user authentication, use Apache Ranger 2.0 (managed separately outside the EMR cluster) for authorization, and configure fine-grained data access policies for databases, tables, columns, and S3 objects.

With this native integration, you use the Amazon EMR security configuration to specify Apache Ranger details, without the need for custom bootstrap scripts. You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

To learn more, see Integrate Amazon EMR with AWS Lake Formation and Integrate Amazon EMR with Apache Ranger.

Jumpstart your migration to Amazon EMR

Building a modern data platform using the Lake House Architecture enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools like Amazon EMR. Migrating your big data and ML to AWS and Amazon EMR offers many advantages over on-premises deployments. These include separation of compute and storage, increased agility, resilient and persistent storage, and managed services that provide up-to-date, familiar environments to develop and operate big data applications. We can help you design, deploy, and architect your analytics application workloads in AWS and help you migrate your big data and applications.

The AWS Well-Architected Framework helps you understand the pros and cons of decisions you make while building systems on AWS. By using the framework, you learn architectural best practices for designing and operating reliable, secure, efficient, and cost-effective systems in the cloud, and ways to consistently measure your architectures against best practices and identify areas for improvement. In May 2020, we announced the Analytics Lens for the AWS Well-Architected Framework, which offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. We believe that having well-architected systems greatly increases the likelihood of business success.

To move to Amazon EMR, you can download the Amazon EMR migration guide to follow step-by-step instructions, get guidance on key design decisions, and learn best practices. You can also request an Amazon EMR Migration Workshop, a virtual workshop to jumpstart your Apache Hadoop/Spark migration to Amazon EMR. You can also learn how AWS partners have helped customers migrate to Amazon EMR in Mactores’s Seagate case study, Cloudwick’s on-premises to AWS Cloud migration to drive cost efficiency, and DNM’s global analytics platform for the cinema industry.


About the Authors

Abhishek Sinha is a Principal Product Manager at Amazon Web Services.

 

 

 

 

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

 

 

 

 

BJ Haberkorn is principal product marketing manager for analytics at Amazon Web Services. BJ has worked previously on voice technology including Amazon Alexa, real time communications systems, and processor design. He holds BS and MS degrees in electrical engineering from the University of Virginia.

Amazon MSK backup for Archival, Replay, or Analytics

Post Syndicated from Rohit Yadav original https://aws.amazon.com/blogs/architecture/amazon-msk-backup-for-archival-replay-or-analytics/

Amazon MSK is a fully managed service that helps you build and run applications that use Apache Kafka to process streaming data. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes. You can also stream changes to and from databases, and power machine learning and analytics applications.

Amazon MSK simplifies the setup, scaling, and management of clusters running Apache Kafka. MSK manages the provisioning, configuration, and maintenance of resources for a highly available Kafka clusters. It is fully compatible with Apache Kafka and supports familiar community-build tools such as MirrorMaker 2.0, Kafka Connect and Kafka streams.

Introduction

In the past few years, the volume of data that companies must ingest has increased significantly. Information comes from various sources, like transactional databases, system logs, SaaS platforms, mobile, and IoT devices. Businesses want to act as soon as the data arrives. This has resulted in increased adoption of scalable real-time streaming solutions. These solutions scale horizontally to provide the needed throughput to process data in real time, with milliseconds of latency. Customers have adopted Amazon MSK as a top choice of streaming platforms. Amazon MSK gives you the flexibility to retain topic data for longer term (default 7 days). This supports replay, analytics, and machine learning based use cases. When IT and business systems are producing and processing terabytes of data per hour, it can become expensive to store, manage, and retrieve data. This has led to legacy data archival processes moving towards cheaper, reliable, and long-term storage solutions like Amazon Simple Storage Service (S3).

Following are some of the benefits of archiving Amazon MSK topic data to Amazon S3:

  1. Reduced Cost – You only must retain the data in the cluster based on your Recovery Point Objective (RPO). Any historical data can be archived in Amazon S3 and replayed if necessary.
  2. Integration with Enterprise Data Lake – Since your data is available in S3, you can now integrate with other data analytics services like Amazon EMR, AWS Glue, Amazon Athena, to run data aggregation and analytics. For example, you can build reports to visualize month over month changes.
  3. Optimize Machine Learning Workloads – Machine learning applications will be able to train new models and improve predictions using historical streams of data available in Amazon S3. This also enables better integration with Amazon Machine Learning services.
  4. Compliance – Long-term data archival for regulatory and security compliance.
  5. Backloading data to other systems – Ability to rebuild data into other application environments such as pre-prod, testing, and more.

There are many benefits to using Amazon S3 as long-term storage for Amazon MSK topics. Let’s dive deeper into the recommended architecture for this pattern. We will present an architecture to back up Amazon MSK topics to Amazon S3 in real time. In addition, we’ll demonstrate some of the use cases previously mentioned.

Architecture

The diagram following illustrates the architecture for building a real-time archival pipeline to archive Amazon MSK topics to S3. This architecture uses an AWS Lambda function to process records from your Amazon MSK cluster when the cluster is configured as an event source. As a consumer, you don’t need to worry about infrastructure management or scaling with Lambda. You only pay for what you consume, so you don’t pay for over-provisioned infrastructure.

To create an event source mapping, you can add your Amazon MSK cluster in a Lambda function trigger. The Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches from one or more partitions and provides these to your function as an event payload. The function then processes records, and sends the payload to an Amazon Kinesis Data Firehose delivery stream. We use Kinesis Data Firehose delivery stream because it can natively batch, compress, transform, and encrypt your events before loading to S3.

In this architecture, Kinesis Data Firehose delivers the records received from Lambda in Gzip file to Amazon S3. These files are partitioned in hive style format by Kinesis Data Firehose:

data/year = yyyy/month = MM/day = dd/hour = HH

Figure 1. Archival Architecture

Figure 1. Archival Architecture

Let’s review some of the possible solutions that can be built on this archived data.

Integration with Enterprise Data Lake

The architecture diagram following shows how you can integrate the archived data in Amazon S3 with your Enterprise Data Lake. Since the data files are prefixed in hive style format, you can partition and store the Data Catalog in AWS Glue. With partitioning in place, you can perform optimizations like partition pruning, which enables predicate pushdown for improved performance of your analytics queries. You can also use AWS Data Analytics services like Amazon EMR and AWS Glue for batch analytics. Amazon Athena can be used to run serverless SQL-like interactive queries on visualization and data.

Data currently gets stored in JSON files. Following are some of the services/tools that can be integrated with your archive for reporting, analytics, visualization, and machine learning requirements.

Figure 2. Analytics Architecture

Figure 2. Analytics Architecture

Cloning data into other application environments

There are use cases where you would want to use this data to clone other application environments using this archive.

These clusters could be used for testing or debugging purposes. You could decide to use only a subset of your data from the archive. Let’s say you want to debug an issue beyond the configured retention period, but not replicate all the data to your testing environment. With archived data in S3, you can build downstream jobs to filter data that can be loaded into a new Amazon MSK cluster. The following diagram highlights this pattern:

Figure 3. Replay Architecture

Figure 3. Replay Architecture

Ready for a Test Drive

To help you get started, we would like to introduce an AWS Solution: AWS Streaming Data Solution for Amazon MSK (scroll down and see Option 3 tab). There is a single-click AWS CloudFormation template, which can assist you in quickly provisioning resources. This will get your real-time archival pipeline for Amazon MSK up and running quickly. This solution shortens your development time by removing or reducing the need for you to:

  • Model and provision resources using AWS CloudFormation
  • Set up Amazon CloudWatch alarms, dashboards, and logging
  • Manually implement streaming data best practices in AWS

This solution is data and logic agnostic, enabling you to start with boilerplate code and start customizing quickly. After deployment, use this solution’s monitoring capabilities to transition easily to production.

Conclusion

In this post, we explained the architecture to build a scalable, highly available real-time archival of Amazon MSK topics to long term storage in Amazon S3. The architecture was built using Amazon MSK, AWS Lambda, Amazon Kinesis Data Firehose, and Amazon S3. The architecture also illustrates how you can integrate your Amazon MSK streaming data in S3 with your Enterprise Data Lake.

Orchestrating analytics jobs on Amazon EMR Notebooks using Amazon MWAA

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-on-amazon-emr-notebooks-using-amazon-mwaa/

In a previous post, we introduced the Amazon EMR notebook APIs, which allow you to programmatically run a notebook on both Amazon EMR Notebooks and Amazon EMR Studio (preview) without accessing the AWS web console. With the APIs, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions triggered by AWS CloudWatch Events.

In this post, we show how to use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to orchestrate analytics jobs on EMR Notebooks. We will start by walking you through the process of using AWS CloudFormation to set up an Amazon MWAA environment, which allows you to programmatically author, schedule, and monitor different sorts of workflows on Amazon EMR. We will then use this environment to run an EMR notebook example which does data analysis with Hive.

The data source for the example in this post is from the public Amazon Customer Reviews Dataset. We use the Parquet formatted dataset as the input dataset for our EMR notebook.

Apache Airflow and Amazon MWAA

Apache Airflow is an open-source platform for authoring, scheduling, and monitoring workflows. With Apache Airflow, we can define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. For additional details on Apache Airflow, see Concepts. Many organizations build, manage, and maintain Apache Airflow on AWS using services such as Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS). Amazon MWAA is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

Prerequisites

Before getting started, you must have the following prerequisites:

  • An AWS account that provides access to AWS services.
  • AWS Command Line Interface (AWS CLI) version 1.18.128 or later installed on your workstation.
  • An Amazon Simple Storage Service (Amazon S3) bucket that meets the following Amazon MWAA requirements:
    • The bucket must be in the same AWS Region where you create the MWAA environment.
    • The bucket name must start with airflow- and should be globally unique.
    • Bucket versioning is enabled.
    • A folder named dags must be created in the same bucket to store DAGs and associated support files.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
    • The IAM user has permissions to create an IAM role and policies, launch an EMR cluster, create an Amazon MWAA environment, and create stacks in AWS CloudFormation.
  • A possible limit increase for your account. (Usually a limit increase isn’t necessary. See AWS service quotas if you encounter a limit error while building the solution.)
  • An EMR notebook created through the Amazon EMR console, using the notebook file find_best_sellers.ipynb. See Creating a Notebook for instructions on creating an EMR notebook. Record the ID of the EMR notebook (for example, <e-*************************>); you will use this later in this post.

Architecture overview

At a high level, this solution uses Amazon MWAA with Amazon EMR to build pipelines for ETL workflow orchestration. The following diagram illustrates the solution architecture.

The following diagram illustrates the solution architecture.

We use the following services and configurations in this solution:

  • Amazon S3
  • VPC network configurations
  • VPC endpoints

Amazon S3

Amazon MWAA uses an S3 bucket to store DAGs and associated support files. You must create an S3 bucket before you can create the environment, with requirements as mentioned in the Prerequisites section. To use a bucket with an Amazon MWAA environment, you must create the bucket in the same Region where you create the environment. Refer to Create an Amazon S3 bucket for Amazon MWAA for further details.

VPC network configurations

Amazon MWAA requires a VPC network that meets the following requirements:

  • Includes two private subnets that are in two different Availability Zones within the same Region
  • Includes public subnets that are configured to route the private subnet data to the internet (via NAT gateways)

For more information, see Create the VPC network using a AWS CloudFormation template.

The Airflow UI in the Amazon MWAA environment is accessible over the internet by users granted access in the IAM policy. Amazon MWAA attaches an Application Load Balancer with an HTTPS endpoint for your web server as part of the Amazon MWAA managed service. For more information, see How it works.

VPC endpoints

VPC endpoints are highly available VPC components that enable private connections between your VPC and supported AWS services. Traffic between your VPC and the other services remains in your AWS network. For our example, we use the following VPC endpoints to ensure extra security, availability, and Amazon S3 data transfer performance:

  • An Amazon S3 gateway VPC endpoint to establish a private connection between the Amazon MWAA VPC and Amazon S3
  • An EMR interface VPC endpoint to securely route traffic directly to Amazon EMR from Amazon MWAA, instead of connecting over the internet

Setting up an Amazon MWAA environment

To make it easier to get started, we created a CloudFormation template that automatically configures and deploys the Amazon MWAA environment. The template takes care of the following tasks for you:

  • Create an Amazon MWAA execution IAM role.
  • Set up the VPC network for the Amazon MWAA environment, deploying the following resources:
    • A VPC with a pair of public and private subnets spread across two Availability Zones.
    • An internet gateway, with a default route on the public subnets.
    • A pair of NAT gateways (one in each Availability Zone), and default routes for them in the private subnets.
    • Amazon S3 gateway VPC endpoints and EMR interface VPC endpoints in the private subnets in two Availability Zones.
    • A security group to be used by the Amazon MWAA environment that only allows local inbound traffic and all outbound traffic.
  • Create an Amazon MWAA environment. For this post, we select mw1.small for the environment class and choose maximum worker count as 1. For monitoring, we choose to publish environment performance to CloudWatch Metrics. For Airflow logging configuration, we choose to send only the task logs and use log level INFO.

If you want to manually create, configure, and deploy the Amazon MWAA environment without using AWS CloudFormation, see Get started with Amazon Managed Workflows for Apache Airflow (MWAA).

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. The Amazon MWAA environment is created in the same Region as you launched the CloudFormation stack. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters.

Parameter Description Default Value
Stack name Enter a meaningful name for the stack. We use MWAAEmrNBDemo for this example. Replace it with your own value. None
AirflowBucketName Name of the S3 bucket to store DAGs and support files. The S3 bucket must be in the same Region where you create the environment. The name must start with airflow-. Enter the S3 bucket created as a prerequisite. We use the S3 bucket airflow-emr-demo-us-west-2 for this post. You must replace it with your own value for this field. None
EnvironmentName An MWAA environment name that is prefixed to resource names. All the resources created by this templated are named after the value saved for this field. We name our environment mwaa-emr-blog-demo for this post. Replace it with your own value for this field. mwaa-
PrivateSubnet1CIDR The IP range (CIDR notation) for the private subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.20.0/24
PrivateSubnet2CIDR The IP range (CIDR notation) for the private subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications.. 10.192.21.0/24
PublicSubnet1CIDR The IP range (CIDR notation) for the public subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.10.0/24
PublicSubnet2CIDR The IP range (CIDR notation) for the public subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.11.0/24
VpcCIDR The IP range (CIDR notation) for this VPC being created. For more information, see AWS CloudFormation VPC stack specifications. 10.192.0.0/16

The default values for the IP range (CIDR notation) fields refer to the AWS CloudFormation VPC stack specifications. You can make changes based on the requirements of your own network settings.

  1. Enter the parameter values from the preceding table.
  2. Review the details on the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation takes a few minutes. After the CloudFormation stack is complete, on the Resources tab, you can find the resources being created in this CloudFormation stack. Now, we’re ready to run our example.

Orchestrating Hive analytics jobs on EMR Notebooks using Apache Airflow

The following diagram illustrates the workflow: As a user, you first need to create the DAG file that describes how to run the analytics jobs and upload it to the dags folder under the S3 bucket specified. The DAG can be triggered in Apache Airflow UI to orchestrate the job workflow, which includes creating an EMR cluster, waiting for the cluster to be ready, running Hive analytics jobs on EMR notebooks, uploading the results to Amazon S3, and cleaning up the cluster after the job is complete.

The following diagram illustrates the workflow.

Input notebook file

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

find_best_sellers.ipynb is a Python script that does analysis on the public Amazon Customer Reviews Dataset. It generates the top 20 best sellers in a given list of categories over a given period of time and saves the results to the given S3 output location. For demonstration purpose only, we rank the seller simply by the sum of review star ratings from verified purchases.

The explanations of the default parameters in the first cell and each code block are included in the notebook itself.

The last line in the first cell, we have OUTPUT_LOCATION = "s3://airflow-emr-demo-us-west-2/query_output/” as a default value for the input parameter. Replace it with your own value for the output location. You can also supply a different value for this for this parameter in the Airflow Variables later.

DAG file

The DAG file test_dag.py is used to orchestrate our job flow via Apache Airflow. It performs the following tasks:

  1. Create an EMR cluster with one m5.xlarge primary and two m5.xlarge core nodes on release version 6.2.0 with Spark, Hive, Livy and JupyterEnterpriseGateway installed as applications.
  2. Wait until the cluster is up and ready.
  3. Run the notebook find_best_sellers.ipynb on the EMR cluster created in Step 1.
  4. Wait until the notebook run is complete.
  5. Clean up the EMR cluster.

Here is the full source code of the DAG:

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from time import sleep
from datetime import datetime
import boto3, time
from builtins import range
from pprint import pprint
from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
from airflow.models import Variable
from airflow.utils import apply_defaults
from airflow.utils.dates import days_ago

# Available categories:
#
# Apparel,Automotive,Baby,Beauty,Books,Camera,Digital_Ebook_Purchase,Digital_Music_Purchase,
# Digital_Software,Digital_Video_Download,Digital_Video_Games,Electronics,Furniture,Gift_Card,
# Grocery,Health_&_Personal_Care,Home,Home_Entertainment,Home_Improvement,Jewelry,Kitchen,
# Lawn_and_Garden,Luggage,Major_Appliances,Mobile_Apps,Mobile_Electronics,Music,Musical_Instruments,
# Office_Products,Outdoors,PC,Personal_Care_Appliances,Pet_Products,Shoes,Software,Sports,Tools,
# Toys,Video,Video_DVD,Video_Games,Watches,Wireless

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')
# =========================================

JOB_FLOW_OVERRIDES = {
    'Name': 'Test-Cluster',
    'ReleaseLabel': 'emr-6.2.0',
    'Applications': [{'Name':'Spark'}, {'Name':'Hive'}, {'Name':'Livy'}, {'Name':'JupyterEnterpriseGateway'}],
    'Configurations': [
          {
            "Classification": "hive-site",
            "Properties": {
                "hive.execution.engine": "spark"
            }
        }
    ],
    'Instances': {
        'Ec2SubnetId': SUBNET_ID,
        'InstanceGroups': [
            {
                'Name': 'Master node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': 'Core node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 2,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'LogUri': EMR_LOG_URI
}


class CustomEmrJobFlowSensor(EmrJobFlowSensor):
    NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'TERMINATING']

class NotebookExecutionSensor(EmrBaseSensor):
    NON_TERMINAL_STATES = ['START_PENDING', 'STARTING', 'RUNNING', 'FINISHING', 'STOP_PENDING', 'STOPPING']
    FAILED_STATE = ['FAILING', 'FAILED']
    template_fields = ['notebook_execution_id']
    template_ext = ()
    @apply_defaults
    def __init__(self, notebook_execution_id, *args, **kwargs):
        super(NotebookExecutionSensor, self).__init__(*args, **kwargs)
        self.notebook_execution_id = notebook_execution_id
    def get_emr_response(self):
        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
        self.log.info('Poking notebook execution %s', self.notebook_execution_id)
        return emr.describe_notebook_execution(NotebookExecutionId=self.notebook_execution_id)
    @staticmethod
    def state_from_response(response):
        return response['NotebookExecution']['Status']
    @staticmethod
    def failure_message_from_response(response):
        state_change_reason = response['NotebookExecution']['LastStateChangeReason']
        if state_change_reason:
            return 'Execution failed with reason: ' + state_change_reason
        return None

def start_execution(**context):
    ti = context['task_instance']
    cluster_id = ti.xcom_pull(key='return_value', task_ids='create_cluster_task')
    print("Starting an execution using cluster: " + cluster_id)
    # generate a JSON key-pair of <String : String Array>, e.g. 
    # "\"CATEGORIES\": [\"Apparel\", \"Automotive\", \"Baby\", \"Books\"]"
    categories_escaped_quotes = ""
    for category in CATEGORIES_CSV.split(','):
        categories_escaped_quotes = categories_escaped_quotes + "\"" + category + "\","
    categories_escaped_quotes = categories_escaped_quotes[:-1]
    categories_parameter = "\"CATEGORIES\" : [" + categories_escaped_quotes + "]"

    output_location_parameter = "\"OUTPUT_LOCATION\": \"" + OUTPUT_LOCATION + "\""
    from_date_parameter = "\"FROM_DATE\": \"" + FROM_DATE + "\""
    to_date_parameter = "\"TO_DATE\": \"" + TO_DATE + "\""
    parameters = f"{{ {categories_parameter}, {output_location_parameter}, {from_date_parameter}, {to_date_parameter} }}"
    emr = boto3.client('emr', region_name=REGION)
    start_resp = emr.start_notebook_execution(
        EditorId=NOTEBOOK_ID,
        RelativePath=NOTEBOOK_FILE_NAME,
        ExecutionEngine={'Id': cluster_id, 'Type': 'EMR'},
        NotebookParams=parameters,
        ServiceRole='EMR_Notebooks_DefaultRole'
    )
    execution_id = start_resp['NotebookExecutionId']
    print("Started an execution: " + execution_id)
    return execution_id



with DAG('test_dag', description='test dag', schedule_interval='0 * * * *', start_date=datetime(2020,3,30), catchup=False) as dag:
    create_cluster = EmrCreateJobFlowOperator(
        task_id='create_cluster_task',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )
    cluster_sensor = CustomEmrJobFlowSensor(
        task_id='check_cluster_task',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    start_execution = PythonOperator(
        task_id='start_execution_task', 
        python_callable=start_execution,
        provide_context=True
    )
    execution_sensor = NotebookExecutionSensor(
        task_id='check_execution_task',
        notebook_execution_id="{{ task_instance.xcom_pull(task_ids='start_execution_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='terminate_cluster',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    
    create_cluster >> cluster_sensor >> start_execution >> execution_sensor >> cluster_remover

The very last line of the DAG code explains how the tasks are linked in the orchestration workflow. It’s overloading the right shift >> operator to create a dependency, meaning that the task on the left should be run first, and the output passed to the task on the right.

Instead of hard-coding the variables in the DAG code, we choose to supply these variables by importing a JSON file in the Airflow UI before actually running the DAG. This way, we can also update the variables without having to update the DAG code, which requires updating the DAG file in Amazon S3. We walk you through how to do so in the later steps. You can see the lines for VARIABLES that we repeated:

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')

We create a JSON formatted file named variables.json for our example. See the following code:

{
    "REGION": "us-west-2",
    "SUBNET_ID": "<subnet-********>",
    "EMR_LOG_URI": "s3://<S3 path for EMR logs>/",
    "NOTEBOOK_ID": "<e-*************************>",
    "NOTEBOOK_FILE_NAME": "find_best_sellers.ipynb",
    "CATEGORIES_CSV": "Apparel,Automotive,Baby,Beauty,Books",
    "FROM_DATE": "2015-08-25",
    "TO_DATE": "2015-08-31",
    "OUTPUT_LOCATION": "s3://<S3 path for query output>/"
}

To use this JSON code, you need to replace all the variable values (subnet and S3 paths) with the actual values.

Accessing Apache Airflow UI and running the workflow

To run the workflow, complete the following steps:

  1. On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

  1. Choose Open Airflow UI.
  2. Log in as an authenticated user.

Log in as an authenticated user.

Next, we import the JSON file for the variables into Airflow UI.

As we mentioned earlier, we want to supply the variable values for our DAG definition later upon triggering the DAG in Airflow UI instead of hard-coding the values.

  1. On the Admin menu, choose Variables.
  2. Choose Browse.
  3. Choose json.
  4. Choose Import Variables.

For more information about importing variables, see Variables.

  1. Run the following command in the same directory as where file test_dag.py is to upload the DAG file to the dags folder under the S3 bucket specified for the Airflow environment. Replace <your_airflow_bucket_name> with the S3 bucket name that you created as a prerequisite:
    aws s3 cp test_dag.py s3://<your_airflow_bucket_name>/dags/

test_dag.py should automatically appear in the Airflow UI.

  1. Trigger the DAG by turning it to On

Trigger the DAG by turning it to On

  1. Choose test_dag to go to the detail page for the DAG.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

  1. Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

You now get an email when failure happens on any of the tasks. You can also configure to get email notification when retry happens as well.

  1. On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

  1. On the Airflow UI, you can switch tabs to check the status of the workflow tasks.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

As specified in our DAG definition, the EMR cluster is stopped when the workflow is complete.

Because we use the cron expression 0 * * * * as the scheduled running interval for our workflow, if the triggered status of the DAG is ON, it runs every hour. You need to switch the status to OFF if you don’t want it to run again.

  1. On the Amazon S3 console, view the result of our notebook job in the S3 folder.

On the Amazon S3 console, view the result of our notebook job in the S3 folder.

For example, the following screenshot is the output for the Books category that we provided as a value in the CATEGORIES parameter. As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how to use the Amazon EMR Notebooks API and use orchestration services such as Amazon MWAA to build ETL pipelines. It demonstrated how set up a secured Amazon MWAA environment using a CloudFormation template and run a sample workflow with Apache Airflow.

If you want to learn how to run Amazon EMR applications such as PySpark with Amazon MWAA, see Running Spark Jobs on Amazon EMR with Apache Airflow.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

Run Apache Spark 3.0 workloads 1.7 times faster with Amazon EMR runtime for Apache Spark

Post Syndicated from Al MS original https://aws.amazon.com/blogs/big-data/run-apache-spark-3-0-workloads-1-7-times-faster-with-amazon-emr-runtime-for-apache-spark/

With Amazon EMR release 6.1.0, Amazon EMR runtime for Apache Spark is now available for Spark 3.0.0. EMR runtime for Apache Spark is a performance-optimized runtime for Apache Spark that is 100% API compatible with open-source Apache Spark.

In our benchmark performance tests using TPC-DS benchmark queries at 3 TB scale, we found EMR runtime for Apache Spark 3.0 provides a 1.7 times performance improvement on average, and up to 8 times improved performance for individual queries over open-source Apache Spark 3.0.0. With Amazon EMR 6.1.0, you can now run your Apache Spark 3.0 applications faster and cheaper without requiring any changes to your applications.

Results observed using TPC-DS benchmarks

To evaluate the performance improvements, we used TPC-DS benchmark queries with 3 TB scale and ran them on a 6-node c4.8xlarge EMR cluster with data in Amazon Simple Storage Service (Amazon S3). We ran the tests with and without the EMR runtime for Apache Spark. The following two graphs compare the total aggregate runtime and geometric mean for all queries in the TPC-DS 3 TB query dataset between the Amazon EMR releases.

The following table shows the total runtime in seconds.

The following table shows the total runtime in seconds.

The following table shows the geometric mean of the runtime in seconds.

The following table shows the geometric mean of the runtime in seconds.

In our tests, all queries ran successfully on EMR clusters that used the EMR runtime for Apache Spark. However, when using Spark 3.0 without the EMR runtime, 34 out of the 104 benchmark queries failed due to SPARK-32663. To work around these issues, we disabled spark.shuffle.readHostLocalDisk configuration. However, even after this change, queries 14a and 14b continued to fail. Therefore, we chose to exclude these queries from our benchmark comparison.

The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is illustrated in the following chart. The horizontal axis shows each query in the TPC-DS 3 TB benchmark. The vertical axis shows the speedup of each query due to the EMR runtime. We found a 1.7 times performance improvement as measured by the geometric mean of the per-query speedups, with all queries showing a performance improvement with the EMR Runtime.

The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is also illustrated in the following chart.

Conclusion

You can run your Apache Spark 3.0 workloads faster and cheaper without making any changes to your applications by using Amazon EMR 6.1. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more great Apache Spark optimizations, configuration best practices, and tuning advice.


About the Authors

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

 

 

 

 

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

Building complex workflows with Amazon MWAA, AWS Step Functions, AWS Glue, and Amazon EMR

Post Syndicated from Dipankar Ghosal original https://aws.amazon.com/blogs/big-data/building-complex-workflows-with-amazon-mwaa-aws-step-functions-aws-glue-and-amazon-emr/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS and build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

You can use AWS Step Functions as a serverless function orchestrator to build scalable big data pipelines using services such as Amazon EMR to run Apache Spark and other open-source applications on AWS in a cost-effective manner, and use AWS Glue for a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs

For production pipelines, a common use case is to read data originating from a variety of sources. This data requires transformation to extract business value and generate insights before sending to downstream applications, such as machine learning algorithms, analytics dashboards, and business reports.

This post demonstrates how to use Amazon MWAA as a primary workflow management service to create and run complex workflows and extend the directed acyclic graph (DAG) to start and monitor a state machine created using Step Functions. In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Architectural overview

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. This workflow uses Amazon EMR to preprocess data and starts a Step Functions state machine. The state machine transforms data using AWS Glue.

The state machine transforms data using AWS Glue.

The workflow includes the following core components:

  1. Airflow Scheduler triggers the DAG based on a schedule or manually.
  2. DAG uses PythonOperator to create an EMR cluster and waits for the cluster creation process to complete.
  3. DAG uses a custom operator EmrSubmitAndMonitorStepOperator to submit and monitor the Amazon EMR step.
  4. DAG uses PythonOperator to stop the EMR cluster when the preprocessing tasks are complete.
  5. DAG starts a Step Functions state machine and monitors it for completion using PythonOperator.

You can build complex ETL pipelines with Step Functions separately and trigger them from an Airflow DAG.

Prerequisites

Before starting, create an Amazon MWAA environment. If this is your first time using Amazon MWAA, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs. It’s located on the environment details page on the Amazon MWAA console.

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs.

Also note the AWS Identity and Access Management (IAM) execution role. This role should be modified to allow MWAA to read and write from your S3 bucket, submit an Amazon EMR step, start a Step Functions state machine, and read from the AWS Systems Manager Parameter Store. The IAM role is available in the Permissions section of the environment details.

The IAM role is available in the Permissions section of the environment details.

The solution references Systems Manager parameters in an AWS CloudFormation template and scripts. For information on adding and removing IAM identity permissions, see Adding and removing IAM identity permissions. A sample IAM policy is also provided in the GitHub repository amazon-mwaa-complex-workflow-using-step-functions.

For this post, we use the MovieLens dataset. We concurrently convert the MovieLens CSV files to Parquet format and save them to Amazon S3 as part of preprocessing.

Setting up the state machine using Step Functions

Our solution extends the ETL pipeline to run a Step Functions state machine from the Airflow DAG. Step Functions lets you build visual workflows that enable fast translation of business requirements into technical requirements. With Step Functions, you can set up dependency management and failure handling using a JSON-based template. A workflow is a series of steps, such as tasks, choices, parallel runs, and timeouts with the output of one step acting as input into the next. For more information about other use cases, see AWS Step Functions Use Cases.

The following diagram shows the ETL process set up through a Step Functions state machine.

The following diagram shows the ETL process set up through a Step Functions state machine.

In the workflow, the Process Data step runs an AWS Glue job, and the Get Job Status step periodically checks for the job completion. The AWS Glue job reads the input datasets and creates output data for the most popular movies and top-rated movies. After the job is complete, the Run Glue Crawler step runs an AWS Glue crawler to catalog the data. The workflow also allows you to monitor and respond to failures at any stage.

Creating resources

Create your resources by following the installation instructions provided in the amazon-mwaa-complex-workflow-using-step-functions README.md.

Running the ETL workflow

To run your ETL workflow, complete the following steps:

  1. On the Amazon MWAA console, choose Open Airflow UI.
  2. Locate the mwaa_movielens_demo DAG.
  3. Turn on the DAG.

Turn on the DAG.

  1. Select the mwaa_movielens_demo DAG and choose Graph View.

This displays the overall ETL pipeline managed by Airflow.

This displays the overall ETL pipeline managed by Airflow.

  1. To view the DAG code, choose Code.

To view the DAG code, choose Code.

The code for the custom operator can be found in the amazon-mwaa-complex-workflow-using-step-functions GitHub repo. 

  1. From the Airflow UI, select the mwaa_movielens_demo DAG and choose Trigger DAG.
  2. Leave the Optional Configuration JSON box blank.

Leave the Optional Configuration JSON box blank.

When the Airflow DAG runs, the first task calls the PythonOperator to create an EMR cluster using Boto3. Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3. Boto provides object-oriented API, as well as low-level access to AWS services.

The second task waits until the EMR cluster is ready and in the Waiting state. As soon as the cluster is ready, the data load task runs, followed by the data preprocessing tasks, which are started in parallel using EmrSubmitAndMonitorStepOperator. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. You can change the concurrency of Amazon EMR to run multiple Amazon EMR steps in parallel.

When the data preprocessing tasks are complete, the EMR cluster is stopped and the DAG starts the Step Functions state machine to initiate data transformation.

The final task in the DAG monitors the completion of the Step Functions state machine.

The DAG run should complete in approximately 10 minutes.

Verifying the DAG run

While the DAG is running, you can view the task logs.

  1. From Graph View, select any task and choose View Log.

From Graph View, select any task and choose View Log.

  1. When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

  1. You can also monitor ETL process completion from the Airflow UI.

You can also monitor ETL process completion from the Airflow UI.

  1. On the Airflow UI, verify the completion from the log entries.

On the Airflow UI, verify the completion from the log entries.

Querying the data

After the successful completion of the Airflow DAG, two tables are created in the AWS Glue Data Catalog. To query the data with Amazon Athena, complete the following steps:

  1. On the Athena console, choose Databases.
  2. Select the mwaa-movielens-demo-db database.

You should see the two tables. If the tables aren’t listed, verify that the AWS Glue crawler run is complete and that the console is showing the correct Region.

  1. Run the following query:
    SELECT * FROM "mwaa-movielens-demo-db"."most_popular_movies" limit 10;

The following screenshot shows the output.

The following screenshot shows the output.

Cleaning up

To clean up the resources created as part of our CloudFormation template, delete the mwaa-demo-foundations stack. You can either use the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI).

Conclusion

In this post, we used Amazon MWAA to orchestrate an ETL pipeline on Amazon EMR and AWS Glue with Step Functions. We created an Airflow DAG to demonstrate how to run data processing jobs concurrently and extended the DAG to start a Step Functions state machine to build a complex ETL pipeline. A custom Airflow operator submitted and then monitored the Amazon EMR steps synchronously.

If you have comments or feedback, please leave them in the comments section.


About the Author

Dipankar GhosalDipankar Ghosal is a Sr Data Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

Introducing Amazon EMR integration with Apache Ranger

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-integration-with-apache-ranger/

Data security is an important pillar in data governance. It includes authentication, authorization , encryption and audit.

Amazon EMR enables you to set up and run clusters of Amazon Elastic Compute Cloud (Amazon EC2) instances with open-source big data applications like Apache Spark, Apache Hive, Apache Flink, and Presto. You may also want to set up multi-tenant EMR clusters where different users (or teams) can use a shared EMR cluster to run big data analytics workloads. In a multi-tenant cluster, it becomes important to set up mechanisms for authentication (determine who is invoking the application and authenticate the user), authorization (set up who has access to what data), and audit (maintain a log of who accessed what data).

Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka.

We’re happy to share that starting with Amazon EMR 5.32, we’re including plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon Simple Storage Service (Amazon S3), and Apache Hive.

You can set up a multi-tenant EMR cluster, use Kerberos for user authentication, use Apache Ranger 2.0 (managed separately outside the EMR cluster) for authorization, and configure fine-grained data access policies for databases, tables, columns, and S3 objects. In this post, we explain how you can set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. We show how you can set up multiple short-running and long-running EMR clusters with a single, centralized Apache Ranger server that maintains data access control policies.

Managed Apache Ranger plugins for PrestoSQL and PrestoDB will soon follow.

You should consider this solution if one or all of these apply:

  • Have experience setting up and managing Apache Ranger admin server (needs to be self-managed)
  • Want to port existing Apache Ranger Hive policies over to Amazon EMR
  • Need to use the database-backed Hive Metastore and can’t use the AWS Glue Data Catalogdue to limitations
  • Require authorization support for Apache Spark (SQL and storage and file access) and Amazon S3
  • Store Apache Ranger authorization audits in Amazon Cloudwatch, avoiding the need to maintain an Apache Solr infrastructure

With this native integration, you use the Amazon EMR security configuration to specify Apache Ranger details, without the need for custom bootstrap scripts. You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

The following image shows table and column-level access set up for Apache SparkSQL.

Additionally, SSH users are blocked from getting AWS Identity and Access Management (IAM) permissions tied with the Amazon EMR instance profiles. This disables  access to Amazon S3 using tools like the AWS Command Line Interface(AWS CLI).

The following screenshot that shows access to Amazon S3 blocked when using AWS CLI.

The following screenshot that shows access to Amazon S3 blocked when using AWS CLI. 

The following screenshots shows how access to the same Amazon S3 location is set up and used through EMRFS (default EMR file system implementation for reading and writing files from Amazon S3).

Prerequisites

Before getting started, you must have the following prerequisites:

  • Self-managed Apache Ranger server (2.x only) outside of an EMR cluster
  • TLS mutual authentication enabled between Apache Ranger server and Apache Ranger plugins running on the EMR cluster
  • Additional IAM roles:
    • IAM role for Apache Ranger– Defines privileges that trusted processes have when submitting Spark and Hive jobs
    • IAM role for other AWS services– Defines privileges that end-users have when accessing services that aren’t protected by Apache Ranger plugins.
  • Updates to the Amazon EC2 EMR role:
  • New Apache Ranger service definitions installed for Apache Spark and Amazon S3
  • Apache Ranger server certificate and private key for plugins uploaded into Secrets Manager
  • A CloudWatch log group for Apache Ranger audits

Architecture overview

The following diagram illustrates the architecture for this solution.

The following diagram illustrates the architecture for this solution.

In the architecture, the Amazon EMR secret agent intercepts user requests and vends credentials based on user and resources. The Amazon EMR record server receives requests to access data from Spark, reads data from Amazon S3, and returns filtered data based on Apache Ranger policies.

See Amazon EMR Components to learn more about Amazon EMR Secret Agent and Record Server.

Setting up your resources

In this section, we walk you through setting up your resources manually.

If you want to use CloudFormation scripts to automate the setup, see the section Setting up your architecture with CloudFormation later in this post.

Uploading SSL private keys and certificates to Secrets Manager

Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see this script.

Uploading SSL private keys and certificates to Secrets Manager

Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see the script create-tls-certs.sh.

Setting up an Apache Ranger server

You need to set up a two-way SSL-enabled Apache Ranger server. To set up the server manually, refer to the script install-ranger-admin-server.sh.

Installing Apache Ranger service definitions

In this section, we review installing the Apache Ranger service definitions for Apache Spark and Amazon S3.

Apache Spark

To add a new Apache Ranger service definition, see the following script:

mkdir /tmp/emr-spark-plugin/
cd /tmp/emr-spark-plugin/

# Download the Service definition
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-spark.json

# Download Service implementation jar/class
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-spark-plugin-2.x.jar

# Copy Service implementation jar to Ranger server
export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark
mv ranger-spark-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark

# Add the service definition using the Ranger REST API
curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-spark.json \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

This script is included in the Apache Ranger server setup script, if you’re deploying resources with the CloudFormation template.

The policy definition is similar to Apache Hive, except that the actions are limited to select only. The following screenshot shows the definition settings.

The following screenshot shows the definition settings.

To change permissions, for the user, choose select.

To change permissions, for the user, choose select.

Amazon S3 (via Amazon EMR File System)

Similar to Apache Spark, we have a new Apache Ranger service definition for Amazon S3. See the following script:

mkdir /tmp/emr-emrfs-s3-plugin/
cd /tmp/emr-emrfs-s3-plugin/

# Download the Service definition
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-emrfs.json

# Download Service implementation jar/class
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-emr-emrfs-plugin-2.x.jar

# Copy Service implementation jar to Ranger server
export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs
mv ranger-emrfs-s3-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs 

# Add the service definition using the Ranger REST API
curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-emrfs.json \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

If you’re using the CloudFormation template, this script is included in the Apache Ranger server setup script.

The following screenshot shows the policy details.

The following screenshot shows the policy details.

You can enable standard Amazon S3 access permissions in this policy.

You can enable standard Amazon S3 access permissions in this policy. 

Importing your existing Apache Hive policies

You can import your existing Apache Hive policies into the Apache Ranger server tied to the EMR cluster. For more information, see User Guide for Import-Export.

The following image shows how to use Apache Ranger’s export and import option.

 

CloudWatch for Apache Ranger audits

Apache Ranger audits are sent to CloudWatch. You should create a new Cloudwatch log group and specify that in the security configuration. See the following code:

aws logs create-log-group --log-group-name /aws/emr/rangeraudit/

You can search audit information using CloudWatch Insights. The following screenshot shows a query.

The following screenshot shows a query.
The following screenshot shows a query.

New Amazon EMR security configuration

The new Amazon EMR security configuration requires the following inputs:

  • IP address of the Apache Ranger server
  • IAM role for the Apache Ranger service (see the GitHub repo) running on the EMR cluster and accessing other AWS services (see the GitHub repo)
  • Secrets Manager name with the Apache Ranger admin server certificate
  • Secrets Manager name with the private key used by the plugins
  • CloudWatch log group name

The following code is an example of using the AWS CLI to create this security configuration:

aws emr create-security-configuration --name MyEMRRangerSecurityConfig --security-configuration
'{
   "EncryptionConfiguration":{
      "EnableInTransitEncryption":false,
      "EnableAtRestEncryption":false
   },
   "AuthenticationConfiguration":{
      "KerberosConfiguration":{
         "Provider":"ClusterDedicatedKdc",
         "ClusterDedicatedKdcConfiguration":{
            "TicketLifetimeInHours":24
         }
      }
   },
   "AuthorizationConfiguration":{
      "RangerConfiguration":{
         "AdminServerURL":"https://<RANGER ADMIN SERVER IP>:8080",
         "RoleForRangerPluginsARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<RANGER PLUGIN DATA ACCESS ROLE NAME>",
         "RoleForOtherAWSServicesARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<USER ACCESS ROLE NAME>",
         "AdminServerSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES ADMIN SERVERS PUBLIC TLS CERTICATE>",
         "RangerPluginConfigurations":[
            {
               "App":"Spark",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES SPARK PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"spark-policy-repository"
            },
            {
               "App":"Hive",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES HIVE PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"hive-policy-repository"
            },
            {
               "App":"EMRFS-S3",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES EMRFS S3 PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"emrfs-policy-repository"
            }
         ],
         "AuditConfiguration":{
            "Destinations":{
               "AmazonCloudWatchLogs":{
                  "CloudWatchLogGroup":"arn:aws:logs:us-east-1:<AWS ACCOUNT ID>:log-group:<LOG GROUP NAME FOR AUDIT EVENTS>"
               }
            }
         }
      }
   }
}'

Install Amazon EMR cluster with Kerberos

Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

Setting up your architecture with CloudFormation

To help you get started, we added a new GitHub repo with setup instructions. The following diagram shows the logical architecture after the CloudFormation stack is fully deployed. Review the roadmap for future enhancements.

Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

To set up this architecture using CloudFormation, complete the following steps:

  1. Use the create-tls-certs.sh script to upload the SSL key and certifications to Secrets Manager.
  2. Set up the VPC or Active Directory server by launching the following CloudFormation template.
  3. Verify DHCP options to make sure the domain name servers for the VPC are listed in the right order (LDAP/AD server first, followed by AmazonProvidedDNS).
  4. Set up the Apache Ranger server,  Amazon Relational Database Service (Amazon RDS) instance, and EMR cluster by launching the following CloudFormation template.

Limitations

When using this solution, keep in mind the following limitations:

  • As of this writing, Amazon EMR 6.x isn’t supported (only Amazon EMR 5.32+ is supported)
  • Non-Kerberos clusters will not be supported.
  • Jobs must be submitted through Apache Zeppelin, Hue, Livy, and SSH.
  • Only selected applications can be installed on the Apache Ranger-enabled EMR cluster, such as Hadoop, Tez and Ganglia. For a full list, see Supported Applications. The cluster creation request is rejected if you choose applications outside this supported list.
  • As of this writing, the SparkSQL plugin doesn’t support column masking and row-level filters.
  • The SparkSQL INSERT INTO and INSERT OVERWRITE overrides aren’t supported.
  • You can’t view audits on the Apache Ranger UI as they’re sent to CloudWatch.
  • The AWS Glue Data Catalog isn’t supported as the Apache Hive Metastore.

Available now

Native support for Apache Ranger 2.0 with Apache Hive, Apache Spark, and Amazon S3 is available today in the following AWS Regions:

  • US East (Ohio)
  • US East (N. Virginia)
  • US West (N. California)
  • US West (Oregon)
  • Africa (Cape Town)
  • Asia Pacific (Hong Kong)
  • Asia Pacific (Mumbai)
  • Asia Pacific (Seoul)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Canada (Central)
  • Europe (Frankfurt)
  • Europe (Ireland)
  • Europe (London)
  • Europe (Paris)
  • Europe (Milan)
  • Europe (Stockholm)
  • South America (São Paulo)
  • Middle East (Bahrain)

For the latest Region availability, see Amazon EMR Management Guide.

Conclusion

Amazon EMR 5.32 includes plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon S3, and Apache Hive. This post demonstrates how to set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. If you have any thoughts of questions, please leave them in the comments.


About the Author

Varun Rao Bhamidimarri is a Sr Manager, AWS Analytics Specialist Solutions Architect team. His focus is helping customers with adoption of cloud-enabled analytics solutions to meet their business requirements. Outside of work, he loves spending time with his wife and two kids, stay healthy, mediate and recently picked up garnering during the lockdown.

Testing data quality at scale with PyDeequ

Post Syndicated from Calvin Wang original https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/

You generally write unit tests for your code, but do you also test your data? Incoming data quality can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. Examples of data quality issues include the following:

  • Missing values can lead to failures in production system that require non-null values (NullPointerException)
  • Changes in the distribution of data can lead to unexpected outputs of machine learning (ML) models
  • Aggregations of incorrect data can lead to wrong business decisions

In this post, we introduce PyDeequ, an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark, the language of choice of many data scientists. PyDeequ democratizes and extends the power of Deequ by allowing you to use it alongside the many data science libraries that are available in that language. Furthermore, PyDeequ allows for fluid interface with Pandas DataFrames as opposed to restricting within Apache Spark DataFrames.

Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (billions of rows) that typically live in a data lake, distributed file system, or a data warehouse. PyDeequ gives you access to this capability, but also allows you to use it from the familiar environment of your Python Jupyter notebook.

Deequ at Amazon

Deequ is used internally at Amazon to verify the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues don’t propagate to consumer data pipelines, reducing their blast radius.

Deequ is also used within Amazon SageMaker Model Monitor. Now with the availability of PyDeequ, you can use it from a broader set of environments— Amazon SageMaker notebooks, AWS Glue, Amazon EMR, and more.

Overview of PyDeequ

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram):

  • Metrics computation – Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon Simple Storage Service (Amazon S3) and compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
  • Constraint verification – As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
  • Constraint suggestion – You can choose to define your own custom data quality constraints or use the automated constraint suggestion methods that profile the data to infer useful constraints.
  • Python wrappers – You can call each Deequ function using Python syntax. The wrappers translate the commands to the underlying Deequ calls and return their response.

Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram)

Use case overview

As a running example, we use a customer review dataset provided by Amazon on Amazon S3. We intentionally follow the example in the post Test data quality at scale with Deequ to show the similarity in functionality and implementation. We begin the way many data science projects do: with initial data exploration and assessment in a Jupyter notebook.

If you’d like to follow along with a live Jupyter notebook, check out the notebook on our GitHub repo.

During the data exploration phase, you want to easily answer some basic questions about the data:

  • Are the fields that are supposed to contain unique values really unique? Are there fields that are missing values?
  • How many distinct categories are there in the categorical fields?
  • Are there correlations between some key features?
  • If there are two supposedly similar datasets (such as different categories or different time periods), are they really similar?

We also show you how to scale this approach to large-scale datasets, using the same code on an Amazon EMR cluster. This is how you’d likely do your ML training, and later as you move into a production setting.

Starting a PySpark session in a SageMaker notebook

To follow along with this post, open up a SageMaker notebook instance, clone the PyDeequ GitHub on the Sagemaker notebook instance, and run the test_data_quality_at_scale.ipynb notebook from the tutorials directory from the PyDeequ repository.

Let’s install our dependencies first in a terminal window:

$ pip install pydeequ

Next, in a cell of our SageMaker notebook, we need to create a PySpark session:

import sagemaker_pyspark
import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = (SparkSession
    .builder
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

Loading data

Load the dataset containing reviews for the category Electronics into our Jupyter notebook:

df = spark.read.parquet("s3a://amazon-reviews-pds/parquet/product_category=Electronics/")

After you load the DataFrame, you can run df.printSchema() to view the schema of the dataset:

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)

Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. As with Deequ, PyDeequ supports a rich set of metrics. For more information, see Test data quality at scale with Deequ or the GitHub repo. In the following example, we use the AnalysisRunner to capture the metrics you’re interested in:

from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("review_id")) \
                    .addAnalyzer(ApproxCountDistinct("review_id")) \
                    .addAnalyzer(Mean("star_rating")) \
                    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
                    .addAnalyzer(Correlation("total_votes", "star_rating")) \
                    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

The following table summarizes our findings.

Name Instance Value
ApproxCountDistinct review_id 3010972
Completeness review_id 1
Compliance top star_rating 0.74941
Correlation helpful_votes,total_votes 0.99365
Correlation total_votes,star_rating -0.03451
Mean star_rating 4.03614
Size * 3120938

From this, we learn the following:

  • review_id has no missing values and approximately 3,010,972 unique values
  • 9% of reviews have a star_rating of 4 or higher
  • total_votes and star_rating are not correlated
  • helpful_votes and total_votes are strongly correlated
  • The average star_rating is 4.0
  • The dataset contains 3,120,938 reviews

Defining and running tests for data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the VerificationSuite and add checks on attributes of the data. In this example, we test for the following properties of our data:

  • At least 3 million rows in total
  • review_id is never NULL
  • review_id is unique
  • star_rating has a minimum of 1.0 and maximum of 5.0
  • marketplace only contains US, UK, DE, JP, or FR
  • year does not contain negative values

This is the code that reflects the previous statements. For information about all available checks, see the GitHub repo. You can run this directly in the Spark shell as previously explained:

from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Amazon Electronics Review Check")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3000000) \
        .hasMin("star_rating", lambda x: x == 1.0) \
        .hasMax("star_rating", lambda x: x == 5.0)  \
        .isComplete("review_id")  \
        .isUnique("review_id")  \
        .isComplete("marketplace")  \
        .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"]) \
        .isNonNegative("year")) \
    .run()
    
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

After calling run(), PyDeequ translates your test description into Deequ, which translates it into a series of Spark jobs that are run to compute metrics on the data. Afterwards, it invokes your assertion functions (for example, lambda x: x == 1.0 for the minimum star rating check) on these metrics to see if the constraints hold on the data. The following table summarizes our findings.

Constraint constraint_status constraint_message
SizeConstraint(Size(None)) Success
MinimumConstraint(Minimum(star_rating,None)) Success
MaximumConstraint(Maximum(star_rating,None)) Success
CompletenessConstraint(Completeness(review_id,None)) Success
UniquenessConstraint(Uniqueness(List(review_id))) Failure Value: 0.9926566948782706 does not meet the constraint requirement!
CompletenessConstraint(Completeness(marketplace,None)) Success
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None)) Success
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None)) Success

Interestingly, the review_id column isn’t unique, which resulted in a failure of the check on uniqueness. We can also look at all the metrics that Deequ computed for this check by running the following:

checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult)
checkResult_df.show()

The following table summarizes our findings.

Name Instance Value
Completeness review_id 1
Completeness marketplace 1
Compliance marketplace contained in US,UK,DE,JP,FR 1
Compliance year is non-negative 1
Maximum star_rating 5
Minimum star_rating 1
Size * 3120938
Uniqueness review_id 0.99266

Automated constraint suggestion

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see the GitHub repo.

from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()

# Constraint Suggestions in JSON format
print(json.dumps(suggestionResult, indent=2))

The result contains a list of constraints with descriptions and Python code, so that you can directly apply it in your data quality checks. Call print(json.dumps(result_json)) to inspect the suggested constraints; the following table shows a subset.

Column Constraint Python code
customer_id customer_id is not null .isComplete("customer_id")
customer_id customer_id has type Integral .hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_id customer_id has no negative values .isNonNegative("customer_id")
helpful_votes helpful_votes is not null .isComplete("helpful_votes")
helpful_votes helpful_votes has no negative values .isNonNegative("helpful_votes")
marketplace marketplace has value range “US”, “UK”, “DE”, “JP”, “FR” .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"])
product_title product_title is not null .isComplete("product_title")
star_rating star_rating is not null .isComplete("star_rating")
star_rating star_rating has no negative values .isNonNegative("star_rating")
vine vine has value range “N”, “Y” .isContainedIn("vine", ["N", "Y"])

You can explore the other tutorials in the PyDeequ GitHub repo.

Scaling to production

So far, we’ve shown you how to use these capabilities in the context of data exploration using a Jupyter notebook running on a SageMaker notebook instance. As your project matures, you need to use the same capabilities on larger and larger datasets, and in a production environment. With PyDeequ, it’s easy to make that transition. The following diagram illustrates deployment options for local and production purposes on AWS.

The following diagram illustrates deployment options for local and production purposes on AWS.

Amazon EMR and AWS Glue interface with PyDeequ through the PySpark drivers that PyDeequ utilizes as its main engine. PyDeequ can run as a PySpark application in both contexts when the Deequ JAR is added the Spark context. You can run PyDeequ’s data validation toolkit after the Spark context and drivers are configured and your data is loaded into a DataFrame. We describe the Amazon EMR configuration options and use cases in this section (configurations 2 and 3 in the diagram).

Data exploration from a SageMaker notebook via an EMR cluster

As shown in configuration 2 in the diagram, you can connect to an EMR cluster from a SageMaker notebook to run PyDeequ. This enables you to explore much larger volumes of data than you can using a single notebook. Your Amazon EMR cluster must be running Spark v2.4.6, available with Amazon EMR version 5.31 or higher, in order to work with PyDeequ. After you have a running cluster that has those components and a SageMaker notebook, you configure a SparkSession object using the following template to connect to your cluster. For more information about connecting a SageMaker notebook to Amazon EMR or the necessary IAM permissions, see Submitting User Applications with spark-submit.

In the SageMaker notebook, run the following JSON in a cell before you start your SparkSession to configure your EMR cluster:

%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
          "spark.jars.packages": "com.amazon.deequ:deequ:1.0.3",
          "spark.jars.excludes": "net.sourceforge.f2j:arpack_combined_all"
         }
}

Start your SparkSession object in a cell after the preceding configuration by running spark. Then install PyDeequ onto your EMR cluster using the SparkContext (default named sc) with the following command:

sc.install_pypi_package('pydeequ')

Now you can start using PyDeequ from your notebook to run the same statements as before, but with much larger volumes of data.

Running a transient EMR cluster

Another way to leverage the power of an EMR cluster is to treat it as a transient cluster and run it in a headless configuration, as shown in configuration 3 in the diagram. We use spark-submit in an EMR add-step to run PyDeequ on Amazon EMR. For each of the following steps, make sure to replace the values in brackets accordingly.

  1. Create a bootstrap shell script and upload it to an S3 bucket. The following code is an example of pydeequ-emr-bootstrap.sh:
    #!/bin/bash
    
    sudo python3 -m pip install --no-deps pydeequ
    sudo python3 -m pip install pandas 

  1. Create an EMR cluster via the AWS Command Line Interface (AWS CLI):
    $ aws emr create-cluster \
    --name 'my-pydeequ-cluster' \
    --release-label emr-5.31.0 --applications Name=Spark Name=Hadoop Name=Hive Name=Livy Name=Pig Name=Hue 
    --use-default-roles \
    --instance-type m5.xlarge \
    --instance-count 2 \
    --bootstrap-actions \
        Path="s3://<S3_PATH_TO_BOOTSTRAP>/pydeequ-emr-bootstrap.sh",Name='install_pydeequ' \
    --visible-to-all-users \
    --enable-debugging \
    --ec2-attributes KeyName="<MY_SSH_KEY>",SubnetId="<MY_SUBNET>" \
    --auto-scaling-role EMR_AutoScaling_DefaultRole

  1. Create your PySpark PyDeequ run script and upload into Amazon S3. The following code is our example of pydeequ-test.py:
    import sys
    import pydeequ
    from pydeequ.checks import *
    from pydeequ.verification import *
    from pyspark.sql import SparkSession, Row
    
    if __name__ == "__main__":
    
        with SparkSession.builder.appName("pydeequ").getOrCreate() as spark:
    
            df = spark.sparkContext.parallelize([
                Row(a="foo", b=1, c=5),
                Row(a="bar", b=2, c=6),
                Row(a="baz", b=3, c=None)]).toDF()
    
            check = Check(spark, CheckLevel.Error, "Integrity checks")
    
            checkResult = VerificationSuite(spark) \
                .onData(df) \
                .addCheck(
                    check.hasSize(lambda x: x >= 3) \
                    .hasMin("b", lambda x: x == 0) \
                    .isComplete("c")  \
                    .isUnique("a")  \
                    .isContainedIn("a", ["foo", "bar", "baz"]) \
                    .isNonNegative("b")) \
                .run()
    
            checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
            checkResult_df.repartition(1).write.csv("s3a://<PATH_TO_OUTPUT>/pydeequ-out.csv", sep='|')

  1. When your cluster is running and in the WAITING stage, submit your Spark job to Amazon EMR via the AWS CLI:
    $ aws emr add-steps \
    --cluster-id <MY_CLUSTER_ID> \
    --steps Type=Spark,Name="pydeequ-spark-submit",Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,--packages,com.amazon.deequ:deequ:1.0.3,--exclude-packages,net.sourceforge.f2j:arpack_combined_all,s3a://pydeequ-emr/setup/pydeequ-test.py],ActionOnFailure=CANCEL_AND_WAIT

Congratulations, you have now submitted a PyDeequ PySpark job to Amazon EMR. Give the job a few minutes to run, after which you can view your results at the S3 output path specified on the last line of pydeequ-test.py.

Afterwards, remember to clean up your results and spin down the EMR cluster using the following command:

$ aws emr terminate-clusters --cluster-ids <MY_CLUSTER_ID>

Now you can use Amazon EMR to process large datasets in batch using PyDeequ to plug into your pipelines and provide scalable tests on your data.

More examples on GitHub

You can find examples of more advanced features on the Deequ GitHub page:

  • Deequ provides more than data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
  • Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
  • If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

Conclusion

This post showed you how to use PyDeequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. PyDeequ is available via pip install and on GitHub now for you to build your own data quality management pipeline.

Learn more about the inner workings of Deequ in the VLDB 2018 paper Automating large-scale data quality verification.

Stay tuned for another post demonstrating production workflows on AWS Glue.


About the Authors

Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.

 

 

Chris Ghyzel is a Data Engineer for AWS Professional Services. Currently, he is working with customers to integrate machine learning solutions on AWS into their production pipelines.

 

 

 

Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

Dream11’s journey to building their Data Highway on AWS

Post Syndicated from Pradip Thoke original https://aws.amazon.com/blogs/big-data/dream11s-journey-to-building-their-data-highway-on-aws/

This is a guest post co-authored by Pradip Thoke of Dream11. In their own words, “Dream11, the flagship brand of Dream Sports, is India’s biggest fantasy sports platform, with more than 100 million users. We have infused the latest technologies of analytics, machine learning, social networks, and media technologies to enhance our users’ experience. Dream11 is the epitome of the Indian sports technology revolution.”

Since inception, Dream11 has been a data-driven sports technology brand. 

Since inception, Dream11 has been a data-driven sports technology brand. The systems that power Dream11, including their transactional data warehouse, run on AWS. As Dream11 hosts fantasy sports contests that are joined by millions of Indian sports fans, they have large volumes of transactional data that is organized in a well-defined Amazon Redshift data warehouse. Previously they were using 3rd party services to collect, analyze and build models over user interaction data combined with transactional data. Although this approach was convenient, it presented certain critical issues:

  • The approach wasn’t conducive to 360-degree user analytics. Dream11’s user interactions data wasn’t present on the cloud, where the rest of Dream11’s infrastructure and data were present (AWS, in this case). To get a complete picture of a user’s experience and journey, the user’s interaction data (client events) needs to be analyzed alongside their transactional data (server events). This is known as 360-degree user analytics.
  • It wasn’t possible to get accurate user journey funnel reports. Currently, there are limitations with every tool available on the market with respect to identifying and mapping a given user’s actions across multiple platforms (on the web, iOS, or Android), as well as multiple related apps. This use case is specifically important if your company is a parent to other companies.
  • The statistics on user behavior that Dream11 was getting weren’t as accurate as they wanted. Some of the popular services they were using for web & mobile analytics use the technique of sampling to be able to deal with high volumes of data. Although this is a well-regarded technique to deal with high volumes of data and provides reasonable accuracy in multiple cases, Dream11 wanted statistics to be as accurate as possible.
  • The analytics wasn’t real-time. Dream11 experiences intense use by their users just before and during the real-life sports matches, so real-time and near-real-time analytics is very critical for them. This need wasn’t sufficiently met by the plethora of services they were using.
  • Their approach was leading to high cost for custom analytics for Dream11’s user interactions data, consisting of hundreds of event types. Serverless query engines typically charge by the amount of data scanned and so it can get very expensive if events data isn’t organized properly in separate tables in a data lake to enable selective access.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform.

All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform. Therefore, they embarked on the Data Highway project on AWS.

This project has additional advantages. It is increasingly becoming important to store and process data securely. Having everything in-house can help Dream11 with data security and data privacy objectives. The platform enables 360-degree customer analytics, which further allows Dream11 to do intelligent user segmentation in-house and share only those segments (without exposing underlying transactional or interactions data) with third-party messaging service providers. 

Design goals

Dream11 had the following design goals for the project:

  • The system should be easy to maintain and should be able to handle a very high volume of data, consisting of billions of events and terabytes of data daily.
  • The cost should be low and should be pay-as-you-go.
  • Dream11’s web and mobile developers regularly create new types of events to capture new types of interactions. Whenever they add new types of events, they should be immediately available in the system for analytics, and their statistics should immediately reflect in relevant dashboards and reports without any human intervention.
  • Certain types of statistics (such as concurrency) should be available in real-time and near-real time—within 5 minutes or less.
  • Dream11 should be able to use custom logic to calculate key statistics. The analytics should be accurate—no more sampling.
  • The data for various events should be neatly organized in separate tables and analytics-friendly file formats.
  • Although Dream11 will have a common data lake, they shouldn’t be constrained to use a single analytics engine for all types of analytics. Different types of analytics engines excel for different types of queries.
  • The Product Management team should have access to views they commonly use in their decision-making process, such as funnels and user flow diagrams.
  • The system should be extensible by adding lanes in the system. Lanes allow you to reuse your basic setup without mixing events data for different business units. It also potentially allows you to study user behavior across different apps.
  • The system should be able to build 360-degree user profiles
  • The system should provide alerting on important changes to key business metrics.
  • Last but not the least, the system should be secure and reliable with 6 nines of availability guarantee.

Data Highway architecture

In less than 3 months, Dream11’s data team built a system that met all the aforementioned goals. The following diagram shows the high-level architecture.

The following diagram shows the high-level architecture.

For this project, they used the following components:

The rest of this post explains the various design choices and trade-offs made by the Dream11’s data engineers. 

Event ingestion, segregation, and organization

Dream11 has several hundred event types. These events have common attributes and specific attributes. The following diagram shows the logical structure of these events.

The following diagram shows the logical structure of these events.

When the front end receives an event, it saves fields up to common attributes into a message and posts it to Kafka_AllEvents_CommonAttributes. This Kafka topic is the source for the following systems:

  • Apache HBase on Amazon EMR – Provides real-time concurrency analytics
  • Apache Druid – Provides near real-time dimensional analytics
  • Amazon Redshift – Provides session analytics

The front end also saves events, as they are, into Kafka_AllEvents_AllAttributes. These events are further picked by Apache Ni-Fi, which forwards them to their respective topics. Apache Ni-Fi supports data routing, transformation, and system mediation logic using powerful and scalable directed graphs. Data is transformed and published to Kafka by using a combination of RouteOnAttribute and JoltTransformJSON processors (to parse JSON). Apache Ni-Fi basically reads event names and posts to the Kafka topic with matching names. If Kafka doesn’t have a topic with that name, it creates a new topic with that name. You can configure your Kafka brokers to auto-create a topic when a message is received for a non-existent topic.

The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

  The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

 

The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

Storage, cataloging, ETL, and scheduling

In this section, we discuss how Dream11 updates their AWS Glue Data Catalog, performs extract, transform, and load (ETL) jobs with Amazon EMR Presto, and uses Apache Airflow for schedule management.

Updating the AWS Glue Data Catalog with metadata for the target table

The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. It provides out-of-the-box integration with Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon Redshift, Amazon Redshift Spectrum, Athena, Amazon EMR, and any application compatible with the Apache Hive metastore. You can create your table definitions one time and query across engines. For more information, see FAQ: Upgrading to the AWS Glue Data Catalog.

Because this Data Catalog is accessible from multiple services that were going to be used for the Data Highway project, Dream11 decided to use it to register all the table definitions.

Registering tables with AWS Glue Data Catalog is easy. You can use an AWS Glue crawler. It can infer schema from files in Amazon S3 and register a table in the Data Catalog. It works quite well, but Dream11 needed additional actions, such as automatically configuring Kafka Amazon S3 sink connectors etc. Therefore, they developed two Python based crawlers.

The first Python based crawler runs every 2 hours and looks up Kafka topics. If it finds a new topic, it configures a Kafka Amazon S3 connector sink to dump its data to Amazon S3 every 30 minutes in JSON Gzip format. It also registers a table with Glue Data Catalog so that users can query the JSON data directly, if needed. 

The second Python based crawler runs once a day and registers a corresponding table for each new table created that day to hold flattened data (Parquet, Snappy). It infers schemas and registers tables with the Data Catalog using its Table API. It adds customization needed by the Dream11 team to the metadata. It then creates Amazon EMR Presto ETL jobs to convert JSON, Gzip data to Parquet, Snappy, and registers them with Apache Airflow to run every 24 hours.

ETL with Amazon EMR Presto

Dream11 has a multi node, long-running, multi-purpose EMR cluster. They decided to run scheduled ETL jobs on it for the Data Highway project.

ETL for an event table involves a simple SELECT FROM -> INSERT INTO command to convert JSON (Gzip) to Parquet (Snappy). Converted data takes up to 70% less space, results in 10 times improvement in Athena query performance. ETL happens once a day. Tables are partitioned by day.

Data received on Kafka_AllEvents_CommonAttributes topic is loaded to Redshift. ETL involves SELECT FROM -> INSERT INTO to convert JSON (Gzip) to CSV, followed by Amazon Redshift COPY.

Apache Airflow for schedule management

Apache Airflow is an open-source tool for authoring and orchestrating big data workflows. With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that run independently. The DAG keeps track of the relationships and dependencies between tasks.

Dream11 uses Apache Airflow to schedule Python scripts and over few hundred ETL jobs on Amazon EMR Presto to convert JSON (Gzip) data for over few hundred events to Parquet (Snappy) format, and converts JSON data containing common attributes for all events to CSV before loading to Amazon Redshift. For more information, see Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling.

The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling. 

Real-time and near-real-time analytics

In this section, we discuss the real-time and near-real-time analytics performed on Dream11’s data.

Concurrency analytics with Apache Druid

Apache Druid is an OLAP-style data store. It computes facts and metrics against various dimensions while data is being loaded. This avoids the need to compute results when a query is run.

Dream11’s web and mobile events are loaded from the Kafka_AllEvents_CommonAttributes topic to Apache Druid with the help of the Apache Druid Kafka indexing service. Dream11 has a dashboard with different granularity levels and dimensions such as app version, org, and other dimensions present in the common event attributes list.

Finding active users with Amazon EMR HBase

Dream11 also needs to identify individual active users at any given time or during a given window. This is required by other downstream teams such as the Data Science team and Digital User Engagement team.

With the help of a Java consumer, they push all events from the Kafka_AllEvents_ CommonAttributes topic to HBase on an EMR cluster with just required user dimensions. They can query the data in HBase with SQL syntax supported by the Apache Phoenix interface. 

Session analytics with Amazon Redshift

Dream11 maintains their transactional data warehouse on Amazon Redshift multi node cluster. Amazon Redshift allows them to run complex SQL queries efficiently. Amazon Redshift would have been a natural choice for event analytics for hundreds of event types. However, in Dream11’s case, events data grows very rapidly and this would be a lot of data in Amazon Redshift. Also, this data loses its value rapidly as time passes (relatively speaking) compared with transactional data. Therefore, they decided to do only session analytics in Amazon Redshift to benefit from its complex SQL query capabilities and to do analytics for individual events with the help of Athena (which we discuss in the next section).

Data received on Kafka_AllEvents_CommonAttributes is loaded into Amazon S3 every 30 minutes by the associated kafka connector sink. This data is in JSON format with Gzip compression. Every 24 hours, a job runs on Amazon EMR Presto that flattens this data into CSV format. The data is loaded into Amazon Redshift with the COPY command. The data gets loaded first into a staging table. Data in the staging table is aggregated to get sessions data. Amazon Redshift already has transactional data from other tables that, combined now with the session data, allows Dream11 to perform 360-degree user analytics. They can now easily segment users based on their interactions data and transactions data. They can then run campaigns for those users with the help of messaging platforms. 

Event analytics with Athena

Dream11 uses Athena to analyze the data in Amazon S3. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. It made perfect sense to organize data for over hundreds of event tables in Amazon S3 and analyze them with Athena on demand.

With Athena, you’re charged based on the amount of data scanned by each query. You can get significant cost savings and performance gains by compressing, partitioning, or converting your data to a columnar format, because each of those operations reduces the amount of data that Athena needs to scan to run a query. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

As discussed before, Dream11 has registered over hundreds of tables for events data in JSON format, and similar number of tables for events data in Parquet format with the AWS Glue Data Catalog. They observed a performance gain of 10 times on conversion of data format to Parquet, and an 80% savings in space. Data in Amazon S3 can be queried directly through the Athena UI with SQL queries. The other option they use is connecting to Athena using a JDBC driver from Looker and their custom Java UI for the Data Aware project.

Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

  Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

 

Conclusion

This architecture has enabled Dream11 to achieve all the design goals they set out with. Results of analytics for real-time requirements are available under millisecond latency, and the system costs 40% less than the previous system. Analytics is performed with all the data without sampling, so results are accurate and reliable. All the data and analytics engines are within Dream11’s AWS account, improving data security and privacy.

As of this writing, the system handles 14 TB of data per day and it has served 80 million requests per minute at peak during Dream11 IPL 2020.

Doing all their analytics in-house on AWS has not just improved speed, accuracy, and data security, it has also enabled newer possibilities. Now Dream11 has a 360-degree view of their users. They can study their users’ progress across multiple platforms – web, Android, and IOS. This new system is enabling novel applications of machine learning, digital user engagement, and social media technologies at Dream11.


About the Authors

Pradip Thoke is a AVP Data Engineering at Dream11 and leads their Data Engineering team. The team involved in this implementation includes Vikas Gite, Salman Dhariwala, Naincy Suman, Lavanya Pulijala, Ruturaj Bhokre, Dhanraj Gaikwad, Vishal Verma, Hitesh Bansal, Sandesh Shingare, Renu Yadav, Yash Anand, Akshay Rochwani, Alokh P, Sunaim and Nandeesh Bijoor.

 

Girish Patil is a Principal Architect AI, Big Data, India Scale Apps for Amazon.

Amazon EMR Studio (Preview): A new notebook-first IDE experience with Amazon EMR

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/amazon-emr-studio-preview-a-new-notebook-first-ide-experience-with-amazon-emr/

We’re happy to announce Amazon EMR Studio (Preview), an integrated development environment (IDE) that makes it easy for data scientists and data engineers to develop, visualize, and debug applications written in R, Python, Scala, and PySpark. EMR Studio provides fully managed Jupyter notebooks and tools like Spark UI and YARN Timeline Service to simplify debugging. EMR Studio uses AWS Single Sign-On (AWS SSO), and allows you to log in directly with your corporate credentials without signing in to the AWS Management Console.

With EMR Studio, you can run notebook code on Amazon EMR running on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS), and debug your applications. For more information about Amazon EMR on Amazon EKS, see What is Amazon EMR on EKS.

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing with the performance-optimized Apache Spark runtime that Amazon EMR provides. You can also install custom kernels and libraries, collaborate with peers using code repositories such as GitHub and Bitbucket, or run parameterized notebooks as part of scheduled workflows using orchestration services like Apache Airflow or Amazon Managed Workflows for Apache Airflow (Amazon MWAA). Administrators can set up EMR clusters that can be used by EMR Studio users, or create predefined AWS CloudFormation templates for Amazon EMR and allow you to simply choose a template for creating your own cluster.

In this post, we discuss the benefits that EMR Studio offers and we introduce to you some of its capabilities. To learn more about creating and using EMR Studios, see Use Amazon EMR Studio.

Benefits of using EMR Studio

EMR Studio offers the following benefits:

  • Set up a unified experience to develop and diagnose EMR Spark applications – Administrators can set up EMR Studio to allow you to log in using your corporate credentials without having to sign in to the AWS console. You get a single unified environment to interactively explore, process, and visualize data using notebooks, build and schedule pipelines, and debug applications without having to log in to EMR clusters.
  • Use fully managed Jupyter notebooks – With EMR Studio, you can develop analytics and data science applications in R, Python, Scala, and PySpark with fully managed Jupyter notebooks. You can take advantage of distributed processing using the performance-optimized Amazon EMR runtime for Apache Spark with Jupyter kernels and applications running on EMR clusters. you can attach notebooks to an existing cluster that uses Amazon EC2 instances, or to an EMR on EKS virtual cluster. You can also start your own clusters using templates pre-configured by administrators.
  • Collaborate with others using code repositories – From the EMR Studio notebooks environment, you can connect to code repositories such as AWS CodeCommit, GitHub, and Bitbucket to collaborate with peers.
  • Run custom Python libraries and kernels – From EMR Studio, you can install custom Python libraries or Jupyter kernels required for your applications directly to the EMR clusters.
  • Automate workflows using pipelines – EMR Studio makes it easy to move from prototyping to production. You can create EMR Studio notebooks that can be programmatically invoked with parameters, and use APIs to run the parameterized notebooks. You can also use orchestration tools such as Apache Airflow or Amazon MWAA to run notebooks in automated workflows.
  • Simplified debugging – With EMR Studio, you can debug jobs and access logs without logging in to the cluster. EMR Studio provides native application interfaces such as Spark UI and YARN Timeline. When a notebook is run in EMR Studio, the application logs are uploaded to Amazon Simple Storage Service (Amazon S3). As a result, you can access logs and diagnose applications even after your EMR cluster is terminated. You can quickly locate the job to debug by filtering based on the cluster or time when the application was run.

In the following section, we demonstrate some of the capabilities of Amazon EMR Studio using a sample notebook. For our sample notebook, we use the open-source, real-time COVID-19 US daily case reports provided by Johns Hopkins University CSSE from the following GitHub repo.

Notebook-first IDE experience with AWS SSO integration

EMR Studio makes it simple to interact with applications on an EMR cluster. After an administrator sets up EMR Studio and provides the access URL (which looks like https://es-*************************.emrstudio.us-east-1.amazonaws.com), you can log in to EMR Studio with your corporate credentials.

After you log in to EMR Studio, you get started by creating a Workspace. A Workspace is a collection of one or more notebooks for a project. The Workspaces and the notebooks that you create in EMR Studio are automatically saved in an Amazon S3 location.

Now, we create a Workspace by completing the following steps:

  1. On the EMR Studio Dashboard page, choose Create Workspace.
  2. On the Create a Workspace page, enter a Workspace name and a Description.

Naming the Workspace helps identify your project. Your workspace is automatically saved, and you can find it later on the Workspaces page. For this post, we name our Workspace EMR-Studio-WS-Demo1.

  1. On the Subnet drop-down menu, choose a subnet for your Workspace.

Each subnet belongs to the same Amazon Virtual Private Cloud (Amazon VPC) as your EMR Studio. Your administrator may have set up one or more subnets to use for your EMR clusters. You should choose a subnet that matches the subnet where you use EMR clusters. If you’re not sure about which subnet to use, contact your administrator.

  1. For S3 location, choose the Amazon S3 location where EMR Studio backs up all notebook files in the Workspace.

This location is where your Workspace and all the notebooks in the Workspace are automatically saved.

  1. In the Advanced configuration section, you can attach an EMR cluster to your Workspace.

For this post, we skip this step. EMR Studio allows you to create Workspaces and notebooks without attaching to an EMR cluster. You can attach an EMR cluster later when you’re ready to run your notebooks.

  1. Choose Create Workspace.

Fully managed environment for managing and running Jupyter-based notebooks

EMR Studio provides a fully managed environment to help organize and manage Workspaces. Workspaces are the primary building blocks of EMR Studio, and they preserve the state of your notebooks. You can create different Workspaces for each project. From within a Workspace, you can create notebooks, link your Workspace to a code repository, and attach your Workspace to an EMR cluster to run notebooks. Your Workspaces and the notebooks and settings it contains are automatically saved in the Amazon S3 location that you specify.

If you created the workspace EMR-Studio-WS-Demo1 by following the preceding steps, it appears on the Workspaces page with the name EMR-Studio-WS-Demo1 along with status Ready, creation time, and last modified timestamp.

The following table describes each possible Workspace status.

Status Meaning
Starting The Workspace is being prepared, but is not yet ready to use.
Ready You can open the Workspace to use the notebook editor. When a Workspace has a Ready status, you can open or delete it.
Attaching The Workspace is being attached to a cluster.
Attached The Workspace is attached to an EMR cluster. If a Workspace is not attached to an EMR cluster, you need to attach it to an EMR cluster before you can run any notebook code in the Workspace.
Idle

The Workspace is stopped and currently idle. When you launch an idle Workspace, the Workspace status changes from Idle to Starting to Ready.

 

Stopping The Workspace is being stopped.
Deleting When you delete a Workspace, it’s marked for deletion. EMR Studio automatically deletes Workspaces marked for deletion. After a Workspace is deleted, it no longer shows in the list of Workspaces.

You can choose the Workspace that you created (EMR-Studio-WS-Demo1) to open it. This opens a new web browser tab with the JupyterLab interface. The icon-denoted tabs on the left sidebar allow you to access tool panels such as the file browser or JupyterLab command palette. To learn more about the EMR Studio Workspace interface, see Understand the Workspace User Interface.

EMR Studio automatically creates an empty notebook with the same name as the Workspace. For this post, we the Workspace that we created, it automatically creates EMR-Studio-WS-Demo1.ipynb. In the following screenshot, no cluster or kernel is specified in the top right corner, because we didn’t choose to attach any cluster while creating the Workspace. You can write code in your new notebook, but before you run your code, you need to attach it to an EMR cluster and specify a kernel. To attach your workspace to a cluster, choose the EMR clusters icon on the left panel.

Linking Git-based code repositories with your Workspace

You can collaborate with your peers by sharing notebooks as code via code repositories. EMR Studio supports the following Git-based services:

This capability provides the following benefits:

  • Version control – Record code changes in a version control system so you can review the history of your changes and selectively revert them.
  • Collaboration – Share code with team members working in different Workspaces through remote Git-based repositories. Workspaces can clone or merge code from remote repositories and push changes back to those repositories.
  • Code reuse – Many Jupyter notebooks that demonstrate data analysis or machine learning techniques are available in publicly hosted repositories, such as GitHub. You can associate your Workspace with a GitHub repository to reuse the Jupyter notebooks contained in a repository.

To link Git repositories to your Workspace, you can link an existing repository or create a new one. When you link an existing repository, you choose from a list of Git repositories associated with the AWS account in which your EMR Studio was created.

We add a new repository by completing the following steps:

  1. Choose the Git icon.
  2. For Repository name¸ enter a name (for example, emr-notebook).
  3. For Git repository URL, enter the URL for the Git repo (for this post, we use the sample notebook at https://github.com/emrnotebooks/notebook_execution).
  4. For Git credentials, select your credentials. Because we’re using a public repo, we select Use a public repository without credentials.
  5. Choose Add repository.

After it’s added, we can see the repo on the Git repositories drop-down menu.

  1. Choose the repo to link to the Workspace.

You can link up to three Git repositories with an EMR Studio Workspace. For more information, see Link Git-Based Repositories to an EMR Studio Workspace.

  1. Choose the File browser icon to locate the Git repo we just linked.

Attaching and detaching Workspaces to and from EMR clusters

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing using the performance-optimized EMR runtime for Apache Spark. You can attach your Workspace to an EMR cluster and get distributed data processing using Spark or custom kernels. You can use primary node capacity to run non-distributed applications.

In addition to using Amazon EMR clusters running on Amazon EC2, you can attach a Workspace to an Amazon EMR on EKS virtual cluster to run notebook code. For more information about how to use an Amazon EMR on EKS cluster in EMR Studio, see Use an Amazon EMR on EKS Cluster to Run Notebook Code.

Before you can run your notebooks, you must attach your Workspace to an EMR cluster. For more information about clusters, see Create and Use Clusters with EMR Studio.

To run the Git repo notebooks that we linked in the previous step, complete the following steps:

  1. Choose the EMR cluster
  2. Attach the Workspace to an existing EMR cluster running on Amazon EC2 instances.
  3. Open the notebook demo_pyspark.ipynb from the Git repo emr-notebook that we linked to the Workspace.

In the upper right corner of the Workspace UI, we can see the ID of the EMR cluster being attached to our Workspace, as well as the kernel selected to run the notebook.

  1. Record the value of the cluster ID (for example, <j-*************>).

We use this value later to locate the EMR cluster for application debugging purposes.

You can also detach the Workspace from the cluster in the Workspace UI and re-attach it to another cluster. For more information, see Detach a Cluster from Your Workspace.

Being able to easily attach and detach to and from any EMR cluster allows you to move any workload from prototyping into production. For example, you can start your prototype development by attaching your workspace to a development EMR cluster and working with test datasets. When you’re ready to run your notebook with larger production datasets, you can detach your workspace from the development EMR cluster and attach it to a larger production EMR cluster.

Installing and loading custom libraries and kernels

You can install notebook-scoped libraries with a PySpark kernel in EMR Studio. The libraries installed are isolated to your notebook session and don’t interfere with libraries installed via EMR bootstrap actions, or libraries installed by other EMR Studio notebook sessions that may be running on the same EMR cluster. After you install libraries for your Workspace, they’re available for other notebooks in the Workspace in the same session.

Our sample notebook demo_pyspark.ipynb is a Python script. It uses real-time COVID-19 US daily case reports as input data. The following parameters are defined in the first cell:

  • DATE – The given date used when the notebook job is started.
  • TOP_K – The top k US states with confirmed COVID-19 cases. We use this to plot Graph a.
  • US_STATES – The names of the specific US states being checked for the fatality rates of COVID-19 patients. We use this plot Graph b.

The parameters can be any of the Python data types.

Running this notebook plots two graphs:

  • Graph a – Visualizes the top k US states with most the COVID-19 cases on a given date
  • Graph b – Visualizes the fatality rates among specific US states on a given date

In our notebook, we install notebook-scoped libraries by running the following code from within a notebook cell:

sc.install_pypi_package("pandas==0.25.1")
sc.install_pypi_package("requests==2.24.0")
sc.install_pypi_package("numpy==1.19.1")
sc.install_pypi_package("kiwisolver==1.2.0")
sc.install_pypi_package("matplotlib==3.3.0")

We use these libraries in the subsequent cells for the further data analysis and visualization steps in the notebook.

The following set of parameters is used to run the notebook:

{"DATE": "10-15-2020",
 "TOP_K": 6,
"US_STATES": ["Wisconsin", "Texas", "Nevada"]}

Running all the notebook cells generates two graphs. Graph a shows the top six US states with confirmed COVID-19 cases on October 15, 2020.

Graph b shows the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada on October 15, 2020.

EMR Studio also allows you to install Jupyter notebook kernels and Python libraries on a cluster primary node, which makes your custom environment available to any EMR Studio Workspace attached the cluster. To install the sas_kernel kernel on a cluster primary node, run the following code within a notebook cell:

!/emr/notebook-env/bin/pip install sas_kernel

The following screenshot shows your output.

For more information about how to install kernels and use libraries, see Installing and Using Kernels and Libraries.

Diagnosing applications and jobs with EMR Studio

In EMR Studio, you can quickly debug jobs and access logs without logging in to the cluster, such as setting up a web proxy through an SSH connection, for both active and stopped clusters. You can use native application interfaces such as Spark UI and YARN Timeline Service directly from EMR Studio. EMR Studio also allows you to quickly locate the cluster or job to debug by using filters such as cluster state, creation time, and cluster ID. For more information, see Diagnose Applications and Jobs with EMR Studio.

Now, we show you how to open a native application interface to debug the notebook job that already finished.

  1. On the EMR Studio page, choose Clusters.

A list appears with all the EMR clusters launched under the same AWS account. You can filter the list by cluster state, cluster ID, or creation time range by entering values in the provided fields.

  1. Choose the cluster ID of the EMR cluster that we attached to the Workspace EMR-Studio-WS-Demo1 for running notebook demo_pyspark.ipynb.
  2. For Spark job debugging, on the Launch application UIs menu, choose Spark History Server.

The following screenshot shows you the Spark job debugging UI.

We can traverse the details for our notebook application by checking actual logs from the Spark History Server, as in the following screenshot.

  1. For Yarn application debugging, on the Launch application UIs menu, choose Yarn Timeline Server.

The following screenshot shows the Yarn debugging UI.

Orchestrating analytics notebook jobs to build ETL production pipelines

EMR Studio makes it easy for you to move any analytics workload from prototyping to production. With EMR Studio, you can run parameterized notebooks as part of scheduled workflows using orchestration services like AWS Step Functions and Apache Airflow or Amazon MWAA.

In this section, we show a simple example of how to orchestrate running notebook workflows using Apache Airflow.

We have a fully tested notebook under an EMR Studio Workspace, and want to schedule a workflow that runs the notebook on an on-demand EMR cluster every 10 minutes.

Record the value of the Workspace ID (for example, e-*****************************) and the notebook file path relative to the home directory within the Workspace (for example, demo.ipynb or my_folder/demo.ipynb)

The workflow that we create takes care of the following tasks:

  1. Create an EMR cluster.
  2. Wait until the cluster is ready.
  3. Start running a notebook defined by the Workspace ID, notebook file path, and the cluster created.
  4. Wait until the notebook is complete.

The following screenshot is the tree view of this example DAG. The DAG definition is available on the GitHub repo. Make sure you replace any placeholder values with the actual ones before using.

When you open the Gantt chart of one of the successful notebooks, we can see the timeline of our workflow. The time spent creating the cluster and creating a notebook execution is negligible compared to the time spent waiting for the cluster to be ready and waiting for the notebook to finish, which meets the expectation of our SLA.

This example is a just starting point. Try it out and extend it with more sophisticated workflows that suit your needs.

Summary

In this post, we highlighted some of the capabilities of EMR Studio, such as the ability to log in via AWS SSO, access fully managed Jupyter notebooks, link Git-based code repositories, change clusters, load custom Python libraries and kernels, diagnose clusters and jobs using native application UIs, and orchestrate notebook jobs using Apache Airflow or Amazon MWAA.

There is no additional charge for using EMR Studio in public preview, and you only pay for the use of the EMR cluster or other AWS services such as AWS Service Catalog. For more information, see the EMR Studio FAQs.

EMR Studio is available on Amazon EMR release version 6.2 and later, in the US East (N. Virginia), US West (Oregon), and EU (Ireland) Regions for public preview. For the latest Region availability for the public preview, see Considerations.

If you have questions or suggestions, feel free to leave a comment.


About the  Authors

Fei Lang is a Senior Big Data Architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

 

Shuang Li is a Senior Product Manager for Amazon EMR at AWS. She holds a doctoral degree in Computer Science and Engineering from Ohio State University.

 

 

Ray Liu is a Software Development Engineer at AWS. Besides work, he enjoys traveling and spending time with family.

 

 

 

Kendra Ellis is a Programmer Writer at AWS.

 

 

 

 

New – Amazon EMR on Amazon Elastic Kubernetes Service (EKS)

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/new-amazon-emr-on-amazon-elastic-kubernetes-service-eks/

Tens of thousands of customers use Amazon EMR to run big data analytics applications on frameworks such as Apache Spark, Hive, HBase, Flink, Hudi, and Presto at scale. EMR automates the provisioning and scaling of these frameworks and optimizes performance with a wide range of EC2 instance types to meet price and performance requirements. Customer are now consolidating compute pools across organizations using Kubernetes. Some customers who manage Apache Spark on Amazon Elastic Kubernetes Service (EKS) themselves want to use EMR to eliminate the heavy lifting of installing and managing their frameworks and integrations with AWS services. In addition, they want to take advantage of the faster runtimes and development and debugging tools that EMR provides.

Today, we are announcing the general availability of Amazon EMR on Amazon EKS, a new deployment option in EMR that allows customers to automate the provisioning and management of open-source big data frameworks on EKS. With EMR on EKS, customers can now run Spark applications alongside other types of applications on the same EKS cluster to improve resource utilization and simplify infrastructure management.

Customers can deploy EMR applications on the same EKS cluster as other types of applications, which allows them to share resources and standardize on a single solution for operating and managing all their applications. Customers get all the same EMR capabilities on EKS that they use on EC2 today, such as access to the latest frameworks, performance optimized runtimes, EMR Notebooks for application development, and Spark user interface for debugging.

Amazon EMR automatically packages the application into a container with the big data framework and provides pre-built connectors for integrating with other AWS services. EMR then deploys the application on the EKS cluster and manages logging and monitoring. With EMR on EKS, you can get 3x faster performance using the performance-optimized Spark runtime included with EMR compared to standard Apache Spark on EKS.

Amazon EMR on EKS – Getting Started
If you already have a EKS cluster where you run Spark jobs, you simply register your existing EKS cluster with EMR using the AWS Management Console, AWS Command Line Interface (CLI) or APIs to deploy your Spark appication.

For exampe, here is a simple CLI command to register your EKS cluster.

$ aws emr create-virtual-cluster \
          --name <virtual_cluster_name> \
          --container-provider '{
             "id": "<eks_cluster_name>",
             "type": "EKS",
             "info": {
                 "eksInfo": {
                     "namespace": "<namespace_name>"
                 }
             } 
         }'

In the EMR Management console, you can see it in the list of virtual clusters.

When Amazon EKS clusters are registered, EMR workloads are deployed to Kubernates nodes and pods to manage application execution and auto-scaling, and sets up managed endpoints so that you can connect notebooks and SQL clients. EMR builds and deploys a performance-optimized runtime for the open source frameworks used in analytics applications.

You can simply start your Spark jobs.

$ aws emr start-job-run \
          --name <job_name> \
          --virtual-cluster-id <cluster_id> \
          --execution-role-arn <IAM_role_arn> \
          --virtual-cluster-id <cluster_id> \
          --release-label <<emr_release_label> \
          --job-driver '{
            "sparkSubmitJobDriver": {
              "entryPoint": <entry_point_location>,
              "entryPointArguments": ["<arguments_list>"],
              "sparkSubmitParameters": <spark_parameters>
            }
       }'

To monitor and debug jobs, you can use inspect logs uploaded to your Amazon CloudWatch and Amazon Simple Storage Service (S3) location configured as part of monitoringConfiguration. You can also use the one-click experience from the console to launch the Spark History Server.

Integration with Amazon EMR Studio

Now you can submit analytics applications using AWS SDKs and AWS CLI, Amazon EMR Studio notebooks, and workflow orchestration services like Apache Airflow. We have developed a new Airflow Operator for Amazon EMR on EKS. You can use this connector with self-managed Airflow or by adding it to the Plugin Location with Amazon Managed Workflows for Apache Airflow.

You can also use newly previewed Amazon EMR Studio to perform data analysis and data engineering tasks in a web-based integrated development environment (IDE). Amazon EMR Studio lets you submit notebook code to EMR clusters deployed on EKS using the Studio interface. After seting up one or more managed endpoints to which Studio users can attach a Workspace, EMR Studio can communicate with your virtual cluster.

For EMR Studio preview, there is no additional cost when you create managed endpoints for virtual clusters. To learn more, visit the guide document.

Now Available
Amazon EMR on Amazon EKS is available in US East (N. Virginia), US West (Oregon), and Europe (Ireland) Regions. You can run EMR workloads in AWS Fargate for EKS removing the need to provision and manage infrastructure for pods as a serverless option.

To learn more, visit the documentation. Please send feedback to the AWS forum for Amazon EMR or through your usual AWS support contacts.

Learn all the details about Amazon EMR on Amazon EKS and get started today.

Channy;

How the Allen Institute uses Amazon EMR and AWS Step Functions to process extremely wide transcriptomic datasets

Post Syndicated from Gautham Acharya original https://aws.amazon.com/blogs/big-data/how-the-allen-institute-uses-amazon-emr-and-aws-step-functions-to-process-extremely-wide-transcriptomic-datasets/

This is a guest post by Gautham Acharya, Software Engineer III at the Allen Institute for Brain Science, in partnership with AWS Data Lab Solutions Architect Ranjit Rajan, and AWS Sr. Enterprise Account Executive Arif Khan.

The human brain is one of the most complex structures in the universe. Billions of neurons and trillions of connections come together to form a labyrinthine network of activity. Understanding the mechanisms that guide our minds is one of the most challenging problems in modern scientific research.

The Allen Institute for Brain Science is dedicated to solving large-scale, fundamental problems in neuroscience. Our mission is to accelerate the rate at which the world understands the inner workings of the human brain and to uncover the essence of what makes us human.

Processing extremely wide datasets

As a part of “big science,” one of our core principles, we seek to tackle scientific challenges at scales no one else has attempted before. One of these challenges is processing large-scale transcriptomic datasets. Transcriptomics is the study of RNA. In particular, we’re interested in the genes that are expressed in individual neurons. The human brain contains almost 100 billion neurons—how do they differ from each other, and what genes do they express? After a series of complex analysis using cutting-edge techniques such as Smart-Seq and 10x Genomics Chromium Sequencing, we produce extremely large matrices of numeric values.

Such matrices are called feature matrices. Each column represents the feature of a cell, which in this case are genes. A genome is over 50,000 genes, so a single matrix can have over 50,000 columns! We expect the number of rows in our matrices to increase over time, reaching tens of millions, if not more. These matrices can reach 500 GB or more in size. Over the next few years, we want to be able to ingest tens or hundreds of such matrices.

Our goal is to provide low-latency visualizations on such matrices, allowing researchers to aggregate, slice, and dissect our data in real time. To do this, we run a series of precomputations that store expensive calculations in a database for future retrieval.

We wanted to create a flexible, scalable pipeline to run computations on these matrices and store the results for visualizations.

The pipeline

We wanted to build a pipeline that takes these large matrices as inputs, runs various Spark jobs, and stores the outputs in an Apache HBase cluster. We wanted to create something flexible so that we could easily add additional Spark transformations.

We decided on AWS Step Functions as our workflow-orchestration tool of choice. Step Functions allows us to create a state machine that orchestrates the dataflow from payload submission to database loading.

After close collaboration with the engineers at the AWS Data Lab, we came up with the following pipeline architecture.

At a high level, our pipeline has the following workflow:

  1. Trigger a state machine from an upload event to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Copy and unzip the input ZIP file containing a feature matrix into an Amazon S3 working directory.
  3. Run Spark jobs on Amazon EMR to transform input feature matrices into various pre-computed datasets. Store all intermittent results in a working directory on Amazon S3 and output the results of the Spark Jobs as HFiles.
  4. Bulk load the results of our Spark jobs into Apache HBase.

The preceding architecture diagram is deceptively simple. We found a number of challenges during our initial implementation, which we discuss in the following sections.

Lack of transaction support and rollbacks across tables in Apache HBase

The results of our Spark jobs are a number of precomputed views of our original input dataset. Each view is stored as a separate table in Apache HBase. A major drawback of Apache HBase is the lack of a native transactional system. HBase only provides row-level atomicity. Our worst-case scenario is writing partial data—cases where some views are updated, but not others, showing different results for different visualizations and resulting in scientifically incorrect data!

We worked around this by rolling our own blue/green system on top of Apache HBase. We suffix each set of tables related to a dataset with a universally unique identifier (UUID). We use Amazon DynamoDB to track the UUID associated with each individual dataset. When an update to a dataset is being written, the UUID is not switched in DynamoDB until we verify that all the new tables have been successfully written to Apache HBase. We have an API on top of HBase to facilitate reads. This API checks DynamoDB for the dataset UUID before querying HBase, so user traffic is never redirected toward a new view until we confirm a successful write. Our API involves an AWS Lambda function using HappyBase to connect to our HBase cluster, wrapped in an Amazon API Gateway layer to provide a REST interface. The following diagram illustrates this architecture.

The read path has the following steps:

  • R1 – API Gateway invokes a Lambda function to fetch data from a dataset
  • R2 – The Lambda function requests and receives the dataset UUID from DynamoDB
  • R3 – Lambda queries the Apache HBase cluster with the UUID

The write path has the following steps:

  • W1 – The state machine bulk loads new dataset tables to the Apache HBase cluster suffixed with the new UUID
  • W2 – After validation, the state machine updates DynamoDB so user traffic is directed towards those changes

Stalled Spark jobs on extremely wide datasets

Although Apache Spark is a fantastic engine for running distributed compute operations, it doesn’t do too well when scaling to extremely wide datasets. We routinely operate on data that surpasses 50,000 columns, which often causes issues such as a stalled JavaToPython step in our PySpark job. Although we have more investigating to do to figure out why our Spark jobs hang on these wide datasets, we found a simple workaround in the short term—batching!

A number of our jobs involve computing simple columnar aggregations on our data. This means that each calculation on a column is completely independent of all the other columns. This lends itself quite well to batching our compute. We can break our input columns into chunks and run our compute on each chunk.

The following code chunks Apache Spark aggregation functions into groups of columns:

def get_aggregation_for_matrix_and_metadata(matrix, metadata, group_by_arg, agg_func, cols_per_write):
   '''
   Performs an aggregation on the joined matrix, aggregating the desired column by the given function.
   agg_func must be a valid Pandas UDF function. Runs in batches so we don't overload the Task Scheduler with 50,000
   columns at once.
   '''
   # Chunk the data
   for col_group in pyspark_utilities.chunks(matrix.columns, cols_per_write):

       # Add the row key to the column group
       col_group.append(matrix.columns[0])

       selected_matrix = matrix.select(pyspark_utilities.escape_column_list(col_group))

       # create argument list for group by and then process
       cast_as_udf = pyspark_functions.pandas_udf(
                       agg_func,
                       pyspark_datatype.FloatType(),
                       pyspark_functions.PandasUDFType.GROUPED_AGG)

       udf_input = [cast_as_udf(selected_matrix [column_name]).alias(column_name)
                    for column_name in selected_matrix .columns
                    if column_name != group_by_arg]

       yield joined.groupby(group_by_arg).agg(*udf_input)

We then write the results of each batch to an HFile, which is then later bulk loaded into HBase.

Because the post-aggregation DataFrame was very small, we found a significant performance increase in coalescing the DataFrame post-aggregation and checkpointing the results before writing the HFiles. This forces Spark to compute the aggregation before writing the HFiles. HFiles need to be sorted by row key, so it’s easier to pass a smaller DataFrame to our HFile converter.

Using Apache Spark to write DataFrames as HFiles

Apache Spark supports writing DataFrames in multiple formats, including as HFiles. However, the documentation for doing so leaves a lot to be desired. To write out our Spark DataFrames as HFiles, we had to take the following steps:

  1. Convert a DataFrame into a HFile-compatible format, assuming that the first column is the HBase rowkey—(row_key, column_family, col, value).
  2. Create a JAR file containing a converter to convert input Python Objects into Java key-value byte classes. This step took a lot of trial and error—we couldn’t find clear documentation on how the Python object was serialized and passed into the Java function.
  3. Call the saveAsNewAPIHadoopFile function, passing in the relevant information: the ZooKeeper Quorum IP, port, and cluster DNS of our Apache HBase on the Amazon EMR cluster; the HBase table name; the class name of our Java converter function; and more.

The following code writes HFiles:

import src.spark_transforms.pyspark_jobs.pyspark_utilities as pyspark_utilities
import src.spark_transforms.pyspark_jobs.output_handler.emr_constants as constants


def csv_to_key_value(row, sorted_cols, column_family):
   '''
   This method is an RDD mapping function that will map each
   row in an RDD to an hfile-formatted tuple for hfile creation
   (rowkey, (rowkey, columnFamily, columnQualifier, value))
   '''
   result = []
   for index, col in enumerate(sorted_cols[constants.ROW_KEY_INDEX + 1:], 1):
       row_key = str(row[constants.ROW_KEY_INDEX])
       value = row[index]

       if value is None:
           raise ValueError(f'Null value found at {row_key}, {col}')

       # We store sparse representations, dropping all zeroes.
       if value != 0:
           result.append((row_key, (row_key, column_family, col, value)))

   return tuple(result)


def get_sorted_df_by_cols(df):
   '''
   Sorts the matrix by column. Retains the row key as the initial column.
   '''
   cols = [df.columns[0]] + sorted(df.columns[1:])
   escaped_cols = pyspark_utilities.escape_column_list(cols)
   return df.select(escaped_cols)


def flat_map_to_hfile_format(df, column_family):
   '''
   Flat maps the matrix DataFrame into an RDD formatted for conversion into HFiles.
   '''
   sorted_df = get_sorted_df_by_cols(df)
   columns = sorted_df.columns
   return sorted_df.rdd.flatMap(lambda row: csv_to_key_value(row, columns, column_family)).sortByKey(True)


def write_hfiles(df, output_path, zookeeper_quorum_ip, table_name, column_family):
   '''
   This method will sort and map the medians psyspark dataFrame and
   then write to hfiles in the output directory using the supplied
   hbase configuration.
   '''
   # sort columns other than the row key (first column)

   rdd = flat_map_to_hfile_format(df, column_family)

   conf = {
           constants.HBASE_ZOOKEEPER_QUORUM: zookeeper_quorum_ip,
           constants.HBASE_ZOOKEEPER_CLIENTPORT: constants.ZOOKEEPER_CLIENTPORT,
           constants.ZOOKEEPER_ZNODE_PARENT: constants.ZOOKEEPER_PARENT,
           constants.HBASE_TABLE_NAME: table_name
           }

   rdd.saveAsNewAPIHadoopFile(output_path,
                              constants.OUTPUT_FORMAT_CLASS,
                              keyClass=constants.KEY_CLASS,
                              valueClass=constants.VALUE_CLASS,
                              keyConverter=constants.KEY_CONVERTER,
                              valueConverter=constants.VALUE_CONVERTER,
                              conf=conf)

The following code describes all the constants configuration:

HBASE_ZOOKEEPER_QUORUM="hbase.zookeeper.quorum"
HBASE_ZOOKEEPER_CLIENTPORT="hbase.zookeeper.property.clientPort"
ZOOKEEPER_ZNODE_PARENT="zookeeper.znode.parent"
HBASE_TABLE_NAME="hbase.mapreduce.hfileoutputformat.table.name"

OUTPUT_FORMAT_CLASS='org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2'
KEY_CLASS='org.apache.hadoop.hbase.io.ImmutableBytesWritable'
VALUE_CLASS='org.apache.hadoop.hbase.KeyValue'
KEY_CONVERTER="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
VALUE_CONVERTER="KeyValueConverter"

ZOOKEEPER_CLIENTPORT='2181'
ZOOKEEPER_PARENT='/hbase'

ROW_KEY_INDEX = 0

The following code is a Java class to serialize input PySpark RDDs:

import org.apache.spark.api.python.Converter;
import org.apache.hadoop.hbase.KeyValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

/**
* This class is used to convert a tuple
* supplied by a spark job created in Python
* to the corresponding hbase keyValue type
* which is needed for hfile creation.
*
*/
@SuppressWarnings("rawtypes")
public class KeyValueConverter implements Converter {

  private static final long serialVersionUID = 1L;

  /**
   * this method will take a tuple object supplied
   * by Python spark job and convert and
   * return the corresponding hbase KeyValue object.
   */
  public Object convert(Object obj) {
     KeyValue cell;
     List<?> list = new ArrayList<>();
     if (obj.getClass().isArray()) {
          list = Arrays.asList((Object[])obj);
      } else if (obj instanceof Collection) {
          list = new ArrayList<>((Collection<?>)obj);
      }

     cell = new KeyValue(
           list.get(0).toString().getBytes(),
           list.get(1).toString().getBytes(),
           list.get(2).toString().getBytes(),
           list.get(3).toString().getBytes());

     return cell;
  }
}

Looking ahead

Our computation pipeline was a success, and you can see the resulting visualizations on https://transcriptomics.brain-map.org/.

We’ve been thrilled with AWS’s reliable and feature-rich ecosystem. We used Amazon EMR, Step Functions, and Amazon S3 to build a robust, large-scale data processing pipeline.

Since writing this post, we’ve done much more, including a cross-database transaction system, wide-matrix transposes in Spark, and more. Big Data problems in neuroscience never end, and we’re excited to share more with you in the future!


About the Authors

Gautham Acharya is a Software Engineer at the Allen Institute for Brain Science. He works on the backend data platform team responsible for integrating multimodal neuroscience data into a single cohesive system.

 

 

Ranjit Rajan is a Data Lab Solutions Architect with AWS. Ranjit works with AWS customers to help them design and build data and analytics applications in the cloud.

 

 

Arif Khan is a Senior Account Executive with Amazon Web Services. He works with nonprofit research customers to help shape and deliver on a strategy that focuses on customer success, building mind share and driving broad use of Amazon’s utility computing services to support their mission.

Amazon EMR now provides up to 30% lower cost and up to 15% improved performance for Spark workloads on Graviton2-based instances

Post Syndicated from Peter Gvozdjak original https://aws.amazon.com/blogs/big-data/amazon-emr-now-provides-up-to-30-lower-cost-and-up-to-15-improved-performance-for-spark-workloads-on-graviton2-based-instances/

Amazon EMR now supports M6g, C6g and R6g instances with Amazon EMR versions 6.1.0, 5.31.0 and later. These instances are powered by AWS Graviton2 processors that are custom designed by AWS using 64-bit Arm Neoverse cores to deliver the best price performance for cloud workloads running in Amazon Elastic Compute Cloud (Amazon EC2). On Graviton2 instances, Amazon EMR runtime for Apache Spark provides up to 15% improved performance at up to 30% lower costs relative to equivalent previous generation instances. In our TPC-DS 3 TB benchmark tests, we found that queries run up to 32 times faster using Amazon EMR runtime for Apache Spark. For more information, see Amazon EMR introduces EMR runtime for Apache Spark.

You can use Apache Spark for a wide array of analytics use cases ranging from large-scale transformations to streaming, data science, and machine learning. Amazon EMR provides the latest, stable, open-source innovations, performant storage with Amazon S3, and the unique cost savings capabilities of Spot Instances and Managed Scaling.

Amazon EMR runtime for Apache Spark is a performance-optimized runtime environment for Apache Spark, available and turned on by default on Amazon EMR release 5.28.0 and later. Amazon EMR runtime for Apache Spark provides 100% API compatibility with open-source Apache Spark. This means that your Apache Spark workloads run faster and incur lower compute costs when run on Amazon EMR without requiring any changes to your application.

In this post, we discuss the results that we observed by running Spark workloads on Graviton2 instances.

AWS Graviton2 and Amazon EMR runtime performance improvements

To measure improvements, we ran TPC-DS 3 TB benchmark queries on Amazon EMR 5.30.1 using Amazon EMR runtime for Apache Spark (compatible with Apache Spark version 2.4) with 5-10 node clusters of M6g instances with data in Amazon Simple Storage Service (Amazon S3) and compared it to equivalent configuration using M5 instances. We measured performance improvements using total query execution time and geometric mean of query execution time across the 104 TPC-DS 3 TB benchmark queries.

The results showed improved performance on M6g instance EMR clusters compared to equivalent M5 instance EMR clusters of between 11.61–15.61% improvement in total query runtime for different sizes within the instance family, and between 10.52–12.91% improvement in geometric mean. To measure cost improvement, we added up the Amazon EMR and Amazon EC2 cost per instance per hour and multiplied it by the total query runtime. We observed between 21.58–30.58% reduced instance hour cost on M6g instance EMR clusters compared equivalent M5 instance EMR clusters to execute the 104 TPC-DS benchmark queries.

The following table shows results from running TPC-DS 3 TB benchmark queries using Amazon EMR 5.30.1 over equivalent M5 and M6g instance EMR clusters.

Instance Size 16 XL 12 XL 8 XL 4 XL 2 XL
Number of core instances in EMR cluster 5 5 5 5 10
Total query runtime on M5 (seconds) 6157 6167 6857 10593 10676
Total query runtime on M6g (seconds) 5196 5389 6061 9313 9240
Total query execution time improvement with M6g 15.61% 12.63% 11.61% 12.08% 13.45%
Geometric mean query execution time on M5 (sec) 33 34 35 47 47
Geometric mean query execution time on M6g (sec) 29 30 32 41 42
Geometric mean query execution time improvement with M6g 12.73% 10.79% 10.52% 12.91% 11.24%
EC2 M5 instance price ($ per hour) $3.072 $2.304 $1.536 0.768 0.384
EMR M5 instance price ($ per hour) $0.27 $0.27 $0.27 0.192 0.096
(EC2 + EMR) M5 instance price ($ per hour) $3.342 $2.574 $1.806 $0.960 $0.480
Cost of running on M5 ($ per instance) $5.72 $4.41 $3.44 $2.82 $1.42
EC2 M6g instance price ($ per hour) $2.464 $1.848 $1.232 $0.616 $0.308
EMR M6g price ($ per hour per instance) $0.616 $0.462 $0.308 $0.15 $0.08
(EC2 + EMR) M6g instance price ($ per hour) $3.080 $2.310 $1.540 $0.770 $0.385
Cost of running on M6g ($ per instance) $4.45 $3.46 $2.59 $1.99 $0.99
Total cost reduction with M6g including performance improvement -22.22% -21.58% -24.63% -29.48% -30.58%

The following graph shows per query improvements observed on M6g 2XL instances with EMR Runtime for Spark on Amazon EMR version 5.30.1 compared to equivalent M5 2XL instances for the 104 queries in the TPC-DS 3 TB benchmark. Performance of 100 of 104 TPC-DS queries improved with M6g 2XL, and performance for 4 queries regressed (q41, q20, q42, and q52 – with the maximum regression of -20.99%). If you are evaluating migrating from M5 instances to M6g instances for EMR Spark workloads, we recommend that you test your workloads to check if any of your queries encounter slower performance.

 

R6g instances showed a similar performance improvement while running Apache Spark workloads compared to equivalent R5 instances. Our test results showed between 14.27-21.50% improvement in total query runtime for 5 different instance sizes within the instance family, and between 12.48-18.95% improvement in geometric mean. On cost comparison, we observed 23.26%-31.66% reduced instance hour cost on R6g instance EMR clusters compared to R5 EMR instance clusters to execute the 104 TPC-DS benchmark queries. We observed 4 benchmark queries (q6, q21, q41 and q26 – with a maximum regression of -18.28%) taking longer to execute on R6g instance clusters compared to R5 instance clusters.

With C6g instances, we observed improved performance compared to C5 instances for Spark workloads on 2XL, 4XL, 12XL and 16XL instance sizes. Query execution performance regressed on 8XL instances by -0.38%. We observed between 16.84-24.15% lower instance hour costs on C6g instance EMR clusters compared equivalent C5 EMR instance clusters to execute the 104 TPC-DS benchmark queries. Performance of 73 of 104 TPC-DS queries improved with C6g 4XL, and performance for 31 queries regressed (with a maximum regression of -31.38% for q78).

Summary

By using Amazon EMR with M6g, C6g and R6g instances powered by Graviton2 processors, we observed improved performance and reduced cost of running 104 TPC-DS benchmark queries. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more Apache Spark optimizations, configuration best practices, and tuning advice.


About the Authors

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

 

 

 

Al MS is a product manager for Amazon EMR at Amazon Web Services.

Data preprocessing for machine learning on Amazon EMR made easy with AWS Glue DataBrew

Post Syndicated from Kartik Kannapur original https://aws.amazon.com/blogs/big-data/data-preprocessing-for-machine-learning-on-amazon-emr-made-easy-with-aws-glue-databrew/

The machine learning (ML) lifecycle consists of several key phases: data collection, data preparation, feature engineering, model training, model evaluation, and model deployment. The data preparation and feature engineering phases ensure an ML model is given high-quality data that is relevant to the model’s purpose. Because most raw datasets require multiple cleaning steps (such as addressing missing values and imbalanced data) and numerous data transformations to produce useful features ready for model training, these phases are often considered the most time-consuming in the ML lifecycle. Additionally, producing well-prepared training datasets has typically required extensive knowledge of multiple data analysis libraries and frameworks. This has presented a barrier to entry for new ML practitioners and reduced iteration speed for more experienced practitioners.

In this post, we show you how to address this challenge with the newly released AWS Glue DataBrew. DataBrew is a visual data preparation service, with over 250 pre-built transformations to automate data preparation tasks, without the need to write any code. We show you how to use DataBrew to analyze, prepare, and extract features from a dataset for ML, and subsequently train an ML model using PySpark on Amazon EMR. Amazon EMR is a managed cluster platform that provides the ability to process and analyze large amounts of data using frameworks such as Apache Spark and Apache Hadoop.

For more details about DataBrew, Amazon EMR, and each phase of the ML lifecycle, see the following:

Solution overview

The following diagram illustrates the architecture of our solution.

Loading the dataset to Amazon S3

We use the Census Income dataset from the UCI Machine Learning Repository to train an ML model that predicts whether a person’s income is above $50,000 a year. This multivariate dataset contains 48,842 observations and 14 attributes, such as age, nature of employment, educational background, and marital status.

For this post, we download the Adult dataset. The data folder contains five files, of which adult.data and adult.test are the train and test datasets, and adult.names contains the column names and description. Because the raw dataset doesn’t contain the column names, we add them to the first row of the train and test datasets and save the files with the extension .csv:

Column Names
age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race, sex,capital-gain,capital-loss,hours-per-week,native-country,target

Create a new bucket in Amazon Simple Storage Service (Amazon S3) and upload the train and test data files under a new folder titled raw-data.

Data preparation and feature engineering using DataBrew

In this section, we use DataBrew to explore a sample of the dataset uploaded to Amazon S3 and prepare the dataset to train an ML model.

Creating a DataBrew project

To get started with DataBrew, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create a project.
  3. For Name, enter census-income.
  4. For Attached recipe, choose Create a new recipe.
  5. For Recipe name, enter census-income-recipe.
  6. For Select a dataset, select New dataset.
  7. For Dataset name¸ enter adult.data.

  1. Import the train dataset adult.data.csv from Amazon S3.
  2. Create a new AWS Identity and Access Management (IAM) policy and IAM role by following the steps on the DataBrew console, which provides DataBrew the necessary permissions to access the source data in Amazon S3.
  3. In the Sampling section, for Type, choose Random rows.
  4. Select 1,000.

Exploratory data analysis

The first step in the data preparation phase is to perform exploratory data analysis (EDA). EDA allows us to gain an intuitive understanding of the dataset by summarizing its main characteristics. Example outputs from EDA include identifying data types across columns, plotting the distribution of data points, and creating visuals that describe the relationship between columns. This process informs the data transformations and feature engineering steps that you need to apply prior to building an ML model.

After you create the project, DataBrew provides three different views of the dataset:

  • Grid view – Presents the 15 columns and first 1,000 rows sampled from the dataset and the distribution of data across each column
  • Schema view – In addition to information in the grid view, presents information about the data types (such as double, integer, or string) and the data quality that indicates the presence of missing or invalid values
  • Data profile view – Supported by a data profile job, generates summary statistics such as quartiles, standard deviation, variance, most frequently occurring values, and the correlation between columns

The following screenshot shows our view of the dataset.

Each view presents a unique piece of information that helps us gain a better understanding of the dataset. For instance, in the grid view, we can observe the distribution of data across the 15 columns and spot erroneous data points, such as those with ? in the workclass, occupation, or native-country columns.

In the schema view, we can observe six columns with continuous data, nine columns with categorical or binary data, and no missing or invalid observations in the sample of our dataset. The columns with continuous data also contain the corresponding minimum, maximum, mean, median, and mode values represented as a box plot.

In the data profile view, after running a data profile job, we can observe the summary statistics from the first 20,000 rows, such as the five-number summary, measures of central tendency, variance, skewness, kurtosis, correlations, and the most frequently occurring values in each column. For instance, we can combine the information from the grid view and the data profile view to replace erroneous data points such as ? by the most frequently occurring value in that column as a form of data cleaning. To run a data profile job on more than 20,000 rows, request for a limit increase at [email protected]

As part of the EDA phase, we can look at the distribution of data in the target column, which represents whether a person’s income is above $50,000 per year. The ratio of people whose income is greater than $50,000 per year to those whose income is less than or equal to $50,000 per year is 1:3, indicating that the distribution of the target classes is not imbalanced.

Building a set of data transformation steps and publishing a recipe

Now that we have an intuitive understanding of the dataset, let’s build out the data transformation steps. Based on our EDA, we replace the ? observation with the most frequently occurring value in each column.

  1. Choose the Replace value or pattern transformation.
  2. Replace ? with Private in the workclass column.
  3. Replace ? with United-States in the native-country column.

The occupation column also contains observations with ?, but the data points are spread across categories without a clear frequently occurring category. Therefore, we can categorically encode the observations in the occupation column, including those with ? observation, thereby treating ? as a separate category. The occupation column in the adult.data training dataset contains 15 categories, of which Protective-serv, Priv-house-serv, and Armed-Forces occur infrequently. To avoid excessive granularity in ML modeling, we can group these three categories into a single category named Other.

During ML model evaluation and prediction, we can also map categories that the model hasn’t encountered during model training to the Other category.

With that as the background, let’s apply the categorical mapping transformation to only the top 12 distinct values.

  1. Select Map top 12 values.
  2. Select Map values to numeric values.

This selects the top 12 categories and combines the other categories into a single category named Other. We now have a new column named occupation_mapped.

  1. Delete the occupation column to avoid redundancy.
  2. Similarly, apply the categorical mapping transformation to the top five values in the workclass column and the top one value in the native-country Remember to select Map values to numeric values.

This groups the remaining categories into a single category named Other.

  1. Delete the columns workclass and native-country.

The other four columns with categorical data—marital-status, relationship, race, and sex—have few categories with most of them occurring frequently. Let’s apply the categorical mapping transformation to these columns as well.

  1. Apply categorical mapping, with the following differences:
    1. Select Map all values.
    2. Select Map values to numeric values.
  2. Delete the original columns to avoid redundancy.
  3. Delete the fnlwgt column, because it represents the sampling weight and isn’t related to the target
  4. Delete the education column, because it has already been categorically mapped to education-num.
  5. Map the target column to numeric values, where income less than or equal to $50,000 per year is mapped to class 0 and income greater than $50,000 per year is mapped to class 1.
  6. Rename the destination column to label in order to align with our downstream PySpark model training code.

  1. Delete the original target column.

The data preparation phase is now complete, and the set of 20 transformations that consist of data cleaning and categorical mapping is combined into a recipe.

Because the data preparation and ML model training phases are highly iterative, we can save the set of data transformation steps applied by publishing the recipe. This provides version control, and allows us to maintain the data transformation steps and experiment with multiple versions of the recipe in order to determine the version with the best ML model performance. For more information about DataBrew recipes, see Creating and using AWS Glue DataBrew recipes.

Creating and running a DataBrew recipe job

The exploratory data analysis phase helped us gain an intuitive understanding of the dataset, from which we built a recipe to prepare and transform our data for ML modeling. We have been working with a random sample of 1,000 rows from the adult.data training dataset, and we need to apply the same set of data transformation steps to the over 32,000 rows in the adult.data dataset. A DataBrew recipe job provides the ability to scale the transformation steps from a sample of data to the entire dataset. To create our recipe job, complete the following steps:

  1. On the DataBrew console, choose Jobs.
  2. Choose Create recipe job.
  3. For Job name, enter a name.
  4. Create a new folder in Amazon S3 (s3://<YOUR-S3-BUCKET-NAME>/transformed-data/) for the recipe job to save the transformed dataset.

The recipe job should take under 2 minutes to complete.

Training an ML model on the transformed dataset using PySpark

With the data transformation job complete, we can use the transformed dataset to train a binary classification model to predict whether a person’s income is above $50,000 per year.

  1. Create an Amazon EMR notebook.
  2. When the notebook’s status is Ready, open the notebook in a JupyterLab or Jupyter Notebook environment.
  3. Choose the PySpark kernel.

For this post, we use Spark version 2.4.6.

  1. Load the transformed dataset into a PySpark DataFrame within the notebook:
    train_dataset = spark.read.csv(path='s3://<YOUR-S3-BUCKET-NAME>/transformed-data/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>_part00000.csv', header=True, inferSchema=True)
    print('The transformed train dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=train_dataset.count(), n_cols=len(train_dataset.columns)))
    The transformed train dataset has 32561 rows and 13 columns

  2. Inspect the schema of the transformed dataset:
    train_dataset.printSchema()
    root 
    |-- age: integer (nullable = true) 
    |-- workclass_mapped: double (nullable = true) 
    |-- education-num: double (nullable = true) 
    |-- marital_status_mapped: double (nullable = true) 
    |-- occupation_mapped: double (nullable = true) 
    |-- relationship_mapped: double (nullable = true) 
    |-- race_mapped: double (nullable = true) 
    |-- sex_mapped: double (nullable = true) 
    |-- capital-gain: double (nullable = true) 
    |-- capital-loss: double (nullable = true) 
    |-- hours-per-week: double (nullable = true) 
    |-- native_country_mapped: double (nullable = true) 
    |-- label: double (nullable = true)

Of the 13 columns in the dataset, we use the first 12 columns as features for the model and the label column as the final target value for prediction.

  1. Use the VectorAssembler method within PySpark to combine the 12 columns into a single feature vector column, which makes it convenient to train the ML model:
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import VectorAssembler
    stages = []
    arr_features = train_dataset.columns[:-1]
    # Transform input features into a vector using VectorAssembler
    features_vector_assembler = VectorAssembler(inputCols=arr_features, outputCol='features')
    stages.append(features_vector_assembler)
    # Run the train dataset through the pipeline
    pipeline = Pipeline(stages=stages)
    train_dataset_pipeline = pipeline.fit(train_dataset).transform(train_dataset)
    # Select the feature vector and label column
    train_dataset_pipeline = train_dataset_pipeline.select('features', 'label')

  2. To estimate the model performance on the unseen test dataset (test) split the transformed train dataset (train_dataset_pipline) into 70% for model training and 30% for model validation:
    df_train, df_val = train_dataset_pipeline.randomSplit([0.7, 0.3], seed=42)
    print('The train dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=df_train.count(), n_cols=len(df_train.columns)))
    print('The validation dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=df_val.count(), n_cols=len(df_val.columns)))
    The train dataset has 22841 rows and 2 columns
    The validation dataset has 9720 rows and 2 columns

  3. Train a Random Forest classifier on the training dataset df_train and evaluate its performance on the validation dataset df_val using the area under the ROC curve (AUC), which is a measure of model performance for binary classifiers at different classification thresholds:
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    # Train a Random Forest classifier
    rf_classifier = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
    model = rf_classifier.fit(df_train)
    # Model predictions on the validation dataset
    preds = model.transform(df_val)
    # Evaluate model performance
    evaluator = BinaryClassificationEvaluator()
    auc = evaluator.evaluate(preds, {evaluator.metricName: "areaUnderROC"})
    print('Validation AUC: {}'.format(auc))
    Validation AUC: 0.8909629419656796

A validation AUC of 0.89 indicates strong model performance for the classifier. Because the data transformation and model training phases are highly iterative in nature, in order to improve the model performance, we can experiment with different data transformation steps, additional features, and other classification models. After we achieve a satisfactory model performance, we can evaluate the model predictions on the unseen test dataset, adult.test.

Evaluating the ML model on the test dataset

In the data transformation and ML model training sections, we have developed a reusable pipeline that we can use to evaluate the model predictions on the unseen test dataset.

  1. Create a new DataBrew project and load the raw test dataset (adult.test.csv) from Amazon S3, as we did in the data preparation section.
  2. Import the recipe we created earlier with the 20 data transformation steps to apply them on the adult.test dataset.



We can observe that all the columns have been transformed successfully, apart from the label column, which contains null values. This is because the adult.test dataset contains messy data in the target column, namely an extra punctuation mark at the end of the classes <=50k and >50k. To correct this, we can remove the last step of the recipe.

  1. Delete the column target.
  2. Edit the prior step in creating a categorical map to account for the extra punctuation mark.
  3. Delete the original target column to avoid redundancy.
  4. Create and run the recipe job to transform and store the over 16,000 rows in the adult.test dataset under s3://<YOUR-S3-BUCKET-NAME>/transformed-data/.

This job should take approximately 1 minute to complete.

When the train and test datasets don’t have any variation in the types of categories, we can create and run a recipe job directly from the DataBrew console, without having to create a separate project.

  1. When the data transformation job on the adult.test dataset is complete, load the transformed dataset into a PySpark dataframe to evaluate the performance of the binary classification model:
    # Load the transformed test dataset
    test_dataset = spark.read.csv(path='s3://<YOUR-S3-BUCKET-NAME>/transformed-data/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>/<YOUR-RECIPE-JOB-NAME>_<TIMESTAMP>_part00000.csv', header=True, inferSchema=True)
    
    print('The transformed test dataset has {n_rows} rows and {n_cols} columns'.format(n_rows=test_dataset.count(), n_cols=len(test_dataset.columns)))

The transformed test dataset has 16281 rows and 13 columns

# Run the test dataset through the same feature vector pipeline
test_dataset_pipeline = pipeline.fit(test_dataset).transform(test_dataset)

# Select the feature vector and label column
test_dataset_pipeline = test_dataset_pipeline.select('features', 'label')

# Model predictions on the test dataset
preds_test = model.transform(test_dataset_pipeline)

# Evaluate model performance
evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(preds_test, {evaluator.metricName: "areaUnderROC"})
print('Test AUC: {}'.format(auc))
Test AUC: 0.8947235975486465

The model performance with an AUC of 0.89 on the unseen test dataset is about the same as the model performance on the validation set, which demonstrates strong model performance on the unseen test dataset as well.

Summary

In this post, we showed you how to use DataBrew and Amazon EMR to streamline and speed up the data preparation and feature engineering stages of the ML lifecycle. We explored a binary classification problem, but the wide selection of DataBrew pre-built transformations and PySpark ML libraries make this approach extendable to numerous ML use cases.

Get started today! Explore your use case with the services mentioned in this post and many others on the AWS Management Console


About the Authors

Kartik Kannapur is a Data Scientist with AWS Professional Services. He holds a Master’s degree in Applied Mathematics and Statistics from Stony Brook University and focuses on using machine learning to solve customer business problems.

 

 

 

Prithiviraj Jothikumar, PhD, is a Data Scientist with AWS Professional Services, where he helps customers build solutions using machine learning. He enjoys watching movies and sports and spending time to meditate.

 

 

 

Bala Krishnamoorthy is a Data Scientist with AWS Professional Services, where he helps customers solve problems and run machine learning workloads on AWS. He has worked with customers across diverse industries, including software, finance, and healthcare. In his free time, he enjoys spending time outdoors, running with his dog, beating his family and friends at board games and keeping up with the stock market.

Accessing and visualizing external tables in an Apache Hive metastore with Amazon Athena and Amazon QuickSight

Post Syndicated from James Sun original https://aws.amazon.com/blogs/big-data/accessing-and-visualizing-external-tables-in-an-apache-hive-metastore-with-amazon-athena-and-amazon-quicksight/

Many organizations have an Apache Hive metastore that stores the schemas for their data lake. You can use Amazon Athena due to its serverless nature; Athena makes it easy for anyone with SQL skills to quickly analyze large-scale datasets. You may also want to reliably query the rich datasets in the lake, with their schemas hosted in an external Hive metastore. In response to customers’ requests, AWS announced Athena’s support for Hive metastore in early 2020. This extends the ability in Athena to query external data stores, with a combined benefit of better performance and lower cost.

In this post, we provide an AWS CloudFormation template that configures a remote Hive metastore based on Amazon Relational Database Service (Amazon RDS) and MySQL with Amazon EMR located in a private subnet to perform ETL tasks. We then demonstrate how you can use a Spark step to pull COVID-19 datasets from a public repository and transform the data into a performant Parquet columnar storage format. We also walk through the steps to query the data with Athena and visualize it with Amazon QuickSight. QuickSight is a fully managed data visualization service; it lets the you easily create and publish interactive dashboards by analyzing data from various data sources, including Athena.

Solution walkthrough

The following diagram shows the architecture for our solution.


As shown in the preceding architecture, we have an Availability Zone within a VPC in an AWS Region. The Availability Zone hosts subnets that are either public or private. A multi-master EMR cluster that has Hive software components installed is launched in a private subnet with egress internet traffic through a NAT gateway to download data from public sites for analysis. The multi-master feature also ensures that the primary nodes are highly available. The Hive metastore is backed by a remote RDS for MySQL instance located in the same private subnet.

We also have an Amazon Simple Storage Service (Amazon S3)-based data lake. A Spark step in Amazon EMR retrieves the data in CSV format, saves it in the provisioned S3 bucket, and transforms the data into Parquet format. The Spark step creates an external Hive table referencing the Parquet data and is ready for Athena to query.

With Athena, you can create a data source connector based on an AWS Lambda function to access the Hive metastore hosted on the EMR cluster by using the Apache Thrift interface.

The connector is called a catalog, which when invoked in a SQL statement with Athena, invokes the Lambda function. The function exits if the connector is not active for 15 minutes. For queries that run longer than 15 minutes, it’s recommended to let the query complete before retrieving the query results in an Amazon S3 location you can specify.

In Athena, you can compose a SQL statement against the Hive tables with predicates to further limit the size of the query result for faster visualization by QuickSight.

Deploying the resources with AWS CloudFormation

To demonstrate our solution, we provide a CloudFormation template that you can download to easily deploy the necessary AWS resources. The template creates the following resources to simulate the environment:

  • VPC and subnets – A VPC with one public and one private subnets. A NAT gateway is also created to allow outbound internet access from the EMR cluster to download public COVID-19 datasets from the Johns Hopkins GitHub repo.
  • EMR cluster – A multi-master EMR cluster with Hive, running on three primary nodes (m5.xlarge) and two core nodes (m5.xlarge), is launched to support the thrift connection required by the Athena Lambda connectors.
  • Amazon RDS for MySQL database – An RDS for MySQL primary instance is launched in the same subnet as the EMR cluster. The RDS instance serves as the Hive metastore backend data store.
  • S3 bucket – An S3 bucket stores files in Parquet format by Amazon EMR and is accessed later by Athena.
  • AWS IAM users – Two AWS Identity and Access Management (IAM) users belonging to different user groups. The first user, the data engineer, has permissions to access the Lambda-based Athena data source connector. The other user, the salesperson, does not have permissions to access the connector.

To get started, you need to have an AWS account. If you don’t have one, go to aws.amazon.com to sign up for one. Then complete the following steps:

  1. Sign in to the AWS Management Console as an IAM power user, preferably an admin user.
  2. Choose Launch Stack to launch the CloudFormation template:

This template has been tested in the US East (N. Virginia) Region.

  1. Choose Next.

You’re prompted to enter a few launch parameters.

  1. For Stack name, enter a name for the stack (for example, athena-hms).
  2. For Hive Metastore Host Number, choose the Amazon EMR primary node to connect to (or use the default value).
  3. Continue to choose Next and leave other parameters at their default.
  4. On the review page, select the three check boxes to confirm that AWS CloudFormation might create resources.
  5. Choose Create stack.

The stack takes 15–20 minutes to complete.

  1. On the Outputs tab of the stack details, save the key-value pairs to use later.

When the EMR cluster is provisioned, it uses a bootstrap action to install the necessary Python libraries. It runs an Amazon EMR Spark step (a PySpark script) that downloads three COVID-19 datasets for confirmed, recovered, and death cases from the John Hopkins GitHub repo in CSV format and stores them in the csv subfolder of the S3 bucket created by the CloudFormation stack. Lastly, the final transformed data is converted to Parquet format and external Hive tables are created referencing the Parquet data located in the parquet subfolder of the S3 bucket.

The following are the source codes for the bootstrap and Spark step actions for your reference:

To validate the data, on the Amazon S3 console, choose the bucket name from the CloudFormation template outputs. You should see a covid_data folder in the bucket. The folder contains the two subfolders, csv and parquet, which store the raw CSV and transformed Parquet data, respectively.

Querying the data in Athena

The CloudFormation template creates two users belonging to two different AWS IAM groups. The de_user_us-east-1_athena-hms user is a data engineer with permissions to access the Lambda function to communicate with the Hive metastore using the Athena data source connector. They belong to the group athena-hms-DataEngineerGroup-xxxxxxx. The sales_user_us-east-1_athena-hms user is a salesperson with no access to the connector. They belong to the group athena-hms-SalesGroup-xxxxx. 

To query the data, first retrieve your secret values in AWS Secrets Manager:

  1. On the Secrets Manager console, choose Secrets.
  2. Choose DataEngineerUserCreds.

  1. Choose Retrieve secret value.

 

  1. Save the username and password.

 

  1. Repeat these steps for SalesUserCreds. 

With the data in place, we can now use Athena to query it.

Accessing data as the data engineer

To query the data as the de_user_us-east-1_athena-hms user, complete the following steps:

  1. Sign in to the Athena console as the de_user_us-east-1_athena-hms user with the credentials retrieved from Secrets Manager.

After logging in, we need to create a data source for Hive metastore.

  1. On the navigation bar, choose Data sources.
  2. Choose Connect data source.

  1. For Choose where you data is located, select Query data in Amazon S3.
  2. For Choose a metadata catalog, select Apache Hive metastore.
  3. Chose Next.

  1. For Lambda function, choose the function the CloudFormation template created with the key HiveMetastoreFunctionName.
  2. For Catalog name, enter a name for the Athena catalog (for example, demo_hive_metastore).
  3. Choose Connect.

You should now have a catalog named demo_hive_metastore with the catalog type Hive metastore.

  1. On the navigation bar, choose Query editor.
  2. Enter the following SQL statement:
    SELECT *
    FROM demo_hive_metastore.default.covid_confirmed_cases
    WHERE country = 'US'
            OR country = 'Italy'
            OR country = 'India'
            OR country = 'Brazil'
            OR country = 'Spain'
            OR country = 'United Kingdom'

This SQL statement selects all the columns in the covid_confirmed_cases table with predicates to only include a few countries of interest. We use a table name with the pattern <catalog name>.<hive database name>.<hive table name in the database>, which for this post translates to demo_hive_metastore.default.covid_confirmed_cases.

  1. Choose Run query.

The following screenshot shows your query results.

Make sure you completely sign out of the console before moving on to the next steps.

Accessing data as the salesperson

Sign in to the console as sales_user_us-east-1_athena-hms user. Because the salesperson user doesn’t have the appropriate IAM policies to access the Hive metastore connection, you can’t see the tables.

 

Permissions policies

The data engineer has additional policies attached to their IAM group in addition to the managed AmazonAthenaFullAccess policy: the <stack-name>-DataBucketReadAccessPolicy-xxxxx and <stack-name>-HiveMetastoreDataSourceAccessPolicy-xxxxx policies created by the CloudFormation template. Therefore, the data engineer can view the tables, but the salesperson can’t.

These policies are available on the IAM console, on the Permissions tab for the group <stack-name>-DataEngineerGroup-xxxxx.

The following sample JSON code is the <stack-name>-DataBucketReadWriteAccessPolicy-xxxxx policy to allow access to the provisioned S3 bucket:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:Get*",
                "s3:List*",
                "s3:Put*"
            ],
            "Resource": [
                "arn:aws:s3:::<provisioned bucket name>",
                "arn:aws:s3:::<provisioned bucket name>/",
                "arn:aws:s3:::<provisioned bucket name>/*"
            ],
            "Effect": "Allow"
        }
    ]
}

The following sample JSON code is the <stack-name>-HiveMetastoreDataSourceAccessPolicy-xxxxx policy to allow access to the Lambda Hive metastore function:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "lambda:GetFunction",
                "lambda:GetLayerVersion",
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:us-east-1:<account id>:function:athena-hms-HiveMetastoreFunction"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:PutObject",
                "s3:ListMultipartUploadParts",
                "s3:AbortMultipartUpload"
            ],
            "Resource": [
               "arn:aws:s3:::<provisioned bucket name>/hms_spill"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "lambda:ListFunctions"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

Next, we walk through using QuickSight to visualize the results. Make sure you completely sign out of the console as the salesperson user before proceeding to the next steps. 

Signing up for QuickSight

You can skip this section if you have already signed up for QuickSight previously.

  1. Sign in to the console as the IAM power user who deployed the CloudFormation template or any user with enough IAM privileges to set up QuickSight.
  2. On the QuickSight console, choose Sign up for QuickSight.
  3. Select either the Standard or Enterprise edition for QuickSight.
  4. Choose Continue.

  1. For QuickSight account name, enter your AWS account ID.
  2. For Notification email address, enter your email.
  3. Select Amazon S3.

  1. Select the provisioned S3 bucket to grant QuickSight permission to access.
  2. Choose Finish.

Your QuickSight account should now be set up.

Attaching the Hive metastore access policy

Before you can use QuickSight, you have to attach the Hive metastore access policy to the QuickSight service role.

  1. On the IAM console, search for the service role aws-quicksight-service-role-v0.
  2. Choose the role.

  1. Search for the <stack-name>-HiveMetastoreDataSourceAccessPolicy-xxxxx policy.
  2. Select the policy and attach it to the QuickSight service role.

Creating your data source and performing data conversions

Before we can create visualizations, we need to set up our data source.

  1. Download the SQL script covid-19.sql.
  2. On the QuickSight console, choose Datasets in the navigation pane.

  1. Choose New dataset.

  1. Choose Athena.

  1. In the pop-up window, for Data source name, enter demo_hive_metastore.
  2. Choose Validate.
  3. When the connection is validated, choose Create data source.

  1. In the next window, choose Use custom SQL.

  1. Enter the content of the covid-19.sql script in the query window.
  2. Choose Confirm query.

  1. Leave Import to SPICE for quicker analytics
  2. Choose Visualize.

Now we perform a few data type conversions before visualizing the data.

  1. Choose the Edit icon next to Data set on the menu bar.

  1. Choose the … icon.
  2. Choose Edit.

  1. Expand date and choose Change data type.
  2. Choose Date.

  1. Enter yyyy-MM-dd to convert the date format.
  2. Choose Update.

Now we create a coordinate using the latitude and longitude values.r

  1. Expand lat and choose Ad to coordinates.

  1. Leave Create new geospatial coordinates
  2. Chose Add.

  1. In the pop-up window, for Name your coordinates, enter coordinate.
  2. For Field to use for longitude, choose lon.
  3. Choose Create coordinates.

  1. Choose Save and visualize on the menu bar.

Creating visualizations in QuickSight

Now we can visualize our data. For this post, we create a map visualization.

  1. For Visual types, choose the map

  1. For Geospatial, drag coordinates.
  2. For Size, drag confirmed.
  3. For Color, drag country.

This world map shows the accumulated confirmed cases for selected countries over time; you need to use a filter to look at confirmed cases on a specific date.

  1. In the navigation pane, choose Filter.
  2. Choose the + icon.
  3. For Filter type, choose Time range.
  4. Choose start and end dates, such as 2020-09-10 00:00 and 2020-09-11 00:00, respectively.
  5. Choose Apply.

This plots the confirmed cases on September 10, 2020, for these countries.

Similarly, you can choose other visual types, such as a line chart, and generate the mortality rate for selected countries over time.

Using highly available primary nodes of the Amazon EMR cluster

The EMR cluster has a multi-master configuration with three primary nodes running to meet high availability requirements. At any time, the Lambda function communicates with one of these three EMR primary nodes. In the rare event that this node goes down, you can quickly re-establish the Athena data source connector to the external Hive metastore by failing over to another active primary node.

To perform this failover, complete the following steps:

  1. On the AWS CloudFormation console, choose Stacks.
  2. Choose athena-hms.
  3. Choose update.
  4. Choose Use current update.
  5. Choose Next.
  6. For Hive Metastore Host Number, choose a host other than the current one you’re using.

  1. Choose Next.
  2. Acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Update stack.

In less than a minute, you should be able to access the Hive metastore and continue to query on the Athena console.

Cleaning up

You may want to clean up the demo environment when you’re done. To do so, on the AWS CloudFormation console, select the template and choose Delete.

This action also deletes the S3 bucket and any data in it. If you want to retain the data for future use, you should make a copy of the bucket before you delete it.

Summary

In this post, we walked through a solution using Athena to query external Hive tables with public COVID-19 datasets hosted in an S3 bucket and visualizing the data with QuickSight. We provided a CloudFormation template to automate the deployment of necessary AWS services for the demo. We encourage you to use these managed and scalable services for your specific use cases in production.


About the Authors

James Sun is a Senior Solutions Architect with Amazon Web Services. James has over 15 years of experience in information technology. Prior to AWS, he held several senior technical positions at MapR, HP, NetApp, Yahoo, and EMC. He holds a PhD from Stanford University.

 

 

Chinmayi Narasimhadevara is a Solutions Architect with Amazon Web Services. Chinmayi has over 15 years of experience in information technology and has worked with organizations ranging from large enterprises to mid-sized startups. She helps AWS customers leverage the correct mix of AWS services to achieve success for their business goals.

 

 

Gagan Brahmi is a Solutions Architect focused on Big Data & Analytics at Amazon Web Services. Gagan has over 15 years of experience in information technology. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.