Tag Archives: Amazon EMR

Install Python libraries on a running cluster with EMR Notebooks

Post Syndicated from Parag Chaudhari original https://aws.amazon.com/blogs/big-data/install-python-libraries-on-a-running-cluster-with-emr-notebooks/

Last year, AWS introduced EMR Notebooks, a managed notebook environment based on the open-source Jupyter notebook application.

This post discusses installing notebook-scoped libraries on a running cluster directly via an EMR Notebook. Before this feature, you had to rely on bootstrap actions or use custom AMI to install additional libraries that are not pre-packaged with the EMR AMI when you provision the cluster. This post also discusses how to use the pre-installed Python libraries available locally within EMR Notebooks to analyze and plot your results. This capability is useful in scenarios in which you don’t have access to a PyPI repository but need to analyze and visualize a dataset.

Benefits of using notebook-scoped libraries with EMR Notebooks

Notebook-scoped libraries provide you the following benefits:

  • Runtime installation – You can import your favorite Python libraries from PyPI repositories and install them on your remote cluster on the fly when you need them. These libraries are instantly available to your Spark runtime environment. There is no need to restart the notebook session or recreate your cluster.
  • Dependency isolation – The libraries you install using EMR Notebooks are isolated to your notebook session and don’t interfere with bootstrapped cluster libraries or libraries installed from other notebook sessions. These notebook-scoped libraries take precedence over bootstrapped libraries. Multiple notebook users can import their preferred version of the library and use it without dependency clashes on the same cluster.
  • Portable library environment – The library package installation happens from your notebook file. This allows you to recreate the library environment when you switch the notebook to a different cluster by re-executing the notebook code. At the end of the notebook session, the libraries you install through EMR Notebooks are automatically removed from the hosting EMR cluster.

Prerequisites

To use this feature in EMR Notebooks, you need a notebook attached to a cluster running EMR release 5.26.0 or later. The cluster should have access to the public or private PyPI repository from which you want to import the libraries. For more information, see Creating a Notebook.

There are different ways to configure your VPC networking to allow clusters inside the VPC to connect to an external repository. For more information, see Scenarios and Examples in the Amazon VPC User Guide.

Using notebook-scoped libraries

This post demonstrates the notebook-scoped libraries feature of EMR Notebooks by analyzing the publicly available Amazon customer reviews dataset for books. For more information, see Amazon Customer Reviews Dataset on the Registry of Open Data for AWS.

Open your notebook and make sure the kernel is set to PySpark. Run the following command from the notebook cell:

print("Welcome to my EMR Notebook!")

You get the following output:

Output shows newly created spark session.

You can examine the current notebook session configuration by running the following command:

%%info

You get the following output:

Output shows spark session properties which include Python version and properties to enable this new feature.

The notebook session is configured for Python 3 by default (through spark.pyspark.python). If you prefer to use Python 2, reconfigure your notebook session by running the following command from your notebook cell:

%%configure -f { "conf":{ "spark.pyspark.python": "python", 
                "spark.pyspark.virtualenv.enabled": "true", 
                "spark.pyspark.virtualenv.type":"native", 
                "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv" }}

You can also verify the Python version used in your current notebook session by running the following code:

import sys
sys.version

You get the following output:

Output shows Python version 3.6.8

Before starting your analysis, check the libraries that are already available on the cluster. You can do this using the list_packages() PySpark API, which lists all the Python libraries on the cluster. Run the following code:

sc.list_packages()

You get an output similar to the following code, which shows all the available Python 3-compatible packages on your cluster:

Output shows list of Python packages along with their versions.

Load the Amazon customer reviews data for books into a Spark DataFrame with the following code:

df = spark.read.parquet('s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet')

You are now ready to explore the data. Determine the schema and number of available columns in your dataset with the following code:

# Total columns
print(f'Total Columns: {len(df.dtypes)}')
df.printSchema()

The following code is the output:

Output shows that Spark DataFrame has 15 columns. It also shows the schema of the Spark DataFrame.

This dataset has a total of 15 columns. You can also check the total rows in your dataset by running the following code:

# Total row
print(f'Total Rows: {df.count():,}')

You get the following output:

Output shows that Spark DataFrame has more than 20 million rows.

Check the total number of books with the following code:

# Total number of books
num_of_books = df.select('product_id').distinct().count()
print(f'Number of Books: {num_of_books:,}')

You get the following output:

Output shows that there are more than 3 million books.

You can also analyze the number of book reviews by year and find the distribution of customer ratings. To do this, import the Pandas library version 0.25.1 and the latest Matplotlib library from the public PyPI repository. Install them on the cluster attached to your notebook using the install_pypi_package API. See the following code:

sc.install_pypi_package("pandas==0.25.1") #Install pandas version 0.25.1 
sc.install_pypi_package("matplotlib", "https://pypi.org/simple") #Install matplotlib from given PyPI repository

You get the following output:

Output shows that “pandas” and “matplotlib” packages are successfully installed.

The install_pypi_package PySpark API installs your libraries along with any associated dependencies. By default, it installs the latest version of the library that is compatible with the Python version you are using. You can also install a specific version of the library by specifying the library version from the previous Pandas example.

Verify that your imported packages successfully installed by running the following code:

sc.list_packages()

You get the following output:

Output shows list of Python packages along with their versions.

You can also analyze the trend for the number of reviews provided across multiple years. Use ‘toPandas()’ to convert the Spark data frame to a Pandas data frame, which you can visualize with Matplotlib. See the following code:

# Number of reviews across years
num_of_reviews_by_year = df.groupBy('year').count().orderBy('year').toPandas()

import matplotlib.pyplot as plt
plt.clf()
num_of_reviews_by_year.plot(kind='area', x='year',y='count', rot=70, color='#bc5090', legend=None, figsize=(8,6))
plt.xticks(num_of_reviews_by_year.year)
plt.xlim(1995, 2015)
plt.title('Number of reviews across years')
plt.xlabel('Year')
plt.ylabel('Number of Reviews')

The preceding commands render the plot on the attached EMR cluster. To visualize the plot within your notebook, use %matplot magic. See the following code:

%matplot plt

The following graph shows that the number of reviews provided by customers increased exponentially from 1995 to 2015. Interestingly, 2001, 2002, and 2015 are outliers, when the number of reviews dropped from the previous years.

A line chart showing number of reviews across years.

You can analyze the distribution of star ratings and visualize it using a pie chart. See the following code:

# Distribution of overall star ratings
product_ratings_dist = df.groupBy('star_rating').count().orderBy('count').toPandas()

plt.clf()
labels = [f"Star Rating: {rating}" for rating in product_ratings_dist['star_rating']]
reviews = [num_reviews for num_reviews in product_ratings_dist['count']]
colors = ['#00876c', '#89c079', '#fff392', '#fc9e5a', '#de425b']
fig, ax = plt.subplots(figsize=(8,5))
w,a,b = ax.pie(reviews, autopct='%1.1f%%', colors=colors)
plt.title('Distribution of star ratings for books')
ax.legend(w, labels, title="Star Ratings", loc="center left", bbox_to_anchor=(1, 0, 0.5, 1))

Print the pie chart using %matplot magic and visualize it from your notebook with the following code:

%matplot plt

The following pie chart shows that 80% of users gave a rating of 4 or higher. Approximately 10% of users rated their books 2 or lower. In general, customers are happy about their book purchases from Amazon.

Output shows pie chart depicting distribution of star ratings.

Lastly, use the ‘uninstall_package’ Pyspark API to uninstall the Pandas library that you installed using the install_package API. This is useful in scenarios in which you want to use a different version of a library that you previously installed using EMR Notebooks. See the following code:

sc.uninstall_package('pandas')

You get the following output:

Output shows that “pandas” package is successfully uninstalled.

Next, run the following code:

sc.list_packages()

You get the following output:

Output shows list of Python packages along with their versions.

After closing your notebook, the Pandas and Matplot libraries that you installed on the cluster using the install_pypi_package API are garbage and collected out of the cluster.

Using local Python libraries in EMR Notebooks

The notebook-scoped libraries discussed previously require your EMR cluster to have access to a PyPI repository. If you cannot connect your EMR cluster to a repository, use the Python libraries pre-packaged with EMR Notebooks to analyze and visualize your results locally within the notebook. Unlike the notebook-scoped libraries, these local libraries are only available to the Python kernel and are not available to the Spark environment on the cluster. To use these local libraries, export your results from your Spark driver on the cluster to your notebook and use the notebook magic to plot your results locally. Because you are using the notebook and not the cluster to analyze and render your plots, the dataset that you export to the notebook has to be small (recommend less than 100 MB).

To see the list of local libraries, run the following command from the notebook cell:

%%local
conda list

You get a list of all the libraries available in the notebook. Because the list is rather long, this post doesn’t include them.

For this analysis, find out the top 10 children’s books from your book reviews dataset and analyze the star rating distribution for these children’s books.

You can identify the children’s books by using customers’ written reviews with the following code:

kids_books = (
df
.where("lower(review_body) LIKE '%child%' OR lower(review_body) LIKE '%kid%' OR lower(review_body) LIKE '%infant%'OR lower(review_body) LIKE '%Baby%'")
.select("customer_id", "product_id", "star_rating", "product_title", "year")
)

Plot the top 10 children’s books by number of customer reviews with the following code:

top_10_book_titles = kids_books.groupBy('product_title') \
                       .count().orderBy('count', ascending=False) \
                       .limit(10)
top_10_book_titles.show(10, False)

You get the following output:

Output shows list of book titles with their corresponding review count.

Analyze the customer rating distribution for these books with the following code:

top_10 = kids_books.groupBy('product_title', 'star_rating') \
           .count().join(top_10_book_titles, ['product_title'], 'leftsemi') \
           .orderBy('count', ascending=False) 
top_10.show(truncate=False)

You get the following output:

Output shows list of book titles along with start ratings and review counts.

To plot these results locally within your notebook, export the data from the Spark driver and cache it in your local notebook as a Pandas DataFrame. To achieve this, first register a temporary table with the following code:

top_10.createOrReplaceTempView("top_10_kids_books")

Use the local SQL magic to extract the data from this table with the following code:

%%sql -o top_10 -n -1
SELECT product_title, star_rating, count from top_10_kids_books
GROUP BY product_title, star_rating, count
ORDER BY count Desc

For more information about these magic commands, see the GitHub repo.

After you execute the code, you get a user-interface to interactively plot your results. The following pie chart shows the distribution of ratings:

Output shows pie chart depicting distribution of star ratings.

You can also plot more complex charts by using local Matplot and seaborn libraries available with EMR Notebooks. See the following code:

%%local 
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
sns.set()
plt.clf()
top_10['book_name'] = top_10['product_title'].str.slice(0,30)
colormap = sns.color_palette("hls", 8)
pivot_df = top_10.pivot(index= 'book_name', columns='star_rating', values='count')
pivot_df.plot.barh(stacked=True, color = colormap, figsize=(15,11))
plt.title('Top 10 children books',fontsize=16)
plt.xlabel('Number of reviews',fontsize=14)
plt.ylabel('Book',fontsize=14)
plt.subplots_adjust(bottom=0.2)

You get the following output:

Output shows stacked bar chart for top 10 children books with star ratings and review counts.

Summary

This post showed how to use the notebook-scoped libraries feature of EMR Notebooks to import and install your favorite Python libraries at runtime on your EMR cluster, and use these libraries to enhance your data analysis and visualize your results in rich graphical plots. The post also demonstrated how to use the pre-packaged local Python libraries available in EMR Notebook to analyze and plot your results.

 


About the Author

Parag Chaudhari is a software development engineer at AWS.

 

 

 

Secure your Amazon EMR cluster from unintentional network exposure with Block Public Access configuration

Post Syndicated from Vignesh Rajamani original https://aws.amazon.com/blogs/big-data/secure-your-amazon-emr-cluster-from-unintentional-network-exposure-with-block-public-access-configuration/

AWS security groups act as a network firewall that allows you to control access to your cluster to only whitelisted IP addresses. Proper management of security groups rules is critical to protect your application and data on the cluster. Amazon EMR strongly recommends creating restrictive security group rules that include the necessary network ports, protocols, and IP addresses based on your application requirements.

While AWS account administrators can protect cloud network security in different ways, a new feature helps them prevent account users from launching clusters with misconfigured security group rules. Misconfiguration can open a broad range of cluster ports to unrestricted traffic from the public internet and expose cluster resources to outside threats.

This post discusses a new account level feature called Block Public Access (BPA) configuration that helps administrators enforce a common public access rule across all of their EMR clusters in a region.

Overview of Block Public Access configuration

BPA configuration is an account-level configuration that helps you centrally manage public network access to EMR clusters in a region. You can enable this configuration in a region and block your account users from launching clusters that allow unrestricted inbound traffic from the public IP address ( source set to 0.0.0.0/0 for IPv4 and ::/0 for IPv6) through its ports. Your applications may require specific ports to be open to the internet. In that case, configure these ports (or port ranges) in the BPA configuration as exceptions to allow public access before you launch clusters.

When account users launch clusters in the region where you have enabled BPA configuration, EMR will check the port rules defined in this configuration and  compare it with inbound traffic rules specified in the security groups associated with the clusters. If these security groups have inbound rules that open ports to the public IP address but you did not configure these ports as exception in BPA configuration, then EMR will fail the cluster creation and send an exception to the user.

 Enabling BPA configuration from the AWS Management Console

To enable BPA configuration, you need permission to call PutBlockPublicAccessConfiguration API.

  • Log in to the AWS Management Console. From the console, navigate to the Amazon EMR
  • From the navigation panel, choose Block Public Access.
  • Choose Change and select On to enable BPA.

By default, all ports are blocked except port 22 for SSH traffic. To allow more ports for public access, add them as exceptions.

  • Choose Add a port range.

Before launching your cluster, define these exceptions. The port number or range should be the only ones in the security group rules that have an inbound source IP address of 0.0.0.0/0 for IPv4 and ::/0 for IPv6.

  • Enter a port number or the range of ports for public access.
  • Choose Save Changes.

Block public access section with the "Change" hyperlink circled in red.

Block public access settings, under Exceptions section +Add a port range circled in red.

For information about configuring BPA using the AWS CLI, see Configure Block Public Access.

Summary

In this post ,we discussed a new account level feature on Amazon EMR called Block Public Access  (BPA) configuration that helps administrators manage public access to their EMR clusters. You can enable BPA configuration today and prevent your EMR cluster in a region from being unintentionally exposed to public network.

 


About the Author

Vignesh Rajamani is a senior product manager for EMR at AWS.

 

Implement perimeter security in EMR using Apache Knox

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/implement-perimeter-security-in-emr-using-apache-knox/

Perimeter security helps secure Apache Hadoop cluster resources to users accessing from outside the cluster. It enables a single access point for all REST and HTTP interactions with Apache Hadoop clusters and simplifies client interaction with the cluster. For example, client applications must acquire Kerberos tickets using Kinit or SPNEGO before interacting with services on Kerberos enabled clusters. In this post, we walk through setup of Apache Knox to enable perimeter security for EMR clusters.

It provides the following benefits:

  • Simplify authentication of various Hadoop services and UIs
  • Hide service-specific URL’s/Ports by acting as a Proxy
  • Enable SSL termination at the perimeter
  • Ease management of published endpoints across multiple clusters

Overview

Apache Knox

Apache Knox provides a gateway to access Hadoop clusters using REST API endpoints. It simplifies client’s interaction with services on the Hadoop cluster by integrating with enterprise identity management solutions and hiding cluster deployment details.

In this post, we run the following setup:

  • Create a virtual private cloud (VPC) based on the Amazon VPC
  • Provision an Amazon EC2 Windows instance for Active Directory domain controller.
  • Create an Amazon EMR security configuration for Kerberos and cross-realm trust.
  • Set up Knox on EMR master node and enable LDAP authentication

Visually, we are creating the following resources:

Figure 1: Provisioned infrastructure from CloudFormation

Prerequisites and assumptions

Before getting started, the following prerequisites must be met:

IMPORTANT: The templates use hardcoded user name and passwords, and open security groups. They are not intended for production use without modification.

NOTE:

  • Single VPC has been used to simplify networking
  • CloudFormationtemplates use hardcoded user names and passwords and open security groups for simplicity.

Implementation

Single-click solution deployment

If you don’t want to set up each component individually, you can use the single-step AWS CloudFormation template. The single-step template is a master template that uses nested stacks (additional templates) to launch and configure all the resources for the solution in one operation.

To launch the entire solution, click on the Launch Stack button below that directs you to the console. Do not change to a different Region because the template is designed to work only in US-EAST-1 Region.

This template requires several parameters that you must provide. See the table below, noting the parameters marked with *, for which you have to provide values. The remaining parameters have default values and should not be edited.

