Tag Archives: Amazon EMR

Migrate RDBMS or On-Premise data to EMR Hive, S3, and Amazon Redshift using EMR – Sqoop

Post Syndicated from Nivas Shankar original https://aws.amazon.com/blogs/big-data/migrate-rdbms-or-on-premise-data-to-emr-hive-s3-and-amazon-redshift-using-emr-sqoop/

This blog post shows how our customers can benefit by using the Apache Sqoop tool. This tool is designed to transfer and import data from a Relational Database Management System (RDBMS) into AWS – EMR Hadoop Distributed File System (HDFS), transform the data in Hadoop, and then export the data into a Data Warehouse (e.g. in Hive or Amazon Redshift).

To demonstrate the Sqoop tool, this post uses Amazon RDS for MySQL as a source and imports data in the following three scenarios:

  • Scenario 1AWS EMR (HDFS -> Hive and HDFS)
  • Scenario 2Amazon S3 (EMFRS), and then to EMR-Hive
  • Scenario 3 — S3 (EMFRS), and then to Redshift

 

These scenarios help customers initiate the data transfer simultaneously, so that the transfer can run more expediently and cost efficient than a traditional ETL tool. Once the script is developed, customers can reuse it to transfer a variety of RDBMS data sources into EMR-Hadoop. Examples of these data sources are PostgreSQL, SQL Server, Oracle, and MariaDB.

We can also simulate the same steps for an on-premise RDBMS. This requires us to have the correct JDBC driver installed, and a network connection set up between the Corporate Data Center and the AWS Cloud environment. In this scenario, consider using either the AWS Direct Connect or AWS Snowball methods, based upon the data load volume and network constraints.

Prerequisites

To complete the procedures in this post, you need to perform the following tasks.

Step 1 — Launch an RDS Instance

By using the AWS Management Console or AWS CLI commands, launch MySQL instances with the desired capacity. The following example use the T2.Medium class with default settings.

To call the right services, copy the endpoint and use the following JDBC connection string exactly as shown. This example uses the US East (N. Virginia) us-east-1 AWS Region.

jdbc:mysql:// <<Connection string>>.us-east-1.rds.amazonaws.com.us-east-1.rds.amazonaws.com:3306/sqoopblog

Step 2 — Test the connection and load sample data into RDS – MySQL

First, I used open source data sample from this location: https://bulkdata.uspto.gov/data/trademark/casefile/economics/2016/

Second, I loaded the following two tables:

Third, I used MySQL Workbench tool to load sample tables and the Import/Export wizard to load data.  This loads data automatically and creates the table structure.

Download Steps:

The following steps can help download the MySQL Database Engine and load above mentioned data source into tables: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_GettingStarted.CreatingConnecting.MySQL.html#CHAP_GettingStarted.Connecting.MySQL

I used the following instructions on a Mac:

Step A: Install Homebrew  Step B: Install MySQL
Homebrew is open source software package management system; At the time of this blog post, Homebrew has MySQL version 5.7.15 as the default formula in its main repository. Enter the following command: $ brew info MySQL
To install Homebrew, open terminal, and enter: Expected output: MySQL: stable 8.0.11 (bottled)
$ /usr/bin/ruby -e “$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)” To install MySQL, enter: $ brew install MySQL
Homebrew then downloads and installs command line tools for Xcode 8.0 as part of the installation process)

Fourth, when the download is complete, provide the connection string, port, SID to the connection parameter. In main console, click MySQL connections (+) sign à new connection window and provide connection parameter Name, hostname – RDS endpoint, port, username and password.

Step 3 — Launch EMR Cluster

Open the EMR console, choose Advanced option, and launch the cluster with the following options set:

Step 4 — Test the SSH access and install MySQL-connector-java-version-bin.jar in the EMR folder

a. From the security groups for Master – click link and edit inbound rule to allow your PC or laptop IP to access the Master cluster.

b. Download the MySQL JDBC driver to your local PC from the following location: http://www.mysql.com/downloads/connector/j/5.1.html

c. Unzip the folder and copy the latest version of MySQL Connector available. (In my example, the version I use is MySQL-connector-java-5.1.46-bin.jar file).

d. Copy the file to the /var/lib/sqoop/ directory EMR master cluster. (Note: Because EMR Master doesn’t allow public access to the master node, I had to do a manual download from a local PC. I then used FileZila (Cross platform FTP application) to push the file.)

e. From your terminal, SSH to Master cluster, navigate to the /usr/lib/sqoop directory and copy this JAR file.

Note: This driver copy can be automated by using a bootstrap script to copy the driver file into an S3 path, and then transferring it into a master node. An example script would be:

aws s3 cp s3://mybucket/myfilefolder/ MySQL-connector-java-5.1.46-bin.jar  /usr/lib/sqoop/  

Or, with temporary internet access to download file directly into Master node, copy code below:

wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.46.tar.gz
tar -xvzf mysql-connector-java-5.1.46.tar.gz
sudo cp mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar /usr/lib/sqoop/ 

In the Master node directory /usr/lib/sqoop, it should look like below.

  1. Before you begin working with EMR, you need at least two AWS Identity and Access Management (IAM) service roles with sufficient permissions to access the resources in your account.
  2. Amazon RDS and EMR Master and Slave clusters must have access to connect and then initiate the importing and exporting of data from MySQL RDS instances. For example, I am editing the RDS MySQL instance security group to allow an incoming connection from the EMR nodes – the Master security group and Slave Security group.

Step 5 — Test the connection to MySQL RDS from EMR

After you are logged in, run the following command in the EMR master cluster to validate your connection. It also checks the MySQL RDS login and runs the sample query to check table record count.

sqoop eval --connect "jdbc:mysql:// <<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog"  --query "	
select count(*) from sqoopblog.event" --username admin -P

Note: This record count in the previous sample query should match with MySQL tables, as shown in the following example:

Import data into EMR – bulk load

To import data into EMR, you must first import the full data as a text file, by using the following query:

sqoop import --connect "jdbc:mysql://<<ConnectionString>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event --target-dir /user/hadoop/EVENT --username admin -P -m 1

After the import completes, validate the extract file in respective hadoop directory location.

As shown in the previous example, the original table was not partitioned. Hence, it is extracted as one file and imported into the Hadoop folder. If this had been a larger table, it would have caused performance issues.

To address this issue, I show how performance increases if we select a partition table and use the direct method to export faster and more efficiently. I updated the event table, EVENTS_PARTITION, with the EVENT_DT column as the KEY Partition. I then copied the original table data into this table.  In addition, I used the direct method to take advantage of utilizing MySQL partitions to optimize the efficiency of simultaneous data transfer.

Copy data and run stats.

Run the following query in MySQL Workbench to copy data and run stats:

insert into sqoopblog.event_partition select * from sqoopblog.event

analyze table sqoopblog.event_partition

After running the query in MySQL workbench, run the following Sqoop command in the EMR master node:

sqoop import --connect "jdbc:mysql:// <<ConnectionString>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --target-dir /user/hadoop/EVENTSPARTITION --username admin -P --split-by event_dt

This example shows the performance improvement for the same command with the added argument option, which is a partitioned table.  It also shows the data file split into four parts. Number of map reduce tasks automatically creates 4 based on table partition stats.

We can also use the m 10 argument to increase the map tasks, which equals to the number of input splits

sqoop import --connect "jdbc:mysql:// <<ConnectionString>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --target-dir /user/hadoop/EVENTSPARTITION --username admin -P --split-by event_dt -m 10

Note: You can also split more data extract files during the import process by increasing the map reduce engine argument ( -m <<desired #> , as shown in the above sample code. Make sure that the extract files align with partition distribution, otherwise the output files will be out of order.

Consider the following additional options, if required to import selective columns.

In the following example, add the – COLUMN argument to the selective field.

sqoop import --connect "jdbc:mysql://<<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --columns "EVENT_CD,SERIAL_NO,EVENT_DT" --target-dir /user/hadoop/EVENTSSELECTED --split-by EVENT_DT --username admin -P -m 5

For scenario 2, we will import the table data file into S3 bucket. Before you do, make sure that the EMR-EC2 instance group has added security to the S3 bucket. Run the following command in the EMR master cluster:

sqoop import --connect "jdbc:mysql:// <<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --target-dir s3://nivasblog/sqoopblog/ --username admin -P -m 1 --fields-terminated-by '\t' --lines-terminated-by '\n' --as-textfile 

Import as Hive table – Full Load

Now, let’s try creating a hive table directly from the Sqoop command. This is a more efficient way to create hive tables dynamically, and we can later alter this table as an external table for any additional requirements. With this method, customers can save time creating and transforming data into hive through an automated approach.

sqoop import --connect "jdbc:mysql://<<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition  --hive-import --create-hive-table --hive-table HIVEIMPORT1 --delete-target-dir --target-dir /user/hadoop/EVENTSHIVE1 --split-by EVENT_DT --hive-overwrite -m 4

Now, let’s try a direct method to see how significantly the load performance and import time improves.

sqoop import --connect "jdbc:mysql://<<connection string>>us-east-1.rds.amazonaws.com:3306/sqoopblog"  --username admin -P --table event_partition  --hive-import --create-hive-table --hive-table HIVEIMPORT2 --delete-target-dir --target-dir /user/hadoop/nivas/EVENTSHIVE2 --split-by EVENT_DT --hive-overwrite --direct

Following are additional optional to consider.

In the following example, add the COLUMN argument to the selective field and import into EMR as hive table.

sqoop import --connect "jdbc:mysql://<<connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition  --columns "event_cd,serial_no,event_dt" --hive-import --create-hive-table --hive-table HIVESELECTED --delete-target-dir --target-dir /user/hadoop/nivas/EVENTSELECTED --split-by EVENT_DT --hive-overwrite –direct

Perform a free-form query and import into EMR as a hive table.

sqoop import --connect "jdbc:mysql:// <<connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --query "select a.serial_no, a.event_cd, a.event_dt, b.us_class_cd, b.class_id from event_partition a, us_class b where a.serial_no=b.serial_no AND \$CONDITIONS" --hive-import --create-hive-table --hive-table HIVEQUERIED --delete-target-dir --target-dir /user/hadoop/EVENTSQUERIED -m 1 --hive-overwrite -direct

– For scenario 2, create a hive table manually from the S3 location.  The following sample creates an external table from the S3 location. Run the select statement to check data counts.

Import note: Using Sqoop version 1.4.7, you can directly create hive tables by using scripts, as shown in the following sample code.  This feature is supported in EMR 5.15.0.

sqoop import --connect "jdbc:mysql://<<connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition  --hive-import --target-dir s3n://nivasblog/sqoopblog/1/ --create-hive-table --hive-table s3table --split-by EVENT_DT --fields-terminated-by '\t' --lines-terminated-by '\n' --as-textfile

For the previous code samples, validate in Hive or Hue, and confirm the table records.

Import the full schema table into Hive.

Note: Create a Hive database in Hue or Hive first, and then run the following command in the EMR master cluster.

sqoop import-all-tables --connect "jdbc:mysql://<<Connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --hive-database sqoopimport --create-hive-table --hive-import --compression-codec=snappy --hive-overwrite –direct

Import as Hive table – Incremental Load

Now, let’s try loading into Hive a sample incremental data feed for the partition table with the event date as the key. Use the following Sqoop command on an incremental basis.

In addition to initial data in table called EVENT_BASETABLE. I loaded the incremental data into  EVENT_BASETABLE table. Let’s follow below steps and command to do incremental updates by sqoop, and import into Hive.

sqoop import --connect "jdbc:mysql:// <<Connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition --target-dir /user/hadoop/INCRTAB --split-by event_dt -m 1 --check-column event_dt --incremental lastmodified --last-value '2018-06-29'

Once the incremental extracts are loaded into the Hadoop directory, you can create temporary, or incremental, tables in Hive and insert them into the main tables.

CREATE TABLE incremental_table (event_cd text, event_dt date, event_seq int(11),event_type_cd text,serial_no int(11)) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','LOCATION '/user/hadoop/INCRTAB'

Insert into default.hiveimport1 select * from default.incremental_table

Alternatively, you can also perform the –query argument to do the incremental operation by joining various tables and condition arguments, and then inserting them into the main table.

--query "select * from EVENT_BASETABLE where modified_date > {last_import_date} AND $CONDITIONS"

All of these steps have been created as a Sqoop job to automate the flow.

Export data to Redshift

Now that data is imported into EMR- HDFS, S3 data store, let’s see how to use the Sqoop command to export data back into the Datawarehouse layer. In this case, we will use the Redshift cluster and demonstrate with an example.

Download the following JDBC API that our SQL client tool or application uses. If you’re not sure, download the latest version of the JDBC 4.2 API driver.

The class name for this driver is 1.2.15.1025/RedshiftJDBC41-1.2.15.1025.jar

com.amazon.redshift.jdbc42.Driver.

Copy this JAR file into the EMR master cluster node. SSH to Master cluster, navigate to /usr/lib/sqoop directory and copy this JAR file.

Note: Because EMR Master doesn’t allow public access to the master node, I had to do a manual download from a local PC. Also, I used FileZila to push the file.

Log in to the EMR master cluster and run this Sqoop command to copy the S3 data file into the Redshift cluster.

Launch the Redshift cluster. This example uses ds2.xLarge(Storage Node).

After Redshift launches, and the security group is associated with the EMR cluster to allow a connection, run the Sqoop command in EMR master node. This exports the data from the S3 location (shown previously in the Code 6 command) into the Redshift cluster as a table.

I created a table structure in Redshift as shown in the following example.

DROP TABLE IF EXISTS sqoopexport CASCADE;

CREATE TABLE sqoopexport
(
   event_cd       varchar(25)   NOT NULL,
   event_dt       varchar(25),
   event_seq      varchar(25)   NOT NULL,
   event_type_cd  varchar(25)   NOT NULL,
   serial_no      varchar(25)   NOT NULL
);

COMMIT;

When the table is created, run the following command to import data into the Redshift table.

sqoop export --connect jdbc:redshift://<<Connection String>>.us-east-1.redshift.amazonaws.com:5439/sqoopexport --table sqoopexport --export-dir s3://nivastest1/events/ --driver com.amazon.redshift.jdbc42.Driver --username admin -P --input-fields-terminated-by '\t'

This command inserts the data records into the table.

For more information, see Loading Data from Amazon EMR.

For information about how to copy data back into RDBMS, see Use Sqoop to Transfer Data from Amazon EMR to Amazon RDS.

Summary

You’ve learned how to use Apache Sqoop on EMR to transfer data from RDBMS to an EMR cluster. You created an EMR cluster with Sqoop, processed a sample dataset on Hive, built sample tables in MySQL-RDS, and then used Sqoop to import the data into EMR. You also created a Redshift cluster and exported data from S3 using Sqoop.

You proved that Sqoop can perform data transfer in parallel, so execution is quick and more cost effective. You also simplified ETL data processing from the source to a target layer.

The advantages of Sqoop are:

  • Fast and parallel data transfer into EMR — taking advantage of EMR compute instances to do an import process by removing external tool dependencies.
  • An import process by using a direct-to-MySQL expediting query and pull performance into EMR Hadoop and S3.
  • An Import Sequential dataset from Source system (Provided tables have primary keys) maintained simplifying growing need to migrate on-premised RDBMS data without re-architect

Sqoop pain points include.

  • Automation by developer/Operations team community. This requires automating through workflow/job method either using Airflow support for Sqoop or other tools.
  • For those tables doesn’t have primary keys and maintains legacy tables dependencies, will have challenges importing data incrementally. Recommendation is to do one-time migration through Sqoop bulk transfer and re-architect your source ingestion mechanism.
  • Import/Export follows JDBC connection and doesn’t support other methods like ODBC or API calls.

If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Use Sqoop to transfer data from Amazon EMR to Amazon RDS and Seven tips for using S3DistCp on AMazon EMR to move data efficiently between HDFS and Amazon S3.

 


About the Author

Nivas Shankar is a Senior Big Data Consultant at Amazon Web Services. He helps and works closely with enterprise customers building big data applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts. He enjoys spending time with his wife and two adorable kids. In his spare time, he takes his kids to tennis and football practice.

 

 

 

Build a Concurrent Data Orchestration Pipeline Using Amazon EMR and Apache Livy

Post Syndicated from Binal Jhaveri original https://aws.amazon.com/blogs/big-data/build-a-concurrent-data-orchestration-pipeline-using-amazon-emr-and-apache-livy/

Many customers use Amazon EMR and Apache Spark to build scalable big data pipelines. For large-scale production pipelines, a common use case is to read complex data originating from a variety of sources. This data must be transformed to make it useful to downstream applications, such as machine learning pipelines, analytics dashboards, and business reports. Such pipelines often require Spark jobs to be run in parallel on Amazon EMR. This post focuses on how to submit multiple Spark jobs in parallel on an EMR cluster using Apache Livy, which is available in EMR version 5.9.0 and later.

Apache Livy is a service that enables easy interaction with a Spark cluster over a REST interface. Apache Livy lets you send simple Scala or Python code over REST API calls instead of having to manage and deploy large jar files. This helps because it scales data pipelines easily with multiple spark jobs running in parallel, rather than running them serially using EMR Step API. Customers can continue to take advantage of transient clusters as part of the workflow resulting in cost savings.

For the purpose of this blog post, we use Apache Airflow to orchestrate the data pipeline. Airflow is an open-sourced task scheduler that helps manage ETL tasks. Customers love Apache Airflow because workflows can be scheduled and managed from one central location. With Airflow’s Configuration as Code approach, automating the generation of workflows, ETL tasks, and dependencies is easy. It helps customers shift their focus from building and debugging data pipelines to focusing on the business problems.

High-level Architecture

Following is a detailed technical diagram showing the configuration of the architecture to be deployed.

We use an AWS CloudFormation script to launch the AWS services required to create this workflow. CloudFormation is a powerful service that allows you to describe and provision all the infrastructure and resources required for your cloud environment, in simple JSON or YAML templates. In this case, the template includes the following:

The Airflow server uses a LocalExecutor (tasks are executed as a subprocess), which helps to parallelize tasks locally. For production workloads, you should consider scaling out with the CeleryExecutor on a cluster with multiple worker nodes.

For demonstration purposes, we use the movielens dataset to concurrently convert the csv files to parquet format and save it to Amazon S3. This dataset is a popular open-source dataset, which is used in exploring data science algorithms. Each dataset file is a comma-separated file with a single header row. The following table describes each file in the dataset.

Dataset Description
movies.tsv Has the title and list of genres for movies being reviewed.
ratings.csv Shows how users rated movies, using a scale from 1-5. The file also contains the time stamp for the movie review.
tags.csv Shows a user-generated tag for each movie. A tag is user-generated metadata about a movie. A tag can be a word or a short phrase. The file also contains the time stamp for the tag.
links.csv Contains identifiers to link to movies used by IMDB and MovieDB.
genome-scores.csv Shows the relevance of each tag for each movie.
genome-tags.csv Provides the tag descriptions for each tag in the genome-scores.csv file.

Building the Pipeline

Step 0: Prerequisites

Make sure that you have a bash-enabled machine with AWS CLI installed.

Step 1: Create an Amazon EC2 key pair

To build this ETL pipeline, connect to an EC2 instance using SSH. This requires access to an Amazon EC2 key pair in the AWS Region you’re launching your CloudFormation stack. If you have an existing Key Pair in your Region, go ahead and use that Key Pair for this exercise. If not, to create a key pair open the AWS Management Console and navigate to the EC2 console. In the EC2 console left navigation pane, choose Key Pairs.

Choose Create Key Pair, type airflow_key_pair (make sure to type it exactly as shown), then choose Create. This downloads a file called airflow_key_pair.pem. Be sure to keep this file in a safe and private place. Without access to this file, you lose the ability to use SSH to connect with your EC2 instance.

Step 2: Execute the CloudFormation Script

Now, we’re ready to run the CloudFormation script!

Note: The CloudFormation script uses a DBSecurityGroup, which is NOT supported in all Regions.

On the next page, choose the key pair that you created in the previous step (airflow_key_pair) along with a S3 bucket name. The S3 bucket should NOT exist as the cloudformation creates a new S3 bucket. Default values for other parameters have been chosen for simplicity.

After filling out these parameters to fit your environment, choose Next. Finally, review all the settings on the next page. Select the box marked I acknowledge that AWS CloudFormation might create IAM resources (this is required since the script creates IAM resources), then choose Create. This creates all the resources required for this pipeline and takes some time to run. To view the stack’s progress, select the stack you created and choose the Events section or panel.

It takes a couple of minutes for the CloudFormation template to complete.

Step 3: Start the Airflow scheduler in the Airflow EC2 instance

To make these changes, we use SSH to connect to the EC2 instance created by the CloudFormation script. Assuming your local machine has an SSH client, this can be accomplished from the command line. Navigate to the directory that contains the airflow_key_pair.pem file you downloaded earlier and insert the following commands, replacing your-public-ip and your-region with the relevant values from your EC2 instance. The public DNS Name of the EC2 instance can be found on the Outputs tab

Type yes when prompted after the SSH command.

chmod 400 airflow_key_pair.pem
ssh -i "airflow_key_pair.pem" [email protected]

For more information or help with issues, see Connecting to Your Linux Instance Using SSH.

Now we need to run some commands as the root user.

# sudo as the root user
sudo su
# Navigate to the airflow directory which was created by the cloudformation template – Look at the user-data section.
cd ~/airflow
source ~/.bash_profile

Below is an image of how the ‘/root/airflow/’ directory should look like.

Now we need to start the airflow scheduler. The Airflow scheduler monitors all tasks and all directed acyclic graphs (DAGs), and triggers the task instances whose dependencies have been met. In the background, it monitors and stays in sync with a folder for all DAG objects that it may contain. It periodically (every minute or so) inspects active tasks to see whether they can be triggered.

The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To get it started, run the airflow scheduler. It will use the configuration specified in airflow.cfg.

To start a scheduler, run the below command in your terminal.

airflow scheduler

Your screen should look like the following with scheduler running.

Step 4: View the transform_movielens DAG on the Airflow Webserver

The Airflow webserver should be running on port 8080. To see the Airflow webserver, open any browser and type in the <EC2-public-dns-name>:8080. The public EC2 DNS name is the same one found in Step 3.

You should see a list of DAGs on the Airflow dashboard. The example DAGs are left there in case you want you experiment with them. But we focus on the transform_movielens DAG for the purposes of this blog. Toggle the ON button next to the name of the DAG.

The following example shows how the dashboard should look.

Choose the transform_movielens DAG, then choose Graph View to view the following image.

This image shows the overall data pipeline. In the current setup, there are six transform tasks that convert each .csv file to parquet format from the movielens dataset. Parquet is a popular columnar storage data format used in big data applications. The DAG also takes care of spinning up and terminating the EMR cluster once the workflow is completed.

The DAG code can also be viewed by choosing the Code button.

Step 5: Run the Airflow DAG

To run the DAG, go back to the Airflow dashboard, and choose the Trigger DAG button for the transform_movielens DAG.

When the Airflow DAG is run, the first task calls the run_job_flow boto3 API to create an EMR cluster. The second task waits until the EMR cluster is ready to take on new tasks. As soon as the cluster is ready, the transform tasks are kicked off in parallel using Apache Livy, which runs on port 8998. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. To run more tasks in parallel (multiple spark sessions) in Airflow without overwhelming the EMR cluster, you can throttle the concurrency.

How does Apache Livy run the Scala code on the EMR cluster in parallel?

Once the EMR cluster is ready, the transform tasks are triggered by the Airflow scheduler. Each transform task triggers Livy to create a new interactive spark session. Each POST request brings up a new Spark context with a Spark interpreter. This remote Spark interpreter is used to receive and run code snippets, and return back the result.

Let’s use one of the transform tasks as an example to understand the steps in detail.

# Converts each of the movielens datafile to parquet
def transform_movies_to_parquet(**kwargs):
    # ti is the Task Instance
    ti = kwargs['ti']
    cluster_id = ti.xcom_pull(task_ids='create_cluster')
    cluster_dns = emr.get_cluster_dns(cluster_id)
    headers = emr.create_spark_session(cluster_dns, 'spark')
    session_url = emr.wait_for_idle_session(cluster_dns, headers)
    statement_response = emr.submit_statement(session_url,   '/root/airflow/dags/transform/movies.scala')
    emr.track_statement_progress(cluster_dns, statement_response.headers)
    emr.kill_spark_session(session_url)

The first three lines of this code helps to look up the EMR cluster details. This is used to create an interactive spark session on the EMR cluster using Apache Livy.

create_spark_session

Apache Livy creates an interactive spark session for each transform task. The code for which is shown below. SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. The Spark session is created by calling the POST /sessions API.

Note: You can also change different parameters like driverMemory, executor Memory, number of driver and executor cores as part of the API call.

# Creates an interactive scala spark session. 
# Python(kind=pyspark), R(kind=sparkr) and SQL(kind=sql) spark sessions can also be created by changing the value of kind.
def create_spark_session(master_dns, kind='spark'):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind}
    headers = {'Content-Type': 'application/json'}
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