For this parameterUse this
1Domain Controller NameDC1
2Active Directory domainawsknox.com
3Domain NetBIOS nameAWSKNOX (NetBIOS name of the domain (up to 15 characters).
4Domain admin userUser name for the account to be added as Domain administrator. (awsadmin)
5Domain admin password *Password for the domain admin user. Must be at least eight characters containing letters, numbers, and symbols – for example, CheckSum123
6Key pair name *Name of an existing EC2 key pair to enable access to the domain controller instance.
7Instance typeInstance type for the domain controller EC2 instance.
8LDAP Bind user nameLDAP Bind user name.
Default value is: CN=awsadmin,CN=Users,DC=awsknox,DC=com
9EMR Kerberos realmEMR Kerberos realm name. This is usually the VPC’s domain name in upper case letters Eg: EC2.INTERNAL
10Cross-realm trust password *Password for cross-realm trust Eg: CheckSum123
11Trusted Active Directory DomainThe Active Directory domain that you want to trust. This is same as Active Directory in name, but in upper case letters. Default value is “AWSKNOX.COM”
12Instance typeInstance type for the domain controller EC2 instance. Default: m4.xlarge
13Instance countNumber of core instances of EMR cluster. Default: 2
14Allowed IP addressThe client IP address that can reach your cluster. Specify an IP address range in CIDR notation (for example, 203.0.113.5/32). By default, only the VPC CIDR (10.0.0.0/16) can reach the cluster. Be sure to add your client IP range so that you can connect to the cluster using SSH.
15EMR applicationsComma-separated list of applications to install on the cluster. By default it selects “Hadoop,” “Spark,” “Ganglia,” “Hive” and “HBase”
16LDAP search baseLDAP search base: Only value is : “CN=Users,DC=awshadoop,DC=com”
17LDAP search attributeProvide LDAP user search attribute. Only value is : “sAMAccountName”
18LDAP user object classProvide LDAP user object class value. Only value is : “person”
19LDAP group search baseProvide LDAP group search base value. Only value is : “dc=awshadoop, dc=com”
20LDAP group object classProvide LDAP group object class. Only value is “group”
21LDAP member attributeProvide LDAP member attribute. Only value is : “member”
22EMRLogDir *Provide an Amazon S3 bucket where the EMRLogs are stored. Also provide “s3://” as prefix.
23S3 BucketAmazon S3 bucket where the artifacts are stored. In this case, all the artifacts are stored in “aws-bigdata-blog” public S3 bucket. Do not change this value.

Deploying each component individually

If you used the CloudFormation Template in the single-step solution, you can skip this section and start from the Access the Cluster section. This section describes how to use AWS CloudFormation templates to perform each step separately in the solution.

1.     Create and configure an Amazon VPC

In this step, we set up an Amazon VPC, a public subnet, an internet gateway, a route table, and a security group.

In order for you to establish a cross-realm trust between an Amazon EMR Kerberos realm and an Active Directory domain, your Amazon VPC must meet the following requirements:

  • The subnet used for the Amazon EMR cluster must have a CIDR block of fewer than nine digits (for example, 10.0.1.0/24).
  • Both DNS resolution and DNS hostnames must be enabled (set to “yes”).
  • The Active Directory domain controller must be the DNS server for instances in the Amazon VPC (this is configured in the next step).

To launch directly through the console, choose Launch Stack.

2.     Launch and configure an Active Directory domain controller

In this step, you use an AWS CloudFormation template to automatically launch and configure a new Active Directory domain controller and cross-realm trust.

Next, launch a windows EC2 instance and install and configure an Active Directory domain controller. In addition to launching and configuring an Active Directory domain controller and cross realm trust, this AWS CloudFormation template also sets the domain controller as the DNS server (name server) for your Amazon VPC.

To launch directly through the console, choose Launch Stack.

3.     Launch and configure EMR cluster with Apache Knox

To launch a Kerberized Amazon EMR cluster, first we must create a security configuration containing the cross-realm trust configuration. For more details on this, please refer to the blog post, Use Kerberos Authentication to integerate Amazon EMR with Microsoft Active Directory.

In addition to the steps that are described in the above blog, this adds an additional step to the EMR cluster, which creates a Kerberos principal for Knox.

The CloudFormation script also updates the below parameters in core-site.xml, hive-site.xml, hcatalog-webchat-site.xml and oozie-site.xml files. You can see these in “create_emr.py” script. Once the EMR cluster is created, it also runs a shell script as an EMR step. This shell script downloads and installs Knox software on EMR master machine. It also creates a Knox topology file with the name: emr-cluster-top.

To launch directly through the console, choose Launch Stack.

Accessing the cluster

API access to Hadoop Services

One of the main reasons to use Apache Knox is the isolate the Hadoop cluster from direct connectivity by users. Below, we demonstrate how you can interact with several Hadoop services like WebHDFS, WebHCat, Oozie, HBase, Hive, and Yarn applications going through the Knox endpoint using REST API calls. The REST calls can be called on the EMR cluster or outside of the EMR cluster. However, in a production environment, EMR cluster’s security groups should be set to only allow traffic on Knox’s port number to block traffic to all other applications.

For the purposes of this blog, we make the REST calls on the EMR cluster by SSH’ing to master node on the EMR cluster using the LDAP credentials:

ssh [email protected]<EMR-Master-Machine-Public-DNS>

Replace <EMR-Master-Machine-Public-DNS> with the value from the CloudFormation outputs to the EMR cluster’s master node. Find this CloudFormation Output value from the stack you deployed in Step 3 above.

You are prompted for the ‘awsadmin’ LDAP password. Please use the password you selected during the CloudFormation stack creation.

NOTE: In order to connect, your client machine’s IP should fall within the CIDR range specified by “Allowed IP address in the CloudFormation parameters. If you are not able to connect to the master node, check the master instance’s security group for the EMR cluster has a rule to allow traffic from your client. Otherwise, your organizations firewall may be blocking your traffic.

Demonstrating access to the WebHDFS service API:

Here we will invoke the LISTSTATUS operation on WebHDFS via the knox gateway. In our setup, knox is running on port number 8449. The below command will return a directory listing of the root directory of HDFS.

curl -ku awsadmin 'https://localhost:8449/gateway/emr-cluster-top/webhdfs/v1/?op=LISTSTATUS'

You can use both “localhost” or the private DNS of the EMR master node.

You are prompted for the password. This is the same “Domain admin password” that was passed as the parameter into the CloudFormation stack.

Demonstrating access Resource Manager service API:

The Resource manager REST API provides information about the Hadoop cluster status, applications that are running on the cluster etc. We can use the below command to get the cluster information.

curl -ikv -u awsadmin -X GET 'https://localhost:8449/gateway/emr-cluster-top/resourcemanager/v1/cluster'

You are prompted for the password. This is the same “Domain admin password” that was passed as the parameter into the CloudFormation stack.

Demonstrating connecting to Hive using Beeline through Apache Knox:

We can use Beeline, a JDBC client tool to connect to HiveServer2. Here we will connect to Beeline via Knox.

Use the following command to connect to hive shell

$hive

Use the following syntax to connect to Hive from beeline

!connect jdbc:hive2://<EMR-Master-Machine-Public-DNS>:8449/;transportMode=http;httpPath=gateway/emr-cluster-top/hive;ssl=true;sslTrustStore=/home/knox/knox/data/security/keystores/gateway.jks;trustStorePassword=CheckSum123

NOTE: You must update the <EMR-Master-Machine-Public-DNS> with the public DNS name of the EMR master node.

Demonstrating submitting an Spark job using Apache Livy through Apache Knox

You can use the following command to submit a spark job to an EMR cluster. In this example, we run SparkPi program that is available in spark-examples.jar.

curl -i -k -u awsadmin -X POST --data '{"file": "s3://aws-bigdata-blog/artifacts/aws-blog-emr-knox/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi", "args": ["100"]}' -H "Content-Type: application/json" https://localhost:8449/gateway/emr-cluster-top/livy/v1/batches

You can use both “localhost” or the private DNS of EMR master node.

Securely accessing Hadoop Web UIs

In addition to providing API access to Hadoop clusters, Knox also provides proxying service for Hadoop UIs. Below is a table of available UIs:

Application NameApplication URL
1Resource Managerhttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/yarn/
2Gangliahttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/ganglia/
3Apache HBasehttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/hbase/webui/master-status
4WebHDFShttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/hdfs/
5Spark Historyhttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/sparkhistory/

On the first visit of any UI above, you are presented with a drop-down for login credentials. Enter the login user awsadmin and the password you specified as a parameter to your CloudFormation template.

You can now browse the UI as you were directly connected to the cluster. Below is a sample of the Yarn UI:

And the scheduler information in the Yarn UI:

Ganglia:

Spark History UI:

Lastly, HBase UI. The entire URL to the “master-status” page must be provided

Troubleshooting

It’s always clear when there is an error interacting with Apache Knox. Below are a few troubleshooting steps.

I cannot connect to the UI. I do not get any error codes.

  • Apache Knox may not be running. Check that its running by logging into the master node of your cluster and running “ps -ef | grep knox”. There should be a process running.
ps -ef | grep knox
Knox 114022 1 0 Aug24 ? 00:04:21 /usr/lib/jvm/java/bin/java -Djava.library.path=/home/knox/knox/ext/native -jar /home/knox/knox/bin/gateway.jar

If the process is not running, start the process by running “/home/knox/knox/bin/gateway.sh start” as the Knox user (sudo su – knox).

  • Your browser may not have connectivity to the cluster. Even though you may be able to SSH to the cluster, a firewall rule or security group rule may be preventing traffic on the port number that Knox is running on. You can route traffic through SSH by building an SSH tunnel and enable port forwarding.

I get an HTTP 400, 404 or 503 code when accessing a UI:

  • Ensure that the URL you are entering is correct. If you do not enter the correct path, then Knox provides an HTTP 404.
  • There is an issue with the routing rules within Apache Knox and it does not know how to route the requests. The logs for Knox are at INFO level by default and is available in /home/knox/knox/logs/. If you want to change the logging level, change the following lines in /home/knox/knox/conf/gateway-log4j.properties:log4j.logger.org.apache.knox.gateway=INFO
    #log4j.logger.org.apache.knox.gateway=DEBUGto#log4j.logger.org.apache.knox.gateway=INFO
    log4j.logger.org.apache.knox.gateway=DEBUGThe logs will provide a lot more information such as how Knox is rewriting URL’s. This could provide insight whether Knox is translating URL’s correctly.You can use the below “ldap”, “knoxcli” and “curl” commands to verify that the setup is correct. Run these commands as “knox” user.
  • To verify search base, search attribute and search class, run the below ldap command
    ldapsearch -h <Active-Directory-Domain-Private-IP-Address> -p 389 -x -D 'CN=awsadmin,CN=Users,DC=awsknox,DC=com' -w 'CheckSum123' -b 'CN=Users,DC=awsknox,DC=com' -z 5 '(objectClass=person)' sAMAccountName

  • Replace “<Active-Directory-Domain-Private-IP-Address>” with the private IP address of the Active Directory EC2 instance. You can get this IP address from the output of second CloudFormation template.
  • To verify the values for server host, port, username, and password, run the below ldap command.
    ldapwhoami -h <Active-Directory-Domain-Private-IP-Address> -p 389 -x -D 'CN=awsadmin,CN=Users,DC=awsknox,DC=com' -w 'CheckSum123'

  • Replace “<Active-Directory-Domain-Private-IP-Address>” with the private IP address of the Active Directory EC2 instance. You can get this IP address from the output of second CloudFormation template.
  • It should display the below output:

  • To verify the System LDAP bind successful or not:
    /home/knox/knox/bin/knoxcli.sh user-auth-test --cluster emr-cluster-top --u awsadmin --p 'CheckSum123'

  • Here “emr-cluster-top” is the topology file that defines the applications that are available and the endpoints that Knox should connect to service the application.
  • The output from the command should return the below output:

“System LDAP Bind successful!”

  • To verify LDAP authentication successful or not, run the below command.
    /home/knox/knox/bin/knoxcli.sh user-auth-test --cluster emr-cluster-top --u awsadmin --p 'CheckSum123'

  • Here “emr-cluster-top” is the topology file name that we created.
  • The output the command should return the below output:

“LDAP authentication successful!”

  • Verify if WebHDFS is reachable directly using the service
  • First, we must get a valid Kerberos TGT, for that we must use the kinit command as below:
    kinit -kt /mnt/var/lib/bigtop_keytabs/knox.keytab knox/<EMR-Master-Machine-Private-DNS>@EC2.INTERNAL
    curl --negotiate -u : http://<EMR-Master-Machine-Private-DNS>:50070/webhdfs/v1/?op=GETHOMEDIRECTORY

  • For example: EMR-Master-Machine-Private-DNS appears in this format: ip-xx-xx-xx-xx.ec2.internal
  • It should return a JSON object containing a “Path” variable of the user’s home directory.

Cleanup

Delete the CloudFormation stack to clean up all the resources created for this setup. If you used the nested stack, CloudFormation deletes all resources in one operation. If you deployed the templates individually, delete them in the reverse order of creation, deleting the VPC stack last.

Conclusion

In this post, we went through the setup, configuration, and validation of Perimeter security for EMR clusters using Apache Knox. This helps simplify Authentication for various Hadoop services. In our next post, we will show you how to integrate Apache Knox and Apache Ranger to enable authorization and audits.

Stay tuned!

 


Related

 


About the Author


Varun Rao is a enterprise solutions architect
. He works with enterprise customers in their journey to the cloud with focus of data strategy and security. In his spare time, he tries to keep up with his 4-year old.

 

 

 

Mert Hocanin is a big data architect with AWS, covering several products, including EMR, Athena and Managed Blockchain. Prior to working in AWS, he has worked on Amazon.com’s retail business as a Senior Software Development Engineer, building a data lake to process vast amounts of data from all over the company for reporting purposes. When not building and designing data lakes, Mert enjoys traveling and food.

 

 

 

Photo of Srikanth KodaliSrikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.

 

 

 

Run Spark applications with Docker using Amazon EMR 6.0.0 (Beta)

Post Syndicated from Paul Codding original https://aws.amazon.com/blogs/big-data/run-spark-applications-with-docker-using-amazon-emr-6-0-0-beta/

The Amazon EMR team is excited to announce the public beta release of EMR 6.0.0 with Spark 2.4.3, Hadoop 3.1.0, Amazon Linux 2, and Amazon Corretto 8. With this beta release, Spark users can use Docker images from Docker Hub and Amazon Elastic Container Registry (Amazon ECR) to define environment and library dependencies. Using Docker, users can easily define their dependencies and use them for individual jobs, avoiding the need to install dependencies on individual cluster hosts.

This post shows you how to use Docker with the EMR release 6.0.0 Beta. You’ll learn how to launch an EMR release 6.0.0 Beta cluster and run Spark jobs using Docker containers from both Docker Hub and Amazon ECR.

Hadoop 3 Docker support

EMR 6.0.0 (Beta) includes Hadoop 3.1.0, which allows the YARN NodeManager to launch containers either directly on the host machine of the cluster, or inside a Docker container. Docker containers provide a custom execution environment in which the application’s code runs isolated from the execution environment of the YARN NodeManager and other applications.

These containers can include special libraries needed by the application, and even provide different versions of native tools and libraries such as R, Python, Python libraries. This allows you to easily define the libraries and runtime dependencies that your applications need, using familiar Docker tooling.

Clusters running the EMR 6.0.0 (Beta) release are configured by default to allow YARN applications such as Spark to run using Docker containers. To customize this, use the configuration for Docker support defined in the yarn-site.xml and container-executor.cfg files available in the /etc/hadoop/conf directory. For details on each configuration option and how it is used, see Launching Applications Using Docker Containers.

You can choose to use Docker when submitting a job. On job submission, the following variables are used to specify the Docker runtime and Docker image used:

    • YARN_CONTAINER_RUNTIME_TYPE=docker
    • YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={DOCKER_IMAGE_NAME}

When you use Docker containers to execute your YARN applications, YARN downloads the Docker image specified when you submit your job. For YARN to resolve this Docker image, it must be configured with a Docker registry. Options to configure a Docker registry differ based on how you chose to deploy EMR (using either a public or private subnet).

Docker registries

A Docker registry is a storage and distribution system for Docker images. For EMR 6.0.0 (Beta), the following Docker registries can be configured:

  • Docker Hub: A public Docker registry containing over 100,000 popular Docker images.
  • Amazon ECR: A fully-managed Docker container registry that allows you to create your own custom images and host them in a highly available and scalable architecture.

Deployment considerations

Docker registries require network access from each host in the cluster, as each host downloads images from the Docker registry when your YARN application is running on the cluster. How you choose to deploy your EMR cluster (launching it into a public or private subnet) may limit your choice of Docker registry due to network connectivity requirements.

Public subnet

With EMR public subnet clusters, nodes running YARN NodeManager can directly access any registry available over the internet, such as Docker Hub, as shown in the following diagram.

Private Subnet

With EMR private subnet clusters, nodes running YARN NodeManager don’t have direct access to the internet.  Docker images can be hosted in the ECR and accessed through AWS PrivateLink, as shown in the following diagram.

For details on how to use AWS PrivateLink to allow access to ECR in a private subnet scenario, see Setting up AWS PrivateLink for Amazon ECS, and Amazon ECR.

Configuring Docker registries

Docker must be configured to trust the specific registry used to resolve Docker images. The default trust registries are local (private) and centos (on public Docker Hub). You can override docker.trusted.registries in /etc/hadoop/conf/container-executor.cfg to use other public repositories or ECR. To override this configuration, use the EMR Classification API with the container-executor classification key.

The following example shows how to configure the cluster to trust both a public repository (your-public-repo) and an ECR registry (123456789123.dkr.ecr.us-east-1.amazonaws.com). When using ECR, replace this endpoint with your specific ECR endpoint.  When using Docker Hub, please replace this repository name with your actual repository name.

[
  {
    "Classification": "container-executor",
    "Configurations": [
        {
            "Classification": "docker",
            "Properties": {
                "docker.trusted.registries": "local,centos, your-public-repo,123456789123.dkr.ecr.us-east-1.amazonaws.com",
                "docker.privileged-containers.registries": "local,centos, your-public-repo,123456789123.dkr.ecr.us-east-1.amazonaws.com"
            }
        }
    ]
  }
]

To launch an EMR 6.0.0 (Beta) cluster with this configuration using the AWS Command Line Interface (AWS CLI), create a file named container-executor.json with the contents of the preceding JSON configuration.  Then, use the following commands to launch the cluster:

$ export KEYPAIR=<Name of your Amazon EC2 key-pair>
$ export SUBNET_ID=<ID of the subnet to which to deploy the cluster>
$ export INSTANCE_TYPE=<Name of the instance type to use>
$ export REGION=<Region to which to deploy the cluster deployed>

$ aws emr create-cluster \
    --name "EMR-6-Beta Cluster" \
    --region $REGION \
    --release-label emr-6.0.0-beta \
    --applications Name=Hadoop Name=Spark \
    --service-role EMR_DefaultRole \
    --ec2-attributes KeyName=$KEYPAIR,InstanceProfile=EMR_EC2_DefaultRole,SubnetId=$SUBNET_ID \
    --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=$INSTANCE_TYPE InstanceGroupType=CORE,InstanceCount=2,InstanceType=$INSTANCE_TYPE \
    --configuration file://container-executor.json

Using ECR

If you’re new to ECR, follow the instructions in Getting Started with Amazon ECR and verify you have access to ECR from each instance in your EMR cluster.

To access ECR using the docker command, you must first generate credentials. To make sure that YARN can access images from ECR, pass a reference to those generated credentials using the container environment variable YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG.

Run the following command on one of the core nodes to get the login line for your ECR account.

$ aws ecr get-login --region us-east-1 --no-include-email

The get-login command generates the correct Docker CLI command to run to create credentials. Copy and run the output from get-login.

$ sudo docker login -u AWS -p <password> https://<account-id>.dkr.ecr.us-east-1.amazonaws.com

This command generates a config.json file in the /root/.docker folder.  Copy this file to HDFS so that jobs submitted to the cluster can use it to authenticate to ECR.

Execute the commands below to copy the config.json file to your home directory.

$ mkdir -p ~/.docker
$ sudo cp /root/.docker/config.json ~/.docker/config.json
$ sudo chmod 644 ~/.docker/config.json

Execute the commands below to put the config.json in HDFS so it may be used by jobs running on the cluster.

$ hadoop fs -put ~/.docker/config.json /user/hadoop/

At this point, YARN can access ECR as a Docker image registry and pull containers during job execution.

Using Spark with Docker

With EMR 6.0.0 (Beta), Spark applications can use Docker containers to define their library dependencies, instead of requiring dependencies to be installed on the individual Amazon EC2 instances in the cluster. This integration requires configuration of the Docker registry, and definition of additional parameters when submitting a Spark application.

When the application is submitted, YARN invokes Docker to pull the specified Docker image and run the Spark application inside of a Docker container. This allows you to easily define and isolate dependencies. It reduces the time spent bootstrapping or preparing instances in the EMR cluster with the libraries needed for job execution.

When using Spark with Docker, make sure that you consider the following:

  • The docker package and CLI are only installed on core and task nodes.
  • The spark-submit command should always be run from a master instance on the EMR cluster.
  • The Docker registries used to resolve Docker images must be defined using the Classification API with the container-executor classification key to define additional parameters when launching the cluster:
    • docker.trusted.registries
    • docker.privileged-containers.registries
  • To execute a Spark application in a Docker container, the following configuration options are necessary:
    • YARN_CONTAINER_RUNTIME_TYPE=docker
    • YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={DOCKER_IMAGE_NAME}
  • When using ECR to retrieve Docker images, you must configure the cluster to authenticate itself. To do so, you must use the following configuration option:
    • YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={DOCKER_CLIENT_CONFIG_PATH_ON_HDFS}
  • Mount the /etc/passwd file into the container so that the user running the job can be identified in the Docker container.
    • YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro
  • Any Docker image used with Spark must have Java installed in the Docker image.

Creating a Docker image

Docker images are created using a Dockerfile, which defines the packages and configuration to include in the image.  The following two example Dockerfiles use PySpark and SparkR.

PySpark Dockerfile

Docker images created from this Dockerfile include Python 3 and the numpy Python package.  This Dockerfile uses Amazon Linux 2 and the Amazon Corretto JDK 8.

FROM amazoncorretto:8

RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development

RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv

RUN python -V
RUN python3 -V

ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3

RUN pip3 install --upgrade pip
RUN pip3 install numpy panda

RUN python3 -c "import numpy as np"

SparkR Dockerfile

Docker images created from this Dockerfile include R and the randomForest CRAN package. This Dockerfile includes Amazon Linux 2 and the Amazon Corretto JDK 8.

FROM amazoncorretto:8

RUN java -version

RUN yum -y update
RUN amazon-linux-extras enable R3.4

RUN yum -y install R R-devel openssl-devel
RUN yum -y install curl

#setup R configs
RUN echo "r <- getOption('repos'); r['CRAN'] <- 'http://cran.us.r-project.org'; options(repos = r);" > ~/.Rprofile

RUN Rscript -e "install.packages('randomForest')"

For more information on Dockerfile syntax, see the Dockerfile reference documentation.

Using Docker images from ECR

Amazon Elastic Container Registry (ECR) is a fully-managed Docker container registry that makes it easy for developers to store, manage, and deploy Docker container images. When using ECR, the cluster must be configured to trust your instance of ECR, and you must configure authentication in order for the cluster to use Docker images from ECR.

In this example, our cluster must be created with the following additional configuration, to ensure the ECR registry is trusted. Please replace the 123456789123.dkr.ecr.us-east-1.amazonaws.com endpoint with your ECR endpoint.

[
  {
    "Classification": "container-executor",
    "Configurations": [
        {
            "Classification": "docker",
            "Properties": {
                "docker.trusted.registries": "local,centos,123456789123.dkr.ecr.us-east-1.amazonaws.com",
                "docker.privileged-containers.registries": "local,centos, 123456789123.dkr.ecr.us-east-1.amazonaws.com"
            }
        }
    ]
  }
]

Using PySpark with ECR

This example uses the PySpark Dockerfile.  It will be tagged and upload to ECR. Once uploaded, you will run the PySpark job and reference the Docker image from ECR.

After you launch the cluster, use SSH to connect to a core node and run the following commands to build the local Docker image from the PySpark Dockerfile example.

First, create a directory and a Dockerfile for our example.

$ mkdir pyspark

$ vi pyspark/Dockerfile

Paste the contents of the PySpark Dockerfile and run the following commands to build a Docker image.

$ sudo docker build -t local/pyspark-example pyspark/

Create the emr-docker-examples ECR repository for our examples.

$ aws ecr create-repository --repository-name emr-docker-examples

Tag and upload the locally built image to ECR, replacing 123456789123.dkr.ecr.us-east-1.amazonaws.com with your ECR endpoint.

$ sudo docker tag local/pyspark-example 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-example
$ sudo docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-example

Use SSH to connect to the master node and prepare a Python script with the filename main.py. Paste the following content into the main.py file and save it.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-numpy").getOrCreate()
sc = spark.sparkContext

import numpy as np
a = np.arange(15).reshape(3, 5)
print(a)

To submit the job, reference the name of the Docker. Define the additional configuration parameters to make sure that the job execution uses Docker as the runtime. When using ECR, the YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG must reference the config.json file containing the credentials used to authenticate to ECR.

$ DOCKER_IMAGE_NAME=123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-example
$ DOCKER_CLIENT_CONFIG=hdfs:///user/hadoop/config.json
$ spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
--num-executors 2 \
main.py -v

When the job has completed, take note of the YARN application ID, and use the following command to obtain the output of the PySpark job.

$ yarn logs --applicationId application_id | grep -C2 '\[\['
LogLength:55
LogContents:
[[ 0  1  2  3  4]
 [ 5  6  7  8  9]
 [10 11 12 13 14]]

Using SparkR with ECR

This example uses the SparkR Dockerfile. It will be tagged and upload to ECR. Once uploaded, you will run the SparkR job and reference the Docker image from ECR.

After you launch the cluster, use SSH to connect to a core node and run the following commands to build the local Docker image from the SparkR Dockerfile example.

First, create a directory and the Dockerfile for this example.

$ mkdir sparkr

$ vi sparkr/Dockerfile

Paste the contents of the SparkR Dockerfile and run the following commands to build a Docker image.

$ sudo docker build -t local/sparkr-example sparkr/

Tag and upload the locally built image to ECR, replacing 123456789123.dkr.ecr.us-east-1.amazonaws.com with your ECR endpoint.

$ sudo docker tag local/sparkr-example 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:sparkr-example
$ sudo docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:sparkr-example

Use SSH to connect to the master node and prepare an R script with name sparkR.R. Paste the following contents into the sparkR.R file.

library(SparkR)
sparkR.session(appName = "R with Spark example", sparkConfig = list(spark.some.config.option = "some-value"))

sqlContext <- sparkRSQL.init(spark.sparkContext)
library(randomForest)
# check release notes of randomForest
rfNews()

sparkR.session.stop()

To submit the job, reference the name of the Docker. Define the additional configuration parameters to make sure that the job execution uses Docker as the runtime. When using ECR, the YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG must reference the config.json file containing the credentials used to authenticate to ECR.

$ DOCKER_IMAGE_NAME=123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:sparkr-example
$ DOCKER_CLIENT_CONFIG=hdfs:///user/hadoop/config.json
$ spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
sparkR.R

When the job has completed, note the YARN application ID, and use the following command to obtain the output of the SparkR job. This example includes testing to make sure that the randomForest library, version installed, and release notes are available.

$ yarn logs --applicationId application_id | grep -B4 -A10 "Type rfNews"
randomForest 4.6-14
Type rfNews() to see new features/changes/bug fixes.
Wishlist (formerly TODO):

* Implement the new scheme of handling classwt in classification.

* Use more compact storage of proximity matrix.

* Allow case weights by using the weights in sampling?

========================================================================
Changes in 4.6-14:

Using a Docker image from Docker Hub

To use Docker Hub, you must deploy your cluster to a public subnet, and configure it to use Docker Hub as a trusted registry. In this example, the cluster needs the following additional configuration to to make sure that the your-public-repo repository on Docker Hub is trusted. When using Docker Hub, please replace this repository name with your actual repository.

[
  {
    "Classification": "container-executor",
    "Configurations": [
        {
            "Classification": "docker",
            "Properties": {
                "docker.trusted.registries": "local,centos,your-public-repo ",
                "docker.privileged-containers.registries": "local,centos,your-public-repo"
            }
        }
    ]
  }
]

Beta limitations

EMR 6.0.0 (Beta) focuses on helping you get value from using Docker with Spark to simplify dependency management. You can also use EMR 6.0.0 (Beta) to get familiar with Amazon Linux 2, and Amazon Corretto JDK 8.

The EMR 6.0.0 (Beta) supports the following applications:

  • Spark 2.4.3
  • Livy 0.6.0
  • ZooKeeper 3.4.14
  • Hadoop 3.1.0

This beta release is supported in the following Regions:

  • US East (N. Virginia)
  • US West (Oregon)

The following EMR features are currently not available with this beta release:

  • Cluster integration with AWS Lake Formation
  • Native encryption of Amazon EBS volumes attached to an EMR cluster

Conclusion

In this post, you learned how to use an EMR 6.0.0 (Beta) cluster to run Spark jobs in Docker containers and integrate with both Docker Hub and ECR. You’ve seen examples of both PySpark and SparkR Dockerfiles.

The EMR team looks forward to hearing about how you’ve used this integration to simplify dependency management in your projects. If you have questions or suggestions, feel free to leave a comment.


About the Authors

Paul Codding is a senior product manager for EMR at Amazon Web Services.

 

 

 

 

Ajay Jadhav is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Rentao Wu is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Stephen Wu is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Migrate and deploy your Apache Hive metastore on Amazon EMR

Post Syndicated from Tanzir Musabbir original https://aws.amazon.com/blogs/big-data/migrate-and-deploy-your-apache-hive-metastore-on-amazon-emr/

Combining the speed and flexibility of Amazon EMR with the utility and ubiquity of Apache Hive provides you with the best of both worlds. However, getting started with big data projects can feel intimidating. Whether you want to deploy new data on EMR or migrate an existing project, this post provides you with the basics to get started.

Apache Hive is an open-source data warehouse and analytics package that runs on top of an Apache Hadoop cluster. A Hive metastore contains a description of the table and the underlying data making up its foundation, including the partition names and data types. Hive is one of the applications that can run on EMR.

Most of the solutions that this post presents assume that you use Apache Hadoop to manage your metastore, which provides scalability for Hive. If you don’t use Hadoop, see documentation for Amazon EMR.

Hive metastore deployment

You can choose one of three configuration patterns for your Hive metastore: embedded, local, or remote. When migrating an on-premises Hadoop cluster to EMR, your migration strategy depends on your existing Hive metastore’s configuration.

Bear in mind a few key facts while considering your set-up. Apache Hive ships with the Derby database, which you can use for embedded metastores. However, Derby can’t scale for production-level workloads.

When running off EMR, Hive records metastore information in a MySQL database on the master node’s file system as ephemeral storage, creating a local metastore. When a cluster terminates, all cluster nodes shut down, including that master node, which erases your data.

To get around these problems, create an external Hive metastore. This helps ensure that the Hive metadata store can scale with your implementation and that the metastore persists even if the cluster terminates.

There are two options for creating an external Hive metastore for EMR:

Using the AWS Glue Data Catalog as the Hive metastore

The AWS Glue Data Catalog is flexible and reliable, making it a great choice when you’re new to building or maintaining a metastore. Because AWS manages the service for you, it means investing less time and resources to the process, but it also sacrifices some fine control. The Data Catalog is highly available, fault-tolerant, maintains data replicas to avoid failure, and expands hardware depending on usage.

You don’t have to manage the Hive metastore database instance separately, maintain ongoing replication, or scale up the instance. An AWS Glue Data Catalog can supply one EMR cluster or many, as well as supporting Amazon Athena and Amazon Redshift Spectrum. You can also download the source code for the AWS Glue Data Catalog client for Apache Hive Metastore and use that code as a reference implementation for building a compatible client.

AWS Glue Data Catalog still allows you plenty of control. You can enable encryption on your files, or configure action access to allow or forbid certain processes. Bear in mind that the Data Catalog doesn’t currently support column statistics, Hive authorizations, or Hive constraints.

An AWS Glue Data Catalog has versions, which means a table can have multiple schema versions. AWS Glue stores that information in the Data Catalog, including the Hive metastore data. Based on the catalog configuration, you can adopt the new schema version or ignore new versions.

When you create an EMR cluster using release version 5.8.0 and later, you can choose a Data Catalog as the Hive metastore. The Data Catalog is not available with earlier releases.

Specify the AWS Glue Data Catalog using the EMR console

When you set up an EMR cluster, choose Advanced Options to enable AWS Glue Data Catalog settings in Step 1. Apache Hive, Presto, and Apache Spark all use the Hive metastore. Within EMR, you have options to use the AWS Glue Data Catalog for any of these applications.

Specify the AWS Glue Data Catalog using the AWS CLI or EMR API

To specify the AWS Glue Data Catalog when you create a cluster in either the AWS CLI or the EMR API, use the hive-site configuration classification. Set the value of hive.metastore.client.factory.class property to com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory.

[
  {
    "Classification": "hive-site",
    "Properties": {
      "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
    }
  }
]  

When you create an EMR cluster, save the configuration classification to a JSON file and then specify that file when you create the cluster. For more information, see Configuring Applications in the Amazon EMR Release Guide.

Using Amazon RDS or Amazon Aurora as the Hive metastore

If you want full control of your Hive metastore and want to integrate with other open-source applications such as Apache Ranger or Apache Atlas, then you can host your Hive metastore on Amazon RDS.

Always keep in mind that your Hive metastore is a single point of failure. Amazon RDS doesn’t automatically replicate databases, so you should enable replication when using Amazon RDS to avoid any data loss in the event of failure.

There are three main steps to set up your Hive metastore using RDS or Aurora:

  1. Create a MySQL or Aurora database.
  2. Configure the hive-site.xml file to point to MySQL or Aurora database.
  3. Specify an external Hive metastore.

Create a MySQL or Aurora database

Begin by setting up either your MySQL database on Amazon RDS or an Amazon Aurora database. Make a note of the URL, username, password, and database name, as you need all this information for the configuration process.

Update your database’s security group to allow JDBC connections between the EMR cluster and a MySQL database port (default: 3306).

Configure EMR for an external Hive metastore

To configure EMR, create a configuration file containing the following Hive site classification information:

  • jdo.option.ConnectionDriverName should reflect to driver org.mariadb.jdbc.Driver (preferred driver).
  • jdo.option.ConnectionURL, javax.jdo.option.ConnectionUserName and javax.jdo.option.ConnectionPassword should all point to the newly created database.
[
    {
      "Classification": "hive-site",
      "Properties": {
        "javax.jdo.option.ConnectionURL": "jdbc:mysql:\/\/hostname:3306\/hive?createDatabaseIfNotExist=true",
        "javax.jdo.option.ConnectionDriverName": "org.mariadb.jdbc.Driver",
        "javax.jdo.option.ConnectionUserName": "username",
        "javax.jdo.option.ConnectionPassword": "password"
      }
    }
]

Specify an external Hive metastore

After you save your configuration, specify an external Hive metastore. You can do this with either the EMR console or the AWS CLI.

On the EMR console, enter the classification settings created in the previous step as JSON file from S3 or embedded text.

If you are using the AWS CLI, save the classification information as a file named hive-configuration.json and pass the configuration file as a local file or from S3.

  • Hive-configuration.json file in local path:

aws emr create-cluster --release-label emr-5.17.0 --instance-type m4.large --instance-count 2 \
--applications Name=Hive --configurations ./hive-configuration.json --use-default-roles

  • Hive-configuration.json file in Amazon S3:

aws emr create-cluster --release-label emr-5.17.0 --instance-type m4.large --instance-count 2 \
--applications Name=Hive --configurations s3://emr-sample/hive-configuration.json --use-default-roles

Hive metastore migration options

When migrating Hadoop-based workloads from on-premises to the cloud, you must migrate your Hive metastore as well. Depending on the migration plan or your requirements, you can migrate a metastore one of two ways:

  • A one-time metastore migration, which moves an existing Hive metastore completely to AWS.
  • An ongoing metastore sync, which migrates the Hive metastore but also keeps a copy on-premises so that the two metastores can sync in real time during the migration phase.

One-time metastore migration

A one-and-done migration option allows you to shift your workspace entirely and never worry about migrating again. This situation is perfect if you plan to run your existing Hive workloads on EMR. The following diagram illustrates this scenario.

Migrating your Hive metastore to AWS Glue Data Catalog

In this case, your goal is to migrate existing Hive metastore from on-premises to an AWS Glue Data Catalog. There are multiple ways to navigate this migration, but the easiest uses an AWS Glue ETL job to extract metadata from your Hive metastore.  You then use AWS Glue jobs to load the metadata and update the AWS Glue Data Catalog. Many scripts to manage this process already exist on GitHub.

Migrating your Hive metastore to Amazon RDS or Amazon Aurora

Instead of using the AWS Glue Data Catalog, you can move your Hive metastore data from an on-premises database to AWS based storage. Depending on your database source and the desired target in AWS, the process requires different steps. For more information, see the following topics:

Ongoing metastore sync

Large-scale migrations benefit from an ongoing sync process, allowing you to keep running your Hive metastore in your data center as well as in the cloud during the migration phase.

The ongoing sync process keeps both Hive metastores accurate and up-to-date with any changes entered during the migration process. Use only one application for updating the Hive metastore. Otherwise, the metastore is out-of-sync.

AWS DMS is a data migration service ideal for on-going replication and custom-built for this need. You can also replicate the external database to Amazon RDS using the binary log file positions of replicated transactions.

Conclusion

This post pointed you to the various existing resources that can make your Hive migration as smooth and easy as possible.

The content of this blog post is part of the EMR Migration guide, which provides a comprehensive overview of advantages and disadvantages of each migration approach of Hadoop ecosystems. To read the paper, download the Amazon EMR Migration Guide now.

If you have additional insights or feedback, leave a comment here or reach out on Twitter!

 


About the Author

Tanzir Musabbir is an EMR Specialist Solutions Architect with AWS. He is an early adopter of open source Big Data technologies. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.

 

 

Modifying your cluster on the fly with Amazon EMR reconfiguration

Post Syndicated from Brandon Scheller original https://aws.amazon.com/blogs/big-data/modifying-your-cluster-on-the-fly-with-amazon-emr-reconfiguration/

If you are a developer or data scientist using long-running Amazon EMR clusters, you face fast-changing workloads. These changes often require different application configurations to run optimally on your cluster.

With the reconfiguration feature, you can now change configurations on running EMR clusters. Starting with EMR release emr-5.21.0, this feature allows you to modify configurations without creating a new cluster or manually connecting by SSH into each node.

In this post, I go over the following topics:

  • Using reconfiguration
  • Instance group states, configuration versions, and events
  • Reconfiguration example use cases
  • Reconfiguration benefits

Using reconfiguration

The following tasks are updated in EMR release emr-5.21.0:

  • Submitting a reconfiguration
  • Modifying configurations
  • Defining configuration levels

Submitting a reconfiguration

You can submit a recognition through the EMR console, SDK, or AWS CLI. For more information, see submitting a reconfiguration and additional information.

Modifying configurations

When submitting a reconfiguration, you must include all of the configurations you want to apply to the cluster. The update only applies those items, removing all others. As you modify configurations, the EMR console also tracks your previous cluster configurations for you.

Defining configuration levels

Define cluster-level and instance-group-level configurations for your applications. Supply cluster-level configurations as you create a cluster. These configurations are then automatically applied to all your instance groups, even those added after the cluster’s up and running. After the configuration starts, you can’t modify your cluster-level configurations. But you can supplement or override those configurations on the instance-group level through reconfiguration requests. Whenever you submit a reconfiguration request for an instance group, these new instance-group-level configurations take precedence over inherited cluster-level configurations.

To better understand how cluster-level and instance-group-level configurations work together on an instance group, look at a simple demonstration in the EMR console:

Under the Configuration tab, select an instance group in the Filter drop-down list. Navigate to the desired instance group’s configuration table. The Source column of the configuration table indicates the level of your configurations.

This cluster starts with the cluster-level configuration set:

[
  {
    "Classification": "core-site",
    "Properties": {
      "Key-A": "Value-1",
      "Key-B": "Value-2"
    }
  }
]

As you can see in the console, the instance group ig-Y4E3MN8C4YBP automatically inherited the cluster-level configuration set. Now, reconfigure the instance group as follows:

[
  {
    "Classification": "core-site",
    "Properties": {
      "Key-A": "Value-a",
      "Key-C": "Value-3"
    }
  }
]

Once the request goes through, the value of configuration “Key-A” gets overridden by the instance-group-level configuration and changes from “Value-1” to “Value-a.”  In contrast, the value of configuration “Key-B” remains unchanged. Meanwhile, your request introduces the new, supplemental configuration, “Key-C.” The configuration table in your console always displays these kinds of subtle changes.

For more information about how to customize cluster-level and instance-group-level configurations, see Supplying a Configuration when Creating a Cluster.

Instance group states, configuration versions, and events

The states of your reconfiguration requests appear in instance group state transitions, configuration version increases, and CloudWatch events. Understand how each works to keep from losing track of any reconfiguration request:

  • Instance group states: After an instance group receives a reconfiguration request, it transitions from the RUNNING state to the RECONFIGURING state. The RECONFIGURING state indicates the start of the reconfiguration process. After the process completes and the new configurations have taken effect, the instance group returns to the RUNNING state. Then, you can verify your configurations either via your application’s Web UI or application-specific commands.
  • Configuration versions: Every reconfiguration request you submit establishes a new configuration set, distinguished by a new version number. Configuration versions start from 0 and increase by 1 for each new configuration set that you submit. Each instance group keeps its respective configuration version number. Version numbers increase depending on the number of times that you reconfigure the different instance groups.
  • Events:EMR posts a state for each reconfiguration request as an Amazon CloudWatch event. These events list the exact times when the request is submitted, the reconfiguration operation starts, and when it completes. For easy tracking, each request is posted together with its associated configuration version. For example, the following event flow shows how EMR executes a typical reconfiguration request in an instance group:

For a complete list of EMR events and instance group state transitions for reconfiguration operation, see the EMR Management Guide.

Reconfiguration example use cases

Here are some use case examples of reconfiguration operations:

  • Reconfiguring HDFS blocksize
  • Configuring capacity-scheduler queues

Reconfiguring HDFS blocksize

You may deal with fluctuating workloads. These changes can call for new application configurations throughout the lifetime of a cluster.

For example, suppose that you’ve recently seen growth in the workload and filesize for your long-running cluster. You’d like to account for this growth without replacing your current cluster.

To increase your HDFS block size for better performance, take advantage of the new reconfiguration feature. HDFS NameNode tracks each data block in your cluster. Increasing this block size could increase HDFS performance by reducing the number of blocks watched by NameNode. In addition, this feature improves job performance by reducing the number of required mappers.

To increase the HDFS block size from the default of 128 GB to 256 GB, submit a reconfiguration request to the master instance group, which runs the same node:

$ aws emr modify-instance-groups --cli-input-json file://reconfiguration.json

Here’s the example reconfiguration.json file.

reconfiguration.json:
{
  "ClusterId": "j-MyClusterID",
  "InstanceGroups": [
  {
    "InstanceGroupId": "ig-MyMasterId",
    "Configurations": [
    {
      "Classification": "hdfs-site",
      "Properties": {
        "dfs.blocksize": "256m"
      },
    "Configurations": []
    }]
  }]
}

The EMR reconfiguration process then modifies the “dfs.blocksize” parameter to the provided “256 m” value within the hdfs-size.xml file. The reconfiguration process also automatically restarts NameNode, to pick up the new configuration. Any new blocks added to the cluster automatically use the new default blocksize of 256 MB. If you’d like any existing blocks to pick up this default, follow these steps:

  1. Copy the blocks to a new location.
  2. Delete the originals.
  3. Copy the blocks back to their original location.

The restored blocks pick up the new default block size. NameNode is inactive during the short restart period.

Configuring capacity-scheduler queues

Do you want to change cluster resources sharing strategies among different Hadoop jobs? Modify YARN CapacityScheduler configurations on a running cluster? Add new queues on a large shared cluster that you manage with another organization? Alter the capacity allocation between different queues to meet your changing workloads?

Using the EMR reconfiguration feature, you can make changes by submitting a reconfiguration request to the master node. New configurations take effect on your queues in a few minutes. You don’t have to go through the hassle of logging into the master node, directly updating the configuration file, or manually refresh queues.

EMR clusters come with a single queue by default. To create two additional queues, alpha, and beta, and allocate each 30% of the total resource capacity of your cluster to each of them. Here’s a sample command that submits a reconfiguration request to accomplish the desired change:

$ aws emr modify-instance-groups --cli-input-json file://reconfiguration.json

Here’s the example reconfiguration.json file.

reconfiguration.json:
{
   "ClusterId":"j-MyClusterID",
   "InstanceGroups":[
      {
         "InstanceGroupId":"ig-MyMasterId",
         "Configurations":[
            {
               "Classification":"capacity-scheduler",
               "Properties":{
                  "yarn.scheduler.capacity.root.queues":"default,alpha,beta",
                  "yarn.scheduler.capacity.root.default.capacity":"40",
                  "yarn.scheduler.capacity.root.default.accessible-node-labels.CORE.capacity":"40",
                  "yarn.scheduler.capacity.root.alpha.capacity":"30",
                  "yarn.scheduler.capacity.root.alpha.accessible-node-labels":"*",
                  "yarn.scheduler.capacity.root.alpha.accessible-node-labels.CORE.capacity":"30",
                  "yarn.scheduler.capacity.root.beta.capacity":"30",
                  "yarn.scheduler.capacity.root.beta.accessible-node-labels":"*",
                  "yarn.scheduler.capacity.root.beta.accessible-node-labels.CORE.capacity":"30"
               },
               "Configurations":[]
            }
         ]
      }
   ]
}

Access to the “*” label was given to both queues so that each can access labeled core nodes. Additionally, the sum of capacities for all queues must be equal to 100. The capacity of the default queue decreases to 40%.

Finally, the capacity for each queue’s access to the core label matches the capacity of the queue itself. That means that the core partition splits between queues at the same ratio as the rest of the cluster.

After completing this step, go to the YARN ResourceManager Web UI to verify that your modifications have taken place.

EMR reconfiguration benefits

The following are EMR reconfiguration benefits:

  • Rolling reconfiguration process
  • Reconfiguration failure and reversion

Rolling reconfiguration process

One key benefit of EMR reconfiguration is a rolling reconfiguration process. From the documentation:

“Amazon EMR follows a ‘rolling’ process to reconfigure the instances in the Task and Core instance groups. Only 10 percent of the instances in an instance group are modified and restarted at a time. This process takes longer to finish but reduces the chance of potential application failure in a running cluster.”

Rolling reconfiguration protects against any HDFS downtime by allowing 90% of core nodes to stay running during reconfiguration. YARN on EMR additionally has NodeManager recovery enabled. NodeManager recovers containers running after the reconfiguration restart.

Because containers are always active, some MapReduce jobs can continue to run successfully during the reconfiguration process. However, not all applications can recover after a restart. For example, Spark on YARN (the EMR default) may encounter executor issues and job failure after NodeManager restart.

Test applications with the type of reconfiguration that you plan to do in a safe environment before reconfiguring in production.

Finally, the rolling reconfiguration process might result in a temporary mismatch of your instance group’s configuration state. While mismatched, some instances may have old configurations while others may have the newly requested ones. When reconfiguring your cluster, consider any possible side effects of this situation.

Reconfiguration failure and reversion

EMR can also recover your instance group from a reconfiguration failure.

To make your new configuration take effect, EMR restarts your reconfigured applications and ensures that they are running before declaring the reconfiguration operation complete.

However, if any application fails to restart successfully on any node, the reconfiguration operation fails and the instance group remains in the RECONFIGURING state. Such failures might result from problematic configuration values. For example, an invalid address for `yarn.resourcemanager.scheduler.address` can cause the YARN ResourceManager to fail to restart.

In such situations, EMR automatically triggers a configuration reversion. Reversion re-applies the previous working configuration set on the instance group. Reversion brings the instance group state back to the RUNNING state as soon as the reversion completes. Your instance group thus returns to a functioning state and maintains the availability of your applications on the cluster. Rolling reconfiguration continues throughout the process.

If applications still fail to start after the previous working configurations have been re-applied, EMR places the instance group in te ARRESTED state rather than make further reconfiguration attempts. To release the instance group from the ARRESTED state, submit a new reconfiguration request.

Summary

In this post, I showed you the basics of how to configure instance groups on running clusters using the new EMR cluster reconfiguration feature. I walked through the extra semantics of submitting reconfiguration requests, important configuration level concepts, and ways of reconfiguration tracking methods. I provided some real-world reconfiguration examples and covered two useful features of reconfiguration.

Try the new cluster reconfiguration feature and share your experience with us in the comments below!

 


About the Authors

Brandon Scheller is a software development engineer for Amazon EMR. His passion lies in developing and advancing the applications of the Hadoop ecosystem and working with the open source community. He enjoys mountaineering in the Cascades with his free time.

 

 

 

Junyang Li is a software development engineer for Amazon EMR. She works on cutting-edge features of EMR and is also involved in open source projects. Besides work, she enjoys arts and crafts, exercising and traveling.

 

 

 

 

Performance updates to Apache Spark in Amazon EMR 5.24 – Up to 13x better performance compared to Amazon EMR 5.16

Post Syndicated from Paul Codding original https://aws.amazon.com/blogs/big-data/performance-updates-to-apache-spark-in-amazon-emr-5-24-up-to-13x-better-performance-compared-to-amazon-emr-5-16/

Amazon EMR release 5.24.0 includes several optimizations in Spark that improve query performance. To evaluate the performance improvements, we used TPC-DS benchmark queries with 3-TB scale and ran them on a 6-node c4.8xlarge EMR cluster with data in Amazon S3. We observed up to 13X better query performance on EMR 5.24 compared to EMR 5.16 when operating with a similar configuration.

Customers use Spark for a wide array of analytics use cases ranging from large-scale transformations to streaming, data science, and machine learning. They choose to run Spark on EMR because EMR provides the latest, stable open source community innovations, performant storage with Amazon S3, and the unique cost savings capabilities of Spot Instances and Auto Scaling.

Each monthly EMR release offers the latest open source packages, alongside new features such as multiple master nodes, and cluster reconfiguration. The team also adds performance improvements with each release.

Each of those optimizations helps you run faster and reduce costs. With EMR 5.24, we have made several new optimizations and are detailing three critical ones in this post.

Setup

To get started with EMR, sign into the console, launch a cluster, and process data.

To replicate the setup for the benchmarking queries, use the following configuration:

  • Applications installed on the cluster: Ganglia, Hive, Spark, Hadoop (installed by default).
  • EMR release: EMR 5.24.0
  • Cluster configuration
    • Master instance group: 1 c4.8xlarge instance with 512 GiB of GP2 EBS storage (4 volumes of 128 GiB each)
    • Core instance group: 5 c4.8xlarge instances with 512 GiB of GP2 EBS storage (4 volumes of 128 GiB each)
ClassificationProperties
yarn-siteyarn.nodemanager.resource.memory-mb : 53248
yarn.scheduler.maximum-allocation-vcores : 36
spark-defaultsspark.executor.memory : 4743m
spark.driver.memory : 2g
spark.sql.optimizer.distinctBeforeIntersect.enabled : true
spark.sql.dynamicPartitionPruning.enabled : true
spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled : true
spark.executor.cores : 4
spark.executor.memoryOverhead : 890m

Results observed using TPC-DS benchmarks

The following two graphs compare the total aggregate runtime and geometric mean for all queries in the TPC-DS 3TB query dataset between the EMR releases.

The per-query runtime improvement between EMR 5.16 and EMR 5.24 is also illustrated in the following chart. The horizontal axis shows each of the queries in the TPC-DS 3 TB benchmark. The vertical axis shows the orders of magnitude of performance improvement seen in EMR 5.24.0 relative to EMR 5.16.0 as measured by query execution time. The largest performance improvements can be seen in 26 of the queries. In each of these queries, the performance was at least 2X better than EMR 5.16.

Performance optimizations in EMR 5.24

While AWS made several incremental performance improvements aggregating to the overall speedup, this post describes three major improvements in EMR 5.24 that affect the most common customer workloads:

  • Dynamic partition pruning
  • Flatten scalar subqueries
  • DISTINCT before INTERSECT

Dynamic partition pruning

Dynamic partition pruning improves job performance by selecting specific partitions within a table that must be read and processed for a query. By reducing the amount of data read and processed, queries run faster. The open source version of Spark (2.4.2) only supports pushing down static predicates that can be resolved at plan time. Examples of static predicate push down include the following:

partition_col = 5

partition_col IN (1,3,5)

partition_col BETWEEN 1 AND 3

partition_col = 1 + 3

With dynamic partition pruning turned on, Spark on EMR infers the partitions that must be read at runtime. Dynamic partition pruning is disabled by default, and can be enabled by setting the Spark property spark.sql.dynamicPartitionPruning.enabled from within Spark or when creating clusters. For more information, see Configure Spark.

Here’s an example that joins two tables and relies on dynamic partition pruning to improve performance. The store_sales table contains total sales data partitioned by region, and store_regions table contains a mapping of regions for each country. In this representative query, you want to only get data from a specific country.

SELECT ss.quarter, ss.region, ss.store, ss.total_sales 
FROM store_sales ss, store_regions sr
WHERE ss.region sr.region AND sr.country = ’North America’

Without dynamic partition pruning, this query reads all regions, before filtering out the subset of regions that match the results of the subquery. With dynamic partition pruning, only the partitions for the regions returned in the subquery are read and processed. This saves time and resources by both reading less data from storage, and processing fewer records.

The following graph shows the performance improvements to Queries 72, 80, 17, and 25 from the TPC-DS suite that we tested with 3-TB data.

Flatten scalar subqueries

This optimization can improve query performance where multiple conditions must be applied to rows from a specific table. The optimization prevents the table from being read multiple times for each condition. This optimization detects such cases, and optimizes the query to read the table only one time.

Flatten scalar subqueries is disabled by default and can be enabled by setting the Spark property spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled from within Spark or when creating clusters.

To give an example of how this works, use the same total_sales table from the previous optimization. In this example, you want to group stores by their average sales when their average sales are in between specific ranges.

SELECT (SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 5000000 AND 10000000) AS group1, 
(SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 10000000 AND 15000000) AS group2, 
(SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 15000000 AND 20000000) AS group3  

With this optimization disabled, the total_sales table is read for each sub query. With the optimization enabled, the query is rewritten as follows to apply each of the conditions to the rows returned by reading the table only one time.

SELECT c1 AS group1, c2 AS group2, c3 AS group3 
FROM (SELECT avg (IF(total_sales BETWEEN 5000000 AND 10000000, total_sales, null)) AS c1, 
avg (IF(total_sales BETWEEN 10000000 AND 15000000, total_sales, null)) AS c2, 
avg (IF(total_sales BETWEEN 15000000 AND 20000000, total_sales, null)) AS c3 FROM store_sales);  

This optimization saves time and resources by both reading less data from storage, and processing fewer records.

To illustrate, take the example of Q9 from the TPCDS suite. The query runs 2.9x faster in version 5.24 compared to 5.16, when the relevant Spark property is switched on.

DISTINCT before INTERSECT

When producing the intersection of two collections, the result of that intersection is a set of unique values found in each collection. When dealing with large collections, many duplicate records must be both processed, and shuffled between hosts to finally calculate the intersection. This optimization eliminates duplicate values in each collection before computing the intersection, improving performance by reducing the amount of data shuffled between hosts.

This optimization is disabled by default and can be enabled by setting the Spark property spark.sql.optimizer.distinctBeforeIntersect.enabled from within Spark or when creating clusters.

For example (simplified from TPC-DS query14), you want to find all of the brands that are sold both in store and catalog sale channels. In this example, the store_sales table contains sale made through the store channel, the catalog_sales table contains sale made through catalog, and the item table contains each unique product’s formulation (e.g. brand, manufactuer).

(SELECT item.brand ss_brand FROM store_sales, item
WHERE store_sales.item_id = item.item_id)
INTERSECT
(SELECT item.brand cs_brand FROM catalog_sales, item 
WHERE catalog_sales.item_id = item.item_id) 

With this optimization disabled, the first SELECT statement produces 2,600,000 records (same number of records as store_sales) with only 1,200 unique brands. The second SELECT statement produces 1,500,000 records (same number of records as catalog_sales) with 300 unique brands. This results in all 4,100,000 rows being fed into the intersect operation to produce the 200 brands that exist in both results.

With the optimization enabled, a distinct operation is performed on each collection before being fed into the intersect operator, resulting in only 1,200 + 300 records being fed into the intersect operator. This optimization saves time and resources by shuffling less data between hosts.

Summary

With each of these performance optimizations to Apache Spark, you benefit from better query performance on EMR 5.24 compared to EMR 5.16. We look forward to feedback on how these optimizations benefit your real world workloads.

Stay tuned as we roll out additional updates to improve Apache Spark performance in EMR. To keep up-to-date, subscribe to the Big Data blog’s RSS feed to learn about more great Apache Spark optimizations, configuration best practices, and tuning advice. Be sure not to miss other great optimizations like using S3 Select with Spark, and the EMRFS S3-Optimized Committer from previous EMR releases.

 


About the Author

Paul Codding is a senior product manager for EMR at Amazon Web Services.

 

 

 

 

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

 

 

 

 

Joseph Marques is a principal engineer for EMR at Amazon Web Services.

 

 

 

 

Yuzhou Sun is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Atul Payapilly is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Surya Vadan Akivikolanu is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Amazon EMR Migration Guide

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/amazon-emr-migration-guide/

Businesses worldwide are discovering the power of new big data processing and analytics frameworks like Apache Hadoop and Apache Spark, but they are also discovering some of the challenges of operating these technologies in on-premises data lake environments. They may also have concerns about the future of their current distribution vendor.

To address this, we’ve introduced the Amazon EMR Migration Guide (first published June 2019.) This paper is a comprehensive guide to offer sound technical advice to help customers in planning how to move from on-premises big data deployments to EMR.

Common problems of on-premises big data environments include a lack of agility, excessive costs, and administrative headaches, as IT organizations wrestle with the effort of provisioning resources, handling uneven workloads at large scale, and keeping up with the pace of rapidly changing, community-driven, open-source software innovation. Many big data initiatives suffer from the delay and burden of evaluating, selecting, purchasing, receiving, deploying, integrating, provisioning, patching, maintaining, upgrading, and supporting the underlying hardware and software infrastructure.

A subtler, if equally critical, problem is the way companies’ data center deployments of Apache Hadoop and Apache Spark directly tie together the compute and storage resources in the same servers, creating an inflexible model where they must scale in lock step. This means that almost any on-premises environment pays for high amounts of under-used disk capacity, processing power, or system memory, as each workload has different requirements for these components. Typical workloads run on different types of clusters, at differing frequencies and times of day. These big data workloads should be freed to run whenever and however is most efficient, while still accessing the same shared underlying storage or data lake. See Figure 1. below for an illustration.

How can smart businesses find success with their big data initiatives? Migrating big data (and machine learning) to the cloud offers many advantages. Cloud infrastructure service providers, such as AWS offer a broad choice of on-demand and elastic compute resources, resilient and inexpensive persistent storage, and managed services that provide up-to-date, familiar environments to develop and operate big data applications. Data engineers, developers, data scientists, and IT personnel can focus their efforts on preparing data and extracting valuable insights.

Services like Amazon EMR, AWS Glue, and Amazon S3 enable you to decouple and scale your compute and storage independently, while providing an integrated, well-managed, highly resilient environment, immediately reducing so many of the problems of on-premises approaches. This approach leads to faster, more agile, easier to use, and more cost-efficient big data and data lake initiatives.

However, the conventional wisdom of traditional on-premises Apache Hadoop and Apache Spark isn’t always the best strategy in cloud-based deployments. A simple lift and shift approach to running cluster nodes in the cloud is conceptually easy but suboptimal in practice. Different design decisions go a long way towards maximizing your gains as you migrate big data to a cloud architecture.

This guide provides the best practices for:

  • Migrating data, applications, and catalogs
  • Using persistent and transient resources
  • Configuring security policies, access controls, and audit logs
  • Estimating and minimizing costs, while maximizing value
  • Leveraging the AWS Cloud for high availability (HA) and disaster recovery (DR)
  • Automating common administrative tasks

Although not intended as a replacement for professional services, this guide covers a wide range of common questions, and scenarios as you migrate your big data and data lake initiatives to the cloud.

When starting your journey for migrating your big data platform to the cloud, you must first decide how to approach migration. One approach is to re-architect your platform to maximize the benefits of the cloud. The other approach is known as lift and shift, is to take your existing architecture and complete a straight migration to the cloud. A final option is a hybrid approach, where you blend a lift and shift with re-architecture. This decision is not straightforward as there are advantages and disadvantages of both approaches.

A lift and shift approach is usually simpler with less ambiguity and risk. Additionally, this approach is better when you are working against tight deadlines, such as when your lease is expiring for a data center. However, the disadvantage to a lift and shift is that it is not always the most cost effective, and the existing architecture may not readily map to a solution in the cloud.

A re-architecture unlocks many advantages, including optimization of costs and efficiencies. With re-architecture, you move to the latest and greatest software, have better integration with native cloud tools, and lower operational burden by leveraging native cloud products and services.

This paper provides advantages and disadvantages of each migration approach from the perspective of the Apache Spark and Hadoop ecosystems. To read the paper, download the Amazon EMR Migration Guide now.

For a more general resource on deciding which approach is ideal for your workflow, see An E-Book of Cloud Best Practices for Your Enterprise, which outlines the best practices for performing migrations to the cloud at a higher level.

 


About the Author

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 

 

Optimizing downstream data processing with Amazon Kinesis Data Firehose and Amazon EMR running Apache Spark

Post Syndicated from Srikanth Kodali original https://aws.amazon.com/blogs/big-data/optimizing-downstream-data-processing-with-amazon-kinesis-data-firehose-and-amazon-emr-running-apache-spark/

For most organizations, working with ever-increasing volumes of data and incorporating new data sources can be a challenge.  Often, AWS customers have messages coming from various connected devices and sensors that must be efficiently ingested and processed before further analysis.  Amazon S3 is a natural landing spot for data of all types.  However, the way data is stored in Amazon S3 can make a significant difference in the efficiency and cost of downstream data processing.  Specifically, Apache Spark can be over-burdened with file operations if it is processing a large number of small files versus fewer larger files.  Each of these files has its own overhead of a few milliseconds for opening, reading metadata information, and closing. This overhead of file operations on these large numbers of files results in slow processing. This blog post shows how to use Amazon Kinesis Data Firehose to merge many small messages into larger messages for delivery to Amazon S3.  This results in faster processing with Amazon EMR running Spark.

Like Amazon Kinesis Data Streams, Kinesis Data Firehose accepts a maximum incoming message size of 1 MB.  If a single message is greater than 1 MB, it can be compressed before placing it on the stream.  However, at large volumes, a message or file size of 1 MB or less is usually too small.  Although there is no right answer for file size, 1 MB for many datasets would just yield too many files and file operations.

This post also shows how to read the compressed files using Apache Spark that are in Amazon S3, which does not have a proper file name extension and store back in Amazon S3 in parquet format.

Solution overview

The steps we follow in this blog post are:

  1. Create a virtual private cloud (VPC) and an Amazon S3 bucket.
  2. Provision a Kinesis data stream, and an AWS Lambda function to process the messages from the Kinesis data stream.
  3. Provision Kinesis Data Firehose to deliver messages to Amazon S3 sent from the Lambda function in step 2. This step also provisions an Amazon EMR cluster to process the data in Amazon S3.
  4. Generate test data with custom code running on an Amazon EC2
  5. Run a sample Spark program from the Amazon EMR cluster’s master instance to read the files from Amazon S3, convert them into parquet format and write back to an Amazon S3 destination.

The following diagram explains how the services work together:

The AWS Lambda function in the diagram reads the messages, append additional data to them, and compress them with gzip before sending to Amazon Kinesis Data Firehose. The reason for this is most customers need some enrichment to the data before arriving to Amazon S3.

Amazon Kinesis Data Firehose can buffer incoming messages into larger records before delivering them to your Amazon S3 bucket. It does so according to two conditions, buffer size (up to 128 MB) and buffer interval (up to 900 seconds). Record delivery is triggered once either of these conditions has been satisfied.

An Apache Spark job reads the messages from Amazon S3, and stores them in parquet format. With parquet, data is stored in a columnar format that provides more efficient scanning and enables ad hoc querying or further processing by services like Amazon Athena.

Considerations

The maximum size of a record sent to Kinesis Data Firehose is 1,000 KB. If your message size is greater than this value, compressing the message before it is sent to Kinesis Data Firehose is the best approach. Kinesis Data Firehose  also offers compression of messages after they are written to the Kinesis Data Firehose data stream. Unfortunately, this does not overcome the message size limitation, because this compression happens after the message is written. When Kinesis Data Firehose delivers a previously compressed message to Amazon S3 it is written as an object without a file extension. For example, if a message is compressed with gzip before it is written to Kinesis Data Firehose, it is delivered to Amazon S3 without the .gz extension. This is problematic if you are using Apache Spark for downstream processing because a “.gz” extension is required.

We will see how to overcome this issue by reading the files using the Amazon S3 API operations later in this blog.

Prerequisites and assumptions

To follow the steps outlined in this blog post, you need the following:

  • An AWS account that provides access to AWS services.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
  • The templates and code are intended to work in the US East (N. Virginia) Region only.

Additionally, be aware of the following:

  • We configure all services in the same VPC to simplify networking considerations.
  • Important: The AWS CloudFormation templates and the sample code that we provide use hardcoded user names and passwords and open security groups. These are for testing purposes only. They aren’t intended for production use without any modifications.

Implementing the solution

You can use this downloadable template for single-click deployment. This template is launched in the US East (N. Virginia) Region by default. Do not change to a different Region. The template is designed to work only in the US East (N. Virginia) Region. To launch directly through the console, choose the Launch Stack button.

This template takes the following parameters. Some of the parameters have default values, and you can’t edit these. These predefined names are hardcoded in the code. For some of the parameters, you must provide the values. The following table provides additional details.

For this parameterProvide this
StackNameProvide the stack name.

ClientIP

 

The IP address range of the client that is allowed to connect to the cluster using SSH.
FirehoseDeliveryStreamNameThe name of the Amazon Firehose delivery stream. Default value is set to “AWSBlogs-LambdaToFireHose”.
InstanceTypeThe EC2 instance type.
KeyNameThe name of an existing EC2 key pair to enable access to login.
KinesisStreamNameThe name of the Amazon Kinesis Stream. Default value is set to “AWS-Blog-BaseKinesisStream”
RegionAWS Region – By default it is us-east-1 — US East (N. Virginia). Do not change this as the scripts are developed to work in this Region only.

EMRClusterName

 

A name for the EMR cluster.
S3BucketNameThe name of the bucket that is created in your account. Provide some unique name to this bucket. This bucket is used for storing the messages and output from the Spark code.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names and for I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND. And then click on the Create button.

If you use this one-step solution, you can skip to Step 7: Generate test dataset and load into Kinesis Data Streams.

To create each component individually, use the following steps.

1. Use the AWS CloudFormation template to configure Amazon VPC and create an Amazon S3 bucket

In this step, we set up a VPC, public subnet, internet gateway, route table, and a security group. The security group has two inbound access rules. The first inbound rule allows access to the TCP port 22 (SSH) from the provided client IP CIDR range and the second inbound rule allows access to any TCP port from any host with in the same security group. We use this VPC and subnet for all other services that are created in the next steps. In addition to these resources, we will also create a standard Amazon S3 bucket with a provided bucket name to store the incoming data and processed data. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
S3BucketNameProvide a unique Amazon S3 bucket. This bucket is created in your account.
ClientIpProvide a CIDR IP address range that is added to inbound rule of the security group. You can get your current IP address from “checkip.amazon.com” web url.

After you specify the template details, choose Next. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
StackNameName
VPCIDVpc-xxxxxxx
SubnetIDsubnet-xxxxxxxx
SecurityGroupsg-xxxxxxxxxx
S3BucketDomain<S3_BUCKET_NAME>.s3.amazonaws.com
S3BucketARNarn:aws:s3:::<S3_BUCKET_NAME>

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

2.  Use the AWS CloudFormation template to create necessary IAM Roles

In this step, we set up two AWS IAM roles. One of the IAM roles will be used by an AWS Lambda function to allow access to Amazon S3 service, Amazon Kinesis Data Firehose, Amazon CloudWatch Logs, and Amazon EC2 instances.  The second IAM role is used by the Amazon Kinesis Data Firehose service to access Amazon S3 service. You can use this downloadable CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names. Choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
LambdaRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole
FirehoseRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole

When the stack launch is complete, it returns the output with information about the resources that were created. Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

3. Use an AWS CloudFormation template to configure the Amazon Kinesis Data Firehose data stream

In this step, we set up Amazon Kinesis Data Firehose with Amazon S3 as destination for the incoming messages. We select the Uncompressed option for compression format, buffering options with 128 MB size and interval seconds of 300. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
FirehoseDeliveryStreamNameProvide the name of the Amazon Kinesis Data Firehose delivery stream. The default value is set to “AWSBlogs-LambdaToFirehose”
RoleProvide the Kinesis Data Firehose IAM role ARN that was created as part of step 2.
S3BucketARNSelect the S3BucketARN. You can get this from the step 1 AWS CloudFormation output.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

4. Use an AWS CloudFormation template to create a Kinesis data stream and a Lambda function

In this step, we set up a Kinesis data stream and an AWS Lambda function. We can use the AWS Lambda function to process incoming messages in a Kinesis data stream. An event source mapping is also created as part of this template. This adds a trigger to the AWS Lambda function for the Kinesis data stream source. For more information about creating event source mapping, see Creating an Event Source Mapping. This Kinesis data stream is created with 10 shards and the Lambda function is created with a Java 8 runtime. We allocate memory size of 1920 MB and timeout of 300 seconds. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides details.

For this parameterDo this
StackNameProvide the stack name.
KinesisStreamNameProvide the name of the Amazon Kinesis stream. Default value is set to ‘AWS-Blog-BaseKinesisStream’
RoleProvide the IAM Role created for Lambda function as part of the second AWS CloudFormation template. Get the value from the output of second AWS CloudFormation template.
S3BucketProvide the existing Amazon S3 bucket name that was created using first AWS CloudFormation template. Do not use the domain name. Provide the bucket name only.
RegionSelect the AWS Region. By default it is us-east-1 — US East (N. Virginia).

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

5. Use an AWS CloudFormation template to configure the Amazon EMR cluster

In this step, we set up an Amazon EMR 5.16.0 cluster with “Spark”, “Ganglia” and “Hive” applications. We create this cluster with one master and two core nodes, and use an r4.xlarge instance type. The template uses an AWS Glue metastore for the Amazon EMR hive metastore. This Amazon EMR cluster is used to process the messages in Amazon S3 bucket that are created by the Amazon Kinesis Data Firehose data stream. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EMRClusterNameProvide the name for the EMR cluster.
ClusterSecurityGroupSelect the security group ID that was created as part of the first AWS CloudFormation template.
ClusterSubnetIDSelect the subnet ID that was created as part of the first AWS CloudFormation template.
AllowedCIDRProvide the IP address range of the client that is allowed to connect to the cluster.
KeyNameProvide the name of an existing EC2 key pair to access the Amazon EMR cluster.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EMRClusterMasterssh [email protected] -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

6. Use an AWS CloudFormation template to create an Amazon EC2 Instance to generate test data

In this step, we set up an Amazon EC2 instance and install open-jdk version 1.8. The AWS CloudFormation script that creates this EC2 instance runs two additional steps. First, it downloads and installs open-jdk version 1.8. Second, it downloads a Java program jar file on to the EC2 instance’s ec2-user home directory. We use this Java program to generate test data messages with an approximate size of ~900 KB. We then send them to the Kinesis data stream that was created as part of the previous steps. The Java jar file name is: “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar”.

You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EC2SecurityGroupSelect the security group ID that was created from the first AWS CloudFormation template.
EC2SubnetSelect the subnet that was created from the first AWS CloudFormation template.
InstanceTypeSelect the provided instance type. By default, it selects r4.4xlarge instance.
KeyNameName of an existing EC2 key pair to enable SSH access to the EC2 instance.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page select “I acknowledge that AWS CloudFormation might create IAM resources with custom names” option and, click Create button.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EC2Instancessh [email protected]<Public-IP> -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

7. Generate the test dataset and load into Kinesis Data Streams

After all of the previous AWS CloudFormation stacks are created successfully, log in to the EC2 instance that was created as part of the step 6. Use the “ssh” command as shown in the CloudFormation stack template output. This template copies the “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar” file, which we use to generate the test data and send to Amazon Kinesis Data Streams. You can find the code corresponding to this sample Kinesis producer in this Git repository.

Make sure your EC2 instance’s security group allows ssh port 22 (Inbound) from your IP address. If not, update your security group inbound access.

Run the following commands to generate some test data.

$ cd;

 

$ ls -ltra sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

-rwxr-xr-x 1 ec2-user ec2-user 27536802 Oct 29 21:19 sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

 

$java -Xms1024m -Xmx25600m -XX:+UseG1GC -cp sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar com.optimize.downstream.entry.Main 10000

 

This java program uses PutRecords API method that allows many records to be sent with a single HTTP request. For more information on this you can check this AWS blog. Once you run the above java program, you will see the below output that shows messages are in the process of sending to Kinesis Data Stream.

“Starting producer and consumer.....
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 0
Producer Thread # 9 is going to sleep mode for 500 ms.
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 1
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 2
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 3
::
::
Record sent to Kinesis Stream. Record size is ::: 5042850 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042726 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042729 KB”

When running the sample Kinesis producer jar, notice that the number of messages is 10,000. This program generates the test data messages and is not a replacement for your load testing tool. This is created to demonstrate the use case presented in this post.

After all of the messages generated and sent to Amazon Kinesis Data Streams, program will exit gracefully.

The sample JSON input message format is shown as follows:

   "processedDate":"2018/10/30 19:05:19",
   "currentDate":"2018/10/30 19:05:07",
   "hashDeviceId":"0c2745e4-c2d6-4d43-8339-9c2401e80e92",
   "deviceId":"94581b5f-a117-484a-8e3c-4fcc2dbd53b7",
   "accelerometerSensorList":[  
      {  
         "accelerometer_Y":8,
         "gravitySensor_X":5,
         "accelerometer_X":9,
         "gravitySensor_Z":4,
         "accelerometer_Z":1,
         "gravitySensor_Y":5,
         "linearAccelerationSensor_Z":3,
         "linearAccelerationSensor_Y":9,
         "linearAccelerationSensor_X":9
      },
      {  
         "accelerometer_Y":1,
         "gravitySensor_X":3,
         "accelerometer_X":5,
         "gravitySensor_Z":5,
         "accelerometer_Z":7,
         "gravitySensor_Y":9,
         "linearAccelerationSensor_Z":6,
         "linearAccelerationSensor_Y":5,
         "linearAccelerationSensor_X":3
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "tempSensorList":[  
      {  
         "kelvin":585.4928040286752,
         "celsius":43.329574923775425,
         "fahrenheit":50.13864584530086
      },
      {  
         "kelvin":349.95625855125814,
         "celsius":95.68423052685313,
         "fahrenheit":7.854854574219985
      },
 {
   …
 },
 {
   …
 },
 :
 :
 
   ],
   "illuminancesSensorList":[  
      {  
         "illuminance":44.65135784368194
      },
      {  
         "illuminance":98.15404017082403
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "gpsSensorList":[  
      {  
         "altitude":4.38273213294682,
         "heading":7.416314616289915,
         "latitude":5.759723677991661,
         "longitude":1.4732885894731842
      },
      {  
         "altitude":9.816473807569487,
         "heading":5.118919157684835,
         "latitude":3.581361614110458,
         "longitude":1.3699272610616127
      },
 {
   …
 },
 {
   …
 },
 :
 :
   }

 

Log in to the Kinesis Data Streams console, then choose the Kinesis data stream that was created as part of the step 4.  Choose the Monitor tab to see the graphs. Run the data generation utility for at least 15 mins to generate enough data.

8. Processing Kinesis Data Streams messages using AWS Lambda

As part of the previously-described setup, we also use an AWS Lambda function (name:LambdaForProcessingKinesisRecords) to process the messages from the Kinesis data stream. This Lambda function reads each message content and appends “additional data.” This demonstrates that the incoming message from Kinesis data stream is read, and appended with some additional information to make the message size more than 1 MB.  Several customers have a use case like this to enrich the incoming messages by adding additional information. After the AWS Lambda function appends additional data to incoming messages, it sends them to Amazon Kinesis Data Firehose. Because Kinesis Data Firehose accepts only messages that are less than 1 MB, we must compress the messages before sending to it. In the Lambda function, we are compressing the message using gzip compression before sending it to Kinesis Data Firehose. In addition to compressing each message, we are also appending a new line character (“/n”) to each message after compressing it to separate the messages.

We set the buffer size to 128 MB and duration of the buffer is 900 seconds while creating the Kinesis Data Firehose. This helps merge the incoming compressed messages into larger messages and sends to the provided Amazon S3 bucket.

The AWS Lambda function appends the following content to the original message in Kinesis Data Streams after reading it.

"testAdditonalDataList": [
  {
    "dimesnion_X": 9,
    "dimesnion_Y": 2,
    "dimesnion_Z": 2
  },
  {
    "dimesnion_X": 3,
    "dimesnion_Y": 10,
    "dimesnion_Z": 5
  }
  {
    …
  },
  {
    …
  },
  :
  :
]

If we do not compress the message before sending to Kinesis Data Firehose, it throws this error message in the Amazon CloudWatch Logs.

Here is the code snippet where we are compressing the message in the AWS Lambda function. The complete code can be found in this Git repository.

private String sendToFireHose(String mergedJsonString)
{
    PutRecordResult res = null;
    try {
        //To Firehose -
        System.out.println("MESSAGE SIZE BEFORE COMPRESSION IS : " + mergedJsonString.toString().getBytes(charset).length);
        System.out.println("MESSAGE SIZE AFTER GZIP COMPRESSION IS : " + compressMessage(mergedJsonString.toString().getBytes(charset)).length);
        PutRecordRequest req = new PutRecordRequest()
                .withDeliveryStreamName(firehoseStreamName);

        // Without compression - Send to Firehose
        //Record record = new Record().withData(ByteBuffer.wrap((mergedJsonString.toString() + "\r\n").getBytes()));

        // With compression - send to Firehose
        Record record = new Record().withData(ByteBuffer.wrap(compressMessage((mergedJsonString.toString() + "\r\n").getBytes())));
        req.setRecord(record);
        res = kinesisFirehoseClient.putRecord(req);
    }
    catch (IOException ie) {
        ie.printStackTrace();
    }
    return res.getRecordId();
}

You can check the provided bucket to see if the messages are flowing into the bucket. The Amazon S3 bucket should show something similar to the following example:

You see the files generated from Kinesis Data Firehose that do not have any extension. By default, Kinesis Data Firehose does not provide any extension to the files that are generated in Amazon S3 bucket unless you select a compression option. But in our use case, since the size of the uncompressed input message is greater than 1 MB, we are compressing it before sending to Kinesis Data Firehose. As the message is already compressed, we are not selecting any compression option in Kinesis Data Firehose, as it double-compresses the message and the downstream Spark application cannot process this.

9. Reading and converting the data into parquet format using Apache Spark program with Amazon EMR

As we noted down from the previous screen shot, Kinesis Data Firehose by default does not generate any file extensions to the files that are written into Amazon S3 bucket. This creates a problem while reading the files using Apache Spark. Apache Spark, by default, checks for a valid file name extension if the file is compressed. In this case for gzip compression, it looks for <filename>.gz to successfully read it.

To overcome this issue, we can use Amazon S3 API operations, particularly AmazonS3Client class, to list all the Amazon S3 keys and use Spark’s parallelize method to read the contents of the files. After reading the file content, we can uncompress it using GZipInputStream class. You can find the code snippet below. The complete code can be found in the Git repository.

val allLinesRDD = spark.sparkContext.parallelize(s3ObjectKeys).flatMap
{ key => Source.fromInputStream
  (
   new GZipInputStream(s3Client.getObject(bucketName, key).getObjectContent:   InputStream)
  ).getLines 
}

var finalDF = spark.read.json(allLinesRDD).toDF()

Once the Amazon EMR cluster creation is completed successfully, login to the Amazon EMR master machine using the following command. You can get the “ssh” login command from the AWS CloudFormation stack 5 (step 5) outputs parameter “EMRClusterMaster”.

  • ssh [email protected] -i <KEYPAIR_NAME>.pem
  • Make sure the security port 22 is opened to connect to the Amazon EMR master machine.

Run the Spark program using the following Spark submit command.

spark-submit --class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet --master yarn --deploy-mode client s3://aws-bigdata-blog/artifacts/aws-blog-optimize-downstream-data-processing/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar <S3_BUCKET_NAME> fromfirehose/<YEAR>/ output-emr-parquet/

Change the S3_BUCKET_NAME and YEAR values from the previous Spark command.

Argument #PropertyValue
1–classcom.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet
2–masteryarn
3–deploy-modeclient
4s3://aws-bigdata-blog/artifacts/aws-blog-avoid-small-files/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar
5S3_BUCKET_NAMEThe Amazon S3 bucket name that was created as part of the AWS CloudFormation template. The source files are created in this bucket.
6<INPUT S3 LOCATION>“fromfirehose/<YYYY>/”. The input files are created in this Amazon S3 key location under the bucket that was created. “YYYY” represents the current year. For example, “fromfirehose/2018/”
7<OUTPUT S3 LOCATION>Provide an output directory name that will be created under the above provided Amazon S3 bucket. For example: “output-emr-parquet/”

 

When the program finishes running, you can check the Amazon S3 output location to see the files that are written in parquet format.

Cleaning up after the migration

After completing and testing this solution, clean up the resources by stopping your tasks and deleting the AWS CloudFormation stacks. The stack deletion fails if you have any files in the created Amazon S3 bucket. Make sure that you cleaned up the Amazon S3 bucket that was created before deleting the AWS CloudFormation templates.

Conclusion

In this post, we described the process of avoiding small file creation in Amazon S3 by sending the incoming messages to Amazon Kinesis Data Firehose. We also went through the process of reading and storing the data in parquet format using Apache Spark with an Amazon EMR cluster.

 


About the Author

Photo of Srikanth KodaliSrikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.

 

 

Trigger cross-region replication of pre-existing objects using Amazon S3 inventory, Amazon EMR, and Amazon Athena

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/trigger-cross-region-replication-of-pre-existing-objects-using-amazon-s3-inventory-amazon-emr-and-amazon-athena/

In Amazon Simple Storage Service (Amazon S3), you can use cross-region replication (CRR) to copy objects automatically and asynchronously across buckets in different AWS Regions. CRR is a bucket-level configuration, and it can help you meet compliance requirements and minimize latency by keeping copies of your data in different Regions. CRR replicates all objects in the source bucket, or optionally a subset, controlled by prefix and tags.

Objects that exist before you enable CRR (pre-existing objects) are not replicated. Similarly, objects might fail to replicate (failed objects) if permissions aren’t in place, either on the IAM role used for replication or the bucket policy (if the buckets are in different AWS accounts).

In our work with customers, we have seen situations where large numbers of objects aren’t replicated for the previously mentioned reasons. In this post, we show you how to trigger cross-region replication for pre-existing and failed objects.

Methodology

At a high level, our strategy is to perform a copy-in-place operation on pre-existing and failed objects. This operation uses the Amazon S3 API to copy the objects over the top of themselves, preserving tags, access control lists (ACLs), metadata, and encryption keys. The operation also resets the Replication_Status flag on the objects. This triggers cross-region replication, which then copies the objects to the destination bucket.

To accomplish this, we use the following:

  • Amazon S3 inventory to identify objects to copy in place. These objects don’t have a replication status, or they have a status of FAILED.
  • Amazon Athena and AWS Glue to expose the S3 inventory files as a table.
  • Amazon EMR to execute an Apache Spark job that queries the AWS Glue table and performs the copy-in-place operation.

Object filtering

To reduce the size of the problem (we’ve seen buckets with billions of objects!) and eliminate S3 List operations, we use Amazon S3 inventory. S3 inventory is enabled at the bucket level, and it provides a report of S3 objects. The inventory files contain the objects’ replication status: PENDING, COMPLETED, FAILED, or REPLICA. Pre-existing objects do not have a replication status in the inventory.

Interactive analysis

To simplify working with the files that are created by S3 inventory, we create a table in the AWS Glue Data Catalog. You can query this table using Amazon Athena and analyze the objects.  You can also use this table in the Spark job running on Amazon EMR to identify the objects to copy in place.

Copy-in-place execution

We use a Spark job running on Amazon EMR to perform concurrent copy-in-place operations of the S3 objects. This step allows the number of simultaneous copy operations to be scaled up. This improves performance on a large number of objects compared to doing the copy operations consecutively with a single-threaded application.

Account setup

For the purpose of this example, we created three S3 buckets. The buckets are specific to our demonstration. If you’re following along, you need to create your own buckets (with different names).

We’re using a source bucket named crr-preexisting-demo-source and a destination bucket named crr-preexisting-demo-destination. The source bucket contains the pre-existing objects and the objects with the replication status of FAILED. We store the S3 inventory files in a third bucket named crr-preexisting-demo-inventory.

The following diagram illustrates the basic setup.

You can use any bucket to store the inventory, but the bucket policy must include the following statement (change Resource and aws:SourceAccount to match yours).

{
    "Version": "2012-10-17",
    "Id": "S3InventoryPolicy",
    "Statement": [
        {
            "Sid": "S3InventoryStatement",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::crr-preexisting-demo-inventory/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control",
                    "aws:SourceAccount": "111111111111"
                }
            }
        }
    ]
}

In our example, we uploaded six objects to crr-preexisting-demo-source. We added three objects (preexisting-*.txt) before CRR was enabled. We also added three objects (failed-*.txt) after permissions were removed from the CRR IAM role, causing CRR to fail.

Enable S3 inventory

You need to enable S3 inventory on the source bucket. You can do this on the Amazon S3 console as follows:

On the Management tab for the source bucket, choose Inventory.

Choose Add new, and complete the settings as shown, choosing the CSV format and selecting the Replication status check box. For detailed instructions for creating an inventory, see How Do I Configure Amazon S3 Inventory? in the Amazon S3 Console User Guide.

After enabling S3 inventory, you need to wait for the inventory files to be delivered. It can take up to 48 hours to deliver the first report. If you’re following the demo, ensure that the inventory report is delivered before proceeding.

Here’s what our example inventory file looks like:

You can also look on the S3 console on the objects’ Overview tab. The pre-existing objects do not have a replication status, but the failed objects show the following:

Register the table in the AWS Glue Data Catalog using Amazon Athena

To be able to query the inventory files using SQL, first you need to create an external table in the AWS Glue Data Catalog. Open the Amazon Athena console at https://console.aws.amazon.com/athena/home.

On the Query Editor tab, run the following SQL statement. This statement registers the external table in the AWS Glue Data Catalog.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo (
    `bucket` string,
    key string,
    replication_status string
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://crr-preexisting-demo-inventory/crr-preexisting-demo-source/crr-preexisting-demo/hive';

After creating the table, you need to make the AWS Glue Data Catalog aware of any existing data and partitions by adding partition metadata to the table. To do this, you use the Metastore Consistency Check utility to scan for and add partition metadata to the AWS Glue Data Catalog.

MSCK REPAIR TABLE crr_preexisting_demo;

To learn more about why this is required, see the documentation on MSCK REPAIR TABLE and data partitioning in the Amazon Athena User Guide.

Now that the table and partitions are registered in the Data Catalog, you can query the inventory files with Amazon Athena.

SELECT * FROM crr_preexisting_demo where dt='2019-02-24-04-00';

The results of the query are as follows.

The query returns all rows in the S3 inventory for a specific delivery date. You’re now ready to launch an EMR cluster to copy in place the pre-existing and failed objects.

Note: If your goal is to fix FAILED objects, make sure that you correct what caused the failure (IAM permissions or S3 bucket policies) before proceeding to the next step.

Create an EMR cluster to copy objects

To parallelize the copy-in-place operations, run a Spark job on Amazon EMR. To facilitate EMR cluster creation and EMR step submission, we wrote a bash script (available in this GitHub repository).

To run the script, clone the GitHub repo. Then launch the EMR cluster as follows:

$ git clone https://github.com/aws-samples/amazon-s3-crr-preexisting-objects
$ ./launch emr.sh

Note: Running the bash script results in AWS charges. By default, it creates two Amazon EC2 instances, one m4.xlarge and one m4.2xlarge. Auto-termination is enabled so when the cluster is finished with the in-place copies, it terminates.

The script performs the following tasks:

  1. Creates the default EMR roles (EMR_EC2_DefaultRole and EMR_DefaultRole).
  2. Uploads the files used for bootstrap actions and steps to Amazon S3 (we use crr-preexisting-demo-inventory to store these files).
  3. Creates an EMR cluster with Apache Spark installed using the create-cluster

After the cluster is provisioned:

  1. A bootstrap action installs boto3 and awscli.
  2. Two steps execute, copying the Spark application to the master node and then running the application.

The following are highlights from the Spark application. You can find the complete code for this example in the amazon-s3-crr-preexisting-objects repo on GitHub.

Here we select records from the table registered with the AWS Glue Data Catalog, filtering for objects with a replication_status of "FAILED" or “”.

query = """
        SELECT bucket, key
        FROM {}
        WHERE dt = '{}'
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')
        """.format(inventory_table, inventory_date)

print('Query: {}'.format(query))

crr_failed = spark.sql(query)

We call the copy_object function for each key returned by the previous query.

def copy_object(self, bucket, key, copy_acls):
        dest_bucket = self._s3.Bucket(bucket)
        dest_obj = dest_bucket.Object(key)

        src_bucket = self._s3.Bucket(bucket)
        src_obj = src_bucket.Object(key)

        # Get the S3 Object's Storage Class, Metadata, 
        # and Server Side Encryption
        storage_class, metadata, sse_type, last_modified = \
            self._get_object_attributes(src_obj)

        # Update the Metadata so the copy will work
        metadata['forcedreplication'] = runtime

        # Get and copy the current ACL
        if copy_acls:
            src_acl = src_obj.Acl()
            src_acl.load()
            dest_acl = {
                'Grants': src_acl.grants,
                'Owner': src_acl.owner
            }

        params = {
            'CopySource': {
                'Bucket': bucket,
                'Key': key
            },
            'MetadataDirective': 'REPLACE',
            'TaggingDirective': 'COPY',
            'Metadata': metadata,
            'StorageClass': storage_class
        }

        # Set Server Side Encryption
        if sse_type == 'AES256':
            params['ServerSideEncryption'] = 'AES256'
        elif sse_type == 'aws:kms':
            kms_key = src_obj.ssekms_key_id
            params['ServerSideEncryption'] = 'aws:kms'
            params['SSEKMSKeyId'] = kms_key

        # Copy the S3 Object over the top of itself, 
        # with the Storage Class, updated Metadata, 
        # and Server Side Encryption
        result = dest_obj.copy_from(**params)

        # Put the ACL back on the Object
        if copy_acls:
            dest_obj.Acl().put(AccessControlPolicy=dest_acl)

        return {
            'CopyInPlace': 'TRUE',
            'LastModified': str(result['CopyObjectResult']['LastModified'])
        }

Note: The Spark application adds a forcedreplication key to the objects’ metadata. It does this because Amazon S3 doesn’t allow you to copy in place without changing the object or its metadata.

Verify the success of the EMR job by running a query in Amazon Athena

The Spark application outputs its results to S3. You can create another external table with Amazon Athena and register it with the AWS Glue Data Catalog. You can then query the table with Athena to ensure that the copy-in-place operation was successful.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo_results (
  `bucket` string,
  key string,
  replication_status string,
  last_modified string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  STORED AS TEXTFILE
LOCATION 's3://crr-preexisting-demo-inventory/results';

SELECT * FROM crr_preexisting_demo_results;

The results appear as follows on the console.

Although this shows that the copy-in-place operation was successful, CRR still needs to replicate the objects. Subsequent inventory files show the objects’ replication status as COMPLETED. You can also verify on the console that preexisting-*.txt and failed-*.txt are COMPLETED.

It is worth noting that because CRR requires versioned buckets, the copy-in-place operation produces another version of the objects. You can use S3 lifecycle policies to manage noncurrent versions.

Conclusion

In this post, we showed how to use Amazon S3 inventory, Amazon Athena, the AWS Glue Data Catalog, and Amazon EMR to perform copy-in-place operations on pre-existing and failed objects at scale.

Note: Amazon S3 batch operations is an alternative for copying objects. The difference is that S3 batch operations will not check each object’s existing properties and set object ACLs, storage class, and encryption on an object-by-object basis. For more information, see Introduction to Amazon S3 Batch Operations in the Amazon S3 Console User Guide.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Chauncy McCaughey is a senior data architect at AWS. His current side project is using statistical analysis of driving habits and traffic patterns to understand how he always ends up in the slow lane.

 

 

 

EMR Notebooks: A managed analytics environment based on Jupyter notebooks

Post Syndicated from Vignesh Rajamani original https://aws.amazon.com/blogs/big-data/emr-notebooks-a-managed-analytics-environment-based-on-jupyter-notebooks/

Notebooks are increasingly becoming the standard tool for interactively developing big data applications. It’s easy to see why. Their flexible architecture allows you to experiment with data in multiple languages, test code interactively, and visualize large datasets. To help scientists and developers easily access notebook tools, we launched Amazon EMR Notebooks, a managed notebook environment that is based on the popular open-source Jupyter notebook application. EMR Notebooks support Spark Magic kernels, which allows you to submit jobs remotely on your EMR cluster using languages like PySpark, Spark SQL, Spark R, and Scala. The kernels submit your Spark code through Apache Livy, which is a REST server for Spark running on your cluster.

EMR Notebooks is designed to make it easy for you to experiment and build applications with Apache Spark. In this blog post, I first cover some of the benefits that EMR Notebooks offers. Then I introduce you to some of its capabilities such as detaching and attaching a notebook to different EMR clusters, monitoring Spark activity from within the notebook, using tags to control user permissions, and setting up user-impersonation to track notebook users and their actions. To learn about creating and using EMR Notebooks, you can visit Using Amazon EMR Notebooks or follow along with the AWS Online Tech Talks webinar.

Benefits of EMR Notebooks

One of the useful features of EMR Notebooks is the separation of the notebook environment from your underlying cluster infrastructure. The separation makes it easy for you to execute notebook code against transient clusters without worrying about deploying or configuring your notebook infrastructure every time you bring up a new cluster. You can create multiple serverless notebooks from the AWS Management Console for EMR and access the notebook UI without spending time setting up SSH access or configuring your browser for port-forwarding. Each notebook you create is launched instantly with its own Spark context. This capability enables you to attach multiple notebooks to a single shared cluster and submit parallel jobs without fear of job conflicts in a multi-tenant environment. This way you make efficient use of your clusters.

You can also connect EMR Notebooks to an EMR cluster as small as a one node. This gives you a budget-friendly sandbox environment to develop your Spark application.

Finally, with EMR Notebooks, you don’t have to spend time to manually configure your notebook to store files persistently. Your notebook files are saved automatically to a chosen Amazon S3 bucket periodically so you don’t have to worry about losing your work if your cluster is shut down. You can retrieve your saved notebooks from the console or download it locally from your S3 bucket in Jupyter “ipynb” format.

Detaching and attaching EMR Notebooks to different clusters

With EMR Notebooks, you can detach an active notebook from a cluster and attach it to a different cluster and promptly resume your work. This capability can be useful in scenarios where you want to move your notebook from a sandbox development cluster to a production environment or attach to a different cluster with appropriate CPU or memory resources and library packages required to execute your notebook against large datasets. To detach an active notebook:

First select the notebook name and then choose Stop.

Wait for the notebook status shown next to the notebook name to change from Ready to Stopped and then choose Change Cluster.

After you stop the notebook, you can choose to attach it to a different cluster in the same VPC or create a new cluster. EMR Notebooks automatically attaches the notebook to the cluster and re-starts the notebook.

Monitoring and debugging Spark jobs

EMR Notebooks supports a built-in Jupyter notebook widget called SparkMonitor that allows you to monitor the status of all your Spark jobs launched from the notebook without connecting to the Spark web UI server.

A widget appears and automatically integrates within the cell structure of your notebook and displays detailed status about the job submitted from each cell of your notebook, providing you with real-time progress of the different stages of the job. For any failed jobs, this widget also offers embedded links to the container logs in Amazon S3, allowing you to get to the relevant logs and debug your jobs.

Additionally, if you have configured your cluster to accept SSH connections, then you can access the Spark application web UI and Hadoop jobs history server from within the notebook. This allows you to view the event timelines, visualize Directed Acyclic Graphs (DAG) of each job, and view the detailed system and runtime information to inspect and debug your code. These web UIs are available automatically the first time you start to run your Spark code from a notebook.

Writing tag-based policies to control user permissions for notebooks and users

EMR Notebooks are by default shared resources that anyone from your organization with access to your AWS account can open, edit, or even delete. If you want more control over your notebook, you can use tags to label your notebook and write IAM policies that control access for other users. To get you started, when you create a notebook, a default tag with a key string, creatorUserId, is set to the value of the IAM User ID of the user who created the notebook.

You can use this tag to limit allowed actions on the notebook to only the creator of the notebook. For example, the permissions policy statement below, when attached to a role or user, enables the IAM user to view, start, stop, edit, or delete only those notebooks that they have created. This policy statement uses the default tag that is applied by EMR Notebooks when you create the notebook.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:DescribeEditor",
                "elasticmapreduce:StartEditor",
                "elasticmapreduce:StopEditor",
                "elasticmapreduce:DeleteEditor",
                "elasticmapreduce:OpenEditorInConsole"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/creatorUserId": "${aws:userId}"
                }
            }
        }
    ]

You can write policies that enforce the creation of tags before starting a notebook. For example, the policy below requires that the user not change or delete the creatorUserID tag that is added by default. The variable ${aws:userId}, specifies the currently active user’s User ID, which is the default value of the tag.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:CreateEditor"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:RequestTag/creatorUserId": "${aws:userid}"
                }
            }
        }
    ]
}

You can use the notebook tags along with EMR cluster tags to control notebook user access to your cluster. Tagging your notebook and clusters, in addition to securing your resources, also allows you to categorize, track, and allocate your EMR cluster costs across your different line of businesses. For example, the policy below allows a user to create a notebook only if the notebook has a tag with a key string “department” with its value set to “Analytics” and only if the notebook is attached to the EMR cluster that has a tag with the key string “cost-center” and value set to “12345.”

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:editor/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/department": [
                        "Analytics"
                    ]
                }
            }
        },
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:cluster/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/cost-center": [
                        "12345"
                    ]
                }
            }
        }
    ]
}

You can learn more about notebook and cluster tags by visiting Using Tags to Control User Permissions and Tag Clusters. Also, visit Using Cost Allocation Tags to learn more using tags to generate cost allocation report in the AWS Billing and Cost Management Console.

Tracking notebook users by enforcing user-impersonation

EMR Notebooks enables multiple users to execute their notebooks’ code concurrently in a shared EMR cluster, improving cluster utilization. By default, all Spark jobs spawned by these different users from their notebook run as the same user (the livy user) on your EMR cluster. If your corporate policy requires you to set up an audit trail and track individual notebook user actions on shared clusters, you can use the user-impersonation feature of EMR Notebooks. This capability allows you to discriminate and audit all notebook users by associating the jobs they executed from their notebook with the user’s IAM identity. To use this feature, you should enable Livy user-impersonation by configuring the core-site and livy-conf configuration classifications when you create and launch an EMR cluster as follows:

[
    {
        "Classification": "core-site",
        "Properties": {
          "hadoop.proxyuser.livy.groups": "*",
          "hadoop.proxyuser.livy.hosts": "*"
        }
    },
    {
        "Classification": "livy-conf",
        "Properties": {
          "livy.impersonation.enabled": "true"
        }
    }
]

Visit Configuring Applications to learn more about configuring application. After this feature is enabled, EMR Notebooks creates HDFS user directories on the master node for each user identity. This means all the Spark jobs from the notebook run as the IAM user instead of the indistinct user livy. For example, if user NB_User1 runs code from the notebook editor, then a user directory named user_NB_User1 is created on the master node and all Spark jobs run as user_NB_User1. You can then use a service like AWS CloudTrail to audit the record of actions by the user NbUser1 by creating a trail. To learn more about setting up audit trails, see logging Amazon EMR API calls in AWS CloudTrail.

Conclusion

In this post, I highlighted some of the capabilities of EMR Notebooks such as the ability to change clusters, monitor Spark jobs from each notebook cell, control user permission, and categorize resource costs. You can do this by using notebook and cluster tags and setting up user-impersonation to track notebook user actions.

Oh by the way, there is no additional charge for using EMR Notebooks and you only pay for the use of the EMR cluster as-usual!

If you have questions or suggestions, feel free to leave a comment.

 