submit_statement

Once the session has completed starting up, it transitions to the idle state. The transform task is then submitted to the session. The scala code is submitted as a REST API call to the Livy Server instead of the EMR cluster, to have good fault tolerance and concurrency.

# Submits the scala code as a simple JSON command to the Livy server
def submit_statement(session_url, statement_path):
    statements_url = session_url + '/statements'
    with open(statement_path, 'r') as f:
        code = f.read()
    data = {'code': code}
    response = requests.post(statements_url, data=json.dumps(data), headers={'Content-Type': 'application/json'})
    logging.info(response.json())
    return response

track_statement_progress

The progress of the statement can also be easily tracked and the logs are centralized on the Airflow webserver.

# Function to help track the progress of the scala code submitted to Apache Livy
def track_statement_progress(master_dns, response_headers):
    statement_status = ''
    host = 'http://' + master_dns + ':8998'
    session_url = host + response_headers['location'].split('/statements', 1)[0]
    # Poll the status of the submitted scala code
    while statement_status != 'available':
        # If a statement takes longer than a few milliseconds to execute, Livy returns early and provides a statement URL that can be polled until it is complete:
        statement_url = host + response_headers['location']
        statement_response = requests.get(statement_url, headers={'Content-Type': 'application/json'})
        statement_status = statement_response.json()['state']
        logging.info('Statement status: ' + statement_status)

        #logging the logs
        lines = requests.get(session_url + '/log', headers={'Content-Type': 'application/json'}).json()['log']
        for line in lines:
            logging.info(line)

        if 'progress' in statement_response.json():
            logging.info('Progress: ' + str(statement_response.json()['progress']))
        time.sleep(10)
    final_statement_status = statement_response.json()['output']['status']
    if final_statement_status == 'error':
        logging.info('Statement exception: ' + statement_response.json()['output']['evalue'])
        for trace in statement_response.json()['output']['traceback']:
            logging.info(trace)
        raise ValueError('Final Statement Status: ' + final_statement_status)
    logging.info('Final Statement Status: ' + final_statement_status)

The below is a snapshot of the centralized logs from the Airflow webserver.

Once the job is run successfully, the Spark session is ended and the EMR cluster is terminated.

Analyze the data in Amazon Athena

The output data in S3 can be analyzed in Amazon Athena by creating a crawler on AWS Glue. For information about automatically creating the tables in Athena, see the steps in Build a Data Lake Foundation with AWS Glue and Amazon S3.

Summary

In this post, we explored orchestrating a Spark data pipeline on Amazon EMR using Apache Livy and Apache Airflow. We created a simple Airflow DAG to demonstrate how to run spark jobs concurrently. You can modify this to scale your ETL data pipelines and improve latency. Additionally, we saw how Livy helps to hide the complexity to submit spark jobs via REST by using optimal EMR resources.

For more information about the code shown in this post, see AWS Concurrent Data Orchestration Pipeline EMR Livy. Feel free to leave questions and other feedback in the comments.

 


Additional Reading

If you found this post useful, be sure to check out Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy.

 


About the Author

Binal Jhaveri is a Big Data Engineer at Amazon Web Services. Her passion is to build big data products in the cloud. During her spare time, she likes to travel, read detective fiction and loves exploring the abundant nature that the Pacific Northwest has to offer.

 

 

 

Exploratory data analysis of genomic datasets using ADAM and Mango with Apache Spark on Amazon EMR

Post Syndicated from Alyssa Marrow original https://aws.amazon.com/blogs/big-data/exploratory-data-analysis-of-genomic-datasets-using-adam-and-mango-with-apache-spark-on-amazon-emr/

Exploratory data analysis of genomic datasets using ADAM and Mango with Apache Spark on Amazon EMR

As the cost of genomic sequencing has rapidly decreased, the amount of publicly available genomic data has soared over the past couple of years. New cohorts and studies have produced massive datasets consisting of over 100,000 individuals. Simultaneously, these datasets have been processed to extract genetic variation across populations, producing mass amounts of variation data for each cohort.

In this era of big data, tools like Apache Spark have provided a user-friendly platform for batch processing of large datasets. However, to use such tools as a sufficient replacement to current bioinformatics pipelines, we need more accessible and comprehensive APIs for processing genomic data. We also need support for interactive exploration of these processed datasets.

ADAM and Mango provide a unified environment for processing, filtering, and visualizing large genomic datasets on Apache Spark. ADAM allows you to programmatically load, process, and select raw genomic and variation data using Spark SQL, an SQL interface for aggregating and selecting data in Apache Spark. Mango supports the visualization of both raw and aggregated genomic data in a Jupyter notebook environment, allowing you to draw conclusions from large datasets at multiple resolutions.

With the combined power of ADAM and Mango, you can load, query, and explore datasets in a unified environment. You can interactively explore genomic data at a scale previously impossible using single node bioinformatics tools. In this post, we describe how to set up and run ADAM and Mango on Amazon EMR. We demonstrate how you can use these tools in an interactive notebook environment to explore the 1000 Genomes dataset, which is publicly available in Amazon S3 as a public dataset.

Configuring ADAM and Mango on Amazon EMR

First, you launch and configure an EMR cluster. Mango uses Docker containers to easily run on Amazon EMR. Upon cluster startup, EMR uses the following bootstrap action to install Docker and the required startup scripts. The scripts are available at /home/hadoop/mango-scripts.

aws emr create-cluster 
--release-label emr-5.14.0 \   
--name 'emr-5.14.0 Mango example' \   
--applications Name=Hadoop Name=Hive Name=Spark \   
--ec2-attributes KeyName=<your-ec2-key>,InstanceProfile=EMR_EC2_DefaultRole \   
--service-role EMR_DefaultRole \     
--instance-groups \     InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c5.4xlarge \     InstanceGroupType=CORE,InstanceCount=4,InstanceType=c5.4xlarge \   --region <your-aws-region> \   
--log-uri s3://<your-s3-bucket>/emr-logs/ \   
--bootstrap-actions \     
Name='Install Mango', Path="s3://aws-bigdata-blog/artifacts/mango-emr/install-bdg-mango-emr5.sh"

To start the Mango notebook, run the following:

/home/hadoop/mango-scripts/run-notebook.sh

This file sets up all of the environment variables that are needed to run Mango in Docker on Amazon EMR. In your terminal, you will see the port and Jupyter notebook token for the Mango notebook session. Navigate to this port on the public DNS URL of the master node for your EMR cluster.

Loading data from the 1000 Genomes Project

Now that you have a working environment, you can use ADAM and Mango to discover interesting variants in the child from the genome sequencing data of a trio (data from a mother, father, and child). This data is available from the 1000 Genomes Project AWS public dataset. In this analysis, you will view a trio (NA19685, NA19661, and NA19660) and search for variants that are present in the child but not present in the parents.

In particular, we want to identify genetic variants that are found in the child but not in the parents, known as de novo variants. These are interesting regions, as they can indicate sights of de novo variation that might contribute to multiple disorders.

You can find the Jupyter notebook containing these examples in Mango’s GitHub repository, or at /opt/cgl-docker-lib/mango/example-files/notebooks/aws-1000genomes.ipynb in the running Docker container for Mango.

First, import the ADAM and Mango modules and any Spark modules that you need:

# Import ADAM modules
from bdgenomics.adam.adamContext import ADAMContext
from bdgenomics.adam.rdd import AlignmentRecordRDD, CoverageRDD
from bdgenomics.adam.stringency import LENIENT, _toJava

# Import Mango modules
from bdgenomics.mango.rdd import GenomicVizRDD
from bdgenomics.mango.QC import CoverageDistribution

# Import Spark modules
from pyspark.sql import functions as sf

Next, create a Spark session. You will use this session to run SQL queries on variants.

# Create ADAM Context
ac = ADAMContext(spark)
genomicRDD = GenomicVizRDD(spark)

Variant analysis with Spark SQL

Load in a subset of variant data from chromosome 17:

genotypesPath = 's3://1000genomes/phase1/analysis_results/integrated_call_sets/ALL.chr17.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.gz'
genotypes = ac.loadGenotypes(genotypesPath)

# repartition genotypes to balance the load across memory
genotypes_df  = genotypes.toDF()

You can take a look at the schema by printing the columns in the dataframe.

# cache genotypes and show the schema
genotypes_df.columns

This genotypes dataset contains all samples from the 1000 Genomes Project. Therefore, you will next filter genotypes to only consider samples that are in the NA19685 trio, and cache the results in memory.

# trio IDs
IDs = ['NA19685', 'NA19661','NA19660']

# Filter by individuals in the trio
trio_df = genotypes_df.filter(genotypes_df["sampleId"].isin(IDs))

trio_df.cache()
trio_df.count()

Next, add a new column to your dataframe that determines the genomic location of each variant. This is defined by the chromosome (contigName) and the start and end position of the variant.

# Add ReferenceRegion column and group by referenceRegion
trios_with_referenceRegion = trio_df.withColumn('ReferenceRegion', 
                    sf.concat(sf.col('contigName'),sf.lit(':'), sf.col('start'), sf.lit('-'), sf.col('end')))

Now, you can query your dataset to find de novo variants. But first, you must register your dataframe with Spark SQL.

#  Register df with Spark SQL
trios_with_referenceRegion.createOrReplaceTempView("trios")

Now that your dataframe is registered, you can run SQL queries on it. For the first query, select the names of variants belonging to sample NA19685 that have at least one alternative (ALT) allele.

# filter by alleles. This is a list of variant names that have an alternate allele for the child
alternate_variant_sites = spark.sql("SELECT variant.names[0] AS snp FROM trios \
                                    WHERE array_contains(alleles, 'ALT') AND sampleId == 'NA19685'") 

collected_sites = map(lambda x: x.snp, alternate_variant_sites.collect())

For your next query, filter sites in which the parents have both reference alleles. Then filter these variants by the set produced previously from the child.

# get parent records and filter by only REF locations for variant names that were found in the child with an ALT
filtered1 = spark.sql("SELECT * FROM trios WHERE sampleId == 'NA19661' or sampleId == 'NA19660' \
            AND !array_contains(alleles, 'ALT')")
filtered2 = filtered1.filter(filtered1["variant.names"][0].isin(collected_sites))
snp_counts = filtered2.groupBy("variant.names").count().collect()

# collect snp names as a list
snp_names = map(lambda x: x.names, snp_counts)
denovo_snps = [item for sublist in snp_names for item in sublist]
denovo_snps[:10]

Now that you have found some interesting variants, you can unpersist your genotypes from memory.

trio_df.unpersist()

Working with alignment data

You have found a lot of potential de novo variant sites. Next, you can visually verify some of these sites to see if the raw alignments match up with these de novo hits.

First, load in the alignment data for the NA19685 trio:

# load in NA19685 exome from s3a

childReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19685.mapped.illumina.mosaik.MXL.exome.20110411.bam'
parent1ReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19660.mapped.illumina.mosaik.MXL.exome.20110411.bam'
parent2ReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19661.mapped.illumina.mosaik.MXL.exome.20110411.bam'

childReads = ac.loadAlignments(childReadsPath, stringency=LENIENT)
parent1Reads = ac.loadAlignments(parent1ReadsPath, stringency=LENIENT)
parent2Reads = ac.loadAlignments(parent2ReadsPath, stringency=LENIENT)

Note that this example uses s3a:// instead of s3:// style URLs. The reason for this is that the ADAM formats use Java NIO to access BAM files. To do this, we are using a JSR 203 implementation for the Hadoop Distributed File System to access these files. This itself requires the s3a:// protocol. You can view that implementation in this GitHub repository.

You now have data alignment data for three individuals in your trio. However, the data has not yet been loaded into memory. To cache these datasets for fast subsequent access to the data, run the cache() function:

# cache child RDD and count records
# takes about 2 minutes, on 4 c3.4xlarge worker nodes 
childReads.cache()

# Count reads in the child
childReads.toDF().count()
# Output should be 95634679

Quality control of alignment data

One popular analysis to visually re-affirm the quality of genomic alignment data is by viewing coverage distribution. Coverage distribution gives you an idea of the read coverage that you have across a sample.

Next, generate a sample coverage distribution plot for the child alignment data on chromosome 17:

# Calculate read coverage
# Takes 2-3 minutes
childCoverage = childReads.transform(lambda x: x.filter(x.contigName == "17")).toCoverage()

childCoverage.cache()
childCoverage.toDF().count()

# Output should be 51252612

Now that coverage data is calculated and cached, compute the coverage distribution of chromosome 17 and plot the coverage distribution:

# Calculate coverage distribution

# You can check the progress in the SparkUI by navigating to 
# <PUBLIC_MASTER_DNS>:8088 and clicking on the currently running Spark application.
cd = CoverageDistribution(sc, childCoverage)
x = cd.plot(normalize=True, cumulative=False, xScaleLog=True, labels="NA19685")

 

This looks pretty standard because the data you are viewing is exome data. Therefore, you can see a high number of sights with low coverage and a smaller number of genomic positions with more than 100 reads. Now that you are done with coverage, you can unpersist these datasets to clear space in memory for the next analysis.

childCoverage.unpersist()

Viewing sites with missense variants in the proband

After verifying alignment data and filtering variants, you have four genes with potential missense mutations in the proband, including YBX2, ZNF286B, KSR1, and GNA13. You can visually verify these sites by filtering and viewing the raw reads of the child and parents.

First, view the child reads. If you zoom in to the location of the GNA13 variant (63052580-63052581), you can see a heterozygous T to A call:

# missense variant at GNA13: 63052580-63052581 (SNP rs201316886)
# Takes about 2 minutes to collect data from workers
contig = "17"
start = 63052180
end = 63052981

genomicRDD.ViewAlignments(childReads, contig, start, end)

It looks like there indeed is a variant at this position, possibly a heterozygous SNP with alternate allele A. Look at the parent data to verify that this variant does not appear in the parents:

# view missense variant at GNA13: 63052580-63052581 in parent 1
contig = "17"
start = 63052180
end = 63052981

genomicRDD.ViewAlignments(parent1Reads, contig, start, end)

This confirms the filter that this variant is indeed present only in the proband, but not the parents.

Summary

To summarize, this post demonstrated how to set up and run ADAM and Mango in Amazon EMR. We demonstrated how to use these tools in an interactive notebook environment to explore the 1000 Genomes dataset, a publicly available dataset on Amazon S3. We used these tools inspect 1000 Genomes data quality, query for interesting variants in the genome, and validate results through the visualization of raw data.

For more information about Mango, see the Mango User Guide. If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Genomic Analysis with Hail on Amazon EMR and Amazon Athena, Interactive Analysis of Genomic Datasets Using Amazon Athena, and, on the AWS Compute Blog, Building High-Throughput Genomics Batch Workflows on AWS: Introduction (Part 1 of 4).

 


About the Author

Alyssa Marrow is a graduate student in the RISELab and Yosef Lab at the University of California Berkeley. Her research interests lie at the intersection of systems and computational biology. This involves building scalable systems and easily parallelized algorithms to process and compute on all that ‘omics data.

 

 

 

Encrypt data in transit using a TLS custom certificate provider with Amazon EMR

Post Syndicated from Remek Hetman original https://aws.amazon.com/blogs/big-data/encrypt-data-in-transit-using-a-tls-custom-certificate-provider-with-amazon-emr/

Many enterprises have highly regulated policies around cloud security. Those policies might be even more restrictive for Amazon EMR where sensitive data is processed.

EMR provides security configurations that allow you to set up encryption for data at rest stored on Amazon S3 and local Amazon EBS volumes. It also allows the setup of Transport Layer Security (TLS) certificates for the encryption of data in transit.

When in-transit encryption is enabled, EMR supports the following components by default:

  • Hadoop MapReduce Encrypted Shuffle.
  • Secure Hadoop RPC set to Privacy and using SASL, which is activated in EMR when data at rest encryption is enabled.
  • Secure Hadoop RPC set to Privacy and using SASL. This is activated in EMR when data at rest encryption is enabled in the security configuration.
  • Presto internal communication between nodes using SSL/TLS. This applies only to EMR version 5.6.0 and later.
  • Tez Shuffle Handler using TLS.
  • Internal RPC communication between Apache Spark
  • HTTP protocol communication with user interfaces, such as Spark History Server and HTTPS-enabled file servers encrypted using Spark’s Secure Sockets Layer (SSL) configuration.

For more information about EMR in-transit encryption, see Encryption Options.

A security configuration provides the following options to specify TLS certificates:

  1. As a path to a .zip file in an S3 bucket that contains all certificates
  2. Through a custom certificate provider as a Java class

In many cases, company security policies prohibit storing any type of sensitive information in an S3 bucket, including certificate private keys. For that reason, the only remaining option to secure data in transit on EMR is to configure the custom certificate provider.

In this post, I guide you through the configuration process, showing you how to secure data in transit on EMR using the TLS custom certificate provider.

Required knowledge

To walk through this solution, you should know or be familiar with:

Solution overview

The custom certificate provider is a Java class that implements the TLSArtifactsProvider interface and compiles it into a JAR file. The TLSArtifactsProvider interface is available in the AWS SDK for Java version 1.11.+.

The TLSArtifactsProvider interface provides the TLSArtifacs method, which as argument expects certificates.

To make this solution work, you need a secure place to store certificates that can also be accessed by Java code.

In this example, use Parameter Store, which supports encryption using the AWS Key Management Service (AWS KMS) key.

Another way would be to store encrypted certificates in Amazon DynamoDB.

The following diagram and steps show the configuration process from the Java standpoint:

 

  1. During bootstrap, EMR downloads the Java JAR file from the S3 bucket, and runs it on each node.
  2. Java invokes the Lambda function, requesting the value of a specific parameter key.
  3. Lambda calls Parameter Store to get the value. The value returned by Systems Manager remains encrypted.
  4. Lambda returns the encrypted value back to Java.
  5. Java decrypts the value using an AWS KMS API call.
  6. The decrypted value is converted to the correct format of the certificate.
  7. The process repeats for all certificates.
  8. Certificates are returned back to EMR through the TLSArtifactsProvider interface.

In this example, for the master node, I used a certificate signed by a certificate authority (CA) and wildcard self-signed certificate for slave nodes. Depending on requirements, you can use CA certificates for all nodes or only a self-signed wildcard certificate.

Implementing in-transit encryption

This section walks you through all aspects of implementation and configuration for in-transit encryption using a custom certificate provider.

Create a self-signed wildcard certificate

To create a self-signed wildcard certificate, you can use OpenSSL:

openssl req -x509 -newkey rsa:4096 -keyout inter-nodes.key -out inter-nodes.crt -days 365 -subj "/C=US/ST=MA/L=Boston/O=EMR/OU=EMR/CN=*.ec2.internal" -nodes