About the Authors

Vignesh Rajamani is a senior product manager for EMR at AWS.

 

 

 

 

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 

 

Test data quality at scale with Deequ

Post Syndicated from Dustin Lange original https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/

You generally write unit tests for your code, but do you also test your data? Incorrect or malformed data can have a large impact on production systems. Examples of data quality issues are:

  • Missing values can lead to failures in production system that require non-null values (NullPointerException).
  • Changes in the distribution of data can lead to unexpected outputs of machine learning models.
  • Aggregations of incorrect data can lead to wrong business decisions.

In this blog post, we introduce Deequ, an open source tool developed and used at Amazon. Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (think billions of rows) that typically live in a distributed filesystem or a data warehouse.

Deequ at Amazon

Deequ is being used internally at Amazon for verifying the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues do not propagate to consumer data pipelines, reducing their blast radius.

Overview of Deequ

To use Deequ, let’s look at its main components (also shown in Figure 1).

  • Metrics Computation — Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon S3, and to compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
  • Constraint Verification — As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
  • Constraint Suggestion — You can choose to define your own custom data quality constraints, or use the automated constraint suggestion methods that profile the data to infer useful constraints.

Figure 1: Overview of Deequ components

Setup: Launch the Spark cluster

This section shows the steps to use Deequ on your own data. First, set up Spark and Deequ on an Amazon EMR cluster. Then, load a sample dataset provided by AWS, run some analysis, and then run data tests.

Deequ is built on top of Apache Spark to support fast, distributed calculations on large datasets. Deequ depends on Spark version 2.2.0 or later. As a first step, create a cluster with Spark on Amazon EMR. Amazon EMR takes care of the configuration of Spark for you. Also, you canuse the EMR File System (EMRFS) to directly access data in Amazon S3. For testing, you can also install Spark on a single machine in standalone mode.

Connect to the Amazon EMR master node using SSH. Load the latest Deequ JAR from Maven Repository. To load the JAR of version 1.0.1, use the following:

wget http://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.1/deequ-1.0.1.jar

Launch Spark Shell and use the spark.jars argument for referencing the Deequ JAR file:

spark-shell --conf spark.jars=deequ-1.0.1.jar

For more information about how to set up Spark, see the Spark Quick Start guide, and the overview of Spark configuration options.

Load data

As a running example, we use a customer review dataset provided by Amazon on Amazon S3. Let’s load the dataset containing reviews for the category “Electronics” in Spark. Make sure to enter the code in the Spark shell:

val dataset = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")

You can see the following selected attributes if you run dataset.printSchema() in the Spark shell:

root
|-- marketplace: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: integer (nullable = true)
|-- helpful_votes: integer (nullable = true)
|-- total_votes: integer (nullable = true)
|-- vine: string (nullable = true)
|-- year: integer (nullable = true)

Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. Deequ supports the following metrics (they are defined in this Deequ package):

Metric

Description

Usage Example

ApproxCountDistinctApproximate number of distinct value, computed with HyperLogLogPlusPlus sketches.ApproxCountDistinct("review_id")
ApproxQuantileApproximate quantile of a distribution.ApproxQuantile("star_rating", quantile = 0.5)
ApproxQuantilesApproximate quantiles of a distribution.ApproxQuantiles("star_rating", quantiles = Seq(0.1, 0.5, 0.9))
CompletenessFraction of non-null values in a column.Completeness("review_id")
ComplianceFraction of rows that comply with the given column constraint.Compliance("top star_rating", "star_rating >= 4.0")
CorrelationPearson correlation coefficient, measures the linear correlation between two columns. The result is in the range [-1, 1], where 1 means positive linear correlation, -1 means negative linear correlation, and 0 means no correlation.Correlation("total_votes", "star_rating")
CountDistinctNumber of distinct values.CountDistinct("review_id")
DataTypeDistribution of data types such as Boolean, Fractional, Integral, and String. The resulting histogram allows filtering by relative or absolute fractions.DataType("year")
DistinctnessFraction of distinct values of a column over the number of all values of a column. Distinct values occur at least once. Example: [a, a, b] contains two distinct values a and b, so distinctness is 2/3.Distinctness("review_id")
EntropyEntropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). Example: [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055.Entropy("star_rating")
MaximumMaximum value.Maximum("star_rating")
MeanMean value; null values are excluded.Mean("star_rating")
MinimumMinimum value.Minimum("star_rating")
MutualInformationMutual information describes how much information about one column (one random variable) can be inferred from another column (another random variable). If the two columns are independent, mutual information is zero. If one column is a function of the other column, mutual information is the entropy of the column. Mutual information is symmetric and nonnegative.MutualInformation(Seq("total_votes", "star_rating"))
PatternMatchFraction of rows that comply with a given regular experssion.PatternMatch("marketplace", pattern = raw"\w{2}".r)
SizeNumber of rows in a DataFrame.Size()
SumSum of all values of a column.Sum("total_votes")
UniqueValueRatioFraction of unique values over the number of all distinct values of a column. Unique values occur exactly once; distinct values occur at least once. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2.UniqueValueRatio("star_rating")
UniquenessFraction of unique values over the number of all values of a column. Unique values occur exactly once. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3.Uniqueness("star_rating")

In the following example, we show how to use the AnalysisRunner to define the metrics you are interested in. You can run the following code in the Spark shell by either just pasting it in the shell or by saving it in a local file on the master node and loading it in the Spark shell with the following command:

:load PATH_TO_FILE

import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}

val analysisResult: AnalyzerContext = { AnalysisRunner
  // data to run the analysis on
  .onData(dataset)
  // define analyzers that compute metrics
  .addAnalyzer(Size())
  .addAnalyzer(Completeness("review_id"))
  .addAnalyzer(ApproxCountDistinct("review_id"))
  .addAnalyzer(Mean("star_rating"))
  .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0"))
  .addAnalyzer(Correlation("total_votes", "star_rating"))
  .addAnalyzer(Correlation("total_votes", "helpful_votes"))
  // compute metrics
  .run()
}

// retrieve successfully computed metrics as a Spark data frame
val metrics = successMetricsAsDataFrame(spark, analysisResult)

The resulting data frame contains the calculated metrics (call metrics.show() in the Spark shell):

nameinstancevalue
ApproxCountDistinctreview_id3010972
Completenessreview_id1
Compliancetop star_rating0.74941
Correlationhelpful_votes,total_votes0.99365
Correlationtotal_votes,star_rating-0.03451
Meanstar_rating4.03614
Size*3120938

We can learn that:

  • review_id has no missing values and approximately 3,010,972 unique values.
  • 74.9 % of reviews have a star_rating of 4 or higher.
  • total_votes and star_rating are not correlated.
  • helpful_votes and total_votes are strongly correlated.
  • The average star_rating is 4.0.
  • The dataset contains 3,120,938 reviews.

Define and Run Tests for Data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the VerificationSuite and add Checks on attributes of the data. In this example, we test for the following properties of our data:

  • There are at least 3 million rows in total.
  • review_id is never NULL.
  • review_id is unique.
  • star_rating has a minimum of 1.0 and a maximum of 5.0.
  • marketplace only contains “US”, “UK”, “DE”, “JP”, or “FR”.
  • year does not contain negative values.

This is the code that reflects the previous statements. For information about all available checks, see this GitHub repository. You can run this directly in the Spark shell as previously explained:

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}

val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
  .onData(dataset)
  // define a data quality check
  .addCheck(
    Check(CheckLevel.Error, "Review Check") 
      .hasSize(_ >= 3000000) // at least 3 million rows
      .hasMin("star_rating", _ == 1.0) // min is 1.0
      .hasMax("star_rating", _ == 5.0) // max is 5.0
      .isComplete("review_id") // should never be NULL
      .isUnique("review_id") // should not contain duplicates
      .isComplete("marketplace") // should never be NULL
      // contains only the listed values
      .isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
      .isNonNegative("year")) // should not contain negative values
  // compute metrics and verify check conditions
  .run()
}

// convert check results to a Spark data frame
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)

After calling run, Deequ translates your test description into a series of Spark jobs, which are executed to compute metrics on the data. Afterwards, it invokes your assertion functions (e.g., _ == 1.0 for the minimum star-rating check) on these metrics to see if the constraints hold on the data.

Call resultDataFrame.show(truncate=false) in the Spark shell to inspect the result. The resulting table shows the verification result for every test, for example:

constraintconstraint_statusconstraint_message
SizeConstraint(Size(None))Success
MinimumConstraint(Minimum(star_rating,None))Success
MaximumConstraint(Maximum(star_rating,None))Success
CompletenessConstraint(Completeness(review_id,None))Success
UniquenessConstraint(Uniqueness(List(review_id)))FailureValue: 0.9926566948782706 does not meet the constraint requirement!
CompletenessConstraint(Completeness(marketplace,None))Success
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None))Success
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None))Success

Interestingly, the review_id column is not unique, which resulted in a failure of the check on uniqueness.

We can also look at all the metrics that Deequ computed for this check:

VerificationResult.successMetricsAsDataFrame(spark, verificationResult).show(truncate=False)

Result:

nameinstancevalue
Completenessreview_id1
Completenessmarketplace1
Compliancemarketplace contained in US,UK,DE,JP,FR1
Complianceyear is non-negative1
Maximumstar_rating5
Minimumstar_rating1
Size*3120938
Uniquenessreview_id0.99266

Automated Constraint Suggestion

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see this GitHub repository.

import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method

// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
  // data to suggest constraints for
  .onData(dataset)
  // default set of rules for constraint suggestion
  .addConstraintRules(Rules.DEFAULT)
  // run data profiling and constraint suggestion
  .run()
}

// We can now investigate the constraints that Deequ suggested. 
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap { 
  case (column, suggestions) => 
    suggestions.map { constraint =>
      (column, constraint.description, constraint.codeForConstraint)
    } 
}.toSeq.toDS()

The result contains a list of constraints with descriptions and Scala code, so that you can directly apply it in your data quality checks. Call suggestionDataFrame.show(truncate=false) in the Spark shell to inspect the suggested constraints; here we show a subset:

columnconstraintscala code
customer_id‘customer_id’ is not null.isComplete("customer_id")
customer_id‘customer_id’ has type Integral.hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_id‘customer_id’ has no negative values.isNonNegative("customer_id")
helpful_votes‘helpful_votes’ is not null.isComplete("helpful_votes")
helpful_votes‘helpful_votes’ has no negative values.isNonNegative("helpful_votes")
marketplace‘marketplace’ has value range ‘US’, ‘UK’, ‘DE’, ‘JP’, ‘FR’.isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
product_title‘product_title’ is not null.isComplete("product_title")
star_rating‘star_rating’ is not null.isComplete("star_rating")
star_rating‘star_rating’ has no negative values.isNonNegative("star_rating")
vine‘vine’ has value range ‘N’, ‘Y’.isContainedIn("vine", Array("N", "Y"))

Note that the constraint suggestion is based on heuristic rules and assumes that the data it is shown is correct, which might not be the case. We recommend to review the suggestions before applying them in production.

More Examples on GitHub

You can find examples of more advanced features at Deequ’s GitHub page:

  • Deequ not only provides data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
  • Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
  • If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation capability. For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

Additional Resources

Learn more about the inner workings of Deequ in our VLDB 2018 paper “Automating large-scale data quality verification.

Conclusion

This blog post showed you how to use Deequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. Deequ is available for you now to build your own data quality management pipeline.

 


About the Authors

Dustin Lange is an Applied Science Manager at Amazon Search in Berlin. Dustin’s team develops algorithms for improving the search customer experience through machine learning and data quality tracking. He completed his PhD in similarity search in databases in 2013 and started his Amazon career as an Applied Scientist in forecasting the same year.

 

 

Sebastian Schelter is a Senior Applied Scientist at Amazon Search, working on problems at the intersection of data management and machine learning. He holds a Ph.D. in large-scale data processing from TU Berlin and is an elected member of the Apache Software Foundation, where he currently serves as a mentor for the Apache Incubator.

 

 

Philipp Schmidt is an ML Engineer at Amazon Search. After his graduation from TU Berlin he worked at the University of Potsdam and in several startups in Berlin. At Amazon he is working on enabling data quality tracking for large scale datasets and refining the customer shopping experience through machine learning.

 

 

Tammo Rukat is an Applied Scientist at Amazon Search in Berlin. He holds a PhD in statistical machine learning from the University of Oxford. At Amazon he makes use of the abundance and complexity of the company’s large-scale noisy datasets to contribute to a more intelligent customer experience.

 

 

 

 

Optimize Amazon EMR costs with idle checks and automatic resource termination using advanced Amazon CloudWatch metrics and AWS Lambda

Post Syndicated from Praveen Krishnamoorthy Ravikumar original https://aws.amazon.com/blogs/big-data/optimize-amazon-emr-costs-with-idle-checks-and-automatic-resource-termination-using-advanced-amazon-cloudwatch-metrics-and-aws-lambda/

Many customers use Amazon EMR to run big data workloads, such as Apache Spark and Apache Hive queries, in their development environment. Data analysts and data scientists frequently use these types of clusters, known as analytics EMR clusters. Users often forget to terminate the clusters after their work is done. This leads to idle running of the clusters and in turn, adds up unnecessary costs.

To avoid this overhead, you must track the idleness of the EMR cluster and terminate it if it is running idle for long hours. There is the Amazon EMR native IsIdle Amazon CloudWatch metric, which determines the idleness of the cluster by checking whether there’s a YARN job running. However, you should consider additional metrics, such as SSH users connected or Presto jobs running, to determine whether the cluster is idle. Also, when you execute any Spark jobs in Apache Zeppelin, the IsIdle metric remains active (1) for long hours, even after the job is finished executing. In such cases, the IsIdle metric is not ideal in deciding the inactivity of a cluster.

In this blog post, we propose a solution to cut down this overhead cost. We implemented a bash script to be installed in the master node of the EMR cluster, and the script is scheduled to run every 5 minutes. The script monitors the clusters and sends a CUSTOM metric EMR-INUSE (0=inactive; 1=active) to CloudWatch every 5 minutes. If CloudWatch receives 0 (inactive) for some predefined set of data points, it triggers an alarm, which in turn executes an AWS Lambda function that terminates the cluster.

Prerequisites

You must have the following before you can create and deploy this framework:

Note: This solution is designed as an additional feature. It can be applied to any existing EMR clusters by executing the scheduler script (explained later in the post) as an EMR step. If you want to implement this solution as a mandatory feature for your future clusters, you can include the EMR step as part of your cluster deployment. You can also apply this solution to EMR clusters that are spun up through AWS CloudFormation, the AWS CLI, and even the AWS Management Console.

Key components

The following are the key components of the solution.

Analytics EMR cluster

Amazon EMR provides a managed Apache Hadoop framework that lets you easily process large amounts of data across dynamically scalable Amazon EC2 instances. Data scientists use analytics EMR clusters for data analysis, machine learning using notebook applications (such as Apache Zeppelin or JupyterHub), and running big data workloads based on Apache Spark, Presto, etc.

Scheduler script

The schedule_script.sh is the shell script to be executed as an Amazon EMR step. When executed, it copies the monitoring script from the Amazon S3 artifacts folder and schedules the monitoring script to run every 5 minutes. The S3 location of the monitoring script should be passed as an argument.

Monitoring script

The pushShutDownMetrin.sh script is a monitoring script that is implemented using shell commands. It should be installed in the master node of the EMR cluster as an Amazon EMR step. The script is scheduled to run every 5 minutes and sends the cluster activity status to CloudWatch.  

JupyterHub API token script

The jupyterhub_addAdminToken.sh script is a shell script to be executed as an Amazon EMR step if JupyterHub is enabled on the cluster. In our design, the monitoring script uses REST APIs provided by JupyterHub to check whether the application is in use.

To send the request to JupyterHub, you must pass an API token along with the request. By default, the application does not generate API tokens. This script generates the API token and assigns it to the admin user, which is then picked up by the jupyterhub module in the monitoring script to track the activity of the application.

Custom CloudWatch metric

All Amazon EMR clusters send data for several metrics to CloudWatch. Metrics are updated every 5 minutes, automatically collected, and pushed to CloudWatch. For this use case, we created the Amazon EMR metric EMR-INUSE. This metric represents the active status of the cluster based on the module checks implemented in the monitoring script. The metric is set to 0 when the cluster is inactive and 1 when active.

Amazon CloudWatch

CloudWatch is a monitoring service that you can use to set high-resolution alarms to take automated actions. In this case, CloudWatch triggers an alarm if it receives 0 continuously for the configured number of hours.

AWS Lambda

Lambda is a serverless technology that lets you run code without provisioning or managing servers. With Lambda, you can run code for virtually any type of application or backend service—all with zero administration. You can set up your code to automatically trigger from other AWS services. In this case, the triggered CloudWatch alarm mentioned earlier signals Lambda to terminate the cluster.

Architectural diagram

The following diagram illustrates the sequence of events when the solution is enabled, showing what happens to the EMR cluster that is spun up via AWS CloudFormation.

 

The diagram shows the following steps:

  1. The AWS CloudFormation stack is launched to spin up an EMR cluster.
  2. The Amazon EMR step is executed (installs the pushShutDownMetric.sh and then schedules it as a cron job to run every 5 minutes).
  3. If the EMR cluster is active (executing jobs), the master node sets the EMR-INUSE metric to 1 and sends it to CloudWatch.
  4. If the EMR cluster is inactive, the master node sets the EMR-INUSE metric to 0 and sends it to CloudWatch.
  5. On receiving 0 for a predefined number of data points, CloudWatch triggers a CloudWatch alarm.
  6. The CloudWatch alarm sends notification to AWS Lambda to terminate the cluster.
  7. AWS Lambda executes the Lambda function.
  8. The Lambda function then deletes all the stack resources associated with the cluster.
  9. Finally, the EMR cluster is terminated, and the Stack ID is removed from AWS CloudFormation.

Modules in the monitoring script

Following are the different activity checks that are implemented in the monitoring script (pushShutDownMetric.sh). The script is designed in a modular fashion so that you can easily include new modules without modifying the core functionality.

ActiveSSHCheck

The ActiveSSHCheck module checks whether there are any active SSH connections to the master node. If there is an active SSH connection, and it’s idle for less than 10 minutes, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch.

YARNCheck

Apache Hadoop YARN is the resource manager of the EMR Hadoop ecosystem. All the Spark Submits and Hive queries reach YARN initially. It then schedules and processes these jobs. The YARNCheck module checks whether there are any running jobs in YARN or jobs completed within last 5 minutes. If it finds any, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch.The checks are performed by calling REST APIs exposed by YARN.

The API to fetch the running jobs is http://localhost:8088/ws/v1/cluster/apps?state=RUNNING.

The API to fetch the completed jobs is

http://localhost:8088/ws/v1/cluster/apps?state=FINISHED.

PRESTOCheck

Presto is an open-source distributed query engine for running interactive analytic queries. It is included in EMR release version 5.0.0 and later.The PRESTOCheck module checks whether there are any running Presto queries or if any queries have been completed within last 5 minutes. If there are some, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. These checks are performed by calling REST APIs exposed by the Presto server.

The API to fetch the Presto jobs is http://localhost:8889/v1/query.

ZeppelinCheck

Amazon EMR users use Apache Zeppelin as a notebook for interactive data exploration. The ZeppelinCheck module checks whether there are any jobs running or if any have been completed within the last 5 minutes. If so, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. These checks are performed by calling the REST APIs exposed by Zeppelin.

The API to fetch the list of notebook IDs is http://localhost:8890/api/notebook.

The API to fetch the status of each cell inside each notebook ID is http://localhost:8890/api/notebook/job/$notebookID.

JupyterHubCheck

Jupyter Notebook is an open-source web application that you can use to create and share documents that contain live code, equations, visualizations, and narrative text. JupyterHub allows you to host multiple instances of a single-user Jupyter notebook server.The JupyterHubCheck module checks whether any Jupyter notebook is currently in use.

The function uses REST APIs exposed by JupyterHub to fetch the list of Jupyter notebook users and gathers the data about individual notebook servers. From the response, it extracts the last activity time of the servers and checks whether any server was used in the last 5 minutes. If so, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. The jupyterhub_addAdminToken.sh script needs to be executed as an EMR step before enabling the scheduler script.

The API to fetch the list of notebook users is https://localhost:9443/hub/api/users -H "Authorization: token $admin_token".

The API to fetch individual server information is https://localhost:9443/hub/api/users/$user -H "Authorization: token $admin_token.

If any one of these checks fails, the cluster is considered to be inactive, and the monitoring script sets the EMR-INUSE metric to 0 and pushes it to CloudWatch.

Note:

The scheduler script schedules the monitoring script (pushShutDownMetric.sh) to run every 5 minutes. Internal cron jobs that run for a very few minutes are not considered in calibrating the EMR-INUSE metric.

Deploying each component

Follow the steps in this section to deploy each component of the proposed design.

Step 1. Create the Lambda function and SNS subscription

The Lambda function and the SNS subscription are the core components of the design. You must set up these components initially, and they are common for every cluster. The following are the AWS resources to be created for these components:

  • Execution role for the Lambda function
  • Terminate Idle EMR Lambda function
  • SNS topic and Lambda subscription

For one-step deployment, use this AWS CloudFormation template to launch and configure the resources in a single go.

The following parameters are available in the template.

ParameterDefaultDescription
s3Bucketemr-shutdown-blogartifactsThe name of the S3 bucket that contains the Lambda file
s3KeyEMRTerminate.zipThe Amazon S3 key of the Lambda file

For manual deployment, follow these steps on the AWS Management Console.

Execution role for the Lambda function

  1. Open the AWS Identity and Access Management (IAM) consoleand choose PoliciesCreate policy.
  2. Choose the JSON tab, paste the following policy text, and then choose Review policy.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListAllMyBuckets",
                "s3:HeadBucket",
                "s3:ListObjects",
                "s3:GetObject",
                "cloudformation:ListStacks",
                "cloudformation:DeleteStack",
                "cloudformation:DescribeStacks",
                "cloudformation:ListStackResources",
                "elasticmapreduce:TerminateJobFlows"
            ],
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "GenericAccess"
        },
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*",
            "Effect": "Allow",
            "Sid": "LogAccess"
        }
    ]
}
  1. For Name, enter TerminateEMRPolicy and choose Create policy.
  2. Choose RolesCreate role.
  3. Under Choose the service that will use this role, choose Lambda, and then choose Next: Permissions.
  4. For Attach permissions policies, choose the arrow next to Filter policies and choose Customer managed in the drop-down list.
  5. Attach the TerminateEMRPolicy policy that you just created, and choose Review.
  6. For Role name, enter TerminateEMRLambdaRole and then choose Create role.

Terminate idle EMR – Lambda function

I created a deployment package to use with this function.

  1. Open the Lambda consoleand choose Create function.
  2. Choose Author from scratch, and provide the details as shown in the following screenshot:
  • Name: lambdaTerminateEMR
  • Runtime: Python 2.7
  • Role: Choose an existing role
  • Existing role: TerminateEMRLambdaRole

  1. Choose Create function.
  2. In the Function code section, for Code entry type, choose Upload a file from Amazon S3, and for Runtime, choose Python 2.7.

The Lambda function S3 link URL is

s3://emr-shutdown-blogartifacts/EMRTerminate.zip.

Link to the function: https://s3.amazonaws.com/emr-shutdown-blogartifacts/EMRTerminate.zip

This Lambda function is triggered by a CloudWatch alarm. It parses the input event, retrieves the JobFlowId, and deletes the AWS CloudFormation stack of the corresponding JobFlowId.

SNS topic and Lambda subscription

For setting the CloudWatch alarm in the further stages, you must create an Amazon SNS topic that notifies the preceding Lambda function to execute. Follow these steps to create an SNS topic and configure the Lambda endpoint.

  1. Navigate to the Amazon SNS console, and choose Create topic.
  2. Enter the Topic name and Display name, and choose Create topic.

  1. The topic is created and displayed in the Topics
  2. Select the topic and choose Actions, Subscribe to topic.

  1. In the Create subscription, choose the AWS Lambda Choose lambdaterminateEMR as the endpoint, and choose Create subscription.

Step 2. Execute the JupyterHub API token script as an EMR step

This step is required only when JupyterHub is enabled in the cluster.

Navigate to the EMR cluster to be monitored, and execute the scheduler script as an EMR step.

Command: s3://emr-shutdown-blogartifacts/jupyterhub_addAdminToken.sh

This script generates an API token and assigns it to the admin user. It is then picked up by the jupyterhub module in the monitoring script to track the activity of the application.

Step 3. Execute the scheduler script as an EMR step

Navigate to the EMR cluster to be monitored and execute the scheduler script as an EMR step.

Note:

Ensure that termination protection is disabled in the cluster. The termination protection flag causes the Lambda function to fail.

Command: s3://emr-shutdown-blogartifacts/schedule_script.sh

Parameter: s3://emr-shutdown-blogartifacts/pushShutDownMetrin.sh

The step function copies the pushShutDownMetric.sh script to the master node and schedules it to run every 5 minutes.

The schedule_script.sh is at https://s3.amazonaws.com/emr-shutdown-blogartifacts/schedule_script.sh.

The pushShutDownMetrin.sh is at https://s3.amazonaws.com/emr-shutdown-blogartifacts/pushShutDownMetrin.sh.

Step 4. Create a CloudWatch alarm

For single-step deployment, use this AWS CloudFormation template to launch and configure the resources in a single go.

The following parameters are available in the template.

ParameterDefaultDescription
AlarmNameTerminateIDLE-EMRAlarmThe name for the alarm.
EMRJobFlowIDRequires inputThe Jobflowid of the cluster.
EvaluationPeriodRequires inputThe idle timeout value—input should be in data points (1 data point equals 5 minutes). For example, to terminate the cluster if it is idle for 20 minutes, the input should be 4.
SNSSubscribeTopicRequires inputThe Amazon Resource Name (ARN) of the SNS topic to be triggered on the alarm.

 

The AWS CloudFormation CLI command is as follows:

aws cloudformation create-stack --stack-name EMRAlarmStack \
      --template-body s3://emr-shutdown-blogartifacts/Cloudformation/alarm.json \
      --parameters AlarmName=TerminateIDLE-EMRAlarm,EMRJobFlowID=<Input>,                 EvaluationPeriod=4,SNSSubscribeTopic=<Input>

For manual deployment, follow these steps to create the alarm.

  1. Open the Amazon CloudWatch console and choose Alarms.
  2. Choose Create Alarm.
  3. On the Select Metric page, under Custom Metrics, choose EMRShutdown/Cluster-Metric.

  1. Choose the isEMRUsed metric of the EMR JobFlowId, and then choose Next.

  1. Define the alarm as required. In this case, the alarm is set to send notification to the SNS topic shutDownEMRTest when CloudWatch receives the IsEMRUsed metric as 0 for every data point in the last 2 hours.

  1. Choose Create Alarm.

Summary

In this post, we focused on building a framework to cut down the additional cost that you might incur due to the idle running of an EMR cluster. The modules implemented in the shell script, the tracking of the execution status of the Spark scripts, and the Hive/Presto queries using the lightweight REST API calls make this approach an efficient solution.

If you have questions or suggestions, please comment below.

 


About the Author

Praveen Krishnamoorthy Ravikumar is an associate big data consultant with Amazon Web Services.

 

 

 

 

Improve clinical trial outcomes by using AWS technologies

Post Syndicated from Mayank Thakkar original https://aws.amazon.com/blogs/big-data/improve-clinical-trial-outcomes-by-using-aws-technologies/

We are living in a golden age of innovation, where personalized medicine is making it possible to cure diseases that we never thought curable. Digital medicine is helping people with diseases get healthier, and we are constantly discovering how to use the body’s immune system to target and eradicate cancer cells. According to a report published by ClinicalTrials.gov, the number of registered studies hit 293,000 in 2018, representing a 250x growth since 2000.

However, an internal rate of return (IRR) analysis by Endpoints News, using data from EvaluatePharma, highlights some interesting trends. A flourishing trend in pharma innovation is supported by strong growth in registered studies. However, the IRR shows a rapidly declining trend, from around 17 percent in 2000 to below the cost of capital in 2017 and projected to go to 0 percent by 2020.

This blog post is the first installment in a series that focuses on the end-to-end workflow of collecting, storing, processing, visualizing, and acting on clinical trials data in a compliant and secure manner. The series also discusses the application of artificial intelligence and machine learning technologies to the world of clinical trials. In this post, we highlight common architectural patterns that AWS customers use to modernize their clinical trials. These incorporate mobile technologies for better evidence generation, cost reduction, increasing quality, improving access, and making medicine more personalized for patients.

Improving the outcomes of clinical trials and reducing costs

Biotech and pharma organizations are feeling the pressure to use resources as efficiently as possible. This pressure forces them to search for any opportunity to streamline processes, get faster, and stay more secure, all while decreasing costs. More and more life sciences companies are targeting biologics, CAR-T, and precision medicine therapeutics, with focus shifting towards smaller, geographically distributed patient segments. This shift has resulted in an increasing mandate to capture data from previously unavailable, nontraditional sources. These sources include mobile devices, IoT devices, and in-home and clinical devices. Life sciences companies merge data from these sources with data from traditional clinical trials to build robust evidence around the safety and efficacy of a drug.

Early last year, the Clinical Trials Transformation Initiative (CTTI) provided recommendations about using mobile technologies for capturing holistic, high quality, attributable, real-world data from patients and for submission to the U.S. Food and Drug Administration (FDA). By using mobile technologies, life sciences companies can reduce barriers to trial participation and lower costs associated with conducting clinical trials. Global regulatory groups such as the FDA, Health Canada, and Medicines and Healthcare products Regulatory Agency (MHRA), among others, are also in favor of using mobile technologies. Mobile technologies can make patient recruitment more efficient, reach endpoints faster, and reduce the cost and time required to conduct clinical trials.

Improvised data ingestion using mobile technologies can speed up outcomes, reduce costs, and improve the accuracy of clinical trials. This is especially true when mobile data ingestion is supplemented with artificial intelligence and machine learning (AI/ML) technologies.

Together, they can usher in a new age of smart clinical trials.

At the same time, traditional clinical trial processes and technology designed for mass-marketed blockbuster drugs can’t effectively meet emerging industry needs. This leaves life sciences and pharmaceutical companies in need of assistance for evolving their clinical trial operations. These circumstances result in making clinical trials one of the largest areas of investment for bringing a new drug to market.

Using mobile technologies with traditional technologies in clinical trials can improve the outcomes of the trials and simultaneously reduce costs. Some of the use cases that the integration of various technologies enables include these:

  • Identifying and tracking participants in clinical trials
    • Identifying participants for clinical trials recruitment
    • Educating and informing patients participating in clinical trials
    • Implementing standardized protocols and sharing associated information to trial participants
    • Tracking adverse events and safety profiles
  • Integrating genomic and phenotypic data for identifying novel biomarkers
  • Integrating mobile data into clinical trials for better clinical trial management
  • Creation of a patient-control arm based on historical data
  • Stratifying cohorts based on treatment, claims, and registry datasets
  • Building a collaborative, interoperable network for data sharing and knowledge creation
  • Building compliance-ready infrastructure for clinical trial management

The AWS Cloud provides HIPAA eligible services and solutions. As an AWS customer, you can use these to build solutions for global implementation of mobile devices and sensors in trials, secure capture of streaming Internet of Things (IoT) data, and advanced analytics through visualization tools or AI/ML capabilities. Some of the use cases these services and solutions enable are finding and recruiting patients using smart analytics, facilitating global data management, and remote or in-patient site monitoring. Others include predicting lack of adherence, detecting adverse events, and accelerating trial outcomes along with optimizing trial costs.

Clinical Trials 2.0 (CT2.0) at AWS is geared toward facilitating wider adoption of cloud-native services to enable data ingestion from disparate sources, cost-optimized and reliable storage, and holistic analytics. At the same time, CT2.0 provides the granular access control, end-to-end security, and global scalability needed to conduct clinical trials more efficiently.

Reference architecture

One of the typical architectures for managing a clinical trial using mobile technologies is shown following. This architecture focuses on capturing real-time data from mobile sources and providing a way to process it.

* – Additional considerations such as data security, access control, and compliance need to be incorporated into the architecture and are discussed in the remainder of this post.

Managing a trial by using this architecture consists of the following five major steps.

Step 1: Collect data

Mobile devices, personal wearables, instruments, and smart-devices are extensively being used (or being considered) by global pharmaceutical companies in patient care and clinical trials to provide data for activity tracking, vital signs monitoring, and so on, in real-time. Devices like infusion pumps, personal use dialysis machines, and so on require tracking and alerting of device consumables and calibration status. Remote settings management is also a major use case for these kinds of devices. The end-user mobile devices used in the clinical trial emit a lot of telemetry data that requires real-time data capture, data cleansing, transformation, and analysis.

Typically, these devices are connected to an edge node or a smart phone. Such a connection provides sufficient computing resources to stream data to AWS IoT Core. AWS IoT Core can then be configured to write data to Amazon Kinesis Data Firehose in near real time. Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3. S3 provides online, flexible, cost efficient, pay-as-you-go storage that can replicate your data on three Availability Zones within an AWS Region. The edge node or smart phone can use the AWS IoT SDKs to publish and subscribe device data to AWS IoT Core with MQTT. This process uses the AWS method of authentication (called ‘SigV4’), X.509 certificate–based authentication, and customer-created token-based authentication (through custom authorizers). This authenticated approach enables you to map your choice of policies to each certificate and remotely control device or application access. You can also use the Kinesis Data Firehose encryption feature to enable server-side data encryption.

You can also capture additional data such as Case Report Forms (CRF), Electronic Medical Records (EMR), and medical images using Picture Archiving and Communication Systems (PACS). In addition, you can capture laboratory data (Labs) and other Patient Reported Outcomes data (ePRO). AWS provides multiple tools and services to effectively and securely connect to these data sources, enabling you to ingest data in various volumes, variety, and velocities. For more information about creating a HealthCare Data Hub and ingesting Digital Imaging and Communications in Medicine (DICOM) data, see the AWS Big Data Blog post Create a Healthcare Data Hub with AWS and Mirth Connect.

Step 2: Store data

After data is ingested from the devices and wearables used in the clinical trial, Kinesis Data Firehose is used to store the data on Amazon S3. This stored data serves as a raw copy and can later be used for historical analysis and pattern prediction. Using Amazon S3’s lifecycle policies, you can periodically move your data to reduced cost storage such as Amazon S3 Glacier for further optimizing their storage costs. Using Amazon S3 Intelligent Tiering can automatically optimize costs when data access patterns change, without performance impact or operational overhead by moving data between two access tiers—frequent access and infrequent access. You can also choose to encrypt data at rest and in motion using various encryption options available on S3.

Amazon S3 offers an extremely durable, highly available, and infinitely scalable data storage infrastructure, simplifying most data processing, backup, and replication tasks.

Step 3: Data processingfast lane

After collecting and storing a raw copy of the data, Amazon S3 is configured to publish events to AWS Lambda and invoke a Lambda function by passing the event data as a parameter. The Lambda function is used to extract the key performance indicators (KPIs) such as adverse event notifications, medication adherence, and treatment schedule management from the incoming data. You can use Lambda to process these KPIs and store them in Amazon DynamoDB, along with encryption at rest, which powers a near-real-time clinical trial status dashboard. This alerts clinical trial coordinators in real time so that appropriate interventions can take place.

In addition to this, using a data warehouse full of medical records, you can train and implement a machine learning model. This model can predict which patients are about to switch medications or might exhibit adherence challenges in the future. Such prediction can enable clinical trial coordinators to narrow in on those patients with mitigation strategies.

Step 4: Data processing—batch

For historical analysis and pattern prediction, the staged data (stored in S3) is processed in batches. A Lambda function is used to trigger the extract, transform, and load (ETL) process every time new data is added to the raw data S3 bucket. This Lambda function triggers an ETL process using AWS Glue, a fully managed ETL service that makes it easy for you to prepare and load your data for analytics. This approach helps in mining current and historical data to derive actionable insights, which is stored on Amazon S3.

From there, data is loaded on to Amazon Redshift, a cost-effective, petabyte-scale data warehouse offering from AWS. You can also use Amazon Redshift Spectrum to extend data warehousing out to exabytes without loading any data to Amazon Redshift, as detailed in the Big Data blog post Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required. This enables you to provide an all-encompassing picture of the entire clinical trial to your clinical trial coordinators, enabling you to react and respond faster.

In addition to this, you can train and implement a machine learning model to identify patients who might be at risk for adherence challenges. This enables clinical trial coordinators to reinforce patient education and support.

Step 5: Visualize and act on data

After the data is processed and ready to be consumed, you can use Amazon QuickSight, a cloud-native business intelligence service from AWS that offers native Amazon Redshift connectivity. Amazon QuickSight is serverless and so can be rolled out to your audiences in hours. You can also use a host of third-party reporting tools, which can use AWS-supplied JDBC or ODBC drivers or open-source PostgreSQL drivers to connect with Amazon Redshift. These tools include TIBCO Spotfire Analytics, Tableau Server, Qlik Sense Enterprise, Looker, and others. Real-time data processing (step 3 preceding) combines with historical-view batch processing (step 4). Together, they empower contract research organizations (CROs), study managers, trial coordinators, and other entities involved in the clinical trial journey to make effective and informed decisions at a speed and frequency that was previously unavailable. Using Amazon QuickSight’s unique Pay-per-Session pricing model, you can optimize costs for your bursty usage models by paying only when users access the dashboards.

Using Amazon Simple Notification Service (Amazon SNS), real-time feedback based on incoming data and telemetry is sent to patients by using text messages, mobile push, and emails. In addition, study managers and coordinators can send Amazon SNS notifications to patients. Amazon SNS provides a fully managed pub/sub messaging for micro services, distributed systems, and serverless applications. It’s designed for high-throughput, push-based, many-to-many messaging. Alerts and notifications can be based on current data or a combination of current and historical data.

To encrypt messages published to Amazon SNS, you can follow the steps listed in the post Encrypting messages published to Amazon SNS with AWS KMS, on the AWS Compute Blog.   

Data security, data privacy, data integrity, and compliance considerations

At AWS, customer trust is our top priority. We deliver services to millions of active customers, including enterprises, educational institutions, and government agencies in over 190 countries. Our customers include financial services providers, healthcare providers, and governmental agencies, who trust us with some of their most sensitive information.

To facilitate this, along with the services mentioned earlier, you should also use AWS Identity and Access Management (IAM) service. IAM enables you to maintain segregation of access, fine-grained access control, and securing end user mobile and web applications. You can also use AWS Security Token Service (AWS STS) to provide secure, self-expiring, time-boxed, temporary security credentials to third-party administrators and service providers, greatly strengthening your security posture. You can use AWS CloudTrail to log IAM and STS API calls. Additionally, AWS IoT Device Management makes it easy to securely onboard, organize, monitor, and remotely manage IoT devices at scale.

With AWS, you can add an additional layer of security to your data at rest in the cloud. AWS provides scalable and efficient encryption features for services like Amazon EBS, Amazon S3, Amazon Redshift, Amazon SNSAWS Glue, and many more. Flexible key management options, including AWS Key Management Service, enable you to choose whether to have AWS manage the encryption keys or to keep complete control over their keys. In addition, AWS provides APIs for you to integrate encryption and data protection with any of the services that you develop or deploy in an AWS environment.

As a customer, you maintain ownership of your data, and select which AWS services can process, store, and host the content. Generally speaking, AWS doesn’t access or use customers’ content for any purpose without their consent. AWS never uses customer data to derive information for marketing or advertising.

When evaluating the security of a cloud solution, it’s important that you understand and distinguish between the security of the cloud and security in the cloud. The AWS Shared Responsibility Model details this relationship.

To assist you with your compliance efforts, AWS continues to add more services to the various compliance regulations, attestations, certifications, and programs across the world. To decide which services are suitable for you, see the services in scope page.

You can also use various services like, but not limited to, AWS CloudTrail, AWS Config, Amazon GuardDuty, and AWS Key Management Service (AWS KMS) to enhance your compliance and auditing efforts. Find more details in the AWS Compliance Solutions Guide.

Final thoughts

With the ever-growing interconnectivity and technological advances in the field of medical devices, mobile devices and sensors can improve numerous aspects of clinical trials. They can help in recruitment, informed consent, patient counseling, and patient communication management. They can also improve protocol and medication adherence, clinical endpoints measurement, and the process of alerting participants on adverse events. Smart sensors, smart mobile devices, and robust interconnecting systems can be central in conducting clinical trials.

Every biopharma organization conducting or sponsoring a clinical trial activity faces the conundrum of advancing their approach to trials while maintaining overall trial performance and data consistency. The AWS Cloud enables a new dimension for how data is collected, stored, and used for clinical trials. It thus addresses that conundrum as we march towards a new reality of how drugs are brought to market. The AWS Cloud abstracts away technical challenges such as scaling, security, and establishing a cost-efficient IT infrastructure. In doing so, it allows biopharma organizations to focus on their core mission of improving patent lives through the development of effective, groundbreaking treatments.

 


About the Author

Mayank Thakkar – Global Solutions Architect, AWS HealthCare and Life Sciences

 

 

 

 

Deven Atnoor, Ph.D. – Industry Specialist, AWS HealthCare and Life Sciences

 

 

 

 

Best practices for successfully managing memory for Apache Spark applications on Amazon EMR

Post Syndicated from Karunanithi Shanmugam original https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

In the world of big data, a common use case is performing extract, transform (ET) and data analytics on huge amounts of data from a variety of data sources. Often, you then analyze the data to get insights. One of the most popular cloud-based solutions to process such vast amounts of data is Amazon EMR.

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS. Amazon EMR enables organizations to spin up a cluster with multiple instances in a matter of few minutes. It also enables you to process various data engineering and business intelligence workloads through parallel processing. By doing this, to a great extent you can reduce the data processing times, effort, and costs involved in establishing and scaling a cluster.

Apache Spark is a cluster-computing software framework that is open-source, fast, and general-purpose. It is widely used in distributed processing of big data. Apache Spark relies heavily on cluster memory (RAM) as it performs parallel computing in memory across nodes to reduce the I/O and execution times of tasks.

Generally, you perform the following steps when running a Spark application on Amazon EMR:

  1. Upload the Spark application package to Amazon S3.
  2. Configure and launch the Amazon EMR cluster with configured Apache Spark.
  3. Install the application package from Amazon S3 onto the cluster and then run the application.
  4. Terminate the cluster after the application is completed.

It’s important to configure the Spark application appropriately based on data and processing requirements for it to be successful. With default settings, Spark might not use all the available resources of the cluster and might end up with physical or virtual memory issues, or both. There are thousands of questions raised in stackoverflow.com related to this specific topic.

This blog post is intended to assist you by detailing best practices to prevent memory-related issues with Apache Spark on Amazon EMR.

Common memory issues in Spark applications with default or improper configurations

Listed following are a few sample out-of-memory errors that can occur in a Spark application with default or improper configurations.

Out of Memory Error, Java Heap Space

WARN TaskSetManager: Loss was due to 
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space

Out of Memory Error, Exceeding Physical Memory

Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
12.4 GB of 12.3 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
4.5GB of 3GB physical memory used limits.
Consider boosting spark.yarn.executor.memoryOverhead.

Out of Memory Error, Exceeding Virtual Memory

Container killed by YARN for exceeding memory limits.
1.1gb of 1.0gb virtual memory used. Killing container.

Out of Memory Error, Exceeding Executor Memory

Required executor memory (1024+384 MB) is above 
the max threshold (896 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb'
and/or 'yarn.nodemanager.resource.memory-mb

These issues occur for various reasons, some of which are listed following:

  1. When the number of Spark executor instances, the amount of executor memory, the number of cores, or parallelism is not set appropriately to handle large volumes of data.
  2. When the Spark executor’s physical memory exceeds the memory allocated by YARN. In this case, the total of Spark executor instance memory plus memory overhead is not enough to handle memory-intensive operations. Memory-intensive operations include caching, shuffling, and aggregating (using reduceByKey, groupBy, and so on). Or, in some cases, the total of Spark executor instance memory plus memory overhead can be more than what is defined in yarn.scheduler.maximum-allocation-mb.
  3. The memory required to perform system operations such as garbage collection is not available in the Spark executor instance.

In the following sections, I discuss how to properly configure to prevent out-of-memory issues, including but not limited to those preceding.

Configuring for a successful Spark application on Amazon EMR

The following steps can help you configure a successful Spark application on Amazon EMR.

1. Determine the type and number of instances based on application needs

Amazon EMR has three types of nodes:

  1. Master: An EMR cluster has one master, which acts as the resource manager and manages the cluster and tasks.
  2. Core: The core nodes are managed by the master node. Core nodes run YARN NodeManager daemons, Hadoop MapReduce tasks, and Spark executors to manage storage, execute tasks, and send a heartbeat to the master.
  3. Task: The optional task-only nodes perform tasks and don’t store any data, in contrast to core nodes.

Best practice 1: Choose the right type of instance for each of the node types in an Amazon EMR cluster. Doing this is one key to success in running any Spark application on Amazon EMR.

There are numerous instance types offered by AWS with varying ranges of vCPUs, storage, and memory, as described in the Amazon EMR documentation. Based on whether an application is compute-intensive or memory-intensive, you can choose the right instance type with the right compute and memory configuration.

For memory-intensive applications, prefer R type instances over the other instance types. For compute-intensive applications, prefer C type instances. For applications balanced between memory and compute, prefer M type general-purpose instances.

To understand the possible use cases for each instance type offered by AWS, see Amazon EC2 Instance Types on the EC2 service website.

After deciding the instance type, determine the number of instances for each of the node types. You do this based on the size of the input datasets, application execution times, and frequency requirements.

2. Determine the Spark configuration parameters

Before we dive into the details on Spark configuration, let’s get an overview of how the executor container memory is organized using the diagram following.

As the preceding diagram shows, the executor container has multiple memory compartments. Of these, only one (execution memory) is actually used for executing the tasks. These compartments should be properly configured for running the tasks efficiently and without failure.

Calculate and set the following Spark configuration parameters carefully for the Spark application to run successfully:

  • spark.executor.memory – Size of memory to use for each executor that runs the task.
  • spark.executor.cores – Number of virtual cores.
  • spark.driver.memory – Size of memory to use for the driver.
  • spark.driver.cores – Number of virtual cores to use for the driver.
  • spark.executor.instances ­– Number of executors. Set this parameter unless spark.dynamicAllocation.enabled is set to true.
  • spark.default.parallelism – Default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join, reduceByKey, and parallelize when no partition number is set by the user.

Amazon EMR provides high-level information on how it sets the default values for Spark parameters in the release guide. These values are automatically set in the spark-defaults settings based on the core and task instance types in the cluster.

To use all the resources available in a cluster, set the maximizeResourceAllocation parameter to true. This EMR-specific option calculates the maximum compute and memory resources available for an executor on an instance in the core instance group. It then sets these parameters in the spark-defaults settings. Even with this setting, generally the default numbers are low and the application doesn’t use the full strength of the cluster. For example, the default for spark.default.parallelism is only 2 x the number of virtual cores available, though parallelism can be higher for a large cluster.

Spark on YARN can dynamically scale the number of executors used for a Spark application based on the workloads. Using Amazon EMR release version 4.4.0 and later, dynamic allocation is enabled by default (as described in the Spark documentation).

The problem with the spark.dynamicAllocation.enabled property is that it requires you to set subproperties. Some example subproperties are spark.dynamicAllocation.initialExecutors, minExecutors, and maxExecutors. Subproperties are required for most cases to use the right number of executors in a cluster for an application, especially when you need multiple applications to run simultaneously. Setting subproperties requires a lot of trial and error to get the numbers right. If they’re not right, the capacity might be reserved but never actually used. This leads to wastage of resources or memory errors for other applications.

Best practice 2: Set spark.dynamicAllocation.enabled to true only if the numbers are properly determined for spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutors parameters. Otherwise, set spark.dynamicAllocation.enabled to false and control the driver memory, executor memory, and CPU parameters yourself. To do this, calculate and set these properties manually for each application (see the example following).

Let’s assume that we are going to process 200 terabytes of data spread across thousands of file stores in Amazon S3. Further, let’s assume that we do this through an Amazon EMR cluster with 1 r5.12xlarge master node and 19 r5.12xlarge core nodes. Each r5.12xlarge instance has 48 virtual cores (vCPUs) and 384 GB RAM. All these calculations are for the --deploy-mode cluster, which we recommend for production use.

The following list describes how to set some important Spark properties, using the preceding case as an example.

spark.executor.cores

Assigning executors with a large number of virtual cores leads to a low number of executors and reduced parallelism. Assigning a low number of virtual cores leads to a high number of executors, causing a larger amount of I/O operations. Based on historical data, we suggest that you have five virtual cores for each executor to achieve optimal results in any sized cluster.

For the preceding cluster, the property spark.executor.cores should be assigned as follows: spark.executors.cores = 5 (vCPU)

spark.executor.memory

After you decide on the number of virtual cores per executor, calculating this property is much simpler. First, get the number of executors per instance using total number of virtual cores and executor virtual cores. Subtract one virtual core from the total number of virtual cores to reserve it for the Hadoop daemons.

Number of executors per instance = (total number of virtual cores per instance - 1)/ spark.executors.cores

Number of executors per instance = (48 - 1)/ 5 = 47 / 5 = 9 (rounded down)

Then, get the total executor memory by using the total RAM per instance and number of executors per instance. Leave 1 GB for the Hadoop daemons.

Total executor memory = total RAM per instance / number of executors per instance
Total executor memory = 383 / 9 = 42 (rounded down)

This total executor memory includes the executor memory and overhead (spark.yarn.executor.memoryOverhead). Assign 10 percent from this total executor memory to the memory overhead and the remaining 90 percent to executor memory.

spark.executors.memory = total executor memory * 0.90
spark.executors.memory = 42 * 0.9 = 37 (rounded down)

spark.yarn.executor.memoryOverhead = total executor memory * 0.10
spark.yarn.executor.memoryOverhead = 42 * 0.1 = 5 (rounded up)

spark.driver.memory

We recommend setting this to equal spark.executors.memory.

spark.driver.memory = spark.executors.memory

spark.driver.cores

We recommend setting this to equal spark.executors.cores.

spark.driver.cores= spark.executors.cores.

spark.executor.instances

Calculate this by multiplying the number of executors and total number of instances. Leave one executor for the driver.

spark.executor.instances = (number of executors per instance * number of core instances) minus 1 for the driver

spark.executor.instances = (9 * 19) - 1 = 170

spark.default.parallelism

Set this property using the following formula.

spark.default.parallelism = spark.executor.instances * spark.executors.cores * 2

spark.default.parallelism = 170 * 5 * 2 = 1,700

Warning: Although this calculation gives partitions of 1,700, we recommend that you estimate the size of each partition and adjust this number accordingly by using coalesce or repartition.

In case of dataframes, configure the parameter spark.sql.shuffle.partitions along with spark.default.parallelism.

Though the preceding parameters are critical for any Spark application, the following parameters also help in running the applications smoothly to avoid other timeout and memory-related errors. We advise that you set these in the spark-defaults configuration file.

  • spark.network.timeout – Timeout for all network transactions.
  • spark.executor.heartbeatInterval – Interval between each executor’s heartbeats to the driver. This value should be significantly less than spark.network.timeout.
  • spark.memory.fraction – Fraction of JVM heap space used for Spark execution and storage. The lower this is, the more frequently spills and cached data eviction occur.
  • spark.memory.storageFraction – Expressed as a fraction of the size of the region set aside by spark.memory.fraction. The higher this is, the less working memory might be available to execution. This means that tasks might spill to disk more often.
  • spark.yarn.scheduler.reporterThread.maxFailures – Maximum number executor failures allowed before YARN can fail the application.
  • spark.rdd.compress – When set to true, this property can save substantial space at the cost of some extra CPU time by compressing the RDDs.
  • spark.shuffle.compress – When set to true, this property compresses the map output to save space.
  • spark.shuffle.spill.compress – When set to true, this property compresses the data spilled during shuffles.
  • spark.sql.shuffle.partitions – Sets the number of partitions for joins and aggregations.
  • spark.serializer – Sets the serializer to serialize or deserialize data. As a serializer, I prefer Kyro (org.apache.spark.serializer.KryoSerializer), which is faster and more compact than the Java default serializer.

To understand more about each of the parameters mentioned preceding, see the Spark documentation.

We recommend you consider these additional programming techniques for efficient Spark processing:

  • coalesce – Reduces the number of partitions to allow for less data movement.
  • repartition – Reduces or increases the number of partitions and performs full shuffle of data as opposed to coalesce.
  • partitionBy – Distributes data horizontally across partitions.
  • bucketBy – Decomposes data into more manageable parts (buckets) based on hashed columns.
  • cache/persist – Pulls datasets into a clusterwide in-memory cache. Doing this is useful when data is accessed repeatedly, such as when querying a small lookup dataset or when running an iterative algorithm.

Best practice 3: Carefully calculate the preceding additional properties based on application requirements. Set these properties appropriately in spark-defaults, when submitting a Spark application (spark-submit), or within a SparkConf object.

3. Implement a proper garbage collector to clear memory effectively

Garbage collection can lead to out-of-memory errors in certain cases. These include cases when there are multiple large RDDs in the application. Other cases occur when there is an interference between the task execution memory and RDD cached memory.

You can use multiple garbage collectors to evict the old objects and place the new ones into the memory. However, the latest Garbage First Garbage Collector (G1GC) overcomes the latency and throughput limitations with the old garbage collectors.

Best practice 4: Always set up a garbage collector when handling large volume of data through Spark.

The parameter -XX:+UseG1GC specifies that the G1GC garbage collector should be used. (The default is -XX:+UseParallelGC.) To understand the frequency and execution time of the garbage collection, use the parameters -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps. To initiate garbage collection sooner, set InitiatingHeapOccupancyPercent to 35 (the default is 0.45). Doing this helps avoid potential garbage collection for the total memory, which can take a significant amount of time. An example follows.

"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",

4. Set the YARN configuration parameters

Even if all the Spark configuration properties are calculated and set correctly, virtual out-of-memory errors can still occur rarely as virtual memory is bumped up aggressively by the OS. To prevent these application failures, set the following flags in the YARN site settings.

Best practice 5: Always set the virtual and physical memory check flag to false.

"yarn.nodemanager.vmem-check-enabled":"false",
"yarn.nodemanager.pmem-check-enabled":"false"

5. Perform debugging and monitoring

To get details on where the spark configuration options are coming from, you can run spark-submit with the –verbose option. Also, you can use Ganglia and Spark UI to monitor the application progress, Cluster RAM usage, Network I/O, etc.

In the following example, we compare the outcomes between configured and non-configured Spark applications using Ganglia graphs.

When configured following the methods described, a Spark application can process 10 TB data successfully without any memory issues on an Amazon EMR cluster whose specs are as follows:

  • 1 r5.12xlarge master node
  • 19 r5.12xlarge core nodes
  • 8 TB total RAM
  • 960 total virtual CPUs
  • 170 executor instances
  • 5 virtual CPUs/executor
  • 37 GB memory/executor
  • Parallelism equals 1,700

Following, you can find Ganglia graphs for reference.

If you run the same Spark application with default configurations on the same cluster, it fails with an out-of-physical-memory error. This is because the default configurations (two executor instances, parallelism of 2, one vCPU/executor, 8-GB memory/executor) aren’t enough to process 10 TB data. Though the cluster had 7.8 TB memory, the default configurations limited the application to use only 16 GB memory, leading to the following out-of-memory error.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.5 GB of 8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Also, for large datasets, the default garbage collectors don’t clear the memory efficiently enough for the tasks to run in parallel, causing frequent failures. The following charts help in comparing the RAM usage and garbage collection with the default and G1GC garbage collectors.With G1GC, the RAM used is maintained below 5 TB (see the blue area in the graph).

With the default garbage collector (CMS), the RAM used goes above 5 TB. This can lead to the failure of the Spark job when running many tasks continuously.

Example: EMR instance template with configuration

There are different ways to set the Spark and YARN configuration parameters. One of ways is to pass these when creating the EMR cluster.

To do this, in the Amazon EMR console’s Edit software settings section, you can enter the appropriately updated configuration template (Enter configuration). Or the configuration can be passed from S3 (Load JSON from S3).

Following is a configuration template with sample values. At a minimum, calculate and set the following parameters for a successful Spark application.

{
      "InstanceGroups":[
         {
            "Name":"AmazonEMRMaster",
            "Market":"ON_DEMAND",
            "InstanceRole":"MASTER",
            "InstanceType":"r5.12xlarge",
            "InstanceCount":1,
            "Configurations":[
               {
                 "Classification": "yarn-site",
                 "Properties": {
                   "yarn.nodemanager.vmem-check-enabled": "false",
                   "yarn.nodemanager.pmem-check-enabled": "false"
                 }
               },
               {
                 "Classification": "spark",
                 "Properties": {
                   "maximizeResourceAllocation": "false"
                 }
               },
               {
                 "Classification": "spark-defaults",
                 "Properties": {
                   "spark.network.timeout": "800s",
                   "spark.executor.heartbeatInterval": "60s",
                   "spark.dynamicAllocation.enabled": "false",
                   "spark.driver.memory": "21000M",
                   "spark.executor.memory": "21000M",
                   "spark.executor.cores": "5",
                   "spark.executor.instances": "171",
                   "spark.yarn.executor.memoryOverhead": "21000M",
                   "spark.yarn.driver.memoryOverhead": "21000M",
                   "spark.memory.fraction": "0.80",
                   "spark.memory.storageFraction": "0.30",
                   "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.yarn.scheduler.reporterThread.maxFailures": "5",
                   "spark.storage.level": "MEMORY_AND_DISK_SER",
                   "spark.rdd.compress": "true",
                   "spark.shuffle.compress": "true",
                   "spark.shuffle.spill.compress": "true",
                   "spark.default.parallelism": "3400"
                 }
               },
               {
                 "Classification": "mapred-site",
                 "Properties": {
                   "mapreduce.map.output.compress": "true"
                 }
               },
               {
                 "Classification": "hadoop-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Configurations": [],
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                   }
                 }],
                 "Properties": {}
               },
               {
                 "Classification": "spark-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                   }
                 }],
                 "Properties": {}
               }
            ]
        },
        {
            "Name":"AmazonEMRCore",
            "Market":"ON_DEMAND",
             "InstanceRole":"CORE",
             "InstanceType":"r5.12xlarge",
             "InstanceCount":19,
             "Configurations":[        
        ..............
        ..............
        ..............
        }
      ],
      "Ec2KeyName":"KEY_NAME"
  } 