This command creates a self-signed, 4096-bit certificate.

Explanation of parameter:

-keyout – The output file in which to store the private key.

-out – The output file in which to store the certificate.

-days – The number of days for which to certify the certificate.

-subj – The subject name for a new request.  The CN must match the domain name specified in DHCP that is assigned to the virtual private cloud (VPC). The default is ec2.internal. The “*” prefix is the wildcard certificate.

-nodes – Allows you to create a private key without a password, which is without encryption.

For more information, see req command.

Upload certificates

To upload certificates to the Parameter Store, run the following AWS Command Line Interface (AWS CLI) command for each certificate file, including private keys:

aws ssm put-parameter --name <parameter key name> --key-id < KMS key ID> --type SecureString --value file://<path to certificate file>

The following are examples of uploaded CA and self-signed certificate files:

aws ssm put-parameter --name /emr/certificate --value fileb://emr-ca-certificate.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/private-key --value fileb://emr-ca-private-key.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-certificate --value fileb://inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-private-key --value fileb://inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

The following are examples of uploading certificates when the wildcard certificate is used on all nodes:

aws ssm put-parameter --name /emr/certificate --value fileb:// inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/private-key --value fileb:// inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-certificate --value fileb://inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-private-key --value fileb://inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

Using the Lambda function

The Lambda function in this solution is a broker that allows Java JAR to retrieve certificates from Parameter Store.

Create a new role for the Lambda function, using the following command:

aws iam create-role --role-name lambda-ssm-parameter-store-role --assume-role-policy-document "{\"Version\": \"2012-10-17\", \"Statement\": [{\"Effect\": \"Allow\",\"Principal\": {\"Service\": \"lambda.amazonaws.com\"},\"Action\": \"sts:AssumeRole\"}]}"

Grant permissions to Parameter Store, using the following command:

aws iam put-role-policy --role-name lambda-ssm-parameter-store-role --policy-name ssm --policy-document "{\"Version\": \"2012-10-17\",\"Statement\": [{\"Effect\": \"Allow\",\"Action\": \"ssm:GetParameter\",\"Resource\": \"*\"}]}"

Create a new Lambda function:

To create a new Lambda function, open the AWS Management Console, choose Lambda, and choose Create function. On the Create function page, complete the form as shown in the following screenshot:

Choose runtime as Python 2.7, and specify the role that you created for the Lambda function.

When the new function is created, add the following code in the Function code section:

import json
import boto3

ssm = boto3.client('ssm')

def lambda_handler(event, context):

    ssmResp = ssm.get_parameter(
        Name=event['ParameterName'],
        WithDecryption=False
    )

    paramValue = ssmResp['Parameter']['Value']
    return(paramValue)

Change the timeout to 1 minute, and then save the function.

Tag resources

For the Java class to call the Lambda function, you must provide information about the function name and names of parameter keys under which the certificates are stored.

To reuse the same Java JAR with different certificates and configurations, provide those values to Java through EMR tags, rather than embedding them in Java code.

In this example, I used the following tags:

  • ssm:ssl:certificate – The name of the Systems Manager parameter key storing the CA-signed certificate.
  • ssm:ssl:private-key – The name of the Systems Manager parameter key storing the CA-signed certificate private key.
  • ssm:ssl:inter-node-certificate – The name of the Systems Manager parameter key storing the self-signed certificate.
  • ssm:ssl:inter-node-private-key – The name of the Systems Manager parameter key storing the self-signed certificate private key.
  • tls:lambda-fn-name – The name of the Lambda function. In this example, this is get-ssm-parameter-lambda.

Use the Java class flow

This section describes the flow in the Java code only. You can download the full code alone with the compiled JAR file from GitHub. For more information, see the Java folder in the emr-tls-security GitHub repo.

Important

Because of EMR dependencies, all other methods must be implemented based on the AWS SDK for Java version 1.10.75. These dependencies do not include the TLSArtifactsProvider interface that should be imported from the AWS SDK for Java version 1.11.170 (aws-java-sdk-emr-1.11.170.jar).

All necessary dependencies are included in the example project.

The following is an example of the basic structure of the Java class, with an implementation of the TLSArtifactsProvider interface:

public class emrtls extends TLSArtifactsProvider {

            public emrtls() {

            }

            @Override

            public TLSArtifacts getTlsArtifacts() {

 

                        List<Certificate> crt = new ArrayList<Certificate>();

                        List<Certificate> crtCA = new ArrayList<Certificate>();

                        PrivateKey privkey;

           

                        //here code to retrieve certificates from secure location

                        // and assign them to local variables

                       

                        TLSArtifacts tls = new TLSArtifacts(privkey,crt,crtCA);

                        return tls;

            }

}

The code to add is related to getting certificates from a secure location.

In the provided code example from GitHub, the following logic was implemented. I’ve listed the methods used in each step.

  1. Read the names of the Systems Manager parameter key. Also read the name of the AWS Lambda function from the EMR tags (see “Tagging” section) – readTags()
  2. Invoke the Lambda function to download certificates from Parameter Store – callLambda():
    • Decrypt the values returned by Lambda using the KMS API call – decryptValue().
    • Assign decrypted values to local variables.
  3. If needed, save CA-signed certificates to a local disk. For more information, see Other Communication – Hue section later in this post– createDirectoryForCerts() and writeCert().
  4. Convert certificates to an X509 format – getX509FromString().
  5. Convert the private key to the correct format – getPrivateKey().
  6. Call the getTlsArtifacts() method to provide certificates in arguments.

You can use a wildcard certificate for all nodes without changing code. Reference the same Systems Manager parameter key in ssm:ssl:certificate/ssm:ssl:private-key, and in the ssm:ssl:inter-node-certificate/ ssm:ssl:inter-node-private-key in EMR tags.

If the implemented methods in the example code meet requirements, you can use the provided Java JAR file in the EMR security configuration, as described in the next section. Otherwise, any changes in code require a compile of Java code into a JAR file.

Create the EMR security configuration

Before creating the security configuration, upload the compiled Java JAR file to an S3 bucket.

To create the security configuration:

  1. Log in to the Amazon EMR console.
  2. Choose Security configurations, Create.
  3. Type a name for your new security configuration; for example, emr-tls-ssm
  4. Select In-transit encryption.
  5. Under TLS certificate provider, for Certificate provider type, choose Custom.
  6. For S3 object, type the path to the uploaded Java JAR file.
  7. For Certificate provider class, type the name of the Java class. In the example code, the name is emrtls.
  8. Configure the At-rest encryption, as required.
  9. Choose Create.

Modify the instance profile role

Applications running on EMR assumes and uses the EMR role for EC2 to interact with other AWS services.

To grant Java permissions to invoke Lambda, and to decrypt certificates, add the following policy to your EC2 instance profile role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "TLS",
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:us-east-1:xxxxxxxxxx:function:get-ssm-parameter-lambda",
                "arn:aws:kms:us-east-1:xxxxxxxxxx:key/<your KMS key used to encrypt certificates in AWS Systems Manager"
            ]
        }
    ]
}

Note

Before creating the policy, update resources with the correct Amazon Resource Name (ARN) for the Lambda function and KMS key.

Other available configurations

In addition to the applications that natively support in-transit encryption in EMR, the custom TLS certificate provider can also be used to secure communication (HTTPS) for other applications like Presto, Hue, and Zeppelin.

The sections that follow describe the configuration of each application that works with the certificates set up by the TLS security configuration.

Presto

For Presto, most configuration is done by EMR when TLS certificates are applied.

Depending on the type of certificates used, there are two additional configurations that must be added:

  1. When the CA-signed certificate with a single common name (not wildcard) is set on the master node, additional configurations are required:
    • The certificate common name must be registered in DNS. The EMR cluster must be able to resolve that name to the IP address of the master node. One solution would be to run a script on the bootstrap action to register the IP address of the EMR master node and name in DNS.
    • The Discovery URI in the Presto configuration file must match the certificate common name. The value of uri must be changed on all nodes. This can be accomplished by two provided scripts.

Each script must be uploaded to an S3 bucket to which the EMR cluster has permission.

The first script, emr-presto-conf.sh, must be run on the EMR bootstrap action, as follows:

          {
            "Name": "PrestoConfiguration",
            "ScriptBootstrapAction": {
              "Path": "s3://xxxxx/emr-presto-conf.sh",
              "Args": [ "emr.mycluster.com" ]
            }
          }
Where the value of Args is the certificate common name

The PrestoConfiguration bootstrap action downloads and runs a script (presto-update-dicovery-uri.sh) as a background process. This script waits for the Presto server to be installed and then modify the configuration files.

Before uploading the emr-presto-conf.sh script to the Amazon S3 bucket, change the path to “presto-update-dicovery-uri.sh”

Both scripts can be downloaded from GitHub:

https://github.com/aws-samples/emr-tls-security/tree/master/scripts

2. When a self-signed wildcard certificate is used on the master node, the certificate must be added to the Java default truststore. This can be accomplished by running the following script:

#!/bin/bash

truststorePass=$(grep -Po "(?<=^internal-communication.https.keystore.key = ).*" /etc/presto/conf/config.properties)

sudo keytool -importkeystore -srckeystore /usr/share/aws/emr/security/conf/truststore.jks -destkeystore /usr/lib/jvm/java/jre/lib/security/cacerts -deststorepass changeit -srcstorepass $truststorePass

The previous script can be run on the EMR step. The following is an AWS CloudFormation snippet:

"EMRPrestoTrustedStorStep": {
      "Type": "AWS::EMR::Step",
      "Properties": {
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
          "Jar": "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar",
          "Args": [
            "s3://xxxxxx/presto-update-trusted-store.sh"
          ]
        },
        "JobFlowId": {
          "Ref": "EMRCluster"
        },
        "Name": "EMR-Setup-Presto-Trusted-Store"
      }
    }

Hue

To configure access to the Hue UI over HTTPS, the path to the certificate and private key files must be specified in the hue.ini file. Because our Java class has methods, createDirectoryForCerts() and writeCert(), which support exporting TLS certificates to the local disk, the remaining configuration should point to those files in the hue.ini file.

This configuration can be applied by adding the following configuration to the EMR cluster:

         [{
            "Classification": "hue-ini",
            "Configurations": [
              {
                "Classification": "desktop",
                "ConfigurationProperties": {
                  	"ssl_certificate": "/etc/certs/public.crt",
                 	"ssl_private_key": "/etc/certs/private.key"
                }
	}]
          }]

The port for the HTTPS connection to Hue remains the same. The default is: 8888

Zeppelin

Unlike Hue, Zeppelin configuration files reference certificates from the Java keystore.

Because EMR already added all certificates to the Java keystore, the only modification needed is to reference the same Java keystore files and password in zeppelin-site.xml.

The path to the Java keystore and the password can be read directly from the Presto configuration file.

This configuration can be done by running the following script on EMR:

#!/bin/bash
sudo cp /etc/zeppelin/conf/zeppelin-site.xml.template /etc/zeppelin/conf/zeppelin-site.xml
truststorePath=$(grep -Po "(?<=^internal-communication.https.keystore.path = ).*" /etc/presto/conf/config.properties)
truststorePass=$(grep -Po "(?<=^internal-communication.https.keystore.key = ).*" /etc/presto/conf/config.properties)
keystorePath=$(grep -Po "(?<=^http-server.https.keystore.path = ).*" /etc/presto/conf/config.properties)
keystorePass=$(grep -Po "(?<=^http-server.https.keystore.key = ).*" /etc/presto/conf/config.properties)
keymanager=$(grep -Po "(?<=^http-server.https.keymanager.password = ).*" /etc/presto/conf/config.properties)
sudo sed -i '/<name>zeppelin.server.port<\/name>/!b;n;c<value>8890<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.server.ssl.port<\/name>/!b;n;c<value>7773<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl<\/name>/!b;n;c<value>true<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl.keystore.path<\/name>/!b;n;c<value>'"$keystorePath"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl.keystore.password<\/name>/!b;n;c<value>'"$keystorePass"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl.truststore.path<\/name>/!b;n;c<value>'"$truststorePath"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
CONTENT1="<property>\n  <name>zeppelin.ssl.truststore.password</name>\n  <value>${truststorePass}</value>\n</property>"
sudo sed -i '/<\/configuration>/i'"$CONTENT1" /etc/zeppelin/conf/zeppelin-site.xml
CONTENT2="<property>\n  <name>zeppelin.ssl.key.manager.password</name>\n  <value>${keymanager}</value>\n</property>"
sudo sed -i '/<\/configuration>/i'"$CONTENT2" /etc/zeppelin/conf/zeppelin-site.xml
sudo stop zeppelin
sudo start zeppelin

The previous script sets the HTTPS port in Zeppelin to 7773, which can be changed as needed.

GitHub example

On GitHub, you can download an example of the CloudFormation templates that can be used to launch an EMR cluster with all the security features discussed in this post.

The following is an example of the AWS CLI command required to launch an EMR cluster with security features.

Before running this command, you must change the <key pair name>, <subnet id> and <security group id> values to the correct one. 

aws emr create-cluster --configurations https://s3.amazonaws.com/tls-blog-cf/emr-configuration.json \

--applications Name=Hadoop Name=Hive Name=Presto Name=Spark Name=Hue Name=Zeppelin \

--instance-groups 'InstanceGroupType=MASTER,InstanceCount=1,InstanceType='m4.xlarge'' \

'InstanceGroupType=CORE,InstanceCount='2',InstanceType='m4.xlarge'' \

--release-label emr-5.14.0 \

--service-role EMR_DefaultRole \

--ec2-attributes KeyName=<key pair name>,SubnetId=<subnet id>,\

EmrManagedMasterSecurityGroup=<security group id>,\

AdditionalMasterSecurityGroups=<security group id>,\

EmrManagedSlaveSecurityGroup=<security group id>,\

InstanceProfile=EMR_EC2_DefaultRole \

--name EMR-TLS-Demo \

--visible-to-all-users \

--security-configuration emr-tls-ssm \

--steps Type=CUSTOM_JAR,Name=ZeppelinSSL,ActionOnFailure=CONTINUE,\

Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,\

Args=s3://tls-blog-cf/zeppelin-ssl.sh \

Type=CUSTOM_JAR,Name=PrestoSSL,ActionOnFailure=CONTINUE,\

Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,\

Args=s3://tls-blog-cf/presto-update-trusted-store.sh \

--tags ssm:ssl:private-key="/emr/private-key" \

ssm:ssl:certificate="/emr/certificate" \

ssm:ssl:inter-node-private-key="/emr/inter-nodes-private-key" \

ssm:ssl:inter-node-certificate="/emr/inter-nodes-certificate" \

tls:lambda-fn-name="get-ssm-parameter-lambda" \

--region us-east-1

Note

Before running any of the provided examples, the certificates must be uploaded to the Parameter Store.

For test purposes, you can download the shell script from:

https://github.com/aws-samples/emr-tls-security/blob/master/scripts/upload-certificates.sh

This script uploads self-signed certificates issued for *.ec2.internal to the Parameter Store. Make sure that the DNS associated with your VPC, where you launch the EMR cluster, matches the certificate.

Use the following command:

sh upload-certificates.sh <KMS key ID> <AWS Region>

Where:

KMS key ID – is the identifier of the KMS key that is used to encrypt certificates.

AWS Region – is the Region where certificates are uploaded.

Validation

There are a few methods to validate whether the TLS certificate was installed correctly.

Hue

To test the HTTPS connection to Hue from the browser, connect to https://<EMR URL or IP>:8888

Zeppelin
To test the HTTPS connection to Zeppelin from the browser, connect to https://<EMR URL or IP>:7773

Port 7773 is the one used in this example. If you changed it, make sure you’re connecting to the port under which the Zeppelin is running on your EMR cluster.

There’s something to remember in both of these scenarios. If the certificate doesn’t match the provided URL (or when you created a self-signed certificate), you get a warning message in the browser that the certificate is invalid.

Presto

You can test Presto using one of the following methods:

  1. HTTPS connection to the Presto UI
  2. Using the Presto CLI

To test the connection to Presto UI from the browser, connect to https://<EMR URL or IP>:8446

To test the certificate using the Presto CLI, follow these steps:

  1. Connect (SSH) to the EMR master node
  2. Create a test Hive table by running the following command:

hive -e “create table test1 (id int, name string); insert into test1 values (1, ‘John’), (2,’Robert’), (3,’David’);”

  1. Run the following Presto command:

presto-cli –server https://master-node-url:8446 –schema default –catalog hive –execute ‘select count(*) from test;’

Change ‘master-node-url’ to the correct value.

If the command succeeds, you should see 3 as the command output.

Conclusion

Amazon EMR security configurations provide options to encrypt data at rest, but also data in transit.

This post demonstrated how to create and apply the TLS custom certificate provider to an EMR cluster to secure data in-transit without storing certificate’s private key in an S3 bucket. For a company with strict policies, this might be the only solution to encrypt data in-transit.

 


Additional Reading

If you found this post useful, be sure to check out Build a Multi-Tenant Amazon EMR Cluster with Kerberos, Microsoft Active Directory Integration and IAM Roles for EMRFS and Encrypt Data At-Rest and In-Flight on Amazon EMR with Security Configurations.

 


About the Author

Remek Hetman is a Senior Cloud Infrastructure Architect with Amazon Web Services Professional Services. He works with AWS enterprise customers, providing technical guidance and assistance for Infrastructure, DevOps, and Big Data to help them make the best use of AWS services. Outside of work, he enjoys spending his time actively, and pursuing his passion – astronomy.

 

Best Practices for resizing and automatic scaling in Amazon EMR

Post Syndicated from Brandon Scheller original https://aws.amazon.com/blogs/big-data/best-practices-for-resizing-and-automatic-scaling-in-amazon-emr/

You can increase your savings by taking advantage of the dynamic scaling feature set available in Amazon EMR. The ability to scale the number of nodes in your cluster up and down on the fly is among the major features that make Amazon EMR elastic. You can take advantage of scaling in EMR by resizing your cluster down when you have little or no workload. You can also scale your cluster up to add processing power when the job gets too slow. This allows you to spend just enough to cover the cost of your job and little more.

Knowing the complex logic behind this feature can help you take advantage of it to save on cluster costs. In this post, I detail how EMR clusters resize, and I present some best practices for getting the maximum benefit and resulting cost savings for your own cluster through this feature.

EMR scaling is more complex than simply adding or removing nodes from the cluster. One common misconception is that scaling in Amazon EMR works exactly like Amazon EC2 scaling. With EC2 scaling, you can add/remove nodes almost instantly and without worry, but EMR has more complexity to it, especially when scaling a cluster down. This is because important data or jobs could be running on your nodes.

To prevent data loss, Amazon EMR scaling ensures that your node has no running Apache Hadoop tasks or unique data that could be lost before removing your node. It is worth considering this decommissioning delay when resizing your EMR cluster. By understanding and accounting for how this process works, you can avoid issues that have plagued others, such as slow cluster resizes and inefficient automatic scaling policies.

When an EMR scale cluster is scaled down, two different decommission processes are triggered on the nodes that will be terminated. The first process is the decommissioning of Hadoop YARN, which is the Hadoop resource manager. Hadoop tasks that are submitted to Amazon EMR generally run through YARN, so EMR must ensure that any running YARN tasks are complete before removing the node. If for some reason the YARN task is stuck, there is a configurable timeout to ensure that the decommissioning still finishes. When this timeout happens, the YARN task is terminated and is instead rescheduled to a different node so that the task can finish.

The second decommission process is that of the Hadoop Distributed File System or HDFS. HDFS stores data in blocks that are spread through the EMR cluster on any nodes that are running HDFS. When an HDFS node is decommissioning, it must replicate those data blocks to other HDFS nodes so that they are not lost when the node is terminated.

So how can you use this knowledge in Amazon EMR?

Tips for resizing clusters

The following are some issues to consider when resizing your clusters.

EMR clusters can use two types of nodes for Hadoop tasks: core nodes and task nodes. Core nodes host persistent data by running the HDFS DataNode process and run Hadoop tasks through YARN’s resource manager. Task nodes only run Hadoop tasks through YARN and DO NOT store data in HDFS.

When scaling down task nodes on a running cluster, expect a short delay for any running Hadoop task on the cluster to decommission. This allows you to get the best usage of your task node by not losing task progress through interruption. However, if your job allows for this interruption, you can adjust the one hour default timeout on the resize by adjusting the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property (in EMR 5.14) in yarn-site.xml. When this process times out, your task node is shut down regardless of any running tasks. This process is usually relatively quick, which makes it fast to scale down task nodes.

When you’re scaling down core nodes, Amazon EMR must also wait for HDFS to decommission to protect your data. HDFS can take a relatively long time to decommission. This is because HDFS block replication is throttled by design through configurations located in hdfs-site.xml. This in turn means that HDFS decommissioning is throttled. This protects your cluster from a spiked workload if a node goes down, but it slows down decommissioning. When scaling down a large number of core nodes, consider adjusting these configurations beforehand so that you can scale down more quickly.

For example, consider this exercise with HDFS and resizing speed.

The HDFS configurations, located in hdfs-site.xml, have some of the most significant impact on throttling block replication:

  • datanode.balance.bandwidthPerSec: Bandwidth for each node’s replication
  • namenode.replication.max-streams: Max streams running for block replication
  • namenode.replication.max-streams-hard-limit: Hard limit on max streams
  • datanode.balance.max.concurrent.moves: Number of threads used by the block balancer for pending moves
  • namenode.replication.work.multiplier.per.iteration: Used to determine the number of blocks to begin transfers immediately during each replication interval

(Beware when modifying: Changing these configurations improperly, especially on a cluster with high load, can seriously degrade cluster performance.)

Cluster resizing speed exercise