Conclusion

In this blog post, I detailed the possible out-of-memory errors, their causes, and a list of best practices to prevent these errors when submitting a Spark application on Amazon EMR.

My colleagues and I formed these best practices after thorough research and understanding of various Spark configuration properties and testing multiple Spark applications. These best practices apply to most of out-of-memory scenarios, though there might be some rare scenarios where they don’t apply. However, we believe that this blog post provides all the details needed so you can tweak parameters and successfully run a Spark application.

 


About the Author

Karunanithi Shanmugam is a data engineer with AWS Tech and Finance.

 

 

 

 

Best practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR

Post Syndicated from Ran Sheinberg original https://aws.amazon.com/blogs/big-data/best-practices-for-running-apache-spark-applications-using-amazon-ec2-spot-instances-with-amazon-emr/

Apache Spark has become one of the most popular tools for running analytics jobs. This popularity is due to its ease of use, fast performance, utilization of memory and disk, and built-in fault tolerance. These features strongly correlate with the concepts of cloud computing, where instances can be disposable and ephemeral.

Amazon EC2 Spot Instances offer spare compute capacity available in the AWS Cloud at steep discounts compared to On-Demand prices. EC2 can interrupt Spot Instances with two minutes of notification when EC2 needs the capacity back. You can use Spot Instances for various fault-tolerant and flexible applications. Some examples are analytics, containerized workloads, high-performance computing (HPC), stateless web servers, rendering, CI/CD, and other test and development workloads.

Amazon EMR provides a managed Hadoop framework that makes it easy, fast, and cost-effective to process vast amounts of data using EC2 instances. When using Amazon EMR, you don’t need to worry about installing, upgrading, and maintaining Spark software (or any other tool from the Hadoop framework). You also don’t need to worry about installing and maintaining underlying hardware or operating systems. Instead, you can focus on your business applications and use Amazon EMR to remove the undifferentiated heavy lifting.

In this blog post, we are going to focus on cost-optimizing and efficiently running Spark applications on Amazon EMR by using Spot Instances. We recommend several best practices to increase the fault tolerance of your Spark applications and use Spot Instances. These work without compromising availability or having a large impact on performance or the length of your jobs.

Use the Spot Instance Advisor to target instance types with suitable interruption rates

As mentioned, Spot Instances can be interrupted if EC2 needs the capacity back. In this blog post, we share best practices on how to increase the fault tolerance of your Spark applications to withstand occasional loss of underlying EC2 instances due to Spot interruptions. However, even then, targeting EC2 Spot Instances with lower interruption rates can help further. This approach helps by decreasing occurrences where your job gets prolonged because Spark needs to redo some of the work when interruptions occur.

Use the Spot Instance Advisor to check the interruption rates and try to create your Amazon EMR cluster using instance types that historically have lower interruption rates. For example, the frequency of interruption for r4.2xlarge in the US East (Ohio) region at the time of writing this post is less than 5 percent. This means that less than 5 percent of all r4.2xlarge Spot Instances launched in the last 30 days were interrupted by EC2.

Run your Spot workloads on a diversified set of instance types

When running workloads (analytics or others) on EC2 instances and using On-Demand or Reserved Instances purchase options, you can generally use a single instance type across your entire cluster. You might do so after benchmarking to find the right instance type to fit the application’s requirement. However, with Spot Instances, using multiple Spot capacity pools (an instance type in an Availability Zone) in a cluster is key. This practice enables you to achieve scale and preserve capacity for running your jobs.

For example, suppose that I run my Spark application using On-Demand r4.xlarge instances (30.5 GiB memory and four vCPUs). When I start using Spot Instances, I can configure my Amazon EMR cluster’s Core or Task Instance Fleets with several instance types that have similar vCPUs to memory ratio (roughly 7 GB per vCPU) and let EMR choose the right instance type to run in the cluster. These include r4.2xlarge, r5.xlarge, i3.2xlarge, are i3.4xlarge. Taking this approach makes it more likely I’ll have sufficient Spot capacity to launch the cluster. It also increases the chance that Amazon EMR will be able to replenish the required capacity for the cluster to continue running (from other capacity pools) in case some of the capacity in the cluster is terminated by EC2 Spot Interruptions.

Instance typeNumber of vCPUsRAM (in GB)
R4.xlarge430.5
R4.2xlarge861
R5.xlarge432
I3.2xlarge861
I3.4xlarge16122

Size your Spark executors to allow using multiple instance types

As described just previously, a key factor for running on Spot instances is using a diversified fleet of instances. This also helps decrease the impact of Spot interruptions on your jobs. This approach dictates the architecture for your Spark applications.

Running with memory intensive executors (over 20 GB of RAM) ties your application to a specific set of instance types. These might not have sufficient Spot capacity for you to stand up your cluster. These also might have high Spot interruption rates, which might have an impact on your running jobs.

For example, for a Spark application with 90 GiB of RAM and 15 cores per executor, only 11 instance types fit the hardware requirements and fall below the 20 percent Spot interruption rate. Suppose that we break down the executors, keeping the ratio of 6 GiB of RAM per core, to two cores per executor. If we do so, we open up to 20 additional instance types that our job can run on (below the 20 percent interruption rate).

A fair approach to resizing an executor is to decide on the minimum number of cores to run your application on. Two is a good start. You then allocate memory using the following calculation:

NUM_CORES * ((EXECUTOR_MEMORY + MEMORY_OVERHEAD) / EXECUTOR_CORES)

In our example, that is 2 * (( 90 + 20 ) / 15) = 15GB

For more information about the memoryOverhead setting, see the Spark documentation.

Avoid large shuffles in Spark

To reduce the amount of data that Spark needs to reprocess if a Spot Instance is interrupted in your Amazon EMR cluster, you should avoid large shuffles.

Wide dependency operations like GroupBy and some types of joins can produce vast amounts of intermediate data. Intermediate data is stored on local disk and then transferred (shuffled) to other executors in your cluster.

Although you can’t always do so, we recommend to either avoid shuffle operations or work toward minimizing the amount of shuffle data. We recommend this for two reasons:

  • This is a general Spark best practice, because shuffle is an expensive operation.
  • In the context of Spot Instances, doing this decreases the fault tolerance of the job. This is because losing one node that either contains shuffle data or relies on shuffled data for computations (usually both) requires you to rerun some part of the shuffle process.

There are several patterns that we encounter that produce unnecessary amounts of shuffle data, described following.

The explode to group pattern

From a developer point of view, using explode on complex data types might be a quick solution to some use cases (exploding an array to multiple rows). We thus multiply the number of rows, and later in the job can join them back together.

For example, suppose that our data contains user IDs and an array of dates that describe visits to a website:

AB
1user_idvisit_dates_array
20[ “28/01/2018”29/01/2018”, “01/01/2019”]
3100000[ “01/11/2017”, “01/12/2017”]
4999999[ “01/01/2017”, “02/01/2017”, “03/01/2017”,  “04/01/2017”, “05/01/2017”, “06/01/2017”]

 

Suppose that we run a Spark application that sums the number of visits of users in the website. In this case, an easy solution is to use explode and then aggregate the data, as shown following.

Explode the data:

df.selectExpr("user_id", "explode(visit_dates_array) visit_day").createOrReplaceTempView("visits")

Aggregate back the data:

spark.sql("select count(visit_day), user_id 
                  from visits
                  group by user_id")

Although this method is quick and easy, it bloats our data to three times more than the original data. To accurately sum the visits for each user_id, our data also has to be shipped across the network to other executors.

What are the alternatives to exploding and grouping?

One option is to create a UDF that does the calculations in place, avoiding or minimizing shuffles. The following example is in Scala.

val countVisitsUDF = (array: Seq[String]) => {
    array.length
}

spark.udf.register("countVisits",  countVisitsUDF  )

spark.sql("SELECT user_id, countVisits(arr) 
           FROM tab").show
+-------+--------------------+
|user_id|UDF:countVisits(arr)|
+-------+--------------------+
|  20000|                   3|
| 100000|                   2|
|   9999|                   6|
+-------+--------------------+

Another option that was recently introduced in Spark 2.4 is the aggregate function. This function can also reduce the amount of shuffle data to a bare minimum, just the user_id and the count of their visits:

spark.sql("SELECT user_id, 
           sum(aggregate(arr, 0, (acc, x) -> acc +1)) summary 
           FROM tab 
           GROUP BY user_id").show
+-------+-------+
|user_id|summary|
+-------+-------+
| 100000|      2|
|   9999|      6|
|  00000|      3|
+-------+-------+

Huge data joins (bucketing)

When performing join operations, Spark repartitions (shuffles) the data by the join keys.

If you perform multiple joins on the same table or tables with the same key, you can use bucketing to shuffle the data only once. When persisting the data, any subsequent joins on that same key don’t require shuffle because the data is already “pre-shuffled” on Amazon S3.

To bucket your data, you need to decide on the number of buckets to divide your data into and the columns on which the bucketing occurs.

df.write.bucketBy(4,"user_id").saveAsTable("ExampleTable")

Work with data skew

In some cases, data doesn’t distribute uniformly between partitions. This is an issue for several reasons:

  • Generally, most of the executors finish in a timely manner. However, those that handle the large outliers run for a longer time. This increases your risk of getting your Spot Instances interrupted and having to recompute the whole job. It also has a negative impact on overall performance and prolongs the length of the job or causes resources to be underutilized.
  • Data skew can also be a source for large amounts of shuffle data, which can cause issues as discussed previously.

To handle data skew, we recommend that you try to do the computation that  you’re interested in locally on the executors. You then compute over the results. This approach is also known as a combine operation.

A common technique to handle data skew is salting the keys.

Break huge Spark jobs into smaller ones to increase resiliency

One antipattern that we encounter is large applications that perform numerous jobs that can take hours or days to complete.

This kind of job creates an all-or-nothing situation. Here, a failure can cause loss of time and money due to an issue throughout the runtime of the job.

It might sound obvious, but breaking up your jobs to a chain of smaller jobs increases your resiliency to handle failures and Spot interruptions. Breaking up jobs also means that you can remediate any issues preventing the job from finishing successfully. In addition, it decreases the chances of losing the effort already invested in the process.

Work with Amazon EMR Instance fleets

You can use Amazon EMR instance fleets in a couple of techniques to work effectively with Spark.

Diversify the EC2 instance types in your cluster

By configuring Amazon EMR instance fleets, you can set up a fleet of up to five EC2 instance types for each Amazon EMR node type (Master, Core, Task). As discussed earlier, being instance-flexible is key to the ability to launch and maintain Spot capacity for your Amazon EMR cluster.

For the Master node group, one instance out of your selection is chosen by Amazon EMR. In the Core and Task node groups, Amazon EMR selects the best instance types to use in the cluster based on capacity availability and low price. Also, you can specify multiple subnets in different Availability Zones. In this case, Amazon EMR selects the AZ that best fits the target capacity to launch the entire cluster in.

Size Amazon EMR instance fleets according to the job’s hardware requirements

Amazon EMR instance fleets enable you to define a pool of resources by specifying which instance types fit your application. You can also specify the weight each instance type carries in the pool toward your target capacity.

By default, instances are given weight equivalent to the number of their vCPUs. However, you can also provide weights according to other instance characteristics, such as memory, which I demonstrate in this section.

Sizing by CPU:

For example, suppose that I have a job that requires four cores per executor and 1 GB RAM per core, so the Spark configuration is as follows:

--executor-cores 4 --executor-memory 4G

We want the job to run with 20 executors, which means that we need 80 cores (20*4):

The screenshot shows 80 Spot Units as a representation of the 80 cores that are needed to run the job. It also shows the selection of different instance types that fit the hardware requirements.

Amazon EMR chooses any combination of these instance types to fulfill my target capacity of 80 spot units, while possibly some of the larger instance types will run more than one executor.

 

Sizing by memory

Some Spark application requirements are memory-intensive and require a different weight strategy.

For example, if our job runs with four cores and 6 GB per core (--executor-cores 4 --executor-memory 24G), we first choose instances that have at least 28 GB of RAM:

As you can see in the screenshot, in this configuration the instance type selection is set to accommodate the memory requirement. This leaves about 15–20 percent memory free for other processes running inside the instance operating system.

You then calculate the total units calculated by multiplying the number of units of the smallest eligible instances, with the desired number of executors (25*100).

As in the CPU intensive job, some instance types run only one executor while some run several.

Compensating for performance differences between instance generations

Some workloads can see performance improvements of up to 50 percent just by running on newer instance types. This effect is due to AWS Nitro technology, fast CPU clock speeds, or different CPU architecture (moving from Haswell/Broadwell to Skylake), or a combination of these.

If decreasing application running time is a major requirement, you can offset the performance difference between instance type generations by specifying smaller weights to the older instance generations.

For example, suppose that your job runs for an hour with 10 r5.2xlarge instance and two hours with 10 r4.2xlarge instance. In this case, you might prefer defining your instance fleet as follows:

Select the right purchase option for each node type

Spot Blocks are defined-duration Spot Instances that can run up to six hours without being interrupted, and come at a smaller discount rate compared to Spot Instances. However, you can also use Spot Blocks if your jobs can’t suffer Spot interruptions, given that the cluster run time is forecasted to be smaller than six hours.

Master node: Unless your cluster is very short-lived and the runs are cost-driven, avoid running your Master node on a Spot Instance. We suggest this because a Spot interruption on the Master node terminates the entire cluster. Alternatively to On-Demand, you can set up the Master node on a Spot Block. You do so by setting the defined duration of the node and failing over to On-Demand if the Spot Block capacity is unavailable.

Core nodes: Avoid using Spot Instances for Core nodes if the jobs on the cluster use HDFS. That prevents a situation where Spot interruptions cause data loss for data that was written to the HDFS volumes on the instances.

Task nodes: Use Spot Instances for your core nodes by selecting up to five instance types that match your hardware requirement. Amazon EMR fulfills the most suitable capacity by price and capacity availability.

Get EC2 Spot interruption notifications

When EC2 needs to interrupt Spot Instances, a 2-minute warning is issued for each instance that is going to be interrupted. You can programmatically react to the warning in two ways: from within the instance by polling the instance’s metadata service, and by using Amazon CloudWatch Events. You can find the specifics in the documentation.

The use of this warning varies between types of workloads. For example, you can opt to detach the soon-to-be interrupted instances from an Elastic Load Balancer to drain in-flight connections before the instance gets shuts down. Alternatively, you can copy the logs to a centralized location, or gracefully shut down an application.

To learn more about how EMR handles EC2 Spot interruptions, see the AWS Big Data blog post
Spark enhancements for elasticity and resiliency on Amazon EMR.

You might still want to track the Spot interruptions, possibly to correlate between Amazon EMR job failures and Spot interruptions or job length. In this case, you can set up a CloudWatch Event to trigger an AWS Lambda function to feed the interruption into a data store. This approach means that you can query the historical interruptions in your account. For smaller scale or even initial testing, you can use Amazon SNS with an email target to simply get the interruption notifications by email.

Tag your Amazon EMR cluster and track your costs

Tagging your resources in the AWS Cloud is a fundamental best practice. You can read more about tagging strategies on this AWS Answers page. In Amazon EMR, after you tag the cluster, your tags propagate to the underlying EC2 instances and the Amazon EBS volumes that are created by the cluster. This enables you to have a holistic view of the costs of running your Amazon EMR clusters, and can be easily visualized with AWS Cost Explorer.

Summary

In this blog post, we list best practices for cost-optimizing your Spark applications on Amazon EMR by using Spot Instances. We hope that you find these useful and that you test these best practices with your Spark applications to cost-optimize your workloads.

 


About the authors

Ran Sheinberg is a specialist solutions architect for EC2 Spot Instances with Amazon Web Services. He works with AWS customers on cost optimizing their compute spend by utilizing Spot Instances across different types of workloads: stateless web applications, queue workers, containerized workloads, analytics, HPC and others.

 

 

 

Daniel Haviv is a specialist solutions architect for Analytics with Amazon Web Services.

 

 

 

 

Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer

Post Syndicated from Peter Slawski original https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

The EMRFS S3-optimized committer is a new output committer available for use with Apache Spark jobs as of Amazon EMR 5.19.0. This committer improves performance when writing Apache Parquet files to Amazon S3 using the EMR File System (EMRFS). In this post, we run a performance benchmark to compare this new optimized committer with existing committer algorithms, namely FileOutputCommitter algorithm versions 1 and 2. We close with a discussion on current limitations for the new committer, providing workarounds where possible.

Comparison with FileOutputCommitter

In Amazon EMR version 5.19.0 and earlier, Spark jobs that write Parquet to Amazon S3 use a Hadoop commit algorithm called FileOutputCommitter by default. There are two versions of this algorithm, version 1 and 2. Both versions rely on writing intermediate task output to temporary locations. They subsequently perform rename operations to make the data visible at task or job completion time.

Algorithm version 1 has two phases of rename: one to commit the individual task output, and the other to commit the overall job output from completed/successful tasks. Algorithm version 2 is more efficient because task commits rename files directly to the final output location. This eliminates the second rename phase, but it makes partial data visible before the job completes, which not all workloads can tolerate.

The renames that are performed are fast, metadata-only operations on the Hadoop Distributed File System (HDFS). However, when output is written to object stores such as Amazon S3, renames are implemented by copying data to the target and then deleting the source. This rename “penalty” is exacerbated with directory renames, which can happen in both phases of FileOutputCommitter v1. Whereas these are single metadata-only operations on HDFS, committers must execute N copy-and-delete operations on S3.

To partially mitigate this, Amazon EMR 5.14.0+ defaults to FileOutputCommitter v2 when writing Parquet data to S3 with EMRFS in Spark. The new EMRFS S3-optimized committer improves on that work to avoid rename operations altogether by using the transactional properties of Amazon S3 multipart uploads. Tasks may then write their data directly to the final output location, but defer completion of each output file until task commit time.

Performance test

We evaluated the write performance of the different committers by executing the following INSERT OVERWRITE Spark SQL query. The SELECT * FROM range(…) clause generated data at execution time. This produced ~15 GB of data across exactly 100 Parquet files in Amazon S3.

SET rows=4e9; -- 4 Billion
SET partitions=100;

INSERT OVERWRITE DIRECTORY ‘s3://${bucket}/perf-test/${trial_id}’
USING PARQUET SELECT * FROM range(0, ${rows}, 1, ${partitions});

Note: The EMR cluster ran in the same AWS Region as the S3 bucket. The trial_id property used a UUID generator to ensure that there was no conflict between test runs.

We executed our test on an EMR cluster created with the emr-5.19.0 release label, with a single m5d.2xlarge instance in the master group, and eight m5d.2xlarge instances in the core group. We used the default Spark configuration properties set by Amazon EMR for this cluster configuration, which include the following:

spark.dynamicAllocation.enabled true
spark.executor.memory 11168M
spark.executor.cores 4

After running 10 trials for each committer, we captured and summarized query execution times in the following chart. Whereas FileOutputCommitter v2 averaged 49 seconds, the EMRFS S3-optimized committer averaged only 31 seconds—a 1.6x speedup.

As mentioned earlier, FileOutputCommitter v2 eliminates some, but not all, rename operations that FileOutputCommitter v1 uses. To illustrate the full performance impact of renames against S3, we reran the test using FileOutputCommitter v1. In this scenario, we observed an average runtime of 450 seconds, which is 14.5x slower than the EMRFS S3-optimized committer.

The last scenario we evaluated is the case when EMRFS consistent view is enabled, which addresses issues that can arise due to the Amazon S3 data consistency model. In this mode, the EMRFS S3-optimized committer time was unaffected by this change and still averaged 30 seconds. On the other hand, FileOutputCommitter v2 averaged 53 seconds, which was slower than when the consistent view feature was turned off, widening the overall performance difference to 1.8x.

Job correctness

The EMRFS S3-optimized committer has the same limitations that FileOutputCommitter v2 has because both improve performance by fully delegating commit responsibilities to the individual tasks. The following is a discussion of the notable consequences of this design choice.

Partial results from incomplete or failed jobs

Because both committers have their tasks write to the final output location, concurrent readers of that output location can view partial results when using either of them. If a job fails, partial results are left behind from any tasks that have committed before the overall job failed. This situation can lead to duplicate output if the job is run again without first cleaning up the output location.

One way to mitigate this issue is to ensure that a job uses a different output location each time it runs, publishing the location to downstream readers only if the job succeeds. The following code block is an example of this strategy for workloads that use Hive tables. Notice how output_location is set to a unique value each time the job is run, and that the table partition is registered only if the rest of the query succeeds. As long as readers exclusively access data via the table abstraction, they cannot see results before the job finishes.

SET attempt_id=<a random UUID>;
SET output_location=s3://bucket/${attempt_id};

INSERT OVERWRITE DIRECTORY ‘${output_location}’
USING PARQUET SELECT * FROM input;

ALTER TABLE output ADD PARTITION (dt = ‘2018-11-26’)
LOCATION ‘${output_location}’;

This approach requires treating the locations that partitions point to as immutable. Updates to partition contents require restating all results into a new location in S3, and then updating the partition metadata to point to that new location.

Duplicate results from non-idempotent tasks

Another scenario that can cause both committers to produce incorrect results is when jobs composed of non-idempotent tasks produce outputs into non-deterministic locations for each task attempt.

The following is an example of a query that illustrates the issue. It uses a timestamp-based table partitioning scheme to ensure that it writes to a different location for each task attempt.

SET hive.exec.dynamic.partition=true
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO data PARTITION (time) SELECT 42, current_timestamp();

You can avoid the issue of duplicate results in this scenario by ensuring that tasks write to a consistent location across task attempts. For example, instead of calling functions that return the current timestamp within tasks, consider providing the current timestamp as an input to the job. Similarly, if a random number generator is used within jobs, consider using a fixed seed or one that is based on the task’s partition number to ensure that task reattempts uses the same value.

Note: Spark’s built-in random functions rand(), randn(), and uuid() are already designed with this in mind.

Enabling the EMRFS S3-optimized committer

Starting with Amazon EMR version 5.20.0, the EMRFS S3-optimized committer is enabled by default. In Amazon EMR version 5.19.0, you can enable the committer by setting the spark.sql.parquet.fs.optimized.committer.optimization-enabled property to true from within Spark or when creating clusters. The committer takes effect when you use Spark’s built-in Parquet support to write Parquet files into Amazon S3 with EMRFS. This includes using the Parquet data source with Spark SQL, DataFrames, or Datasets. However, there are some use cases when the EMRFS S3-optimized committer does not take effect, and some use cases where Spark performs its own renames entirely outside of the committer. For more information about the committer and about these special cases, see Using the EMRFS S3-optimized Committer in the Amazon EMR Release Guide.

Related Work – S3A Committers

The EMRFS S3-optimized committer was inspired by concepts used by committers that support the S3A file system. The key take-away is that these committers use the transactional nature of S3 multipart uploads to eliminate some or all of the rename costs. This is also the core concept used by the EMRFS S3-optimized committer.

For more information about the various committers available within the ecosystem, including those that support the S3A file system, see the official Apache Hadoop documentation.

Summary

The EMRFS S3-optimized committer improves write performance compared to FileOutputCommitter. Starting with Amazon EMR version 5.19.0, you can use it with Spark’s built-in Parquet support. For more information, see Using the EMRFS S3-optimized Committer in the Amazon EMR Release Guide.

 


About the authors

Peter Slawski is a software development engineer with Amazon Web Services.

 

 

 

 

Jonathan Kelly is a senior software development engineer with Amazon Web Services.

 

 

 

 

Spark enhancements for elasticity and resiliency on Amazon EMR

Post Syndicated from Udit Mehrotra original https://aws.amazon.com/blogs/big-data/spark-enhancements-for-elasticity-and-resiliency-on-amazon-emr/

Customers take advantage of the elasticity in Amazon EMR to save costs by scaling in clusters when workflows are completed, or when running lighter jobs. This also applies to launching clusters with low-cost Amazon EC2 spot instances.

The Automatic Scaling feature in Amazon EMR lets customers dynamically scale clusters in and out, based on cluster usage or other job-related metrics. These features help you use resources efficiently, but they can also cause EC2 instances to shut down in the middle of a running job. This could result in the loss of computation and data, which can affect the stability of the job or result in duplicate work through recomputing.

To gracefully shut down nodes without affecting running jobs, Amazon EMR uses Apache Hadoop‘s decommissioning mechanism, which the Amazon EMR team developed and contributed back to the community. This works well for most Hadoop workloads, but not so much for Apache Spark. Spark currently faces various shortcomings while dealing with node loss. This can cause jobs to get stuck trying to recover and recompute lost tasks and data, and in some cases eventually crashing the job. For more information about some of the open issues in Spark, see the following links:

To avoid some of these issues and help customers take full advantage of Amazon EMR’s elasticity features with Spark, Amazon EMR has customizations to open-source Spark that make it more resilient to node loss. Recomputation is minimized, and jobs can recover faster from node failures and EC2 instance termination. These improvements are in Amazon EMR release version 5.9.0 and later.

This blog post provides an overview of the issues with how open-source Spark handles node loss and the improvements in Amazon EMR to address the issues.

How Spark handles node loss

When a node goes down during an active Spark job, it has the following risks:

  • Tasks that are actively running on the node might fail to complete and have to run on another node.
  • Cached RDDs (resilient distributed dataset) on the node might be lost. While this does impact performance, it does not cause failures or impact the stability of the application.
  • Shuffle output files in memory, or those written to disk on the node, would be lost. Because Amazon EMR enables the External Shuffle Service by default, the shuffle output is written to disk. Losing shuffle files can bring the application to a halt until they are recomputed on another active node, because future tasks might depend on them. For more information about shuffle operations, see Shuffle operations.

To recover from node loss, Spark should be able to do the following:

  • If actively running tasks are lost, they must be scheduled on another node. In addition, computing for the unscheduled remaining tasks must resume.
  • Shuffle output that was computed on the lost node must be recomputed by re-executing the tasks that produced those shuffle blocks.

The following is the sequence of events for Spark to recover when a node is lost:

  • Spark considers actively running tasks on the node as failed and reruns them on another active node.
  • If the node had shuffle output files that are needed by future tasks, the target executors on other active nodes get a FetchFailedException while trying to fetch missing shuffle blocks from the failed node.
  • When the FetchFailedException happens, the target executors retry fetching the blocks from the failed node for a time determined by the spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait configuration values. After all the retry attempts are exhausted, the failure is propagated to the driver.
  • When the driver receives the FetchFailedException, it marks the currently running shuffle stage during which the failure occurred as failed and stops its execution. It also marks the shuffle output on the node or executors from which shuffle blocks could not be fetched as unavailable/lost, so that they can be recomputed. This triggers the previous Map stage to re-attempt recomputing those missing shuffle blocks.
  • After the missing shuffle output is computed, a re-attempt of the failed shuffle stage is triggered to resume the job from where it stopped. It then runs tasks that failed or had not been scheduled yet.

Issues with Spark’s handling of node loss

Spark’s recovery process helps it recover random executor and node failures that can occur in any cloud environment. However, the recovery process begins only after the node has already failed and Spark gets a FetchFailedException while trying to fetch shuffle blocks. This causes some of the issues described in this section.

Amazon EMR can begin the recovery early, as it knows when and which nodes are going down because of a manual resize, an EC2-triggered Spot instance termination, or an automatic scaling event. It can inform Spark immediately about these nodes, so that Spark can take pro-active actions to gracefully handle loss of nodes and start recovery early. However, Spark currently does not have any mechanism through which it can be notified that a node is going down, such as YARN decommissioning. Therefore, it can not take immediate and relevant actions to help recover faster. As a result, here are some of the issues with Spark’s recovery:

  • The node goes down in the middle of the Map stage, as shown in the following diagram:

In this scenario, the shuffle stage is scheduled unnecessarily, and the application must wait for the FetchFailedException before recomputing the lost shuffle. This takes a lot of time. Instead, it would be better if all lost shuffles could be immediately recomputed in the Map stage before even proceeding to the shuffle stage.

  • The node goes down in the middle of a shuffle stage, as shown in the following diagram:

If there was way to immediately inform Spark about node loss, instead of it depending on FetchFailedException and retry fetching, that would save on recovery time.

  • The Spark driver starts recomputation when it gets the first FetchFailedException. It considers the shuffle files on the lost node as missing. However, if multiple nodes go down at the same time, in its first re-attempt of the previous Map stage, the Spark driver recomputes only the shuffle output for the first node from which it received a FetchFailedException. In the short time between receiving the first fetch failure and starting the re-attempt, it is possible that the driver receives fetch failures from other failed nodes. As a result, it can recompute shuffles for multiple lost nodes in the same re-attempt, but there is no guarantee.

    In most cases, even though nodes go down at the same time, Spark requires multiple re-attempts of the map and shuffle stages to recompute all of the lost shuffle output. This can easily cause a job to be blocked for a significant amount of time. Ideally, Spark could recompute in only one retry the shuffle output on all nodes that were lost around the same time.

  • As long as it can reach a node that is about to go down, Spark can continue to schedule more tasks on it. This causes more shuffle outputs to be computed, which may eventually need to be recomputed. Ideally, these tasks can be redirected to healthy nodes to prevent recomputation and improve recovery time.
  • Spark has a limit on the number of consecutive failed attempts allowed for a stage before it aborts a job. This is configurable with spark.stage.maxConsecutiveAttempts. When a node fails and a FetchFailedException occurs, Spark marks running shuffle stage as failed and triggers a re-attempt after computing the missing shuffle outputs. Frequent scaling of nodes during shuffle stages can easily cause stage failures to reach the threshold and abort the jobs. Ideally, when a stage fails for valid reasons such as a manual scale in, an automatic scaling event, or an EC2-triggered Spot instance termination, there should be a way to tell Spark not to count this toward spark.stage.maxConsecutiveAttempts for that stage.

How Amazon EMR resolves these issues

 This section describes the three main enhancements that Amazon EMR has done to its Spark to resolve the issues described in the previous section.

Integrate with YARN’s decommissioning mechanism

 Spark on Amazon EMR uses YARN as the underlying manager for cluster resources. Amazon EMR has its own implementation of a graceful decommissioning mechanism for YARN that provides a way to gracefully shut down YARN node managers by not scheduling new containers on a node in the Decommissioning state. Amazon EMR does this by waiting for the existing tasks on running containers to complete, or time out, before the node is decommissioned. This decommissioning mechanism has recently been contributed back to open source Hadoop.

We integrated Spark with YARN’s decommissioning mechanism so that the Spark driver is notified when a node goes through Decommissioning or Decommissioned states in YARN. This is shown in the following diagram:

This notification allows the driver to take appropriate actions and start the recovery early, because all nodes go through the decommissioning process before being removed.

Extend Spark’s blacklisting mechanism

YARN’s decommissioning mechanism works well for Hadoop MapReduce jobs by not launching any more containers on decommissioning nodes. This prevents more Hadoop MapReduce tasks from being scheduled on that node. However, this does not work well for Spark jobs because in Spark each executor is assigned a YARN container that is long-lived and keeps receiving tasks.

Preventing new containers from being launched only prevents more executors from being assigned to the node. Already active executors/containers continue to schedule new tasks until the node goes down, and they can end up failing and have to be rerun. Also, if these tasks write shuffle output, they would also be lost. This increases the recomputation and the time that it takes for recovery.

To address this, Amazon EMR extends Spark’s blacklisting mechanism to blacklist a node when the Spark driver receives a YARN decommissioning signal for it. This is shown in the following diagram:

This prevents new tasks from being scheduled on the blacklisted node. Instead they are scheduled on healthy nodes. As soon as tasks already running on the node are complete, the node can be safely decommissioned without the risk of task failures or losses. This also speeds up the recovery process by not producing more shuffle output on a node that is going down. This reduces the number of shuffle outputs to be recomputed. If the node comes out of the Decommissioning state and is active again, Amazon EMR removes the node from the blacklists so that new tasks can be scheduled on it.

This blacklisting extension is enabled by default in Amazon EMR with the spark.blacklist.decommissioning.enabled property set to true. You can control the time for which the node is blacklisted using the spark.blacklist.decommissioning.timeout property, which is set to 1 hour by default, equal to the default value for yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs. We recommend setting spark.blacklist.decommissioning.timeout to a value equal to or greater than yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs to make sure that Amazon EMR blacklists the node for the entire decommissioning period.

Actions for decommissioned nodes

After a node is decommissioning, no new tasks are getting scheduled, and the active containers become idle (or the timeout expires), the node gets decommissioned. When the Spark driver receives the decommissioned signal, it can take the following additional actions to start the recovery process sooner rather than waiting for a fetch failure to occur:

  • All of the shuffle outputs on the decommissioned node are unregistered, thus marking them as unavailable. Amazon EMR enables this by default with the setting spark.resourceManager.cleanupExpiredHost set to true. This has the following advantages:
    • If a node is lost in the middle of a map stage and gets decommissioned, Spark initiates recovery and recomputes the lost shuffle outputs on the decommissioned node, before proceeding to the next Stage. This prevents fetch failures in the shuffle stage, because Spark has all of the shuffle blocks computed and available at the end of map stage, which significantly speeds up recovery.
    • If a node is lost in the middle of a shuffle stage, the target executors trying to get shuffle blocks from the lost node immediately notice that the shuffle output is unavailable. It then sends the failure to the driver instead of retrying and failing multiple times to fetch them. The driver then immediately fails the stage and starts recomputing the lost shuffle output. This reduces the time spent trying to fetch shuffle blocks from lost nodes.
    • The most significant advantage of unregistering shuffle outputs is when a cluster is scaled in by a large number of nodes. Because all of the nodes go down around the same time, they all get decommissioned around the same time, and their shuffle outputs are unregistered. When Spark schedules the first re-attempt to compute the missing blocks, it notices all of the missing blocks from decommissioned nodes and recovers in only one attempt. This speeds up the recovery process significantly over the open-source Spark implementation, where stages might be rescheduled multiple times to recompute missing shuffles from all nodes, and prevent jobs from being stuck for hours failing and recomputing.
  • When a stage fails because of fetch failures from a node being decommissioned, by default, Amazon EMR does not count the stage failure toward the maximum number of failures allowed for a stage as set by spark.stage.maxConsecutiveAttempts. This is determined by the setting spark.stage.attempt.ignoreOnDecommissionFetchFailure being set to true. This prevents a job from failing if a stage fails multiple times because of node failures for valid reasons such as a manual resize, an automatic scaling event, or an EC2-triggered Spot instance termination.

Conclusion

This post described how Spark handles node loss and some of the issues that can occur if a cluster is scaled in during an active Spark job. It also showed the customizations that Amazon EMR has built on Spark, and the configurations available to make Spark on Amazon EMR more resilient, so that you can take full advantage of the elasticity features offered by Amazon EMR.

If you have questions or suggestions, please leave a comment.

 


About the Author

Udit Mehrotra is an software development engineer at Amazon Web Services. He works on cutting-edge features of EMR and is also involved in open source projects such as Apache Spark, Apache Hadoop and Apache Hive. In his spare time, he likes to play guitar, travel, binge watch and hang out with friends.

Metadata classification, lineage, and discovery using Apache Atlas on Amazon EMR

Post Syndicated from Nikita Jaggi original https://aws.amazon.com/blogs/big-data/metadata-classification-lineage-and-discovery-using-apache-atlas-on-amazon-emr/

With the ever-evolving and growing role of data in today’s world, data governance is an essential aspect of effective data management. Many organizations use a data lake as a single repository to store data that is in various formats and belongs to a business entity of the organization. The use of metadata, cataloging, and data lineage is key for effective use of the lake.

This post walks you through how Apache Atlas installed on Amazon EMR can provide capability for doing this. You can use this setup to dynamically classify data and view the lineage of data as it moves through various processes. As part of this, you can use a domain-specific language (DSL) in Atlas to search the metadata.

Introduction to Amazon EMR and Apache Atlas

Amazon EMR is a managed service that simplifies the implementation of big data frameworks such as Apache Hadoop and Spark. If you use Amazon EMR, you can choose from a defined set of applications or choose your own from a list.

Apache Atlas is an enterprise-scale data governance and metadata framework for Hadoop. Atlas provides open metadata management and governance capabilities for organizations to build a catalog of their data assets. Atlas supports classification of data, including storage lineage, which depicts how data has evolved. It also provides features to search for key elements and their business definition.

Among all the features that Apache Atlas offers, the core feature of our interest in this post is the Apache Hive metadata management and data lineage. After you successfully set up Atlas, it uses a native tool to import Hive tables and analyze the data to present data lineage intuitively to the end users. To read more about Atlas and its features, see the Atlas website.

AWS Glue Data Catalog vs. Apache Atlas

The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. AWS Glue Data Catalog integrates with Amazon EMR, and also Amazon RDS, Amazon Redshift, Redshift Spectrum, and Amazon Athena. The Data Catalog can work with any application compatible with the Hive metastore.

The scope of installation of Apache Atlas on Amazon EMR is merely what’s needed for the Hive metastore on Amazon EMR to provide capability for lineage, discovery, and classification. Also, you can use this solution for cataloging for AWS Regions that don’t have AWS Glue.

Architecture

Apache Atlas requires that you launch an Amazon EMR cluster with prerequisite applications such as Apache Hadoop, HBase, Hue, and Hive. Apache Atlas uses Apache Solr for search functions and Apache HBase for storage. Both Solr and HBase are installed on the persistent Amazon EMR cluster as part of the Atlas installation.

This solution’s architecture supports both internal and external Hive tables. For the Hive metastore to persist across multiple Amazon EMR clusters, you should use an external Amazon RDS or Amazon Aurora database to contain the metastore. A sample configuration file for the Hive service to reference an external RDS Hive metastore can be found in the Amazon EMR documentation.

The following diagram illustrates the architecture of our solution.

Amazon EMR–Apache Atlas workflow

To demonstrate the functionality of Apache Atlas, we do the following in this post:

  1. Launch an Amazon EMR cluster using the AWS CLI or AWS CloudFormation
  2. Using Hue, populate external Hive tables
  3. View the data lineage of a Hive table
  4. Create a classification
  5. Discover metadata using the Atlas domain-specific language

1a. Launch an Amazon EMR cluster with Apache Atlas using the AWS CLI

The steps following guide you through the installation of Atlas on Amazon EMR by using the AWS CLI. This installation creates an Amazon EMR cluster with Hadoop, HBase, Hive, and Zookeeper. It also executes a step in which a script located in an Amazon S3 bucket runs to install Apache Atlas under the /apache/atlas folder.

The automation shell script assumes the following:

  • You have a working local copy of the AWS CLI package configured, with access and secret keys.
  • You have a default key pair, VPC, and subnet in the AWS Region where you plan to deploy your cluster.
  • You have sufficient permissions to create S3 buckets and Amazon EMR clusters in the default AWS Region configured in the AWS CLI.
aws emr create-cluster --applications Name=Hive Name=HBase Name=Hue Name=Hadoop Name=ZooKeeper \
  --tags Name="EMR-Atlas" \
  --release-label emr-5.16.0 \
  --ec2-attributes SubnetId=<subnet-xxxxx>,KeyName=<Key Name> \
--use-default-roles \
--ebs-root-volume-size 100 \
  --instance-groups 'InstanceGroupType=MASTER, InstanceCount=1, InstanceType=m4.xlarge, InstanceGroupType=CORE, InstanceCount=1, InstanceType=m4.xlarge \
  --log-uri ‘<S3 location for logging>’ \
--steps Name='Run Remote Script',Jar=command-runner.jar,Args=[bash,-c,'curl https://s3.amazonaws.com/aws-bigdata-blog/artifacts/aws-blog-emr-atlas/apache-atlas-emr.sh -o /tmp/script.sh; chmod +x /tmp/script.sh; /tmp/script.sh']

On successful execution of the command, output containing a cluster ID is displayed:

{
    "ClusterId": "j-2V3BNEB9XQ54H"
}

Use the following command to list the names of active clusters (your cluster shows on the list after it is ready):

aws emr list-clusters --active

In the output of the previous command, look for the server name EMR-Atlas (unless you changed the default name in the script). If you have the jq command line utility available, you can run the following command to filter everything but the name and its cluster ID:

aws emr list-clusters --active | jq '.[][] | {(.Name): .Id}'
Sample output:
{
  "external hive store on rds-external-store": "j-1MO3L3XSXZ45V"
}
{
  "EMR-Atlas": "j-301TZ1GBCLK4K"
}

After your cluster shows up on the active list, Amazon EMR and Atlas are ready for operation.

1b. Launch an Amazon EMR cluster with Apache Atlas using AWS CloudFormation

You can also launch your cluster with CloudFormation. Use the emr-atlas.template to set up your Amazon EMR cluster, or launch directly from the AWS Management Console by using this button:

To launch, provide values for the following parameters:

VPC<VPC>
Subnet<Subnet>
EMRLogDir< Amazon EMR logging directory, for example s3://xxx >
KeyName< EC2 key pair name >

Provisioning an Amazon EMR cluster by using the CloudFormation template achieves the same result as the CLI commands outlined previously.

Before proceeding, wait until the CloudFormation stack events show that the status of the stack has reached “CREATE_COMPLETE”.

2. Use Hue to create Hive tables

Next, you log in to Apache Atlas and Hue and use Hue to create Hive tables.

To log in to Atlas, first find the master public DNS name in the cluster installation by using the Amazon EMR Management Console. Then, use the following command to create a Secure Shell (SSH) tunnel to the Atlas web browser.

ssh -L 21000:localhost:21000 -i key.pem [email protected]<EMR Master IP Address>

If the command preceding doesn’t work, make sure that your key file (*.pem) has appropriate permissions. You also might have to add an inbound rule for SSH (port 22) to the master’s security group.

After successfully creating an SSH tunnel, use following URL to access the Apache Atlas UI.

http://localhost:21000

You should see a screen like that shown following. The default login details are username admin and password admin.

To set up a web interface for Hue, follow the steps in the Amazon EMR documentation. As you did for Apache Atlas, create an SSH tunnel on remote port 8888 for the console access:

ssh -L 8888:localhost:8888 -i key.pem [email protected]<EMR Master IP Address>

After the tunnel is up, use following URL for Hue console access.

http://localhost:8888/

At first login, you are asked to create a Hue superuser, as shown following. Do not lose the superuser credentials.

After creating the Hue superuser, you can use the Hue console to run hive queries.

After you log in to Hue, take the following steps and run the following Hive queries:

    • Run the HQL to create a new database:
create database atlas_emr;
use atlas_emr;
    • Create a new external table called trip_details with data stored on S3. Change the S3 location to a bucket you own.
CREATE external TABLE trip_details
(
  pickup_date        string ,
  pickup_time        string ,
  location_id        int ,
  trip_time_in_secs  int ,
  trip_number        int ,
  dispatching_app    string ,
  affiliated_app     string 
)
row format delimited
fields terminated by ',' stored as textfile
LOCATION 's3://aws-bigdata-blog/artifacts/aws-blog-emr-atlas/trip_details/';
    • Create a new lookup external table called trip_zone_lookup with data stored on S3.
CREATE external TABLE trip_zone_lookup 
(
LocationID     int ,
Borough        string ,
Zone           string ,
service_zone   string
)
row format delimited
fields terminated by ',' stored as textfile
LOCATION 's3://aws-bigdata-blog/artifacts/aws-blog-emr-atlas/zone_lookup/';
    • Create an intersect table of trip_details and trip_zone_lookup by joining these tables:
create table trip_details_by_zone as select *  from trip_details  join trip_zone_lookup on LocationID = location_id;

Next, you perform the Hive import. For metadata to be imported in Atlas, the Atlas Hive import tool is only available by using the command line on the Amazon EMR server (there’s no web UI.)  To start, log in to the Amazon EMR master by using SSH:

ssh -i key.pem [email protected]<EMR Master IP Address>

Then execute the following command. The script asks for your user name and password for Atlas. The default user name is admin and password is admin.

/apache/atlas/bin/import-hive.sh

A successful import looks like the following:

Enter username for atlas :- admin
Enter password for atlas :- 
2018-09-06T13:23:33,519 INFO [main] org.apache.atlas.AtlasBaseClient - Client has only one service URL, will use that for all actions: http://localhost:21000
2018-09-06T13:23:33,543 INFO [main] org.apache.hadoop.hive.conf.HiveConf - Found configuration file file:/etc/hive/conf.dist/hive-site.xml
2018-09-06T13:23:34,394 WARN [main] org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-09-06T13:23:35,272 INFO [main] hive.metastore - Trying to connect to metastore with URI thrift://ip-172-31-90-79.ec2.internal:9083
2018-09-06T13:23:35,310 INFO [main] hive.metastore - Opened a connection to metastore, current connections: 1
2018-09-06T13:23:35,365 INFO [main] hive.metastore - Connected to metastore.
2018-09-06T13:23:35,591 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Importing Hive metadata
2018-09-06T13:23:35,602 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Found 2 databases
2018-09-06T13:23:35,713 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:35,987 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Database atlas_emr is already registered - id=cc311c0e-df88-40dc-ac12-6a1ce139ca88. Updating it.
2018-09-06T13:23:36,130 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,144 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Updated hive_db entity: [email protected], guid=cc311c0e-df88-40dc-ac12-6a1ce139ca88
2018-09-06T13:23:36,164 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Found 3 tables to import in database atlas_emr
2018-09-06T13:23:36,287 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,294 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Table atlas_emr.trip_details is already registered with id c2935940-5725-4bb3-9adb-d153e2e8b911. Updating entity.
2018-09-06T13:23:36,688 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,689 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Updated hive_table entity: [email protected], guid=c2935940-5725-4bb3-9adb-d153e2e8b911
2018-09-06T13:23:36,702 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,703 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Process [email protected]:1536239968000 is already registered
2018-09-06T13:23:36,791 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,802 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Table atlas_emr.trip_details_by_zone is already registered with id c0ff33ae-ca82-4048-9671-c0b6597e1475. Updating entity.
2018-09-06T13:23:36,988 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,989 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Updated hive_table entity: [email protected], guid=c0ff33ae-ca82-4048-9671-c0b6597e1475
2018-09-06T13:23:37,035 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,038 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Table atlas_emr.trip_zone_lookup is already registered with id 834d102a-6f92-4fc9-a498-4adb4a3e7897. Updating entity.
2018-09-06T13:23:37,213 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,214 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Updated hive_table entity: [email protected], guid=834d102a-6f92-4fc9-a498-4adb4a3e7897
2018-09-06T13:23:37,228 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,228 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Process [email protected]:1536239987000 is already registered
2018-09-06T13:23:37,229 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Successfully imported 3 tables from database atlas_emr
2018-09-06T13:23:37,243 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=404
2018-09-06T13:23:37,353 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,361 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/guid/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,362 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Created hive_db entity: [email protected], guid=798fab06-ad75-4324-b7cd-e4d02b6525e8
2018-09-06T13:23:37,365 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - No tables to import in database default
Hive Meta Data imported successfully!!!

After a successful Hive import, you can return to the Atlas Web UI to search the Hive database or the tables that were imported. On the left pane of the Atlas UI, ensure Search is selected, and enter the following information in the two fields listed following:

      • Search By Type: hive_table
      • Search By Text: trip_details

The output of the preceding query should look like this:

3. View the data lineage of your Hive tables using Atlas

To view the lineage of the created tables, you can use the Atlas web search. For example, to see the lineage of the intersect table trip_details_by_zone created earlier, enter the following information:

      • Search By Type: hive_table
      • Search By Text: trip_details_by_zone

The output of the preceding query should look like this:

Now choose the table name trip_details_by_zone to view the details of the table as shown following.

Now when you choose Lineage, you should see the lineage of the table. As shown following, the lineage provides information about its base tables and is an intersect table of two tables.

4. Create a classification for metadata management

Atlas can help you to classify your metadata to comply with data governance requirements specific to your organization. We create an example classification next.

To create a classification, take the following steps

      1. Choose Classification from the left pane, and choose the +
      2. Type PII in the Name field, and Personally Identifiable Information in the Description
      3. Choose Create.

Next, classify the table as PII:

      1. Return to the Search tab on the left pane.
      2. In the Search By Text field, type: trip_zone_lookup

      1. Choose the Classification tab and choose the add icon (+).
      2. Choose the classification that you created (PII) from the list.

      1. Choose Add.

You can classify columns and databases in a similar manner.

Next, view all the entities belonging to this classification.

      1. Choose the Classification tab.
      2. Choose the PII classification that you created.
      3. View all the entities belonging to this classification, displayed on the main pane.

5. Discover metadata using the Atlas domain-specific language (DSL)

Next, you can search Atlas for entities using the Atlas domain-specific language (DSL), which is a SQL-like query language. This language has simple constructs that help users navigate Atlas data repositories. The syntax loosely emulates the popular SQL from the relational database world.

To search a table using DSL:

      1. Choose Search.
      2. Choose Advanced Search.
      3. In Search By Type, choose hive_table.
      4. In Search By Query, search for the table trip_details using the following DSL snippet:
from hive_table where name = trip_details

As shown following, Atlas shows the table’s schema, lineage, and classification information.

Next, search a column using DSL:

      1. Choose Search.
      2. Choose Advanced Search.
      3. In Search By Type, choose hive_column.
      4. In Search By Query, search for column location_id using the following DSL snippet:
from hive_column where name = 'location_id'

As shown following, Atlas shows the existence of column location_id in both of the tables created previously:

You can also count tables using DSL:

      1. Choose Search.
      2. Choose Advanced Search.
      3. In Search By Type, choose hive_table.
      4. In Search By Query, search for table store using the following DSL command:
hive_table select count()

As shown following, Atlas shows the total number of tables.

The final step is to clean up. To avoid unnecessary charges, you should remove your Amazon EMR cluster after you’re done experimenting with it.

The simplest way to do so, if you used CloudFormation, is to remove the CloudFormation stack that you created earlier. By default, the cluster is created with termination protection enabled. To remove the cluster, you first need to turn termination protection off, which you can do by using the Amazon EMR console.

Conclusion

In this post, we outline the steps required to install and configure an Amazon EMR cluster with Apache Atlas by using the AWS CLI or CloudFormation. We also explore how you can import data into Atlas and use the Atlas console to perform queries and view the lineage of our data artifacts.

For more information about Amazon EMR or any other big data topics on AWS, see the EMR blog posts on the AWS Big Data blog.

 


About the Authors

Nikita Jaggi is a senior big data consultant with AWS.

 

 

 

 

Andrew Park is a cloud infrastructure architect at AWS. In addition to being operationally focused in customer engagements, he often works directly with customers to build and to deliver custom AWS solutions.  Having been a Linux solutions engineer for a long time, Andrew loves deep dives into Linux-related challenges. He is an open source advocate, loves baseball, is a recent winner of the “Happy Camper” award in local AWS practice, and loves being helpful in all contexts.

Reduce costs by migrating Apache Spark and Hadoop to Amazon EMR

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/reduce-costs-by-migrating-apache-spark-and-hadoop-to-amazon-emr/

Apache Spark and Hadoop are popular frameworks to process data for analytics, often at a fraction of the cost of legacy approaches, yet at scale they may still become expensive propositions. This blog post discusses ways to reduce your total costs of ownership, while also improving staff productivity at the same time. This can be accomplished by migrating your on-premises workloads to Amazon EMR, making good architectural choices, and taking advantage of features designed to reduce the resource consumption. The advice included has been learned from hundreds of engagements with customers and many points have been validated by the findings of a recently sponsored business value study conducted by IDC’s Carl Olofson and Harsh Singh in the IDC White Paper, sponsored by Amazon Web Services (AWS), “The Economic Benefits of Migrating Apache Spark and Hadoop to Amazon EMR” (November 2018).

Let’s begin with a few headline statistics to demonstrate the positive economic impact from migrating to Amazon EMR. IDC’s survey of nine Amazon EMR customers found an average 57 percent reduced cost of ownership. This was accompanied by a 342 percent five-year ROI, and eight months to break even on the investment. Those customers varied significantly in the size of their deployments for Spark and Hadoop, and accordingly in their results. However, these are compelling numbers for IT and finance leaders to consider as they set their long-term strategy for big data processing.

Now, how exactly does Amazon EMR save you money over on-premises deployments of Spark and Hadoop? The IDC White Paper identified three common answers, which are detailed in this post.

Reducing physical infrastructure costs

An on-premises deployment inevitably has not fully used its hardware for several reasons. One is the provisioning cycle in which servers are selected, ordered, and installed in anticipation of future needs. Despite the efforts to estimate resource needs, this cycle usually brings a tendency to over-provision. In other words, investing capital in assets that aren’t used immediately. This cycle can also result in reaching the resource capacity limits — limiting the ability to complete needed big data processing in a timely fashion.

This is further magnified by the Spark and Hadoop architecture because of the way distinct resources are provisioned in fixed ratios in servers. Each server has a given number of processors and memory (compute) and a set amount of storage capacity. Some workloads may be storage intensive, with vast quantities of data being retained on servers for possible future use cases. Meanwhile, the purchased compute resources sit idle until the day that data is processed and analyzed. Alternately, some workloads may require large quantities of memory or many processors for complex operations, but otherwise run against relatively small volumes of data. In these cases, the local storage may not be fully used.

Achieving durability in HDFS on premises requires multiple copies of data, which increases the hardware requirement. This is already incorporated into Amazon S3, so decoupling compute and storage also reduces the hardware footprint by removing the need to replicate for durability. Although Spark and Hadoop use commodity hardware, which is far more cost efficient than traditional data warehouse appliances, the on premises approach is inherently wasteful, rigid, and not agile enough to meet varying needs over time.

AWS solutions architect Bruno Faria recently gave a talk at re:Invent 2018 about more sophisticated approaches to “Lower Costs on Amazon EMR: Auto Scaling, Spot Pricing, and Expert Strategies”. Let’s recap those points.

Amazon EMR has several natural advantages over the challenges faced on premises. Customers pay only for the resources they actually consume, and only when they use them. On-demand resources lessen the over- and under-provisioning problem. More advanced customers can decouple their compute and storage resources too. They can store as much data as they need to in Amazon S3 and scale their only costs as data volumes grow, not in advance.

Moreover, they can implement their choice of compute instances that are appropriately sized to the job at hand. They are also on-demand, and charged on a granular “per-second of usage” basis. This brings both more cost savings and more agility. Sometimes, customers “lift and shift” their on-premises workloads to AWS and run in Amazon EC2 without Amazon EMR. However, this doesn’t take advantage of the decoupling effect, and is recommended only as an intermediate step of migration.

Once a customer is running their Spark and Hadoop in AWS, they can further reduce compute costs. They can choose from reserved instances, which means making a payment for an expected baseline at significant discounts. This is complemented by On-demand instances, meaning available at any time and paid per-second, and also Spot instances. Spot instances provide interruptible resources for less sensitive workloads at heavily reduced prices. Instance fleets also include the ability for you to specify:

  • A defined duration (Spot block) to keep the Spot Instances running.
  • The maximum Spot price that you’re willing to pay.
  • A timeout period for provisioning Spot Instances.

To let customers blend these purchasing options for the best results, you should understand the:

  • Baseline demand, with predictable SLAs and needs.
  • Periodic or unexpected peaks, to exceed SLA at reduced costs.
  • Nature of the jobs, whether they are transient or long running.

For more information about the instance purchasing options, see Instance Purchasing Options.

The Auto Scaling feature in EMR, available since 2016, can make this even more efficient to implement. It lets the service automatically manage user demand versus resources consumed as this ratio varies over time. For more information, see these best practices for automatic scaling.

Even storage costs can be reduced further. EMRFS, which decouples storage with Amazon S3 provides the ability to scale the clusters’ compute resources independent of storage capacity. In other words, EMRFS alone should already help to reduce costs. Another way to save on storage costs is to partition data to reduce the amount that needs to be scanned for processing and analytics. This is subject to the “Goldilocks” principle, where a customer wants partitions sized to avoid paying for reading unneeded data, but large enough to avoid excess overhead in finding the data needed.

An example of automatic partitioning of Apache Hive external tables is here. Optimizing file sizes can reduce Amazon S3 requests, and compressing data can minimize the bandwidth required to read data into compute instances. Not least, columnar formats for data storage are usually more efficient for big data processing and analytics than row-based layouts.

Applying these recommended methods led to the 57 percent reduction in total cost of ownership, cited earlier in this post.

Capturing these ideas nicely is a quote from the IDC White Paper where an Amazon EMR customer said the following:

Amazon EMR gave us the best bang for the buck. One of the key factors is that our data is obviously growing. Running our big data operations on [Amazon] EMR increases confidence. It’s really good since we get cheap storage for huge amounts of data. The second thing is that the computation that we need fluctuates highly. Some of the data in our database is only occasionally used by our business or data analysts. We choose EMR because it is the most cost-effective solution as well as providing need-based computational expansion.”

Driving higher IT staff productivity

While infrastructure savings can be the most obvious driver for moving Apache Spark and Amazon EMR to the public cloud, improved IT staff productivity may have a significant benefit also. As Amazon EMR is a managed service, there is no need for a staff to spend time evaluating, purchasing, installing, provisioning, integrating, maintaining, or supporting their hardware infrastructure. Nor do they need to evaluate, install, patch, upgrade, or troubleshoot their software infrastructure, which in the rapidly innovating open-source software world can be a never-ending task.

All that effort and associated “soft” costs go away, as the Amazon EMR environment is managed and kept current for customers. IT staff can instead focus their time on assisting data engineers, data scientists, and business analysts in their strategic endeavors, rather than doing infrastructure administration. The IDC White Paper linked these benefits to a 62 percent reduction in staff time cost per year vs. on premises, along with 54 percent reduction of staff costs for the big data environment managers. Respondents also said it helped them be more agile, improve quality, and develop quicker.

Another customer interviewed by IDC summed this up by saying the following:

“We went with Amazon EMR’s ready-made integration site. It is all about not having to spend time on integration…If we choose another Hadoop technology, then our researchers would have to make that work but if we run into a road block and it doesn’t work, we might learn that the hard way. In a way, we would be doing more testing, which would have meant we needed to hire three more people to do the integration work if we weren’t on Amazon EMR.”

Providing stronger big data environment availability

The third major area of savings cited was improved risk mitigation. Because AWS services are built upon many years of learned lessons in efficient and resilient operations, they deliver against promises of greater than 99.99% availability and durability, often with many more 9’s too. Avoiding unplanned downtime was noted by the IDC study to bring a 99% reduction in lost productivity amongst IT and analytics staff.

A customer noted, “We have made systems much more resilient. It is really all about performance and resiliency.”

There are many other economics benefits of migrating to Amazon EMR. They are often linked to improved staff productivity and delivering not only cost savings, but improved performance and a measurable ROI on analytics. But let’s not spoil the whole IDC White Paper, you can read it for yourself!

 


About the Author

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.