Modifying these configurations can speed up the decommissioning time significantly. Try the following exercise to see this difference for yourself.

  1. Create an EMR cluster with the following hardware configuration:
  • Master: 1 node – m3.xlarge
  • Core: 6 nodes – m3.xlarge
  1. Connect to the master node of your cluster using SSH (Secure Shell).

For more information, see Connect to the Master Node Using SSH in the Amazon EMR documentation.

  1. Load data into HDFS by using the following jobs:
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar teragen 1000000000 /user/hadoop/data1/
$ s3-dist-cp --src s3://aws-bigdata-blog/artifacts/ClusterResize/smallfiles25k/ --dest  hdfs:///user/hadoop/data2/
  1. Edit your hdfs-site.xml configs:
$ sudo vim /etc/hadoop/conf/hdfs-site.xml

Then paste in the following configuration setup in the hdfs-site properties.

Disclaimer: These values are relatively high for example purposes and should not necessarily be used in production. Be sure to test config values for production clusters under load before modifying them.

<property>
  <name>dfs.datanode.balance.bandwidthPerSec</name>
  <value>100m</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams</name>
  <value>100</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams-hard-limit</name>
  <value>200</value>
</property>

<property>
  <name>dfs.datanode.balance.max.concurrent.moves</name>
  <value>500</value>
</property>

<property>
  <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
  <value>30</value>
</property>
  1. Resize your EMR cluster from six to five core nodes, and look in the EMR events tab to see how long the resize took.
  2. Repeat the previous steps without modifying the configurations, and check the difference in resize time.

While performing this exercise, I saw resizing time lower from 45+ minutes (without config changes) down to about 6 minutes (with modified hdfs-site configs). This exercise demonstrates how much HDFS is throttled under default configurations. Although removing these throttles is dangerous and performance using them should be tested first, they can significantly speed up decommissioning time and therefore resizing.

The following are some additional tips for resizing clusters:

  • Shrink resizing timeouts. You can configure EMR nodes in two ways: instance groups or instance fleets. For more information, see Create a Cluster with Instance Fleets or Uniform Instance Groups. EMR has implemented shrink resize timeouts when nodes are configured in instance fleets. This timeout prevents an instance fleet from attempting to resize forever if something goes wrong during the resize. It currently defaults to one day, so keep it in mind when you are resizing an instance fleet down.

If an instance fleet shrink request takes longer than one day, it finishes and pauses at however many instances are currently running. On the other hand, instance groups have no default shrink resize timeout. However, both types have the one-hour YARN timeout described earlier in the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property (in EMR 5.14) in yarn-site.xml.

  • Watch out for high frequency HDFS writes when resizing core nodes. If HDFS is receiving a lot of writes, it will modify a large number of blocks that require replication. This replication can interfere with the block replication from any decommissioning core nodes and significantly slow down the resizing process.

Setting up policies for automatic scaling

Although manual scaling is useful, a majority of the time cluster resizes are executed dynamically through Amazon EMR automatic scaling. Generally, the details of the automatic scaling policy must be tailored to the specific Hadoop job, so I won’t go into detail there. Instead, I provide some general guidelines for setting up your cluster’s auto scaling policies.

The following are some considerations when setting up your auto scaling policy.

Metrics for scaling

Choose the right metrics for your node types to trigger scaling. For example, scaling core nodes solely on the YARNMemoryAvailablePercentage metric doesn’t make sense. This is because you would be increasing/decreasing HDFS total size when really you only need more processing power. Scaling task nodes on HDFSUtilization also doesn’t make sense because you would want more HDFS storage space that does not come with task nodes. A common automatic scaling metric for core nodes is HDFSUtilization. Common auto scaling metrics for task nodes include ContainerPending-Out and YarnMemoryAvailablePercentage.

Note: Keep in mind that Amazon EMR currently requires HDFS, so you must have at least one core node in your cluster. Core nodes can also provide CPU and memory resources. But if you don’t need to scale HDFS, and you just need more CPU or memory resources for your job, we recommend that you use task nodes for that purpose.

Scaling core nodes

As described earlier, one of the two EMR node types in your cluster is the core node. Core nodes run HDFS, so they have a longer decommissioning delay. This means that they are slow to scale and should not be aggressively scaled. Only adding and removing a few core nodes at a time will help you avoid scaling issues. Unless you need the HDFS storage, scaling task nodes is usually a better option. If you find that you have to scale large numbers of core nodes, consider changing hdfs-site.xml configurations to allow faster decommission time and faster scale down.

Scaling task nodes

Task nodes don’t run HDFS, which makes them perfect for aggressively scaling with a dynamic job. When your Hadoop task has spikes of work between periods of downtime, this is the node type that you want to use.

You can set up task nodes with a very aggressive auto scaling policy, and they can be scaled up or down easily. If you don’t need HDFS space, you can use task nodes in your cluster.

Using Spot Instances

Automatic scaling is a perfect time to use EMR Spot Instance types. The tendency of Spot Instances to disappear and reappear makes them perfect for task nodes. Because these task nodes are already used to scale in and out aggressively, Spot Instances can have very little disadvantage here. However, for time-sensitive Hadoop tasks, On-Demand Instances might be prioritized for the guaranteed availability.

Scale-in vs. scale-out policies for core nodes

Don’t fall into the trap of making your scale-in policy the exact opposite of your scale-out policy, especially for core nodes. Many times, scaling in results in additional delays for decommissioning. Take this into account and allow your scale-in policy to be more forgiving than your scale-out policy. This means longer cooldowns and higher metric requirements to trigger resizes.

You can think of scale-out policies as easily triggered with a low cooldown and small node increments. Scale-in policies should be hard to trigger, with larger cooldowns and node increments.

Minimum nodes for core node auto scaling

One last thing to consider when scaling core nodes is the yarn.app.mapreduce.am.labels property located in yarn-site.xml. In Amazon EMR, yarn.app.mapreduce.am.labels is set to “CORE” by default, which means that the application master always runs on core nodes and not task nodes. This is to help prevent application failure in a scenario where Spot Instances are used for the task nodes.

This means that when setting a minimum number of core nodes, you should choose a number that is greater than or at least equal to the number of simultaneous application masters that you plan to have running on your cluster. If you want the application master to also run on task nodes, you should modify this property to include “TASK.” However, as a best practice, don’t set the yarn.app.mapreduce.am.labels property to TASK if Spot Instances are used for task nodes.

Aggregating data using S3DistCp

Before wrapping up this post, I have one last piece of information to share about cluster resizing. When resizing core nodes, you might notice that HDFS decommissioning takes a very long time. Often this is the result of storing many small files in your cluster’s HDFS. Having many small files within HDFS (files smaller than the HDFS block size of 128 MB) adds lots of metadata overhead and can cause slowdowns in both decommissioning and Hadoop tasks.

Keeping your small files to a minimum by aggregating your data can help your cluster and jobs run more smoothly. For information about how to aggregate files, see the post Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3.

Summary

In this post, you read about how Amazon EMR resizing logic works to protect your data and Hadoop tasks. I also provided some additional considerations for EMR resizing and automatic scaling. Keeping these practices in mind can help you maximize cluster savings by allowing you to use only the required cluster resources.

If you have questions or suggestions, please leave a comment below.

 


Additional Reading

If you found this post useful, be sure to check out Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3 and Dynamically Scale Applications on Amazon EMR with Auto Scaling.

 


About the Author

Brandon Scheller is a software development engineer for Amazon EMR. His passion lies in developing and advancing the applications of the Hadoop ecosystem and working with the open source community. He enjoys mountaineering in the Cascades with his free time.

Build a blockchain analytic solution with AWS Lambda, Amazon Kinesis, and Amazon Athena

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/big-data/build-a-blockchain-analytic-solution-with-aws-lambda-amazon-kinesis-and-amazon-athena/

There are many potential benefits to using a blockchain. A blockchain is a distributed data structure that can record transactions in a verifiable and immutable manner. Depending upon the use case, there are opportunities for reducing costs, improving speed and efficiency, stronger regulatory compliance, and greater resilience and scalability.

Early adopters of the blockchain are finding innovative ways of using it in such areas as finance, healthcare, eGovernment, and non-profit organizations. The blockchain was even initially pioneered as the key technology behind the cryptocurrency Bitcoin.

Many of the opportunities to use blockchains arise from their design. They are typically large-scale distributed systems that often consist of many thousands of nodes. It can be challenging to gain insight into user activity, events, anomalies, and other state changes on a blockchain. But AWS analytics services provide the ability to analyze blockchain applications and provide meaningful information about these areas.

Walkthrough

In this post, we’ll show you how to:

You can readily adapt this Ethereum deployment and the blockchain analytics for use with a wide range of blockchain scenarios.

Prerequisites

This post assumes that you are familiar with AWS and Ethereum. The following documentation provides background reading to help you perform the steps described in this post:

Additionally, it’s useful to be familiar with Amazon Kinesis, AWS Lambda, Amazon QuickSight, and Amazon Athena to get the most out of this blog post. For more information, see:

For an introduction to serverless computing with AWS Lambda, see Introduction to AWS Lambda – Serverless Compute on Amazon Web Services.

Blockchain 101

Before we proceed with the solution in this post, we’ll provide a short discussion regarding blockchains and Ethereum, which is the blockchain implementation used in this solution.

In short, blockchains are a means for achieving consensus. The motivation behind blockchain was in allowing the Bitcoin network to agree upon the order of financial transactions while resisting vulnerability, malicious threats, and omission errors. Other blockchain implementations are used to agree upon the state of generic computation. This is achieved through a process called mining, whereby an arbitrary computational problem is solved to make falsifying transactions computationally challenging.

Ethereum is a major blockchain implementation. Unlike Bitcoin and other earlier blockchain systems, Ethereum is not solely a cryptocurrency platform, though it does have its own cryptocurrency called Ether. Ethereum extends the blockchain concept by building an Ethereum virtual machine (VM) that is Turing-complete on top of the blockchain. This allows for the development of smart contracts, which are programs that run on the blockchain. The appeal of smart contracts is the ability to translate natural language contracts, such as insurance contracts, into code that can run on Ethereum. This allows contractual agreements to be built without the need for a centralized authority, such as a bank or notary, potentially decreasing time to market and reducing costs.

An overview of the blockchain solution

The following is an overview of the solution provided in this post. The solution consists of:

  • An Ethereum blockchain running on Amazon Elastic Container Service (Amazon ECS) via the AWS Blockchain Template
  • An Application Load Balancer, providing access to the various Ethereum APIs.
  • A Lambda function, which deploys a smart contract to the blockchain
  • A Lambda function, which runs transactions against the smart contract
  • A Lambda function, which listens for events on the smart contract and pushes those events to Amazon Kinesis
  • An Amazon DynamoDB table used to share the blockchain state between Lambda functions
  • A blockchain analytics pipeline that uses Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, Amazon Kinesis Data Streams, and Amazon Athena.
  • An analytics dashboard built using Amazon QuickSight

The solution is presented in the following architectural diagram:

As shown, the solution is comprised of two main portions:

  • The blockchain hosted on Amazon Elastic Compute Cloud (Amazon EC2) and the Lambda functions that interact with the blockchain.
  • The analytics pipeline based around Kinesis that consumes data from the blockchain.

The AWS CloudFormation template we provide deploys the left side of that architecture diagram up to and including Kinesis Data Streams. It is the right side of the diagram that we’re going to build in this post.

Create the initial resources

  1. First, download the AWS CloudFormation template from: https://s3.amazonaws.com/blockchainblog/blockchainblogpost.template
  2. Use AWS CloudFormation to launch the template. The AWS CloudFormation stack deploys a virtual private cloud (VPC), two subnets, and a series of Lambda functions, which interact with the blockchain. This provides a foundation on which to build the analytics pipeline. You can either provide your own CIDR blocks or use the default parameters. Each subnet must have at least eight IP addresses.
  3. Deploy the AWS Blockchain Templates. The AWS Blockchain Templates make it efficient to deploy Ethereum and Hyperledger blockchain networks on AWS. In this case, we’re deploying an Ethereum network into the VPC created by the AWS CloudFormation template in step 2.
  4. Launch the following AWS CloudFormation template: https://aws-blockchain-templates-us-east-1.s3.us-east-1.amazonaws.com/ethereum/templates/latest/ethereum-network.template.yaml This template requires a number of parameters:
  • Set the Initial List of Accounts to the following predefined accounts the Lambda functions use:
0x34db0A1D7FE9D482C389b191e703Bf0182E0baE3,0xB3Bbce5d76aF28EcE4318c28479565F802f96808,0x877108a8825222cf669Ca9bFA3397D6973fE1640,0xb272056E07C94C7E762F642685bE822df6d08D03,0x0c00e92343f7AA255e0BBC17b21a02f188b53D6C,0xaDf205a5fcb846C4f8D5e9f5228196e3c157e8E0,0x1373a92b9BEbBCda6B87a4B5F94137Bc64E47261,0x9038284431F878f17F4387943169d5263eA55650,0xe1cd3399F6b0A1Ef6ac8Cebe228D7609B601ca8a,0x0A67cCC3FD9d664D815D229CEA7EF215d4C00A0a
  • In VPC Network Configuration:
    • Set the VPC ID to the blockchainblog VPC created by the first AWS CloudFormation template.
    • Add the blockchainblog-public subnet to the list of subnets to use.
    • Add blockchainblog-public and blockchainblog-private to the list of ALB subnets.
  • In Security Configuration:
    • Choose your Amazon EC2 key pair.
    • Provide the blockchainblog security group.
    • Provide the blockchainblog-ec2-role for the Amazon EC2 role.
    • Provide the blockchainblog-ecs-role for the Amazon ECS role.
    • Set the ALB security group to the blockchainblog security group.
  1. Leave all other variables unchanged, create the template, and wait for all resources to be deployed. This deploys an Ethereum blockchain, starts the mining process, and exposes the Web3 API through an Application Load Balancer.

After the resources are created, move on to deploying the smart contract.

Deploy a smart contract

To use the blockchain, deploy a smart contract to it. This smart contract is not complex — it provides the functions for holding an auction.

The auction contract represents a public auction, which is an auction whereby all parties involved can be identified. The user offering the item to be auctioned deploys the contract and other users can bid using the contract. The auction is considered completed after a pre-defined number of blocks have been mined. When the auction ends, losing bids can then be withdrawn and the funds returned to the bidders. Later, the user who created the auction can withdraw the funds of the winning bid.

Note that the contract does nothing to ensure that the winner receives the commodity in question. In fact, this contract is entirely separate from what is being auctioned. The contract could be extended to provide this functionality, but for the scope of this post, we’re keeping the contract simple.

The auction contract is located at https://s3.amazonaws.com/blockchainblog/Auction.sol.

Examine the auction contract

The auction contract is automatically pulled by our Lambda function and deployed to our blockchain. This contract is written in a domain-specific language called Solidity. The syntax is inspired by the C family of languages; however, unlike C it doesn’t compile to object code. Instead, it compiles to bytecode, which runs on the Ethereum VM.

This smart contract has two functions: bid and withdraw. Bid allows users to bid in the auction, and withdraw allows users to withdraw funds from the contract when the auction has finished. The auction owner can obtain the winning bid and the losers can recoup their funds. Note that the data structure BidEvent is similar to a C struct, and is how we’ll trigger Solidity events. The Solidity events are captured and sent to our analytics pipeline.

Now it’s time to deploy our smart contract, run transactions against it, and listen for events by using pre-built Lambda functions. The following diagram shows the interactions of these Lambda functions:

DeployContract is a Lambda function created by the AWS CloudFormation stack that we deployed earlier. This function takes our Solidity source code from the Amazon Simple Storage Service (Amazon S3) bucket, compiles it to EVM bytecode using the solc compiler, deploys that to our blockchain, and stores the blockchain address of the contract in a DynamoDB table. The function interacts with the Ethereum blockchain on our Amazon EC2 instance via the web3 1.0.0 API. You can see the source code for this function at https://s3.amazonaws.com/blockchainblog/DeployContract.zip.

After deploying the AWS CloudFormation template, wait about 5 minutes before deploying the contract to give the blockchain time to start the mining process. The majority of this time is the blockchain generating the initial directed acyclic graph (DAG).

DeployContract can be invoked in the Lambda console by testing it with an empty test event. Before invoking the function, provide it with the address of the blockchain. To do this, locate the output of the AWS Blockchain Template and obtain the EthJSONRPCURL value from the output. Later, provide this value in an environment variable named BLOCKCHAIN_HOST, for the DeployContract function, as shown in the following example:

Now invoke the DeployContract function. It should print various states, including the blockchain address of the deployed contract and the JSON ABI of the contract. After the function completes, the contract is deployed to our private blockchain and available for use. If the function produces an error, it’s likely because the blockchain has not yet been initialized. Wait a few minutes after creating the AWS CloudFormation template before invoking DeployContract.

Execute Transactions

To generate some transaction data to analyze, we must first have some transactions. To get transactions, we are using a second Lambda function named ExecuteTransactions.

In the smart contract, an event is specified at the start of the file. Events are a useful mechanism in Solidity that can be used as a callback to code outside of the blockchain. The final Lambda function, ListenForTransactions, listens for events occurring against the contract and then sends those events to Kinesis for analysis.

Ethereum currently does not support sending events directly to Kinesis. So we’ll run the ListenForTransactions function to pull events from the blockchain. We can do this manually by invoking the function with an empty test event. ListenForTransactions pulls all events from the blockchain since the last time it was run. However, if we wanted transactions to be pulled from the blockchain in real time, we’d want the function running perpetually. In the following section, you can optionally schedule the Lambda function to run periodically or regularly. Once again, provide the address of the Ethereum RPC endpoint via the BLOCKCHAIN_HOST environment variable, per DeployContract for both ListenForTransactions and for ExecuteTransactions.

Optional: Use an Amazon CloudWatch event to schedule ListenForTransactions

To have ListenForTransactions run continually, we’ll use Amazon CloudWatch Events as a trigger for our Lambda function. In the Amazon CloudWatch console, choose the Triggers tab, and add a new Amazon CloudWatch Events trigger, with the schedule pattern rate(5). This ensures that the function is continually running and thus ensure that all events are sent to Kinesis as soon as they occur. This allows us to do near real-time processing of events against our smart contract. However, if we want to reduce costs, or if real-time analytics isn’t a key objective, we could run our ListenForTransactions function periodically. Running the function periodically fetches all events since the last time it was run; however, this is less timely than having it wait for events to occur.

To configure a CloudWatch event to trigger ListenForTransactions:

  1. In the designer section on the Lambda console for ListenForTransactions, select CloudWatch events
  2. Click on configure and scroll down to the CloudWatch event configuration
  3. Select Create New Rule from the rule dropdown menu
  4. Name the rule and provide a description
  5. Select schedule expression
  6. Provide the expression: rate(5)
  7. Select enable trigger
  8. Click add

After the function is scheduled, we can then generate some events against our contract. We can run ExecuteTransactions, with an empty test event. We can do this any number of times to generate more transactions to send to our analytics pipeline. ExecuteTransactions produces batches of 10 transactions at a time.

Analyze Transactions with Kinesis Data Analytics

Because our Lambda function is listening to events on our smart contract, all voting activity is sent to a Kinesis Data Stream that was already by an AWS CloudFormation called BlockchainBlogEvents.

Right now, all events go to Kinesis but no further. We’ll persist our events for analysis with Athena later on. To do so, navigate to the Kinesis Data Streams console and choose the BlockchainBlog stream that was created for you.

  1. In the upper right-hand corner, choose Connect to Firehose. This forwards all events to a Kinesis Data Firehose stream, which delivers them to an S3 bucket.
  2. Name the delivery stream choose Next, and don’t enable record transformation.
  3. Provide an S3 bucket in which to store your results. Remember so you can use it later with Athena.

All events coming from the blockchain should now be persisted to Amazon S3.

Now that our events are being persisted, we’ll use Kinesis Data Analytics to perform a series of real-time analytics on the Kinesis Data Stream. Later, we’ll perform batch analytics on the data stored in Amazon S3 via Athena.

First, look at Kinesis Data Analytics. Our ListenForTransactions Lambda function sends a message to a stream each time a transaction is run against our Auction smart contract.

The message is a JSON object. It contains the address of the bidder who initiated the transaction, how much they bid, the contract they bid on, when the transaction was run, and which block the transaction was added to.

Kinesis Data Analytics processes each incoming message to the stream and lets us perform analysis over the stream. In this example, we use Kinesis Data Analytics to:

  1. Calculate the amount of Ether being bid in each block within a sliding window of 15 seconds.
  2. Detect the number of unique bidders occurring within a sliding window of 15 seconds.

Our sliding window is 15 seconds because this is the Ethereum target block time. This is the measure of how long it takes to add a block to the blockchain. By setting the sliding window to 15 seconds, we can gain insight into transactions occurring within the mining interval. We could also set the window to be longer to learn how it pertains to our auction application.

To start with our real time analytics, we must create a Kinesis data analytics application. To do so:

  1. Navigate to the Kinesis data analytics application console on the AWS Management Console.
  2. Create a new Kinesis data analytics application with appropriate name and description, then specify the pre-made blockchainblog Kinesis Data Stream as the source.
  3. Run ExecuteTransactions to send a set of transactions to Kinesis and automatically discover the schema.
  4. Open the SQL editor for the application.

Next, we’re going to add SQL to our Kinesis data analytics application to find out the amount of Ether being sent in each block. This includes all bids sent to the contract and all funds withdrawn from a completed auction.

Copy the following SQL, paste it into the SQL editor in Kinesis Data Analytics, then execute it.

CREATE OR REPLACE STREAM "SPEND_PER_BLOCK_STREAM" (block INTEGER, spend INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "SPEND_PER_BLOCK_STREAM"

SELECT STREAM "Block", SUM("Amount") AS block_sum
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "Block", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '15' SECOND);

This simple piece of SQL provides some insight into our smart contract. The output of SPEND_PER_BLOCK_STREAM yields the block number and the volume of funds, from our contract, in that block. This output explains how much cryptocurrency is spent in relation to our smart contract and when it’s spent.

Make sure that there is data for the Kinesis data analytics application to process by running the ExecuteTransactions and ListenForTransactions functions. You can run these functions either with an Amazon CloudWatch event or manually.

Now, we’ll modify our application to detect the number of unique bidders placing bids within a 15-second window. This is about the time required to add a block to the blockchain. To do so, add the following code to our Kinesis data analytics application:

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    NUMBER_OF_DISTINCT_ITEMS BIGINT
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
   INSERT INTO "DESTINATION_SQL_STREAM" 
      SELECT STREAM * 
      FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
          CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            'Bidder',                                     
            10                                                 
      )
);

The resulting output of this code is the count of unique bidders occurring within the 15-second window. This is useful in helping us understand who is running transactions against our contract. For example, if it’s a large number of blockchain addresses responsible for the bids or if it is a smaller number of addresses bidding.

Finally, as denoted in our architectural diagram, we can add a destination stream to our Kinesis data analytics application. We’ll send the output of our application to Kinesis Data Firehose to persist the results. Then we’ll enable the resulting data to be used in batch analytics with Athena or other tools. To send the output, create a destination for the analytics output stream and point it at a Kinesis Data Firehose stream.

This section has shown real time analytics that can be run on blockchain transactions. The next section shows using Athena to run batch analytics against our stored transactions.

Analyze Transactions with Athena

In this section, we’ll create a table in Athena so we can query our transaction data in batch form.

  1. Create a database in Athena and then create a table, specifying the location that you provided earlier to Kinesis Data Firehose. Your configuration should look like the following example:

  1. Choose Next, choose JSON as the input format, then click next.
  2. In Columns, provide the data types for each of our columns. Choose Bulk add columns, then copy and paste the following column information:
Block int, Blockhash string, Bidder string, Maxbidder string, Contractowner string, Amount int, Auction string, EventTimestamp string
Column Description
Block The block that this event pertains to.
Auction Which auction smart contract the event pertains to
ContractOwner The address of the owner of the contract
Bidder The address of the bidder
BlockHash The SHA hash of the block
Address The address of the transaction
MaxBidder The address of the currently winning bidder (current to when the event was generated)
Amount The amount of the bid

 

  1. Click next and then create the table.

After you configure Athena, you can then explore the data. First, look at whether the user who created the auction has bid in their own auction. Most auctions typically disallow this bidding, but our smart contract doesn’t prohibit this. We could solve this by modifying the contract, but for now let’s see if we can detect this via Athena. Run the following query:

select * from events where contractowner=bidder

The result should resemble the following:

You should see at least one instance where the contract owner has bid on their own contract. Note that the Lambda function running transactions does this at random. Bidding on one’s own contract could be permissible or it might violate the terms of the auction. In that scenario, we can easily detect this violation.

This scenario is an example of using analytics to detect and enforce compliance in a blockchain-backed system. Compliance remains an open question for many blockchain users, as detecting regulatory and compliance issues involving smart contracts often involves significant complexity. Analytics is one way to gain insight and answer these regulatory questions.

Useful queries for analyzing transactions

This section provides some other queries that we can use to analyze our smart contract transactions.

Find the number of transactions per block

SELECT block, COUNT(amount) as transactions FROM events Group By block    

This query yields results similar to the following:

Find the winning bid for each auction

SELECT DISTINCT t.auction, t.amount
    FROM events t
        INNER JOIN (SELECT auction, MAX(amount) AS maxamount
                        FROM events
                        GROUP BY auction) q
            ON t.auction = q.auction
                AND t.amount = q.maxamount

This query yields a set of results such as the following:

The results show each auction that you’ve created on the blockchain and the resulting highest bid.

Visualize queries with Amazon QuickSight

Instead of querying data in plain SQL, it is often beneficial to have a graphical representation of your analysis. You can do this with Amazon QuickSight, which can use Athena as a data source. As a result, with little effort we can build a dashboard solution on top of what we’ve already built. We’re going to use Amazon QuickSight to visualize data stored in Amazon S3, via Athena.

In Amazon QuickSight, we can create a new data source and use the Athena database and table that we created earlier.

To create a new data source

  1. Open the Amazon QuickSight console, then choose New Dataset.
  2. From the list of data sources, choose Athena, then name your data source.

  1. Choose the database and table in Athena that you created earlier.

  1. Import the data into SPICE. SPICE is instrumental for faster querying and visualization of data, without having to go directly to the source data. For more information about SPICE, see the Amazon QuickSight Documentation.
  2. Choose Visualize to start investigating the data.

With Amazon QuickSight, we can visualize the behavior of our simulated blockchain users. We’ll choose Amount as our measurement and Auction as our dimension from teh QuickSight side pane. This shows us how much ether has been bid in each auction. Doing so yields results similar to the following:

The amount depends on the number of times you ran the ExecuteTransactions function.

If we look at MaxBidder, we see a pie chart. In the chart, we can see which blockchain address (user) is most often our highest bidder. This looks like the following:

This sort of information can be challenging to obtain from within a blockchain-based application. But in Amazon QuickSight, with our analytics pipeline, getting the information can be easier.

Finally, we can look at the mining time in Amazon QuickSight by choosing Eventtimestamp as the x-axis, choosing block as the y-axis, and using the minimum aggregate function. This produces a line graph that resembles the following:

The graph shows that we start at around block 9200 and have a steady rate of mining occurring. This is roughly consistent with around a 15 to 20 second block mining time. Note that the time stamp is in Unix time.

This section has shown analysis that can be performed on a blockchain event to understand the behavior of both the blockchain and the smart contracts deployed to it. Using the same methodology, you can build your own analytics pipelines that perform useful analytics that shed light on your blockchain-backed applications.

Conclusion

Blockchain is an emerging technology with a great deal of potential. AWS analytics services provide a means to gain insight into blockchain applications that run over thousands of nodes and deal with millions of transactions. This allows developers to better understand the complexities of blockchain applications and aid in the creation of new applications. Moreover, the analytics portion can all be done without provisioning servers, reducing the need for managing infrastructure. This allows you to focus on building the blockchain applications that you want.

Important: Remember to destroy the stacks created by AWS CloudFormation. Also delete the resources you deployed, including the scheduled Lambda function that listens for blockchain events.


Additional Reading

If you found this post useful, be sure to check out Analyze Apache Parquet optimized data using 10 visualizatinos to try in Amazon QuickSight with sample data and Analyzing Bitcoin Data: AWS CloudFormation Support for AWS Glue.

 


About the Author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.

 

 

Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyzing-apache-parquet-optimized-data-using-amazon-kinesis-data-firehose-amazon-athena-and-amazon-redshift/

Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.

To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.

Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.

What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.

Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.

Solution overview

Here is how the solution is laid out:

 

 

The following sections walk you through each of these steps to set up the pipeline.

1. Define the schema

When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.

To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.

The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.

CREATE EXTERNAL TABLE default.kfhconnectblog (
  awsaccountid string,
  agentarn string,
  currentagentsnapshot string,
  eventid string,
  eventtimestamp string,
  eventtype string,
  instancearn string,
  previousagentsnapshot string,
  version string
)
STORED AS parquet
LOCATION 's3://your_bucket/kfhconnectblog/'
TBLPROPERTIES ("parquet.compression"="SNAPPY")

That’s all you have to do to prepare the schema for Kinesis Data Firehose.

2. Define the data streams

Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events.  Open the Kinesis Data Streams console and create two streams.  You can configure them with only one shard each because you don’t have a lot of data right now.

3. Define the Kinesis Data Firehose delivery stream for Parquet

Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.

As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.

To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.

On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.

Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.

Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.

Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.

4. Set up the Amazon Connect contact center

After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.

After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.

At this point, your pipeline is complete.  Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.

So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.

To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.

5. Analyze the data with Athena

Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures.  The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.

Let’s run the first query to see which agents have logged into the system.

The query might look complex, but it’s fairly straightforward:

WITH dataset AS (
  SELECT 
    from_iso8601_timestamp(eventtimestamp) AS event_ts,
    eventtype,
    -- CURRENT STATE
    json_extract_scalar(
      currentagentsnapshot,
      '$.agentstatus.name') AS current_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        currentagentsnapshot,
        '$.agentstatus.starttimestamp')) AS current_starttimestamp,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.firstname') AS current_firstname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.lastname') AS current_lastname,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.username') AS current_username,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') AS               current_outboundqueue,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
    -- PREVIOUS STATE
    json_extract_scalar(
      previousagentsnapshot, 
      '$.agentstatus.name') as prev_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        previousagentsnapshot, 
       '$.agentstatus.starttimestamp')) as prev_starttimestamp,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.firstname') as prev_firstname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.lastname') as prev_lastname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.username') as prev_username,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
  from kfhconnectblog
  where eventtype <> 'HEART_BEAT'
)
SELECT
  current_status as status,
  current_username as username,
  event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC

The query output looks something like this:

Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.

WITH src AS (
  SELECT
     eventid,
     json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
     cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
     cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
  from kfhconnectblog
),
src2 AS (
  SELECT *
  FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT 
  eventid,
  username,
  json_extract_scalar(c_item, '$.contactid') as c_contactid,
  json_extract_scalar(c_item, '$.channel') as c_channel,
  json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
  json_extract_scalar(c_item, '$.queue.name') as c_queue,
  json_extract_scalar(c_item, '$.state') as c_state,
  from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
  
  json_extract_scalar(p_item, '$.contactid') as p_contactid,
  json_extract_scalar(p_item, '$.channel') as p_channel,
  json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
  json_extract_scalar(p_item, '$.queue.name') as p_queue,
  json_extract_scalar(p_item, '$.state') as p_state,
  from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT 
  username,
  c_channel as channel,
  c_direction as direction,
  p_state as prev_state,
  c_state as current_state,
  c_ts as current_ts,
  c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC

The query output looks similar to the following:

6. Analyze the data with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.

Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.

SELECT 
  eventtype,
  json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
  json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
  json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
  json_extract_path_text(
    currentagentsnapshot,
    'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog

The following shows the query output:

Summary

In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.

 


Additional Reading

If you found this post useful, be sure to check out Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight and Work with partitioned data in AWS Glue.


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.

 

 

 

Analyze data in Amazon DynamoDB using Amazon SageMaker for real-time prediction

Post Syndicated from YongSeong Lee original https://aws.amazon.com/blogs/big-data/analyze-data-in-amazon-dynamodb-using-amazon-sagemaker-for-real-time-prediction/

Many companies across the globe use Amazon DynamoDB to store and query historical user-interaction data. DynamoDB is a fast NoSQL database used by applications that need consistent, single-digit millisecond latency.

Often, customers want to turn their valuable data in DynamoDB into insights by analyzing a copy of their table stored in Amazon S3. Doing this separates their analytical queries from their low-latency critical paths. This data can be the primary source for understanding customers’ past behavior, predicting future behavior, and generating downstream business value. Customers often turn to DynamoDB because of its great scalability and high availability. After a successful launch, many customers want to use the data in DynamoDB to predict future behaviors or provide personalized recommendations.

DynamoDB is a good fit for low-latency reads and writes, but it’s not practical to scan all data in a DynamoDB database to train a model. In this post, I demonstrate how you can use DynamoDB table data copied to Amazon S3 by AWS Data Pipeline to predict customer behavior. I also demonstrate how you can use this data to provide personalized recommendations for customers using Amazon SageMaker. You can also run ad hoc queries using Amazon Athena against the data. DynamoDB recently released on-demand backups to create full table backups with no performance impact. However, it’s not suitable for our purposes in this post, so I chose AWS Data Pipeline instead to create managed backups are accessible from other services.

To do this, I describe how to read the DynamoDB backup file format in Data Pipeline. I also describe how to convert the objects in S3 to a CSV format that Amazon SageMaker can read. In addition, I show how to schedule regular exports and transformations using Data Pipeline. The sample data used in this post is from Bank Marketing Data Set of UCI.

The solution that I describe provides the following benefits:

  • Separates analytical queries from production traffic on your DynamoDB table, preserving your DynamoDB read capacity units (RCUs) for important production requests
  • Automatically updates your model to get real-time predictions
  • Optimizes for performance (so it doesn’t compete with DynamoDB RCUs after the export) and for cost (using data you already have)
  • Makes it easier for developers of all skill levels to use Amazon SageMaker

All code and data set in this post are available in this .zip file.

Solution architecture

The following diagram shows the overall architecture of the solution.

The steps that data follows through the architecture are as follows:

  1. Data Pipeline regularly copies the full contents of a DynamoDB table as JSON into an S3
  2. Exported JSON files are converted to comma-separated value (CSV) format to use as a data source for Amazon SageMaker.
  3. Amazon SageMaker renews the model artifact and update the endpoint.
  4. The converted CSV is available for ad hoc queries with Amazon Athena.
  5. Data Pipeline controls this flow and repeats the cycle based on the schedule defined by customer requirements.

Building the auto-updating model

This section discusses details about how to read the DynamoDB exported data in Data Pipeline and build automated workflows for real-time prediction with a regularly updated model.

Download sample scripts and data

Before you begin, take the following steps:

  1. Download sample scripts in this .zip file.
  2. Unzip the src.zip file.
  3. Find the automation_script.sh file and edit it for your environment. For example, you need to replace 's3://<your bucket>/<datasource path>/' with your own S3 path to the data source for Amazon ML. In the script, the text enclosed by angle brackets—< and >—should be replaced with your own path.
  4. Upload the json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar file to your S3 path so that the ADD jar command in Apache Hive can refer to it.

For this solution, the banking.csv  should be imported into a DynamoDB table.

Export a DynamoDB table

To export the DynamoDB table to S3, open the Data Pipeline console and choose the Export DynamoDB table to S3 template. In this template, Data Pipeline creates an Amazon EMR cluster and performs an export in the EMRActivity activity. Set proper intervals for backups according to your business requirements.

One core node(m3.xlarge) provides the default capacity for the EMR cluster and should be suitable for the solution in this post. Leave the option to resize the cluster before running enabled in the TableBackupActivity activity to let Data Pipeline scale the cluster to match the table size. The process of converting to CSV format and renewing models happens in this EMR cluster.

For a more in-depth look at how to export data from DynamoDB, see Export Data from DynamoDB in the Data Pipeline documentation.

Add the script to an existing pipeline

After you export your DynamoDB table, you add an additional EMR step to EMRActivity by following these steps:

  1. Open the Data Pipeline console and choose the ID for the pipeline that you want to add the script to.
  2. For Actions, choose Edit.
  3. In the editing console, choose the Activities category and add an EMR step using the custom script downloaded in the previous section, as shown below.

Paste the following command into the new step after the data ­­upload step:

s3://#{myDDBRegion}.elasticmapreduce/libs/script-runner/script-runner.jar,s3://<your bucket name>/automation_script.sh,#{output.directoryPath},#{myDDBRegion}

The element #{output.directoryPath} references the S3 path where the data pipeline exports DynamoDB data as JSON. The path should be passed to the script as an argument.

The bash script has two goals, converting data formats and renewing the Amazon SageMaker model. Subsequent sections discuss the contents of the automation script.

Automation script: Convert JSON data to CSV with Hive

We use Apache Hive to transform the data into a new format. The Hive QL script to create an external table and transform the data is included in the custom script that you added to the Data Pipeline definition.

When you run the Hive scripts, do so with the -e option. Also, define the Hive table with the 'org.openx.data.jsonserde.JsonSerDe' row format to parse and read JSON format. The SQL creates a Hive EXTERNAL table, and it reads the DynamoDB backup data on the S3 path passed to it by Data Pipeline.

Note: You should create the table with the “EXTERNAL” keyword to avoid the backup data being accidentally deleted from S3 if you drop the table.

The full automation script for converting follows. Add your own bucket name and data source path in the highlighted areas.

#!/bin/bash
hive -e "
ADD jar s3://<your bucket name>/json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar ; 
DROP TABLE IF EXISTS blog_backup_data ;
CREATE EXTERNAL TABLE blog_backup_data (
 customer_id map<string,string>,
 age map<string,string>, job map<string,string>, 
 marital map<string,string>,education map<string,string>, 
 default map<string,string>, housing map<string,string>,
 loan map<string,string>, contact map<string,string>, 
 month map<string,string>, day_of_week map<string,string>, 
 duration map<string,string>, campaign map<string,string>,
 pdays map<string,string>, previous map<string,string>, 
 poutcome map<string,string>, emp_var_rate map<string,string>, 
 cons_price_idx map<string,string>, cons_conf_idx map<string,string>,
 euribor3m map<string,string>, nr_employed map<string,string>, 
 y map<string,string> ) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
LOCATION '$1/';

INSERT OVERWRITE DIRECTORY 's3://<your bucket name>/<datasource path>/' 
SELECT concat( customer_id['s'],',', 
 age['n'],',', job['s'],',', 
 marital['s'],',', education['s'],',', default['s'],',', 
 housing['s'],',', loan['s'],',', contact['s'],',', 
 month['s'],',', day_of_week['s'],',', duration['n'],',', 
 campaign['n'],',',pdays['n'],',',previous['n'],',', 
 poutcome['s'],',', emp_var_rate['n'],',', cons_price_idx['n'],',',
 cons_conf_idx['n'],',', euribor3m['n'],',', nr_employed['n'],',', y['n'] ) 
FROM blog_backup_data
WHERE customer_id['s'] > 0 ; 

After creating an external table, you need to read data. You then use the INSERT OVERWRITE DIRECTORY ~ SELECT command to write CSV data to the S3 path that you designated as the data source for Amazon SageMaker.

Depending on your requirements, you can eliminate or process the columns in the SELECT clause in this step to optimize data analysis. For example, you might remove some columns that have unpredictable correlations with the target value because keeping the wrong columns might expose your model to “overfitting” during the training. In this post, customer_id  columns is removed. Overfitting can make your prediction weak. More information about overfitting can be found in the topic Model Fit: Underfitting vs. Overfitting in the Amazon ML documentation.

Automation script: Renew the Amazon SageMaker model

After the CSV data is replaced and ready to use, create a new model artifact for Amazon SageMaker with the updated dataset on S3.  For renewing model artifact, you must create a new training job.  Training jobs can be run using the AWS SDK ( for example, Amazon SageMaker boto3 ) or the Amazon SageMaker Python SDK that can be installed with “pip install sagemaker” command as well as the AWS CLI for Amazon SageMaker described in this post.

In addition, consider how to smoothly renew your existing model without service impact, because your model is called by applications in real time. To do this, you need to create a new endpoint configuration first and update a current endpoint with the endpoint configuration that is just created.

#!/bin/bash
## Define variable 
REGION=$2
DTTIME=`date +%Y-%m-%d-%H-%M-%S`
ROLE="<your AmazonSageMaker-ExecutionRole>" 


# Select containers image based on region.  
case "$REGION" in
"us-west-2" )
    IMAGE="174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest"
    ;;
"us-east-1" )
    IMAGE="382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest" 
    ;;
"us-east-2" )
    IMAGE="404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest" 
    ;;
"eu-west-1" )
    IMAGE="438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest" 
    ;;
 *)
    echo "Invalid Region Name"
    exit 1 ;  
esac

# Start training job and creating model artifact 
TRAINING_JOB_NAME=TRAIN-${DTTIME} 
S3OUTPUT="s3://<your bucket name>/model/" 
INSTANCETYPE="ml.m4.xlarge"
INSTANCECOUNT=1
VOLUMESIZE=5 
aws sagemaker create-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --algorithm-specification TrainingImage=${IMAGE},TrainingInputMode=File --role-arn ${ROLE}  --input-data-config '[{ "ChannelName": "train", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://<your bucket name>/<datasource path>/", "S3DataDistributionType": "FullyReplicated" } }, "ContentType": "text/csv", "CompressionType": "None" , "RecordWrapperType": "None"  }]'  --output-data-config S3OutputPath=${S3OUTPUT} --resource-config  InstanceType=${INSTANCETYPE},InstanceCount=${INSTANCECOUNT},VolumeSizeInGB=${VOLUMESIZE} --stopping-condition MaxRuntimeInSeconds=120 --hyper-parameters feature_dim=20,predictor_type=binary_classifier  

# Wait until job completed 
aws sagemaker wait training-job-completed-or-stopped --training-job-name ${TRAINING_JOB_NAME}  --region ${REGION}

# Get newly created model artifact and create model
MODELARTIFACT=`aws sagemaker describe-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --query 'ModelArtifacts.S3ModelArtifacts' --output text `
MODELNAME=MODEL-${DTTIME}
aws sagemaker create-model --region ${REGION} --model-name ${MODELNAME}  --primary-container Image=${IMAGE},ModelDataUrl=${MODELARTIFACT}  --execution-role-arn ${ROLE}

# create a new endpoint configuration 
CONFIGNAME=CONFIG-${DTTIME}
aws sagemaker  create-endpoint-config --region ${REGION} --endpoint-config-name ${CONFIGNAME}  --production-variants  VariantName=Users,ModelName=${MODELNAME},InitialInstanceCount=1,InstanceType=ml.m4.xlarge

# create or update the endpoint
STATUS=`aws sagemaker describe-endpoint --endpoint-name  ServiceEndpoint --query 'EndpointStatus' --output text --region ${REGION} `
if [[ $STATUS -ne "InService" ]] ;
then
    aws sagemaker  create-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}    
else
    aws sagemaker  update-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}
fi

Grant permission

Before you execute the script, you must grant proper permission to Data Pipeline. Data Pipeline uses the DataPipelineDefaultResourceRole role by default. I added the following policy to DataPipelineDefaultResourceRole to allow Data Pipeline to create, delete, and update the Amazon SageMaker model and data source in the script.

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "sagemaker:CreateTrainingJob",
 "sagemaker:DescribeTrainingJob",
 "sagemaker:CreateModel",
 "sagemaker:CreateEndpointConfig",
 "sagemaker:DescribeEndpoint",
 "sagemaker:CreateEndpoint",
 "sagemaker:UpdateEndpoint",
 "iam:PassRole"
 ],
 "Resource": "*"
 }
 ]
}

Use real-time prediction

After you deploy a model into production using Amazon SageMaker hosting services, your client applications use this API to get inferences from the model hosted at the specified endpoint. This approach is useful for interactive web, mobile, or desktop applications.

Following, I provide a simple Python code example that queries against Amazon SageMaker endpoint URL with its name (“ServiceEndpoint”) and then uses them for real-time prediction.

=== Python sample for real-time prediction ===

#!/usr/bin/env python
import boto3
import json 

client = boto3.client('sagemaker-runtime', region_name ='<your region>' )
new_customer_info = '34,10,2,4,1,2,1,1,6,3,190,1,3,4,3,-1.7,94.055,-39.8,0.715,4991.6'
response = client.invoke_endpoint(
    EndpointName='ServiceEndpoint',
    Body=new_customer_info, 
    ContentType='text/csv'
)
result = json.loads(response['Body'].read().decode())
print(result)
--- output(response) ---
{u'predictions': [{u'score': 0.7528127431869507, u'predicted_label': 1.0}]}

Solution summary

The solution takes the following steps:

  1. Data Pipeline exports DynamoDB table data into S3. The original JSON data should be kept to recover the table in the rare event that this is needed. Data Pipeline then converts JSON to CSV so that Amazon SageMaker can read the data.Note: You should select only meaningful attributes when you convert CSV. For example, if you judge that the “campaign” attribute is not correlated, you can eliminate this attribute from the CSV.
  2. Train the Amazon SageMaker model with the new data source.
  3. When a new customer comes to your site, you can judge how likely it is for this customer to subscribe to your new product based on “predictedScores” provided by Amazon SageMaker.
  4. If the new user subscribes your new product, your application must update the attribute “y” to the value 1 (for yes). This updated data is provided for the next model renewal as a new data source. It serves to improve the accuracy of your prediction. With each new entry, your application can become smarter and deliver better predictions.

Running ad hoc queries using Amazon Athena

Amazon Athena is a serverless query service that makes it easy to analyze large amounts of data stored in Amazon S3 using standard SQL. Athena is useful for examining data and collecting statistics or informative summaries about data. You can also use the powerful analytic functions of Presto, as described in the topic Aggregate Functions of Presto in the Presto documentation.

With the Data Pipeline scheduled activity, recent CSV data is always located in S3 so that you can run ad hoc queries against the data using Amazon Athena. I show this with example SQL statements following. For an in-depth description of this process, see the post Interactive SQL Queries for Data in Amazon S3 on the AWS News Blog. 

Creating an Amazon Athena table and running it

Simply, you can create an EXTERNAL table for the CSV data on S3 in Amazon Athena Management Console.

=== Table Creation ===
CREATE EXTERNAL TABLE datasource (
 age int, 
 job string, 
 marital string , 
 education string, 
 default string, 
 housing string, 
 loan string, 
 contact string, 
 month string, 
 day_of_week string, 
 duration int, 
 campaign int, 
 pdays int , 
 previous int , 
 poutcome string, 
 emp_var_rate double, 
 cons_price_idx double,
 cons_conf_idx double, 
 euribor3m double, 
 nr_employed double, 
 y int 
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n' 
LOCATION 's3://<your bucket name>/<datasource path>/';

The following query calculates the correlation coefficient between the target attribute and other attributes using Amazon Athena.

=== Sample Query ===

SELECT corr(age,y) AS correlation_age_and_target, 
 corr(duration,y) AS correlation_duration_and_target, 
 corr(campaign,y) AS correlation_campaign_and_target,
 corr(contact,y) AS correlation_contact_and_target
FROM ( SELECT age , duration , campaign , y , 
 CASE WHEN contact = 'telephone' THEN 1 ELSE 0 END AS contact 
 FROM datasource 
 ) datasource ;

Conclusion

In this post, I introduce an example of how to analyze data in DynamoDB by using table data in Amazon S3 to optimize DynamoDB table read capacity. You can then use the analyzed data as a new data source to train an Amazon SageMaker model for accurate real-time prediction. In addition, you can run ad hoc queries against the data on S3 using Amazon Athena. I also present how to automate these procedures by using Data Pipeline.

You can adapt this example to your specific use case at hand, and hopefully this post helps you accelerate your development. You can find more examples and use cases for Amazon SageMaker in the video AWS 2017: Introducing Amazon SageMaker on the AWS website.

 


Additional Reading

If you found this post useful, be sure to check out Serving Real-Time Machine Learning Predictions on Amazon EMR and Analyzing Data in S3 using Amazon Athena.

 


About the Author

Yong Seong Lee is a Cloud Support Engineer for AWS Big Data Services. He is interested in every technology related to data/databases and helping customers who have difficulties in using AWS services. His motto is “Enjoy life, be curious and have maximum experience.”

 

 

How to retain system tables’ data spanning multiple Amazon Redshift clusters and run cross-cluster diagnostic queries

Post Syndicated from Karthik Sonti original https://aws.amazon.com/blogs/big-data/how-to-retain-system-tables-data-spanning-multiple-amazon-redshift-clusters-and-run-cross-cluster-diagnostic-queries/

Amazon Redshift is a data warehouse service that logs the history of the system in STL log tables. The STL log tables manage disk space by retaining only two to five days of log history, depending on log usage and available disk space.

To retain STL tables’ data for an extended period, you usually have to create a replica table for every system table. Then, for each you load the data from the system table into the replica at regular intervals. By maintaining replica tables for STL tables, you can run diagnostic queries on historical data from the STL tables. You then can derive insights from query execution times, query plans, and disk-spill patterns, and make better cluster-sizing decisions. However, refreshing replica tables with live data from STL tables at regular intervals requires schedulers such as Cron or AWS Data Pipeline. Also, these tables are specific to one cluster and they are not accessible after the cluster is terminated. This is especially true for transient Amazon Redshift clusters that last for only a finite period of ad hoc query execution.

In this blog post, I present a solution that exports system tables from multiple Amazon Redshift clusters into an Amazon S3 bucket. This solution is serverless, and you can schedule it as frequently as every five minutes. The AWS CloudFormation deployment template that I provide automates the solution setup in your environment. The system tables’ data in the Amazon S3 bucket is partitioned by cluster name and query execution date to enable efficient joins in cross-cluster diagnostic queries.

I also provide another CloudFormation template later in this post. This second template helps to automate the creation of tables in the AWS Glue Data Catalog for the system tables’ data stored in Amazon S3. After the system tables are exported to Amazon S3, you can run cross-cluster diagnostic queries on the system tables’ data and derive insights about query executions in each Amazon Redshift cluster. You can do this using Amazon QuickSight, Amazon Athena, Amazon EMR, or Amazon Redshift Spectrum.

You can find all the code examples in this post, including the CloudFormation templates, AWS Glue extract, transform, and load (ETL) scripts, and the resolution steps for common errors you might encounter in this GitHub repository.

Solution overview

The solution in this post uses AWS Glue to export system tables’ log data from Amazon Redshift clusters into Amazon S3. The AWS Glue ETL jobs are invoked at a scheduled interval by AWS Lambda. AWS Systems Manager, which provides secure, hierarchical storage for configuration data management and secrets management, maintains the details of Amazon Redshift clusters for which the solution is enabled. The last-fetched time stamp values for the respective cluster-table combination are maintained in an Amazon DynamoDB table.

The following diagram covers the key steps involved in this solution.

The solution as illustrated in the preceding diagram flows like this:

  1. The Lambda function, invoke_rs_stl_export_etl, is triggered at regular intervals, as controlled by Amazon CloudWatch. It’s triggered to look up the AWS Systems Manager parameter store to get the details of the Amazon Redshift clusters for which the system table export is enabled.
  2. The same Lambda function, based on the Amazon Redshift cluster details obtained in step 1, invokes the AWS Glue ETL job designated for the Amazon Redshift cluster. If an ETL job for the cluster is not found, the Lambda function creates one.
  3. The ETL job invoked for the Amazon Redshift cluster gets the cluster credentials from the parameter store. It gets from the DynamoDB table the last exported time stamp of when each of the system tables was exported from the respective Amazon Redshift cluster.
  4. The ETL job unloads the system tables’ data from the Amazon Redshift cluster into an Amazon S3 bucket.
  5. The ETL job updates the DynamoDB table with the last exported time stamp value for each system table exported from the Amazon Redshift cluster.
  6. The Amazon Redshift cluster system tables’ data is available in Amazon S3 and is partitioned by cluster name and date for running cross-cluster diagnostic queries.

Understanding the configuration data

This solution uses AWS Systems Manager parameter store to store the Amazon Redshift cluster credentials securely. The parameter store also securely stores other configuration information that the AWS Glue ETL job needs for extracting and storing system tables’ data in Amazon S3. Systems Manager comes with a default AWS Key Management Service (AWS KMS) key that it uses to encrypt the password component of the Amazon Redshift cluster credentials.

The following table explains the global parameters and cluster-specific parameters required in this solution. The global parameters are defined once and applicable at the overall solution level. The cluster-specific parameters are specific to an Amazon Redshift cluster and repeat for each cluster for which you enable this post’s solution. The CloudFormation template explained later in this post creates these parameters as part of the deployment process.

Parameter name Type Description
Global parametersdefined once and applied to all jobs
redshift_query_logs.global.s3_prefix String The Amazon S3 path where the query logs are exported. Under this path, each exported table is partitioned by cluster name and date.
redshift_query_logs.global.tempdir String The Amazon S3 path that AWS Glue ETL jobs use for temporarily staging the data.
redshift_query_logs.global.role> String The name of the role that the AWS Glue ETL jobs assume. Just the role name is sufficient. The complete Amazon Resource Name (ARN) is not required.
redshift_query_logs.global.enabled_cluster_list StringList A comma-separated list of cluster names for which system tables’ data export is enabled. This gives flexibility for a user to exclude certain clusters.
Cluster-specific parametersfor each cluster specified in the enabled_cluster_list parameter
redshift_query_logs.<<cluster_name>>.connection String The name of the AWS Glue Data Catalog connection to the Amazon Redshift cluster. For example, if the cluster name is product_warehouse, the entry is redshift_query_logs.product_warehouse.connection.
redshift_query_logs.<<cluster_name>>.user String The user name that AWS Glue uses to connect to the Amazon Redshift cluster.
redshift_query_logs.<<cluster_name>>.password Secure String The password that AWS Glue uses to connect the Amazon Redshift cluster’s encrypted-by key that is managed in AWS KMS.

For example, suppose that you have two Amazon Redshift clusters, product-warehouse and category-management, for which the solution described in this post is enabled. In this case, the parameters shown in the following screenshot are created by the solution deployment CloudFormation template in the AWS Systems Manager parameter store.

Solution deployment

To make it easier for you to get started, I created a CloudFormation template that automatically configures and deploys the solution—only one step is required after deployment.

Prerequisites

To deploy the solution, you must have one or more Amazon Redshift clusters in a private subnet. This subnet must have a network address translation (NAT) gateway or a NAT instance configured, and also a security group with a self-referencing inbound rule for all TCP ports. For more information about why AWS Glue ETL needs the configuration it does, described previously, see Connecting to a JDBC Data Store in a VPC in the AWS Glue documentation.

To start the deployment, launch the CloudFormation template:

CloudFormation stack parameters

The following table lists and describes the parameters for deploying the solution to export query logs from multiple Amazon Redshift clusters.

Property Default Description
S3Bucket mybucket The bucket this solution uses to store the exported query logs, stage code artifacts, and perform unloads from Amazon Redshift. For example, the mybucket/extract_rs_logs/data bucket is used for storing all the exported query logs for each system table partitioned by the cluster. The mybucket/extract_rs_logs/temp/ bucket is used for temporarily staging the unloaded data from Amazon Redshift. The mybucket/extract_rs_logs/code bucket is used for storing all the code artifacts required for Lambda and the AWS Glue ETL jobs.
ExportEnabledRedshiftClusters Requires Input A comma-separated list of cluster names from which the system table logs need to be exported.
DataStoreSecurityGroups Requires Input A list of security groups with an inbound rule to the Amazon Redshift clusters provided in the parameter, ExportEnabledClusters. These security groups should also have a self-referencing inbound rule on all TCP ports, as explained on Connecting to a JDBC Data Store in a VPC.

After you launch the template and create the stack, you see that the following resources have been created:

  1. AWS Glue connections for each Amazon Redshift cluster you provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
  2. All parameters required for this solution created in the parameter store.
  3. The Lambda function that invokes the AWS Glue ETL jobs for each configured Amazon Redshift cluster at a regular interval of five minutes.
  4. The DynamoDB table that captures the last exported time stamps for each exported cluster-table combination.
  5. The AWS Glue ETL jobs to export query logs from each Amazon Redshift cluster provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
  6. The IAM roles and policies required for the Lambda function and AWS Glue ETL jobs.

After the deployment

For each Amazon Redshift cluster for which you enabled the solution through the CloudFormation stack parameter, ExportEnabledRedshiftClusters, the automated deployment includes temporary credentials that you must update after the deployment:

  1. Go to the parameter store.
  2. Note the parameters <<cluster_name>>.user and redshift_query_logs.<<cluster_name>>.password that correspond to each Amazon Redshift cluster for which you enabled this solution. Edit these parameters to replace the placeholder values with the right credentials.

For example, if product-warehouse is one of the clusters for which you enabled system table export, you edit these two parameters with the right user name and password and choose Save parameter.

Querying the exported system tables

Within a few minutes after the solution deployment, you should see Amazon Redshift query logs being exported to the Amazon S3 location, <<S3Bucket_you_provided>>/extract_redshift_query_logs/data/. In that bucket, you should see the eight system tables partitioned by customer name and date: stl_alert_event_log, stl_dlltext, stl_explain, stl_query, stl_querytext, stl_scan, stl_utilitytext, and stl_wlm_query.

To run cross-cluster diagnostic queries on the exported system tables, create external tables in the AWS Glue Data Catalog. To make it easier for you to get started, I provide a CloudFormation template that creates an AWS Glue crawler, which crawls the exported system tables stored in Amazon S3 and builds the external tables in the AWS Glue Data Catalog.

Launch this CloudFormation template to create external tables that correspond to the Amazon Redshift system tables. S3Bucket is the only input parameter required for this stack deployment. Provide the same Amazon S3 bucket name where the system tables’ data is being exported. After you successfully create the stack, you can see the eight tables in the database, redshift_query_logs_db, as shown in the following screenshot.

Now, navigate to the Athena console to run cross-cluster diagnostic queries. The following screenshot shows a diagnostic query executed in Athena that retrieves query alerts logged across multiple Amazon Redshift clusters.

You can build the following example Amazon QuickSight dashboard by running cross-cluster diagnostic queries on Athena to identify the hourly query count and the key query alert events across multiple Amazon Redshift clusters.

How to extend the solution

You can extend this post’s solution in two ways:

  • Add any new Amazon Redshift clusters that you spin up after you deploy the solution.
  • Add other system tables or custom query results to the list of exports from an Amazon Redshift cluster.

Extend the solution to other Amazon Redshift clusters

To extend the solution to more Amazon Redshift clusters, add the three cluster-specific parameters in the AWS Systems Manager parameter store following the guidelines earlier in this post. Modify the redshift_query_logs.global.enabled_cluster_list parameter to append the new cluster to the comma-separated string.

Extend the solution to add other tables or custom queries to an Amazon Redshift cluster

The current solution ships with the export functionality for the following Amazon Redshift system tables:

  • stl_alert_event_log
  • stl_dlltext
  • stl_explain
  • stl_query
  • stl_querytext
  • stl_scan
  • stl_utilitytext
  • stl_wlm_query

You can easily add another system table or custom query by adding a few lines of code to the AWS Glue ETL job, <<cluster-name>_extract_rs_query_logs. For example, suppose that from the product-warehouse Amazon Redshift cluster you want to export orders greater than $2,000. To do so, add the following five lines of code to the AWS Glue ETL job product-warehouse_extract_rs_query_logs, where product-warehouse is your cluster name:

  1. Get the last-processed time-stamp value. The function creates a value if it doesn’t already exist.

salesLastProcessTSValue = functions.getLastProcessedTSValue(trackingEntry=”mydb.sales_2000",job_configs=job_configs)

  1. Run the custom query with the time stamp.

returnDF=functions.runQuery(query="select * from sales s join order o where o.order_amnt > 2000 and sale_timestamp > '{}'".format (salesLastProcessTSValue) ,tableName="mydb.sales_2000",job_configs=job_configs)

  1. Save the results to Amazon S3.

functions.saveToS3(dataframe=returnDF,s3Prefix=s3Prefix,tableName="mydb.sales_2000",partitionColumns=["sale_date"],job_configs=job_configs)

  1. Get the latest time-stamp value from the returned data frame in Step 2.

latestTimestampVal=functions.getMaxValue(returnDF,"sale_timestamp",job_configs)

  1. Update the last-processed time-stamp value in the DynamoDB table.

functions.updateLastProcessedTSValue(“mydb.sales_2000",latestTimestampVal[0],job_configs)

Conclusion

In this post, I demonstrate a serverless solution to retain the system tables’ log data across multiple Amazon Redshift clusters. By using this solution, you can incrementally export the data from system tables into Amazon S3. By performing this export, you can build cross-cluster diagnostic queries, build audit dashboards, and derive insights into capacity planning by using services such as Athena. I also demonstrate how you can extend this solution to other ad hoc query use cases or tables other than system tables by adding a few lines of code.


Additional Reading

If you found this post useful, be sure to check out Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production and Amazon Redshift – 2017 Recap.


About the Author

Karthik Sonti is a senior big data architect at Amazon Web Services. He helps AWS customers build big data and analytical solutions and provides guidance on architecture and best practices.

 

 

 

 

AWS Online Tech Talks – April & Early May 2018

Post Syndicated from Betsy Chernoff original https://aws.amazon.com/blogs/aws/aws-online-tech-talks-april-early-may-2018/

We have several upcoming tech talks in the month of April and early May. Come join us to learn about AWS services and solution offerings. We’ll have AWS experts online to help answer questions in real-time. Sign up now to learn more, we look forward to seeing you.

Note – All sessions are free and in Pacific Time.

April & early May — 2018 Schedule

Compute

April 30, 2018 | 01:00 PM – 01:45 PM PTBest Practices for Running Amazon EC2 Spot Instances with Amazon EMR (300) – Learn about the best practices for scaling big data workloads as well as process, store, and analyze big data securely and cost effectively with Amazon EMR and Amazon EC2 Spot Instances.

May 1, 2018 | 01:00 PM – 01:45 PM PTHow to Bring Microsoft Apps to AWS (300) – Learn more about how to save significant money by bringing your Microsoft workloads to AWS.

May 2, 2018 | 01:00 PM – 01:45 PM PTDeep Dive on Amazon EC2 Accelerated Computing (300) – Get a technical deep dive on how AWS’ GPU and FGPA-based compute services can help you to optimize and accelerate your ML/DL and HPC workloads in the cloud.

Containers

April 23, 2018 | 11:00 AM – 11:45 AM PTNew Features for Building Powerful Containerized Microservices on AWS (300) – Learn about how this new feature works and how you can start using it to build and run modern, containerized applications on AWS.

Databases

April 23, 2018 | 01:00 PM – 01:45 PM PTElastiCache: Deep Dive Best Practices and Usage Patterns (200) – Learn about Redis-compatible in-memory data store and cache with Amazon ElastiCache.

April 25, 2018 | 01:00 PM – 01:45 PM PTIntro to Open Source Databases on AWS (200) – Learn how to tap the benefits of open source databases on AWS without the administrative hassle.

DevOps

April 25, 2018 | 09:00 AM – 09:45 AM PTDebug your Container and Serverless Applications with AWS X-Ray in 5 Minutes (300) – Learn how AWS X-Ray makes debugging your Container and Serverless applications fun.

Enterprise & Hybrid

April 23, 2018 | 09:00 AM – 09:45 AM PTAn Overview of Best Practices of Large-Scale Migrations (300) – Learn about the tools and best practices on how to migrate to AWS at scale.

April 24, 2018 | 11:00 AM – 11:45 AM PTDeploy your Desktops and Apps on AWS (300) – Learn how to deploy your desktops and apps on AWS with Amazon WorkSpaces and Amazon AppStream 2.0

IoT

May 2, 2018 | 11:00 AM – 11:45 AM PTHow to Easily and Securely Connect Devices to AWS IoT (200) – Learn how to easily and securely connect devices to the cloud and reliably scale to billions of devices and trillions of messages with AWS IoT.

Machine Learning

April 24, 2018 | 09:00 AM – 09:45 AM PT Automate for Efficiency with Amazon Transcribe and Amazon Translate (200) – Learn how you can increase the efficiency and reach your operations with Amazon Translate and Amazon Transcribe.

April 26, 2018 | 09:00 AM – 09:45 AM PT Perform Machine Learning at the IoT Edge using AWS Greengrass and Amazon Sagemaker (200) – Learn more about developing machine learning applications for the IoT edge.

Mobile

April 30, 2018 | 11:00 AM – 11:45 AM PTOffline GraphQL Apps with AWS AppSync (300) – Come learn how to enable real-time and offline data in your applications with GraphQL using AWS AppSync.

Networking

May 2, 2018 | 09:00 AM – 09:45 AM PT Taking Serverless to the Edge (300) – Learn how to run your code closer to your end users in a serverless fashion. Also, David Von Lehman from Aerobatic will discuss how they used [email protected] to reduce latency and cloud costs for their customer’s websites.

Security, Identity & Compliance

April 30, 2018 | 09:00 AM – 09:45 AM PTAmazon GuardDuty – Let’s Attack My Account! (300) – Amazon GuardDuty Test Drive – Practical steps on generating test findings.

May 3, 2018 | 09:00 AM – 09:45 AM PTProtect Your Game Servers from DDoS Attacks (200) – Learn how to use the new AWS Shield Advanced for EC2 to protect your internet-facing game servers against network layer DDoS attacks and application layer attacks of all kinds.

Serverless

April 24, 2018 | 01:00 PM – 01:45 PM PTTips and Tricks for Building and Deploying Serverless Apps In Minutes (200) – Learn how to build and deploy apps in minutes.

Storage

May 1, 2018 | 11:00 AM – 11:45 AM PTBuilding Data Lakes That Cost Less and Deliver Results Faster (300) – Learn how Amazon S3 Select And Amazon Glacier Select increase application performance by up to 400% and reduce total cost of ownership by extending your data lake into cost-effective archive storage.

May 3, 2018 | 11:00 AM – 11:45 AM PTIntegrating On-Premises Vendors with AWS for Backup (300) – Learn how to work with AWS and technology partners to build backup & restore solutions for your on-premises, hybrid, and cloud native environments.

How to migrate a Hue database from an existing Amazon EMR cluster

Post Syndicated from Anvesh Ragi original https://aws.amazon.com/blogs/big-data/how-to-migrate-a-hue-database-from-an-existing-amazon-emr-cluster/

Hadoop User Experience (Hue) is an open-source, web-based, graphical user interface for use with Amazon EMR and Apache Hadoop. The Hue database stores things like users, groups, authorization permissions, Apache Hive queries, Apache Oozie workflows, and so on.

There might come a time when you want to migrate your Hue database to a new EMR cluster. For example, you might want to upgrade from an older version of the Amazon EMR AMI (Amazon Machine Image), but your Hue application and its database have had a lot of customization.You can avoid re-creating these user entities and retain query/workflow histories in Hue by migrating the existing Hue database, or remote database in Amazon RDS, to a new cluster.

By default, Hue user information and query histories are stored in a local MySQL database on the EMR cluster’s master node. However, you can create one or more Hue-enabled clusters using a configuration stored in Amazon S3 and a remote MySQL database in Amazon RDS. This allows you to preserve user information and query history that Hue creates without keeping your Amazon EMR cluster running.

This post describes the step-by-step process for migrating the Hue database from an existing EMR cluster.

Note: Amazon EMR supports different Hue versions across different AMI releases. Keep in mind the compatibility of Hue versions between the old and new clusters in this migration activity. Currently, Hue 3.x.x versions are not compatible with Hue 4.x.x versions, and therefore a migration between these two Hue versions might create issues. In addition, Hue 3.10.0 is not backward compatible with its previous 3.x.x versions.

Before you begin

First, let’s create a new testUser in Hue on an existing EMR cluster, as shown following:

You will use these credentials later to log in to Hue on the new EMR cluster and validate whether you have successfully migrated the Hue database.

Let’s get started!

Migration how-to

Follow these steps to migrate your database to a new EMR cluster and then validate the migration process.

1.) Make a backup of the existing Hue database.

Use SSH to connect to the master node of the old cluster, as shown following (if you are using Linux/Unix/macOS), and dump the Hue database to a JSON file.

$ ssh -i ~/key.pem [email protected]
$ /usr/lib/hue/build/env/bin/hue dumpdata > ./hue-mysql.json

Edit the hue-mysql.json output file by removing all JSON objects that have useradmin.userprofile in the model field, and save the file. For example, remove the objects as shown following:

{
  "pk": 1,
  "model": "useradmin.userprofile",
  "fields": {
    "last_activity": "2018-01-10T11:41:04",
    "creation_method": "HUE",
    "first_login": false,
    "user": 1,
    "home_directory": "/user/hue_admin"
  }
},

2.) Store the hue-mysql.json file on persistent storage like Amazon S3.

You can copy the file from the old EMR cluster to Amazon S3 using the AWS CLI or Secure Copy (SCP) client. For example, the following uses the AWS CLI:

$ aws s3 cp ./hue-mysql.json s3://YourBucketName/folder/

3.) Recover/reload the backed-up Hue database into the new EMR cluster.

a.) Use SSH to connect to the master node of the new EMR cluster, and stop the Hue service that is already running.

$ ssh -i ~/key.pem [email protected]
$ sudo stop hue
hue stop/waiting

b.) Connect to the Hue database—either the local MySQL database or the remote database in Amazon RDS for your cluster as shown following, using the mysql client.

$ mysql -h HOST –u USER –pPASSWORD

For a local MySQL database, you can find the hostname, user name, and password for connecting to the database in the /etc/hue/conf/hue.ini file on the master node.

[[database]]
    engine = mysql
    name = huedb
    case_insensitive_collation = utf8_unicode_ci
    test_charset = utf8
    test_collation = utf8_bin
    host = ip-172-31-37-133.us-west-2.compute.internal
    user = hue
    test_name = test_huedb
    password = QdWbL3Ai6GcBqk26
    port = 3306

Based on the preceding example configuration, the sample command is as follows. (Replace the host, user, and password details based on your EMR cluster settings.)

$ mysql -h ip-172-31-37-133.us-west-2.compute.internal -u hue -pQdWbL3Ai6GcBqk26

c.) Drop the existing Hue database with the name huedb from the MySQL server.

mysql> DROP DATABASE IF EXISTS huedb;

d.) Create a new empty database with the same name huedb.

mysql> CREATE DATABASE huedb DEFAULT CHARACTER SET utf8 DEFAULT COLLATE=utf8_bin;

e.) Now, synchronize Hue with its database huedb.

$ sudo /usr/lib/hue/build/env/bin/hue syncdb --noinput
$ sudo /usr/lib/hue/build/env/bin/hue migrate

(This populates the new huedb with all Hue tables that are required.)

f.) Log in to MySQL again, and drop the foreign key to clean tables.

mysql> SHOW CREATE TABLE huedb.auth_permission;

In the following example, replace <id value> with the actual value from the preceding output.

mysql> ALTER TABLE huedb.auth_permission DROP FOREIGN KEY
content_type_id_refs_id_<id value>;

g.) Delete the contents of the django_content_type

mysql> DELETE FROM huedb.django_content_type;

h.) Download the backed-up Hue database dump from Amazon S3 to the new EMR cluster, and load it into Hue.

$ aws s3 cp s3://YourBucketName/folder/hue-mysql.json ./
$ sudo /usr/lib/hue/build/env/bin/hue loaddata ./hue-mysql.json

i.) In MySQL, add the foreign key content_type_id back to the auth_permission

mysql> use huedb;
mysql> ALTER TABLE huedb.auth_permission ADD FOREIGN KEY (`content_type_id`) REFERENCES `django_content_type` (`id`);

j.) Start the Hue service again.

$ sudo start hue
hue start/running, process XXXX

That’s it! Now, verify whether you can successfully access the Hue UI, and sign in using your existing testUser credentials.

After a successful sign in to Hue on the new EMR cluster, you should see a similar Hue homepage as shown following with testUser as the user signed in:

Conclusion

You have now learned how to migrate an existing Hue database to a new Amazon EMR cluster and validate the migration process. If you have any similar Amazon EMR administration topics that you want to see covered in a future post, please let us know in the comments below.


Additional Reading

If you found this post useful, be sure to check out Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR and Dynamically Create Friendly URLs for Your Amazon EMR Web Interfaces.


About the Author


Anvesh Ragi is a Big Data Support Engineer with Amazon Web Services. He works closely with AWS customers to provide them architectural and engineering assistance for their data processing workflows. In his free time, he enjoys traveling and going for hikes.

How I built a data warehouse using Amazon Redshift and AWS services in record time

Post Syndicated from Stephen Borg original https://aws.amazon.com/blogs/big-data/how-i-built-a-data-warehouse-using-amazon-redshift-and-aws-services-in-record-time/

This is a customer post by Stephen Borg, the Head of Big Data and BI at Cerberus Technologies.

Cerberus Technologies, in their own words: Cerberus is a company founded in 2017 by a team of visionary iGaming veterans. Our mission is simple – to offer the best tech solutions through a data-driven and a customer-first approach, delivering innovative solutions that go against traditional forms of working and process. This mission is based on the solid foundations of reliability, flexibility and security, and we intend to fundamentally change the way iGaming and other industries interact with technology.

Over the years, I have developed and created a number of data warehouses from scratch. Recently, I built a data warehouse for the iGaming industry single-handedly. To do it, I used the power and flexibility of Amazon Redshift and the wider AWS data management ecosystem. In this post, I explain how I was able to build a robust and scalable data warehouse without the large team of experts typically needed.

In two of my recent projects, I ran into challenges when scaling our data warehouse using on-premises infrastructure. Data was growing at many tens of gigabytes per day, and query performance was suffering. Scaling required major capital investment for hardware and software licenses, and also significant operational costs for maintenance and technical staff to keep it running and performing well. Unfortunately, I couldn’t get the resources needed to scale the infrastructure with data growth, and these projects were abandoned. Thanks to cloud data warehousing, the bottleneck of infrastructure resources, capital expense, and operational costs have been significantly reduced or have totally gone away. There is no more excuse for allowing obstacles of the past to delay delivering timely insights to decision makers, no matter how much data you have.

With Amazon Redshift and AWS, I delivered a cloud data warehouse to the business very quickly, and with a small team: me. I didn’t have to order hardware or software, and I no longer needed to install, configure, tune, or keep up with patches and version updates. Instead, I easily set up a robust data processing pipeline and we were quickly ingesting and analyzing data. Now, my data warehouse team can be extremely lean, and focus more time on bringing in new data and delivering insights. In this post, I show you the AWS services and the architecture that I used.

Handling data feeds

I have several different data sources that provide everything needed to run the business. The data includes activity from our iGaming platform, social media posts, clickstream data, marketing and campaign performance, and customer support engagements.

To handle the diversity of data feeds, I developed abstract integration applications using Docker that run on Amazon EC2 Container Service (Amazon ECS) and feed data to Amazon Kinesis Data Streams. These data streams can be used for real time analytics. In my system, each record in Kinesis is preprocessed by an AWS Lambda function to cleanse and aggregate information. My system then routes it to be stored where I need on Amazon S3 by Amazon Kinesis Data Firehose. Suppose that you used an on-premises architecture to accomplish the same task. A team of data engineers would be required to maintain and monitor a Kafka cluster, develop applications to stream data, and maintain a Hadoop cluster and the infrastructure underneath it for data storage. With my stream processing architecture, there are no servers to manage, no disk drives to replace, and no service monitoring to write.

Setting up a Kinesis stream can be done with a few clicks, and the same for Kinesis Firehose. Firehose can be configured to automatically consume data from a Kinesis Data Stream, and then write compressed data every N minutes to Amazon S3. When I want to process a Kinesis data stream, it’s very easy to set up a Lambda function to be executed on each message received. I can just set a trigger from the AWS Lambda Management Console, as shown following.

I also monitor the duration of function execution using Amazon CloudWatch and AWS X-Ray.

Regardless of the format I receive the data from our partners, I can send it to Kinesis as JSON data using my own formatters. After Firehose writes this to Amazon S3, I have everything in nearly the same structure I received but compressed, encrypted, and optimized for reading.

This data is automatically crawled by AWS Glue and placed into the AWS Glue Data Catalog. This means that I can immediately query the data directly on S3 using Amazon Athena or through Amazon Redshift Spectrum. Previously, I used Amazon EMR and an Amazon RDS–based metastore in Apache Hive for catalog management. Now I can avoid the complexity of maintaining Hive Metastore catalogs. Glue takes care of high availability and the operations side so that I know that end users can always be productive.

Working with Amazon Athena and Amazon Redshift for analysis

I found Amazon Athena extremely useful out of the box for ad hoc analysis. Our engineers (me) use Athena to understand new datasets that we receive and to understand what transformations will be needed for long-term query efficiency.

For our data analysts and data scientists, we’ve selected Amazon Redshift. Amazon Redshift has proven to be the right tool for us over and over again. It easily processes 20+ million transactions per day, regardless of the footprint of the tables and the type of analytics required by the business. Latency is low and query performance expectations have been more than met. We use Redshift Spectrum for long-term data retention, which enables me to extend the analytic power of Amazon Redshift beyond local data to anything stored in S3, and without requiring me to load any data. Redshift Spectrum gives me the freedom to store data where I want, in the format I want, and have it available for processing when I need it.

To load data directly into Amazon Redshift, I use AWS Data Pipeline to orchestrate data workflows. I create Amazon EMR clusters on an intra-day basis, which I can easily adjust to run more or less frequently as needed throughout the day. EMR clusters are used together with Amazon RDS, Apache Spark 2.0, and S3 storage. The data pipeline application loads ETL configurations from Spring RESTful services hosted on AWS Elastic Beanstalk. The application then loads data from S3 into memory, aggregates and cleans the data, and then writes the final version of the data to Amazon Redshift. This data is then ready to use for analysis. Spark on EMR also helps with recommendations and personalization use cases for various business users, and I find this easy to set up and deliver what users want. Finally, business users use Amazon QuickSight for self-service BI to slice, dice, and visualize the data depending on their requirements.

Each AWS service in this architecture plays its part in saving precious time that’s crucial for delivery and getting different departments in the business on board. I found the services easy to set up and use, and all have proven to be highly reliable for our use as our production environments. When the architecture was in place, scaling out was either completely handled by the service, or a matter of a simple API call, and crucially doesn’t require me to change one line of code. Increasing shards for Kinesis can be done in a minute by editing a stream. Increasing capacity for Lambda functions can be accomplished by editing the megabytes allocated for processing, and concurrency is handled automatically. EMR cluster capacity can easily be increased by changing the master and slave node types in Data Pipeline, or by using Auto Scaling. Lastly, RDS and Amazon Redshift can be easily upgraded without any major tasks to be performed by our team (again, me).

In the end, using AWS services including Kinesis, Lambda, Data Pipeline, and Amazon Redshift allows me to keep my team lean and highly productive. I eliminated the cost and delays of capital infrastructure, as well as the late night and weekend calls for support. I can now give maximum value to the business while keeping operational costs down. My team pushed out an agile and highly responsive data warehouse solution in record time and we can handle changing business requirements rapidly, and quickly adapt to new data and new user requests.


Additional Reading

If you found this post useful, be sure to check out Deploy a Data Warehouse Quickly with Amazon Redshift, Amazon RDS for PostgreSQL and Tableau Server and Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift.


About the Author

Stephen Borg is the Head of Big Data and BI at Cerberus Technologies. He has a background in platform software engineering, and first became involved in data warehousing using the typical RDBMS, SQL, ETL, and BI tools. He quickly became passionate about providing insight to help others optimize the business and add personalization to products. He is now the Head of Big Data and BI at Cerberus Technologies.

 

 

 

Build a Multi-Tenant Amazon EMR Cluster with Kerberos, Microsoft Active Directory Integration and EMRFS Authorization

Post Syndicated from Songzhi Liu original https://aws.amazon.com/blogs/big-data/build-a-multi-tenant-amazon-emr-cluster-with-kerberos-microsoft-active-directory-integration-and-emrfs-authorization/

One of the challenges faced by our customers—especially those in highly regulated industries—is balancing the need for security with flexibility. In this post, we cover how to enable multi-tenancy and increase security by using EMRFS (EMR File System) authorization, the Amazon S3 storage-level authorization on Amazon EMR.

Amazon EMR is an easy, fast, and scalable analytics platform enabling large-scale data processing. EMRFS authorization provides Amazon S3 storage-level authorization by configuring EMRFS with multiple IAM roles. With this functionality enabled, different users and groups can share the same cluster and assume their own IAM roles respectively.

Simply put, on Amazon EMR, we can now have an Amazon EC2 role per user assumed at run time instead of one general EC2 role at the cluster level. When the user is trying to access Amazon S3 resources, Amazon EMR evaluates against a predefined mappings list in EMRFS configurations and picks up the right role for the user.

In this post, we will discuss what EMRFS authorization is (Amazon S3 storage-level access control) and show how to configure the role mappings with detailed examples. You will then have the desired permissions in a multi-tenant environment. We also demo Amazon S3 access from HDFS command line, Apache Hive on Hue, and Apache Spark.

EMRFS authorization for Amazon S3

There are two prerequisites for using this feature:

  1. Users must be authenticated, because EMRFS needs to map the current user/group/prefix to a predefined user/group/prefix. There are several authentication options. In this post, we launch a Kerberos-enabled cluster that manages the Key Distribution Center (KDC) on the master node, and enable a one-way trust from the KDC to a Microsoft Active Directory domain.
  2. The application must support accessing Amazon S3 via Applications that have their own S3FileSystem APIs (for example, Presto) are not supported at this time.

EMRFS supports three types of mapping entries: user, group, and Amazon S3 prefix. Let’s use an example to show how this works.

Assume that you have the following three identities in your organization, and they are defined in the Active Directory:

To enable all these groups and users to share the EMR cluster, you need to define the following IAM roles:

In this case, you create a separate Amazon EC2 role that doesn’t give any permission to Amazon S3. Let’s call the role the base role (the EC2 role attached to the EMR cluster), which in this example is named EMR_EC2_RestrictedRole. Then, you define all the Amazon S3 permissions for each specific user or group in their own roles. The restricted role serves as the fallback role when the user doesn’t belong to any user/group, nor does the user try to access any listed Amazon S3 prefixes defined on the list.

Important: For all other roles, like emrfs_auth_group_role_data_eng, you need to add the base role (EMR_EC2_RestrictedRole) as the trusted entity so that it can assume other roles. See the following example:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "ec2.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::511586466501:role/EMR_EC2_RestrictedRole"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

The following is an example policy for the admin user role (emrfs_auth_user_role_admin_user):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "s3:*",
            "Resource": "*"
        }
    ]
}

We are assuming the admin user has access to all buckets in this example.

The following is an example policy for the data science group role (emrfs_auth_group_role_data_sci):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::emrfs-auth-data-science-bucket-demo/*",
                "arn:aws:s3:::emrfs-auth-data-science-bucket-demo"
            ],
            "Action": [
                "s3:*"
            ]
        }
    ]
}

This role grants all Amazon S3 permissions to the emrfs-auth-data-science-bucket-demo bucket and all the objects in it. Similarly, the policy for the role emrfs_auth_group_role_data_eng is shown below:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::emrfs-auth-data-engineering-bucket-demo/*",
                "arn:aws:s3:::emrfs-auth-data-engineering-bucket-demo"
            ],
            "Action": [
                "s3:*"
            ]
        }
    ]
}

Example role mappings configuration

To configure EMRFS authorization, you use EMR security configuration. Here is the configuration we use in this post

Consider the following scenario.

First, the admin user admin1 tries to log in and run a command to access Amazon S3 data through EMRFS. The first role emrfs_auth_user_role_admin_user on the mapping list, which is a user role, is mapped and picked up. Then admin1 has access to the Amazon S3 locations that are defined in this role.

Then a user from the data engineer group (grp_data_engineering) tries to access a data bucket to run some jobs. When EMRFS sees that the user is a member of the grp_data_engineering group, the group role emrfs_auth_group_role_data_eng is assumed, and the user has proper access to Amazon S3 that is defined in the emrfs_auth_group_role_data_eng role.

Next, the third user comes, who is not an admin and doesn’t belong to any of the groups. After failing evaluation of the top three entries, EMRFS evaluates whether the user is trying to access a certain Amazon S3 prefix defined in the last mapping entry. This type of mapping entry is called the prefix type. If the user is trying to access s3://emrfs-auth-default-bucket-demo/, then the prefix mapping is in effect, and the prefix role emrfs_auth_prefix_role_default_s3_prefix is assumed.

If the user is not trying to access any of the Amazon S3 paths that are defined on the list—which means it failed the evaluation of all the entries—it only has the permissions defined in the EMR_EC2RestrictedRole. This role is assumed by the EC2 instances in the cluster.

In this process, all the mappings defined are evaluated in the defined order, and the first role that is mapped is assumed, and the rest of the list is skipped.

Setting up an EMR cluster and mapping Active Directory users and groups

Now that we know how EMRFS authorization role mapping works, the next thing we need to think about is how we can use this feature in an easy and manageable way.

Active Directory setup

Many customers manage their users and groups using Microsoft Active Directory or other tools like OpenLDAP. In this post, we create the Active Directory on an Amazon EC2 instance running Windows Server and create the users and groups we will be using in the example below. After setting up Active Directory, we use the Amazon EMR Kerberos auto-join capability to establish a one-way trust from the KDC running on the EMR master node to the Active Directory domain on the EC2 instance. You can use your own directory services as long as it talks to the LDAP (Lightweight Directory Access Protocol).

To create and join Active Directory to Amazon EMR, follow the steps in the blog post Use Kerberos Authentication to Integrate Amazon EMR with Microsoft Active Directory.

After configuring Active Directory, you can create all the users and groups using the Active Directory tools and add users to appropriate groups. In this example, we created users like admin1, dataeng1, datascientist1, grp_data_engineering, and grp_data_science, and then add the users to the right groups.

Join the EMR cluster to an Active Directory domain

For clusters with Kerberos, Amazon EMR now supports automated Active Directory domain joins. You can use the security configuration to configure the one-way trust from the KDC to the Active Directory domain. You also configure the EMRFS role mappings in the same security configuration.

The following is an example of the EMR security configuration with a trusted Active Directory domain EMRKRB.TEST.COM and the EMRFS role mappings as we discussed earlier:

The EMRFS role mapping configuration is shown in this example:

We will also provide an example AWS CLI command that you can run.

Launching the EMR cluster and running the tests

Now you have configured Kerberos and EMRFS authorization for Amazon S3.

Additionally, you need to configure Hue with Active Directory using the Amazon EMR configuration API in order to log in using the AD users created before. The following is an example of Hue AD configuration.

[
  {
    "Classification":"hue-ini",
    "Properties":{

    },
    "Configurations":[
      {
        "Classification":"desktop",
        "Properties":{

        },
        "Configurations":[
          {
            "Classification":"ldap",
            "Properties":{

            },
            "Configurations":[
              {
                "Classification":"ldap_servers",
                "Properties":{

                },
                "Configurations":[
                  {
                    "Classification":"AWS",
                    "Properties":{
                      "base_dn":"DC=emrkrb,DC=test,DC=com",
                      "ldap_url":"ldap://emrkrb.test.com",
                      "search_bind_authentication":"false",
                      "bind_dn":"CN=adjoiner,CN=users,DC=emrkrb,DC=test,DC=com",
                      "bind_password":"Abc123456",
                      "create_users_on_login":"true",
                      "nt_domain":"emrkrb.test.com"
                    },
                    "Configurations":[

                    ]
                  }
                ]
              }
            ]
          },
          {
            "Classification":"auth",
            "Properties":{
              "backend":"desktop.auth.backend.LdapBackend"
            },
            "Configurations":[

            ]
          }
        ]
      }
    ]
  }

Note: In the preceding configuration JSON file, change the values as required before pasting it into the software setting section in the Amazon EMR console.

Now let’s use this configuration and the security configuration you created before to launch the cluster.

In the Amazon EMR console, choose Create cluster. Then choose Go to advanced options. On the Step1: Software and Steps page, under Edit software settings (optional), paste the configuration in the box.

The rest of the setup is the same as an ordinary cluster setup, except in the Security Options section. In Step 4: Security, under Permissions, choose Custom, and then choose the RestrictedRole that you created before.

Choose the appropriate subnets (these should meet the base requirement in order for a successful Active Directory join—see the Amazon EMR Management Guide for more details), and choose the appropriate security groups to make sure it talks to the Active Directory. Choose a key so that you can log in and configure the cluster.

Most importantly, choose the security configuration that you created earlier to enable Kerberos and EMRFS authorization for Amazon S3.

You can use the following AWS CLI command to create a cluster.

aws emr create-cluster --name "TestEMRFSAuthorization" \ 
--release-label emr-5.10.0 \ --instance-type m3.xlarge \ 
--instance-count 3 \ 
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,KeyName=MyEC2KeyPair \ --service-role EMR_DefaultRole \ 
--security-configuration MyKerberosConfig \ 
--configurations file://hue-config.json \
--applications Name=Hadoop Name=Hive Name=Hue Name=Spark \ 
--kerberos-attributes Realm=EC2.INTERNAL, \ KdcAdminPassword=<YourClusterKDCAdminPassword>, \ ADDomainJoinUser=<YourADUserLogonName>,ADDomainJoinPassword=<YourADUserPassword>, \ 
CrossRealmTrustPrincipalPassword=<MatchADTrustPwd>

Note: If you create the cluster using CLI, you need to save the JSON configuration for Hue into a file named hue-config.json and place it on the server where you run the CLI command.

After the cluster gets into the Waiting state, try to connect by using SSH into the cluster using the Active Directory user name and password.

ssh -l [email protected] <EMR IP or DNS name>

Quickly run two commands to show that the Active Directory join is successful:

  1. id [user name] shows the mapped AD users and groups in Linux.
  2. hdfs groups [user name] shows the mapped group in Hadoop.

Both should return the current Active Directory user and group information if the setup is correct.

Now, you can test the user mapping first. Log in with the admin1 user, and run a Hadoop list directory command:

hadoop fs -ls s3://emrfs-auth-data-science-bucket-demo/

Now switch to a user from the data engineer group.

Retry the previous command to access the admin’s bucket. It should throw an Amazon S3 Access Denied exception.

When you try listing the Amazon S3 bucket that a data engineer group member has accessed, it triggers the group mapping.

hadoop fs -ls s3://emrfs-auth-data-engineering-bucket-demo/

It successfully returns the listing results. Next we will test Apache Hive and then Apache Spark.

 

To run jobs successfully, you need to create a home directory for every user in HDFS for staging data under /user/<username>. Users can configure a step to create a home directory at cluster launch time for every user who has access to the cluster. In this example, you use Hue since Hue will create the home directory in HDFS for the user at the first login. Here Hue also needs to be integrated with the same Active Directory as explained in the example configuration described earlier.

First, log in to Hue as a data engineer user, and open a Hive Notebook in Hue. Then run a query to create a new table pointing to the data engineer bucket, s3://emrfs-auth-data-engineering-bucket-demo/table1_data_eng/.

You can see that the table was created successfully. Now try to create another table pointing to the data science group’s bucket, where the data engineer group doesn’t have access.

It failed and threw an Amazon S3 Access Denied error.

Now insert one line of data into the successfully create table.

Next, log out, switch to a data science group user, and create another table, test2_datasci_tb.

The creation is successful.

The last task is to test Spark (it requires the user directory, but Hue created one in the previous step).

Now let’s come back to the command line and run some Spark commands.

Login to the master node using the datascientist1 user:

Start the SparkSQL interactive shell by typing spark-sql, and run the show tables command. It should list the tables that you created using Hive.

As a data science group user, try select on both tables. You will find that you can only select the table defined in the location that your group has access to.

Conclusion

EMRFS authorization for Amazon S3 enables you to have multiple roles on the same cluster, providing flexibility to configure a shared cluster for different teams to achieve better efficiency. The Active Directory integration and group mapping make it much easier for you to manage your users and groups, and provides better auditability in a multi-tenant environment.


Additional Reading

If you found this post useful, be sure to check out Use Kerberos Authentication to Integrate Amazon EMR with Microsoft Active Directory and Launching and Running an Amazon EMR Cluster inside a VPC.


About the Authors

Songzhi Liu is a Big Data Consultant with AWS Professional Services. He works closely with AWS customers to provide them Big Data & Machine Learning solutions and best practices on the Amazon cloud.

 

 

 

 

AWS Glue Now Supports Scala Scripts

Post Syndicated from Mehul Shah original https://aws.amazon.com/blogs/big-data/aws-glue-now-supports-scala-scripts/

We are excited to announce AWS Glue support for running ETL (extract, transform, and load) scripts in Scala. Scala lovers can rejoice because they now have one more powerful tool in their arsenal. Scala is the native language for Apache Spark, the underlying engine that AWS Glue offers for performing data transformations.

Beyond its elegant language features, writing Scala scripts for AWS Glue has two main advantages over writing scripts in Python. First, Scala is faster for custom transformations that do a lot of heavy lifting because there is no need to shovel data between Python and Apache Spark’s Scala runtime (that is, the Java virtual machine, or JVM). You can build your own transformations or invoke functions in third-party libraries. Second, it’s simpler to call functions in external Java class libraries from Scala because Scala is designed to be Java-compatible. It compiles to the same bytecode, and its data structures don’t need to be converted.

To illustrate these benefits, we walk through an example that analyzes a recent sample of the GitHub public timeline available from the GitHub archive. This site is an archive of public requests to the GitHub service, recording more than 35 event types ranging from commits and forks to issues and comments.

This post shows how to build an example Scala script that identifies highly negative issues in the timeline. It pulls out issue events in the timeline sample, analyzes their titles using the sentiment prediction functions from the Stanford CoreNLP libraries, and surfaces the most negative issues.

Getting started

Before we start writing scripts, we use AWS Glue crawlers to get a sense of the data—its structure and characteristics. We also set up a development endpoint and attach an Apache Zeppelin notebook, so we can interactively explore the data and author the script.

Crawl the data

The dataset used in this example was downloaded from the GitHub archive website into our sample dataset bucket in Amazon S3, and copied to the following locations:

s3://aws-glue-datasets-<region>/examples/scala-blog/githubarchive/data/

Choose the best folder by replacing <region> with the region that you’re working in, for example, us-east-1. Crawl this folder, and put the results into a database named githubarchive in the AWS Glue Data Catalog, as described in the AWS Glue Developer Guide. This folder contains 12 hours of the timeline from January 22, 2017, and is organized hierarchically (that is, partitioned) by year, month, and day.

When finished, use the AWS Glue console to navigate to the table named data in the githubarchive database. Notice that this data has eight top-level columns, which are common to each event type, and three partition columns that correspond to year, month, and day.

Choose the payload column, and you will notice that it has a complex schema—one that reflects the union of the payloads of event types that appear in the crawled data. Also note that the schema that crawlers generate is a subset of the true schema because they sample only a subset of the data.

Set up the library, development endpoint, and notebook

Next, you need to download and set up the libraries that estimate the sentiment in a snippet of text. The Stanford CoreNLP libraries contain a number of human language processing tools, including sentiment prediction.

Download the Stanford CoreNLP libraries. Unzip the .zip file, and you’ll see a directory full of jar files. For this example, the following jars are required:

  • stanford-corenlp-3.8.0.jar
  • stanford-corenlp-3.8.0-models.jar
  • ejml-0.23.jar

Upload these files to an Amazon S3 path that is accessible to AWS Glue so that it can load these libraries when needed. For this example, they are in s3://glue-sample-other/corenlp/.

Development endpoints are static Spark-based environments that can serve as the backend for data exploration. You can attach notebooks to these endpoints to interactively send commands and explore and analyze your data. These endpoints have the same configuration as that of AWS Glue’s job execution system. So, commands and scripts that work there also work the same when registered and run as jobs in AWS Glue.

To set up an endpoint and a Zeppelin notebook to work with that endpoint, follow the instructions in the AWS Glue Developer Guide. When you are creating an endpoint, be sure to specify the locations of the previously mentioned jars in the Dependent jars path as a comma-separated list. Otherwise, the libraries will not be loaded.

After you set up the notebook server, go to the Zeppelin notebook by choosing Dev Endpoints in the left navigation pane on the AWS Glue console. Choose the endpoint that you created. Next, choose the Notebook Server URL, which takes you to the Zeppelin server. Log in using the notebook user name and password that you specified when creating the notebook. Finally, create a new note to try out this example.

Each notebook is a collection of paragraphs, and each paragraph contains a sequence of commands and the output for that command. Moreover, each notebook includes a number of interpreters. If you set up the Zeppelin server using the console, the (Python-based) pyspark and (Scala-based) spark interpreters are already connected to your new development endpoint, with pyspark as the default. Therefore, throughout this example, you need to prepend %spark at the top of your paragraphs. In this example, we omit these for brevity.

Working with the data

In this section, we use AWS Glue extensions to Spark to work with the dataset. We look at the actual schema of the data and filter out the interesting event types for our analysis.

Start with some boilerplate code to import libraries that you need:

%spark

import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.types._
import org.apache.spark.SparkContext

Then, create the Spark and AWS Glue contexts needed for working with the data:

@transient val spark: SparkContext = SparkContext.getOrCreate()
val glueContext: GlueContext = new GlueContext(spark)

You need the transient decorator on the SparkContext when working in Zeppelin; otherwise, you will run into a serialization error when executing commands.

Dynamic frames

This section shows how to create a dynamic frame that contains the GitHub records in the table that you crawled earlier. A dynamic frame is the basic data structure in AWS Glue scripts. It is like an Apache Spark data frame, except that it is designed and optimized for data cleaning and transformation workloads. A dynamic frame is well-suited for representing semi-structured datasets like the GitHub timeline.

A dynamic frame is a collection of dynamic records. In Spark lingo, it is an RDD (resilient distributed dataset) of DynamicRecords. A dynamic record is a self-describing record. Each record encodes its columns and types, so every record can have a schema that is unique from all others in the dynamic frame. This is convenient and often more efficient for datasets like the GitHub timeline, where payloads can vary drastically from one event type to another.

The following creates a dynamic frame, github_events, from your table:

val github_events = glueContext
                    .getCatalogSource(database = "githubarchive", tableName = "data")
                    .getDynamicFrame()

The getCatalogSource() method returns a DataSource, which represents a particular table in the Data Catalog. The getDynamicFrame() method returns a dynamic frame from the source.

Recall that the crawler created a schema from only a sample of the data. You can scan the entire dataset, count the rows, and print the complete schema as follows:

github_events.count
github_events.printSchema()

The result looks like the following:

The data has 414,826 records. As before, notice that there are eight top-level columns, and three partition columns. If you scroll down, you’ll also notice that the payload is the most complex column.

Run functions and filter records

This section describes how you can create your own functions and invoke them seamlessly to filter records. Unlike filtering with Python lambdas, Scala scripts do not need to convert records from one language representation to another, thereby reducing overhead and running much faster.

Let’s create a function that picks only the IssuesEvents from the GitHub timeline. These events are generated whenever someone posts an issue for a particular repository. Each GitHub event record has a field, “type”, that indicates the kind of event it is. The issueFilter() function returns true for records that are IssuesEvents.

def issueFilter(rec: DynamicRecord): Boolean = { 
    rec.getField("type").exists(_ == "IssuesEvent") 
}

Note that the getField() method returns an Option[Any] type, so you first need to check that it exists before checking the type.

You pass this function to the filter transformation, which applies the function on each record and returns a dynamic frame of those records that pass.

val issue_events =  github_events.filter(issueFilter)

Now, let’s look at the size and schema of issue_events.

issue_events.count
issue_events.printSchema()

It’s much smaller (14,063 records), and the payload schema is less complex, reflecting only the schema for issues. Keep a few essential columns for your analysis, and drop the rest using the ApplyMapping() transform:

val issue_titles = issue_events.applyMapping(Seq(("id", "string", "id", "string"),
                                                 ("actor.login", "string", "actor", "string"), 
                                                 ("repo.name", "string", "repo", "string"),
                                                 ("payload.action", "string", "action", "string"),
                                                 ("payload.issue.title", "string", "title", "string")))
issue_titles.show()

The ApplyMapping() transform is quite handy for renaming columns, casting types, and restructuring records. The preceding code snippet tells the transform to select the fields (or columns) that are enumerated in the left half of the tuples and map them to the fields and types in the right half.

Estimating sentiment using Stanford CoreNLP

To focus on the most pressing issues, you might want to isolate the records with the most negative sentiments. The Stanford CoreNLP libraries are Java-based and offer sentiment-prediction functions. Accessing these functions through Python is possible, but quite cumbersome. It requires creating Python surrogate classes and objects for those found on the Java side. Instead, with Scala support, you can use those classes and objects directly and invoke their methods. Let’s see how.

First, import the libraries needed for the analysis:

import java.util.Properties
import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import scala.collection.convert.wrapAll._

The Stanford CoreNLP libraries have a main driver that orchestrates all of their analysis. The driver setup is heavyweight, setting up threads and data structures that are shared across analyses. Apache Spark runs on a cluster with a main driver process and a collection of backend executor processes that do most of the heavy sifting of the data.

The Stanford CoreNLP shared objects are not serializable, so they cannot be distributed easily across a cluster. Instead, you need to initialize them once for every backend executor process that might need them. Here is how to accomplish that:

val props = new Properties()
props.setProperty("annotators", "tokenize, ssplit, parse, sentiment")
props.setProperty("parse.maxlen", "70")

object myNLP {
    lazy val coreNLP = new StanfordCoreNLP(props)
}

The properties tell the libraries which annotators to execute and how many words to process. The preceding code creates an object, myNLP, with a field coreNLP that is lazily evaluated. This field is initialized only when it is needed, and only once. So, when the backend executors start processing the records, each executor initializes the driver for the Stanford CoreNLP libraries only one time.

Next is a function that estimates the sentiment of a text string. It first calls Stanford CoreNLP to annotate the text. Then, it pulls out the sentences and takes the average sentiment across all the sentences. The sentiment is a double, from 0.0 as the most negative to 4.0 as the most positive.

def estimatedSentiment(text: String): Double = {
    if ((text == null) || (!text.nonEmpty)) { return Double.NaN }
    val annotations = myNLP.coreNLP.process(text)
    val sentences = annotations.get(classOf[CoreAnnotations.SentencesAnnotation])
    sentences.foldLeft(0.0)( (csum, x) => { 
        csum + RNNCoreAnnotations.getPredictedClass(x.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])) 
    }) / sentences.length
}

Now, let’s estimate the sentiment of the issue titles and add that computed field as part of the records. You can accomplish this with the map() method on dynamic frames:

val issue_sentiments = issue_titles.map((rec: DynamicRecord) => { 
    val mbody = rec.getField("title")
    mbody match {
        case Some(mval: String) => { 
            rec.addField("sentiment", ScalarNode(estimatedSentiment(mval)))
            rec }
        case _ => rec
    }
})

The map() method applies the user-provided function on every record. The function takes a DynamicRecord as an argument and returns a DynamicRecord. The code above computes the sentiment, adds it in a top-level field, sentiment, to the record, and returns the record.

Count the records with sentiment and show the schema. This takes a few minutes because Spark must initialize the library and run the sentiment analysis, which can be involved.

issue_sentiments.count
issue_sentiments.printSchema()

Notice that all records were processed (14,063), and the sentiment value was added to the schema.

Finally, let’s pick out the titles that have the lowest sentiment (less than 1.5). Count them and print out a sample to see what some of the titles look like.

val pressing_issues = issue_sentiments.filter(_.getField("sentiment").exists(_.asInstanceOf[Double] < 1.5))
pressing_issues.count
pressing_issues.show(10)

Next, write them all to a file so that you can handle them later. (You’ll need to replace the output path with your own.)

glueContext.getSinkWithFormat(connectionType = "s3", 
                              options = JsonOptions("""{"path": "s3://<bucket>/out/path/"}"""), 
                              format = "json")
            .writeDynamicFrame(pressing_issues)

Take a look in the output path, and you can see the output files.

Putting it all together

Now, let’s create a job from the preceding interactive session. The following script combines all the commands from earlier. It processes the GitHub archive files and writes out the highly negative issues:

import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.types._
import org.apache.spark.SparkContext
import java.util.Properties
import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import scala.collection.convert.wrapAll._

object GlueApp {

    object myNLP {
        val props = new Properties()
        props.setProperty("annotators", "tokenize, ssplit, parse, sentiment")
        props.setProperty("parse.maxlen", "70")

        lazy val coreNLP = new StanfordCoreNLP(props)
    }

    def estimatedSentiment(text: String): Double = {
        if ((text == null) || (!text.nonEmpty)) { return Double.NaN }
        val annotations = myNLP.coreNLP.process(text)
        val sentences = annotations.get(classOf[CoreAnnotations.SentencesAnnotation])
        sentences.foldLeft(0.0)( (csum, x) => { 
            csum + RNNCoreAnnotations.getPredictedClass(x.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])) 
        }) / sentences.length
    }

    def main(sysArgs: Array[String]) {
        val spark: SparkContext = SparkContext.getOrCreate()
        val glueContext: GlueContext = new GlueContext(spark)

        val dbname = "githubarchive"
        val tblname = "data"
        val outpath = "s3://<bucket>/out/path/"

        val github_events = glueContext
                            .getCatalogSource(database = dbname, tableName = tblname)
                            .getDynamicFrame()

        val issue_events =  github_events.filter((rec: DynamicRecord) => {
            rec.getField("type").exists(_ == "IssuesEvent")
        })

        val issue_titles = issue_events.applyMapping(Seq(("id", "string", "id", "string"),
                                                         ("actor.login", "string", "actor", "string"), 
                                                         ("repo.name", "string", "repo", "string"),
                                                         ("payload.action", "string", "action", "string"),
                                                         ("payload.issue.title", "string", "title", "string")))

        val issue_sentiments = issue_titles.map((rec: DynamicRecord) => { 
            val mbody = rec.getField("title")
            mbody match {
                case Some(mval: String) => { 
                    rec.addField("sentiment", ScalarNode(estimatedSentiment(mval)))
                    rec }
                case _ => rec
            }
        })

        val pressing_issues = issue_sentiments.filter(_.getField("sentiment").exists(_.asInstanceOf[Double] < 1.5))

        glueContext.getSinkWithFormat(connectionType = "s3", 
                              options = JsonOptions(s"""{"path": "$outpath"}"""), 
                              format = "json")
                    .writeDynamicFrame(pressing_issues)
    }
}

Notice that the script is enclosed in a top-level object called GlueApp, which serves as the script’s entry point for the job. (You’ll need to replace the output path with your own.) Upload the script to an Amazon S3 location so that AWS Glue can load it when needed.

To create the job, open the AWS Glue console. Choose Jobs in the left navigation pane, and then choose Add job. Create a name for the job, and specify a role with permissions to access the data. Choose An existing script that you provide, and choose Scala as the language.

For the Scala class name, type GlueApp to indicate the script’s entry point. Specify the Amazon S3 location of the script.

Choose Script libraries and job parameters. In the Dependent jars path field, enter the Amazon S3 locations of the Stanford CoreNLP libraries from earlier as a comma-separated list (without spaces). Then choose Next.

No connections are needed for this job, so choose Next again. Review the job properties, and choose Finish. Finally, choose Run job to execute the job.

You can simply edit the script’s input table and output path to run this job on whatever GitHub timeline datasets that you might have.

Conclusion

In this post, we showed how to write AWS Glue ETL scripts in Scala via notebooks and how to run them as jobs. Scala has the advantage that it is the native language for the Spark runtime. With Scala, it is easier to call Scala or Java functions and third-party libraries for analyses. Moreover, data processing is faster in Scala because there’s no need to convert records from one language runtime to another.

You can find more example of Scala scripts in our GitHub examples repository: https://github.com/awslabs/aws-glue-samples. We encourage you to experiment with Scala scripts and let us know about any interesting ETL flows that you want to share.

Happy Glue-ing!

 


Additional Reading

If you found this post useful, be sure to check out Simplify Querying Nested JSON with the AWS Glue Relationalize Transform and Genomic Analysis with Hail on Amazon EMR and Amazon Athena.

 


About the Authors

Mehul Shah is a senior software manager for AWS Glue. His passion is leveraging the cloud to build smarter, more efficient, and easier to use data systems. He has three girls, and, therefore, he has no spare time.

 

 

 

Ben Sowell is a software development engineer at AWS Glue.

 

 

 

 
Vinay Vivili is a software development engineer for AWS Glue.