Tag Archives: Amazon EMR

Improve clinical trial outcomes by using AWS technologies

Post Syndicated from Mayank Thakkar original https://aws.amazon.com/blogs/big-data/improve-clinical-trial-outcomes-by-using-aws-technologies/

We are living in a golden age of innovation, where personalized medicine is making it possible to cure diseases that we never thought curable. Digital medicine is helping people with diseases get healthier, and we are constantly discovering how to use the body’s immune system to target and eradicate cancer cells. According to a report published by ClinicalTrials.gov, the number of registered studies hit 293,000 in 2018, representing a 250x growth since 2000.

However, an internal rate of return (IRR) analysis by Endpoints News, using data from EvaluatePharma, highlights some interesting trends. A flourishing trend in pharma innovation is supported by strong growth in registered studies. However, the IRR shows a rapidly declining trend, from around 17 percent in 2000 to below the cost of capital in 2017 and projected to go to 0 percent by 2020.

This blog post is the first installment in a series that focuses on the end-to-end workflow of collecting, storing, processing, visualizing, and acting on clinical trials data in a compliant and secure manner. The series also discusses the application of artificial intelligence and machine learning technologies to the world of clinical trials. In this post, we highlight common architectural patterns that AWS customers use to modernize their clinical trials. These incorporate mobile technologies for better evidence generation, cost reduction, increasing quality, improving access, and making medicine more personalized for patients.

Improving the outcomes of clinical trials and reducing costs

Biotech and pharma organizations are feeling the pressure to use resources as efficiently as possible. This pressure forces them to search for any opportunity to streamline processes, get faster, and stay more secure, all while decreasing costs. More and more life sciences companies are targeting biologics, CAR-T, and precision medicine therapeutics, with focus shifting towards smaller, geographically distributed patient segments. This shift has resulted in an increasing mandate to capture data from previously unavailable, nontraditional sources. These sources include mobile devices, IoT devices, and in-home and clinical devices. Life sciences companies merge data from these sources with data from traditional clinical trials to build robust evidence around the safety and efficacy of a drug.

Early last year, the Clinical Trials Transformation Initiative (CTTI) provided recommendations about using mobile technologies for capturing holistic, high quality, attributable, real-world data from patients and for submission to the U.S. Food and Drug Administration (FDA). By using mobile technologies, life sciences companies can reduce barriers to trial participation and lower costs associated with conducting clinical trials. Global regulatory groups such as the FDA, Health Canada, and Medicines and Healthcare products Regulatory Agency (MHRA), among others, are also in favor of using mobile technologies. Mobile technologies can make patient recruitment more efficient, reach endpoints faster, and reduce the cost and time required to conduct clinical trials.

Improvised data ingestion using mobile technologies can speed up outcomes, reduce costs, and improve the accuracy of clinical trials. This is especially true when mobile data ingestion is supplemented with artificial intelligence and machine learning (AI/ML) technologies.

Together, they can usher in a new age of smart clinical trials.

At the same time, traditional clinical trial processes and technology designed for mass-marketed blockbuster drugs can’t effectively meet emerging industry needs. This leaves life sciences and pharmaceutical companies in need of assistance for evolving their clinical trial operations. These circumstances result in making clinical trials one of the largest areas of investment for bringing a new drug to market.

Using mobile technologies with traditional technologies in clinical trials can improve the outcomes of the trials and simultaneously reduce costs. Some of the use cases that the integration of various technologies enables include these:

  • Identifying and tracking participants in clinical trials
    • Identifying participants for clinical trials recruitment
    • Educating and informing patients participating in clinical trials
    • Implementing standardized protocols and sharing associated information to trial participants
    • Tracking adverse events and safety profiles
  • Integrating genomic and phenotypic data for identifying novel biomarkers
  • Integrating mobile data into clinical trials for better clinical trial management
  • Creation of a patient-control arm based on historical data
  • Stratifying cohorts based on treatment, claims, and registry datasets
  • Building a collaborative, interoperable network for data sharing and knowledge creation
  • Building compliance-ready infrastructure for clinical trial management

The AWS Cloud provides HIPAA eligible services and solutions. As an AWS customer, you can use these to build solutions for global implementation of mobile devices and sensors in trials, secure capture of streaming Internet of Things (IoT) data, and advanced analytics through visualization tools or AI/ML capabilities. Some of the use cases these services and solutions enable are finding and recruiting patients using smart analytics, facilitating global data management, and remote or in-patient site monitoring. Others include predicting lack of adherence, detecting adverse events, and accelerating trial outcomes along with optimizing trial costs.

Clinical Trials 2.0 (CT2.0) at AWS is geared toward facilitating wider adoption of cloud-native services to enable data ingestion from disparate sources, cost-optimized and reliable storage, and holistic analytics. At the same time, CT2.0 provides the granular access control, end-to-end security, and global scalability needed to conduct clinical trials more efficiently.

Reference architecture

One of the typical architectures for managing a clinical trial using mobile technologies is shown following. This architecture focuses on capturing real-time data from mobile sources and providing a way to process it.

* – Additional considerations such as data security, access control, and compliance need to be incorporated into the architecture and are discussed in the remainder of this post.

Managing a trial by using this architecture consists of the following five major steps.

Step 1: Collect data

Mobile devices, personal wearables, instruments, and smart-devices are extensively being used (or being considered) by global pharmaceutical companies in patient care and clinical trials to provide data for activity tracking, vital signs monitoring, and so on, in real-time. Devices like infusion pumps, personal use dialysis machines, and so on require tracking and alerting of device consumables and calibration status. Remote settings management is also a major use case for these kinds of devices. The end-user mobile devices used in the clinical trial emit a lot of telemetry data that requires real-time data capture, data cleansing, transformation, and analysis.

Typically, these devices are connected to an edge node or a smart phone. Such a connection provides sufficient computing resources to stream data to AWS IoT Core. AWS IoT Core can then be configured to write data to Amazon Kinesis Data Firehose in near real time. Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3. S3 provides online, flexible, cost efficient, pay-as-you-go storage that can replicate your data on three Availability Zones within an AWS Region. The edge node or smart phone can use the AWS IoT SDKs to publish and subscribe device data to AWS IoT Core with MQTT. This process uses the AWS method of authentication (called ‘SigV4’), X.509 certificate–based authentication, and customer-created token-based authentication (through custom authorizers). This authenticated approach enables you to map your choice of policies to each certificate and remotely control device or application access. You can also use the Kinesis Data Firehose encryption feature to enable server-side data encryption.

You can also capture additional data such as Case Report Forms (CRF), Electronic Medical Records (EMR), and medical images using Picture Archiving and Communication Systems (PACS). In addition, you can capture laboratory data (Labs) and other Patient Reported Outcomes data (ePRO). AWS provides multiple tools and services to effectively and securely connect to these data sources, enabling you to ingest data in various volumes, variety, and velocities. For more information about creating a HealthCare Data Hub and ingesting Digital Imaging and Communications in Medicine (DICOM) data, see the AWS Big Data Blog post Create a Healthcare Data Hub with AWS and Mirth Connect.

Step 2: Store data

After data is ingested from the devices and wearables used in the clinical trial, Kinesis Data Firehose is used to store the data on Amazon S3. This stored data serves as a raw copy and can later be used for historical analysis and pattern prediction. Using Amazon S3’s lifecycle policies, you can periodically move your data to reduced cost storage such as Amazon S3 Glacier for further optimizing their storage costs. Using Amazon S3 Intelligent Tiering can automatically optimize costs when data access patterns change, without performance impact or operational overhead by moving data between two access tiers—frequent access and infrequent access. You can also choose to encrypt data at rest and in motion using various encryption options available on S3.

Amazon S3 offers an extremely durable, highly available, and infinitely scalable data storage infrastructure, simplifying most data processing, backup, and replication tasks.

Step 3: Data processingfast lane

After collecting and storing a raw copy of the data, Amazon S3 is configured to publish events to AWS Lambda and invoke a Lambda function by passing the event data as a parameter. The Lambda function is used to extract the key performance indicators (KPIs) such as adverse event notifications, medication adherence, and treatment schedule management from the incoming data. You can use Lambda to process these KPIs and store them in Amazon DynamoDB, along with encryption at rest, which powers a near-real-time clinical trial status dashboard. This alerts clinical trial coordinators in real time so that appropriate interventions can take place.

In addition to this, using a data warehouse full of medical records, you can train and implement a machine learning model. This model can predict which patients are about to switch medications or might exhibit adherence challenges in the future. Such prediction can enable clinical trial coordinators to narrow in on those patients with mitigation strategies.

Step 4: Data processing—batch

For historical analysis and pattern prediction, the staged data (stored in S3) is processed in batches. A Lambda function is used to trigger the extract, transform, and load (ETL) process every time new data is added to the raw data S3 bucket. This Lambda function triggers an ETL process using AWS Glue, a fully managed ETL service that makes it easy for you to prepare and load your data for analytics. This approach helps in mining current and historical data to derive actionable insights, which is stored on Amazon S3.

From there, data is loaded on to Amazon Redshift, a cost-effective, petabyte-scale data warehouse offering from AWS. You can also use Amazon Redshift Spectrum to extend data warehousing out to exabytes without loading any data to Amazon Redshift, as detailed in the Big Data blog post Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required. This enables you to provide an all-encompassing picture of the entire clinical trial to your clinical trial coordinators, enabling you to react and respond faster.

In addition to this, you can train and implement a machine learning model to identify patients who might be at risk for adherence challenges. This enables clinical trial coordinators to reinforce patient education and support.

Step 5: Visualize and act on data

After the data is processed and ready to be consumed, you can use Amazon QuickSight, a cloud-native business intelligence service from AWS that offers native Amazon Redshift connectivity. Amazon QuickSight is serverless and so can be rolled out to your audiences in hours. You can also use a host of third-party reporting tools, which can use AWS-supplied JDBC or ODBC drivers or open-source PostgreSQL drivers to connect with Amazon Redshift. These tools include TIBCO Spotfire Analytics, Tableau Server, Qlik Sense Enterprise, Looker, and others. Real-time data processing (step 3 preceding) combines with historical-view batch processing (step 4). Together, they empower contract research organizations (CROs), study managers, trial coordinators, and other entities involved in the clinical trial journey to make effective and informed decisions at a speed and frequency that was previously unavailable. Using Amazon QuickSight’s unique Pay-per-Session pricing model, you can optimize costs for your bursty usage models by paying only when users access the dashboards.

Using Amazon Simple Notification Service (Amazon SNS), real-time feedback based on incoming data and telemetry is sent to patients by using text messages, mobile push, and emails. In addition, study managers and coordinators can send Amazon SNS notifications to patients. Amazon SNS provides a fully managed pub/sub messaging for micro services, distributed systems, and serverless applications. It’s designed for high-throughput, push-based, many-to-many messaging. Alerts and notifications can be based on current data or a combination of current and historical data.

To encrypt messages published to Amazon SNS, you can follow the steps listed in the post Encrypting messages published to Amazon SNS with AWS KMS, on the AWS Compute Blog.   

Data security, data privacy, data integrity, and compliance considerations

At AWS, customer trust is our top priority. We deliver services to millions of active customers, including enterprises, educational institutions, and government agencies in over 190 countries. Our customers include financial services providers, healthcare providers, and governmental agencies, who trust us with some of their most sensitive information.

To facilitate this, along with the services mentioned earlier, you should also use AWS Identity and Access Management (IAM) service. IAM enables you to maintain segregation of access, fine-grained access control, and securing end user mobile and web applications. You can also use AWS Security Token Service (AWS STS) to provide secure, self-expiring, time-boxed, temporary security credentials to third-party administrators and service providers, greatly strengthening your security posture. You can use AWS CloudTrail to log IAM and STS API calls. Additionally, AWS IoT Device Management makes it easy to securely onboard, organize, monitor, and remotely manage IoT devices at scale.

With AWS, you can add an additional layer of security to your data at rest in the cloud. AWS provides scalable and efficient encryption features for services like Amazon EBS, Amazon S3, Amazon Redshift, Amazon SNSAWS Glue, and many more. Flexible key management options, including AWS Key Management Service, enable you to choose whether to have AWS manage the encryption keys or to keep complete control over their keys. In addition, AWS provides APIs for you to integrate encryption and data protection with any of the services that you develop or deploy in an AWS environment.

As a customer, you maintain ownership of your data, and select which AWS services can process, store, and host the content. Generally speaking, AWS doesn’t access or use customers’ content for any purpose without their consent. AWS never uses customer data to derive information for marketing or advertising.

When evaluating the security of a cloud solution, it’s important that you understand and distinguish between the security of the cloud and security in the cloud. The AWS Shared Responsibility Model details this relationship.

To assist you with your compliance efforts, AWS continues to add more services to the various compliance regulations, attestations, certifications, and programs across the world. To decide which services are suitable for you, see the services in scope page.

You can also use various services like, but not limited to, AWS CloudTrail, AWS Config, Amazon GuardDuty, and AWS Key Management Service (AWS KMS) to enhance your compliance and auditing efforts. Find more details in the AWS Compliance Solutions Guide.

Final thoughts

With the ever-growing interconnectivity and technological advances in the field of medical devices, mobile devices and sensors can improve numerous aspects of clinical trials. They can help in recruitment, informed consent, patient counseling, and patient communication management. They can also improve protocol and medication adherence, clinical endpoints measurement, and the process of alerting participants on adverse events. Smart sensors, smart mobile devices, and robust interconnecting systems can be central in conducting clinical trials.

Every biopharma organization conducting or sponsoring a clinical trial activity faces the conundrum of advancing their approach to trials while maintaining overall trial performance and data consistency. The AWS Cloud enables a new dimension for how data is collected, stored, and used for clinical trials. It thus addresses that conundrum as we march towards a new reality of how drugs are brought to market. The AWS Cloud abstracts away technical challenges such as scaling, security, and establishing a cost-efficient IT infrastructure. In doing so, it allows biopharma organizations to focus on their core mission of improving patent lives through the development of effective, groundbreaking treatments.

 


About the Author

Mayank Thakkar – Global Solutions Architect, AWS HealthCare and Life Sciences

 

 

 

 

Deven Atnoor, Ph.D. – Industry Specialist, AWS HealthCare and Life Sciences

 

 

 

 

Best practices for successfully managing memory for Apache Spark applications on Amazon EMR

Post Syndicated from Karunanithi Shanmugam original https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

In the world of big data, a common use case is performing extract, transform (ET) and data analytics on huge amounts of data from a variety of data sources. Often, you then analyze the data to get insights. One of the most popular cloud-based solutions to process such vast amounts of data is Amazon EMR.

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS. Amazon EMR enables organizations to spin up a cluster with multiple instances in a matter of few minutes. It also enables you to process various data engineering and business intelligence workloads through parallel processing. By doing this, to a great extent you can reduce the data processing times, effort, and costs involved in establishing and scaling a cluster.

Apache Spark is a cluster-computing software framework that is open-source, fast, and general-purpose. It is widely used in distributed processing of big data. Apache Spark relies heavily on cluster memory (RAM) as it performs parallel computing in memory across nodes to reduce the I/O and execution times of tasks.

Generally, you perform the following steps when running a Spark application on Amazon EMR:

  1. Upload the Spark application package to Amazon S3.
  2. Configure and launch the Amazon EMR cluster with configured Apache Spark.
  3. Install the application package from Amazon S3 onto the cluster and then run the application.
  4. Terminate the cluster after the application is completed.

It’s important to configure the Spark application appropriately based on data and processing requirements for it to be successful. With default settings, Spark might not use all the available resources of the cluster and might end up with physical or virtual memory issues, or both. There are thousands of questions raised in stackoverflow.com related to this specific topic.

This blog post is intended to assist you by detailing best practices to prevent memory-related issues with Apache Spark on Amazon EMR.

Common memory issues in Spark applications with default or improper configurations

Listed following are a few sample out-of-memory errors that can occur in a Spark application with default or improper configurations.

Out of Memory Error, Java Heap Space

WARN TaskSetManager: Loss was due to 
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space

Out of Memory Error, Exceeding Physical Memory

Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
12.4 GB of 12.3 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
4.5GB of 3GB physical memory used limits.
Consider boosting spark.yarn.executor.memoryOverhead.

Out of Memory Error, Exceeding Virtual Memory

Container killed by YARN for exceeding memory limits.
1.1gb of 1.0gb virtual memory used. Killing container.

Out of Memory Error, Exceeding Executor Memory

Required executor memory (1024+384 MB) is above 
the max threshold (896 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb'
and/or 'yarn.nodemanager.resource.memory-mb

These issues occur for various reasons, some of which are listed following:

  1. When the number of Spark executor instances, the amount of executor memory, the number of cores, or parallelism is not set appropriately to handle large volumes of data.
  2. When the Spark executor’s physical memory exceeds the memory allocated by YARN. In this case, the total of Spark executor instance memory plus memory overhead is not enough to handle memory-intensive operations. Memory-intensive operations include caching, shuffling, and aggregating (using reduceByKey, groupBy, and so on). Or, in some cases, the total of Spark executor instance memory plus memory overhead can be more than what is defined in yarn.scheduler.maximum-allocation-mb.
  3. The memory required to perform system operations such as garbage collection is not available in the Spark executor instance.

In the following sections, I discuss how to properly configure to prevent out-of-memory issues, including but not limited to those preceding.

Configuring for a successful Spark application on Amazon EMR

The following steps can help you configure a successful Spark application on Amazon EMR.

1. Determine the type and number of instances based on application needs

Amazon EMR has three types of nodes:

  1. Master: An EMR cluster has one master, which acts as the resource manager and manages the cluster and tasks.
  2. Core: The core nodes are managed by the master node. Core nodes run YARN NodeManager daemons, Hadoop MapReduce tasks, and Spark executors to manage storage, execute tasks, and send a heartbeat to the master.
  3. Task: The optional task-only nodes perform tasks and don’t store any data, in contrast to core nodes.

Best practice 1: Choose the right type of instance for each of the node types in an Amazon EMR cluster. Doing this is one key to success in running any Spark application on Amazon EMR.

There are numerous instance types offered by AWS with varying ranges of vCPUs, storage, and memory, as described in the Amazon EMR documentation. Based on whether an application is compute-intensive or memory-intensive, you can choose the right instance type with the right compute and memory configuration.

For memory-intensive applications, prefer R type instances over the other instance types. For compute-intensive applications, prefer C type instances. For applications balanced between memory and compute, prefer M type general-purpose instances.

To understand the possible use cases for each instance type offered by AWS, see Amazon EC2 Instance Types on the EC2 service website.

After deciding the instance type, determine the number of instances for each of the node types. You do this based on the size of the input datasets, application execution times, and frequency requirements.

2. Determine the Spark configuration parameters

Before we dive into the details on Spark configuration, let’s get an overview of how the executor container memory is organized using the diagram following.

As the preceding diagram shows, the executor container has multiple memory compartments. Of these, only one (execution memory) is actually used for executing the tasks. These compartments should be properly configured for running the tasks efficiently and without failure.

Calculate and set the following Spark configuration parameters carefully for the Spark application to run successfully:

  • spark.executor.memory – Size of memory to use for each executor that runs the task.
  • spark.executor.cores – Number of virtual cores.
  • spark.driver.memory – Size of memory to use for the driver.
  • spark.driver.cores – Number of virtual cores to use for the driver.
  • spark.executor.instances ­– Number of executors. Set this parameter unless spark.dynamicAllocation.enabled is set to true.
  • spark.default.parallelism – Default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join, reduceByKey, and parallelize when no partition number is set by the user.

Amazon EMR provides high-level information on how it sets the default values for Spark parameters in the release guide. These values are automatically set in the spark-defaults settings based on the core and task instance types in the cluster.

To use all the resources available in a cluster, set the maximizeResourceAllocation parameter to true. This EMR-specific option calculates the maximum compute and memory resources available for an executor on an instance in the core instance group. It then sets these parameters in the spark-defaults settings. Even with this setting, generally the default numbers are low and the application doesn’t use the full strength of the cluster. For example, the default for spark.default.parallelism is only 2 x the number of virtual cores available, though parallelism can be higher for a large cluster.

Spark on YARN can dynamically scale the number of executors used for a Spark application based on the workloads. Using Amazon EMR release version 4.4.0 and later, dynamic allocation is enabled by default (as described in the Spark documentation).

The problem with the spark.dynamicAllocation.enabled property is that it requires you to set subproperties. Some example subproperties are spark.dynamicAllocation.initialExecutors, minExecutors, and maxExecutors. Subproperties are required for most cases to use the right number of executors in a cluster for an application, especially when you need multiple applications to run simultaneously. Setting subproperties requires a lot of trial and error to get the numbers right. If they’re not right, the capacity might be reserved but never actually used. This leads to wastage of resources or memory errors for other applications.

Best practice 2: Set spark.dynamicAllocation.enabled to true only if the numbers are properly determined for spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutors parameters. Otherwise, set spark.dynamicAllocation.enabled to false and control the driver memory, executor memory, and CPU parameters yourself. To do this, calculate and set these properties manually for each application (see the example following).

Let’s assume that we are going to process 200 terabytes of data spread across thousands of file stores in Amazon S3. Further, let’s assume that we do this through an Amazon EMR cluster with 1 r5.12xlarge master node and 19 r5.12xlarge core nodes. Each r5.12xlarge instance has 48 virtual cores (vCPUs) and 384 GB RAM. All these calculations are for the --deploy-mode cluster, which we recommend for production use.

The following list describes how to set some important Spark properties, using the preceding case as an example.

spark.executor.cores

Assigning executors with a large number of virtual cores leads to a low number of executors and reduced parallelism. Assigning a low number of virtual cores leads to a high number of executors, causing a larger amount of I/O operations. Based on historical data, we suggest that you have five virtual cores for each executor to achieve optimal results in any sized cluster.

For the preceding cluster, the property spark.executor.cores should be assigned as follows: spark.executors.cores = 5 (vCPU)

spark.executor.memory

After you decide on the number of virtual cores per executor, calculating this property is much simpler. First, get the number of executors per instance using total number of virtual cores and executor virtual cores. Subtract one virtual core from the total number of virtual cores to reserve it for the Hadoop daemons.

Number of executors per instance = (total number of virtual cores per instance - 1)/ spark.executors.cores

Number of executors per instance = (48 - 1)/ 5 = 47 / 5 = 9 (rounded down)

Then, get the total executor memory by using the total RAM per instance and number of executors per instance. Leave 1 GB for the Hadoop daemons.

Total executor memory = total RAM per instance / number of executors per instance
Total executor memory = 383 / 9 = 42 (rounded down)

This total executor memory includes the executor memory and overhead (spark.yarn.executor.memoryOverhead). Assign 10 percent from this total executor memory to the memory overhead and the remaining 90 percent to executor memory.

spark.executors.memory = total executor memory * 0.90
spark.executors.memory = 42 * 0.9 = 37 (rounded down)

spark.yarn.executor.memoryOverhead = total executor memory * 0.10
spark.yarn.executor.memoryOverhead = 42 * 0.1 = 5 (rounded up)

spark.driver.memory

We recommend setting this to equal spark.executors.memory.

spark.driver.memory = spark.executors.memory

spark.driver.cores

We recommend setting this to equal spark.executors.cores.

spark.driver.cores= spark.executors.cores.

spark.executor.instances

Calculate this by multiplying the number of executors and total number of instances. Leave one executor for the driver.

spark.executor.instances = (number of executors per instance * number of core instances) minus 1 for the driver

spark.executor.instances = (9 * 19) - 1 = 170

spark.default.parallelism

Set this property using the following formula.

spark.default.parallelism = spark.executor.instances * spark.executors.cores * 2

spark.default.parallelism = 170 * 5 * 2 = 1,700

Warning: Although this calculation gives partitions of 1,700, we recommend that you estimate the size of each partition and adjust this number accordingly by using coalesce or repartition.

In case of dataframes, configure the parameter spark.sql.shuffle.partitions along with spark.default.parallelism.

Though the preceding parameters are critical for any Spark application, the following parameters also help in running the applications smoothly to avoid other timeout and memory-related errors. We advise that you set these in the spark-defaults configuration file.

  • spark.network.timeout – Timeout for all network transactions.
  • spark.executor.heartbeatInterval – Interval between each executor’s heartbeats to the driver. This value should be significantly less than spark.network.timeout.
  • spark.memory.fraction – Fraction of JVM heap space used for Spark execution and storage. The lower this is, the more frequently spills and cached data eviction occur.
  • spark.memory.storageFraction – Expressed as a fraction of the size of the region set aside by spark.memory.fraction. The higher this is, the less working memory might be available to execution. This means that tasks might spill to disk more often.
  • spark.yarn.scheduler.reporterThread.maxFailures – Maximum number executor failures allowed before YARN can fail the application.
  • spark.rdd.compress – When set to true, this property can save substantial space at the cost of some extra CPU time by compressing the RDDs.
  • spark.shuffle.compress – When set to true, this property compresses the map output to save space.
  • spark.shuffle.spill.compress – When set to true, this property compresses the data spilled during shuffles.
  • spark.sql.shuffle.partitions – Sets the number of partitions for joins and aggregations.
  • spark.serializer – Sets the serializer to serialize or deserialize data. As a serializer, I prefer Kyro (org.apache.spark.serializer.KryoSerializer), which is faster and more compact than the Java default serializer.

To understand more about each of the parameters mentioned preceding, see the Spark documentation.

We recommend you consider these additional programming techniques for efficient Spark processing:

  • coalesce – Reduces the number of partitions to allow for less data movement.
  • repartition – Reduces or increases the number of partitions and performs full shuffle of data as opposed to coalesce.
  • partitionBy – Distributes data horizontally across partitions.
  • bucketBy – Decomposes data into more manageable parts (buckets) based on hashed columns.
  • cache/persist – Pulls datasets into a clusterwide in-memory cache. Doing this is useful when data is accessed repeatedly, such as when querying a small lookup dataset or when running an iterative algorithm.

Best practice 3: Carefully calculate the preceding additional properties based on application requirements. Set these properties appropriately in spark-defaults, when submitting a Spark application (spark-submit), or within a SparkConf object.

3. Implement a proper garbage collector to clear memory effectively

Garbage collection can lead to out-of-memory errors in certain cases. These include cases when there are multiple large RDDs in the application. Other cases occur when there is an interference between the task execution memory and RDD cached memory.

You can use multiple garbage collectors to evict the old objects and place the new ones into the memory. However, the latest Garbage First Garbage Collector (G1GC) overcomes the latency and throughput limitations with the old garbage collectors.

Best practice 4: Always set up a garbage collector when handling large volume of data through Spark.

The parameter -XX:+UseG1GC specifies that the G1GC garbage collector should be used. (The default is -XX:+UseParallelGC.) To understand the frequency and execution time of the garbage collection, use the parameters -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps. To initiate garbage collection sooner, set InitiatingHeapOccupancyPercent to 35 (the default is 0.45). Doing this helps avoid potential garbage collection for the total memory, which can take a significant amount of time. An example follows.

"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",

4. Set the YARN configuration parameters

Even if all the Spark configuration properties are calculated and set correctly, virtual out-of-memory errors can still occur rarely as virtual memory is bumped up aggressively by the OS. To prevent these application failures, set the following flags in the YARN site settings.

Best practice 5: Always set the virtual and physical memory check flag to false.

"yarn.nodemanager.vmem-check-enabled":"false",
"yarn.nodemanager.pmem-check-enabled":"false"

5. Perform debugging and monitoring

To get details on where the spark configuration options are coming from, you can run spark-submit with the –verbose option. Also, you can use Ganglia and Spark UI to monitor the application progress, Cluster RAM usage, Network I/O, etc.

In the following example, we compare the outcomes between configured and non-configured Spark applications using Ganglia graphs.

When configured following the methods described, a Spark application can process 10 TB data successfully without any memory issues on an Amazon EMR cluster whose specs are as follows:

  • 1 r5.12xlarge master node
  • 19 r5.12xlarge core nodes
  • 8 TB total RAM
  • 960 total virtual CPUs
  • 170 executor instances
  • 5 virtual CPUs/executor
  • 37 GB memory/executor
  • Parallelism equals 1,700

Following, you can find Ganglia graphs for reference.

If you run the same Spark application with default configurations on the same cluster, it fails with an out-of-physical-memory error. This is because the default configurations (two executor instances, parallelism of 2, one vCPU/executor, 8-GB memory/executor) aren’t enough to process 10 TB data. Though the cluster had 7.8 TB memory, the default configurations limited the application to use only 16 GB memory, leading to the following out-of-memory error.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.5 GB of 8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Also, for large datasets, the default garbage collectors don’t clear the memory efficiently enough for the tasks to run in parallel, causing frequent failures. The following charts help in comparing the RAM usage and garbage collection with the default and G1GC garbage collectors.With G1GC, the RAM used is maintained below 5 TB (see the blue area in the graph).

With the default garbage collector (CMS), the RAM used goes above 5 TB. This can lead to the failure of the Spark job when running many tasks continuously.

Example: EMR instance template with configuration

There are different ways to set the Spark and YARN configuration parameters. One of ways is to pass these when creating the EMR cluster.

To do this, in the Amazon EMR console’s Edit software settings section, you can enter the appropriately updated configuration template (Enter configuration). Or the configuration can be passed from S3 (Load JSON from S3).

Following is a configuration template with sample values. At a minimum, calculate and set the following parameters for a successful Spark application.

{
      "InstanceGroups":[
         {
            "Name":"AmazonEMRMaster",
            "Market":"ON_DEMAND",
            "InstanceRole":"MASTER",
            "InstanceType":"r5.12xlarge",
            "InstanceCount":1,
            "Configurations":[
               {
                 "Classification": "yarn-site",
                 "Properties": {
                   "yarn.nodemanager.vmem-check-enabled": "false",
                   "yarn.nodemanager.pmem-check-enabled": "false"
                 }
               },
               {
                 "Classification": "spark",
                 "Properties": {
                   "maximizeResourceAllocation": "false"
                 }
               },
               {
                 "Classification": "spark-defaults",
                 "Properties": {
                   "spark.network.timeout": "800s",
                   "spark.executor.heartbeatInterval": "60s",
                   "spark.dynamicAllocation.enabled": "false",
                   "spark.driver.memory": "21000M",
                   "spark.executor.memory": "21000M",
                   "spark.executor.cores": "5",
                   "spark.executor.instances": "171",
                   "spark.yarn.executor.memoryOverhead": "21000M",
                   "spark.yarn.driver.memoryOverhead": "21000M",
                   "spark.memory.fraction": "0.80",
                   "spark.memory.storageFraction": "0.30",
                   "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.yarn.scheduler.reporterThread.maxFailures": "5",
                   "spark.storage.level": "MEMORY_AND_DISK_SER",
                   "spark.rdd.compress": "true",
                   "spark.shuffle.compress": "true",
                   "spark.shuffle.spill.compress": "true",
                   "spark.default.parallelism": "3400"
                 }
               },
               {
                 "Classification": "mapred-site",
                 "Properties": {
                   "mapreduce.map.output.compress": "true"
                 }
               },
               {
                 "Classification": "hadoop-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Configurations": [],
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                   }
                 }],
                 "Properties": {}
               },
               {
                 "Classification": "spark-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                   }
                 }],
                 "Properties": {}
               }
            ]
        },
        {
            "Name":"AmazonEMRCore",
            "Market":"ON_DEMAND",
             "InstanceRole":"CORE",
             "InstanceType":"r5.12xlarge",
             "InstanceCount":19,
             "Configurations":[        
        ..............
        ..............
        ..............
        }
      ],
      "Ec2KeyName":"KEY_NAME"
  } 

Conclusion

In this blog post, I detailed the possible out-of-memory errors, their causes, and a list of best practices to prevent these errors when submitting a Spark application on Amazon EMR.

My colleagues and I formed these best practices after thorough research and understanding of various Spark configuration properties and testing multiple Spark applications. These best practices apply to most of out-of-memory scenarios, though there might be some rare scenarios where they don’t apply. However, we believe that this blog post provides all the details needed so you can tweak parameters and successfully run a Spark application.

 


About the Author

Karunanithi Shanmugam is a data engineer with AWS Tech and Finance.

 

 

 

 

Best practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR

Post Syndicated from Ran Sheinberg original https://aws.amazon.com/blogs/big-data/best-practices-for-running-apache-spark-applications-using-amazon-ec2-spot-instances-with-amazon-emr/

Apache Spark has become one of the most popular tools for running analytics jobs. This popularity is due to its ease of use, fast performance, utilization of memory and disk, and built-in fault tolerance. These features strongly correlate with the concepts of cloud computing, where instances can be disposable and ephemeral.

Amazon EC2 Spot Instances offer spare compute capacity available in the AWS Cloud at steep discounts compared to On-Demand prices. EC2 can interrupt Spot Instances with two minutes of notification when EC2 needs the capacity back. You can use Spot Instances for various fault-tolerant and flexible applications. Some examples are analytics, containerized workloads, high-performance computing (HPC), stateless web servers, rendering, CI/CD, and other test and development workloads.

Amazon EMR provides a managed Hadoop framework that makes it easy, fast, and cost-effective to process vast amounts of data using EC2 instances. When using Amazon EMR, you don’t need to worry about installing, upgrading, and maintaining Spark software (or any other tool from the Hadoop framework). You also don’t need to worry about installing and maintaining underlying hardware or operating systems. Instead, you can focus on your business applications and use Amazon EMR to remove the undifferentiated heavy lifting.

In this blog post, we are going to focus on cost-optimizing and efficiently running Spark applications on Amazon EMR by using Spot Instances. We recommend several best practices to increase the fault tolerance of your Spark applications and use Spot Instances. These work without compromising availability or having a large impact on performance or the length of your jobs.

Use the Spot Instance Advisor to target instance types with suitable interruption rates

As mentioned, Spot Instances can be interrupted if EC2 needs the capacity back. In this blog post, we share best practices on how to increase the fault tolerance of your Spark applications to withstand occasional loss of underlying EC2 instances due to Spot interruptions. However, even then, targeting EC2 Spot Instances with lower interruption rates can help further. This approach helps by decreasing occurrences where your job gets prolonged because Spark needs to redo some of the work when interruptions occur.

Use the Spot Instance Advisor to check the interruption rates and try to create your Amazon EMR cluster using instance types that historically have lower interruption rates. For example, the frequency of interruption for r4.2xlarge in the US East (Ohio) region at the time of writing this post is less than 5 percent. This means that less than 5 percent of all r4.2xlarge Spot Instances launched in the last 30 days were interrupted by EC2.

Run your Spot workloads on a diversified set of instance types

When running workloads (analytics or others) on EC2 instances and using On-Demand or Reserved Instances purchase options, you can generally use a single instance type across your entire cluster. You might do so after benchmarking to find the right instance type to fit the application’s requirement. However, with Spot Instances, using multiple Spot capacity pools (an instance type in an Availability Zone) in a cluster is key. This practice enables you to achieve scale and preserve capacity for running your jobs.

For example, suppose that I run my Spark application using On-Demand r4.xlarge instances (30.5 GiB memory and four vCPUs). When I start using Spot Instances, I can configure my Amazon EMR cluster’s Core or Task Instance Fleets with several instance types that have similar vCPUs to memory ratio (roughly 7 GB per vCPU) and let EMR choose the right instance type to run in the cluster. These include r4.2xlarge, r5.xlarge, i3.2xlarge, are i3.4xlarge. Taking this approach makes it more likely I’ll have sufficient Spot capacity to launch the cluster. It also increases the chance that Amazon EMR will be able to replenish the required capacity for the cluster to continue running (from other capacity pools) in case some of the capacity in the cluster is terminated by EC2 Spot Interruptions.

Instance typeNumber of vCPUsRAM (in GB)
R4.xlarge430.5
R4.2xlarge861
R5.xlarge432
I3.2xlarge861
I3.4xlarge16122

Size your Spark executors to allow using multiple instance types

As described just previously, a key factor for running on Spot instances is using a diversified fleet of instances. This also helps decrease the impact of Spot interruptions on your jobs. This approach dictates the architecture for your Spark applications.

Running with memory intensive executors (over 20 GB of RAM) ties your application to a specific set of instance types. These might not have sufficient Spot capacity for you to stand up your cluster. These also might have high Spot interruption rates, which might have an impact on your running jobs.

For example, for a Spark application with 90 GiB of RAM and 15 cores per executor, only 11 instance types fit the hardware requirements and fall below the 20 percent Spot interruption rate. Suppose that we break down the executors, keeping the ratio of 6 GiB of RAM per core, to two cores per executor. If we do so, we open up to 20 additional instance types that our job can run on (below the 20 percent interruption rate).

A fair approach to resizing an executor is to decide on the minimum number of cores to run your application on. Two is a good start. You then allocate memory using the following calculation:

NUM_CORES * ((EXECUTOR_MEMORY + MEMORY_OVERHEAD) / EXECUTOR_CORES)

In our example, that is 2 * (( 90 + 20 ) / 15) = 15GB

For more information about the memoryOverhead setting, see the Spark documentation.

Avoid large shuffles in Spark

To reduce the amount of data that Spark needs to reprocess if a Spot Instance is interrupted in your Amazon EMR cluster, you should avoid large shuffles.

Wide dependency operations like GroupBy and some types of joins can produce vast amounts of intermediate data. Intermediate data is stored on local disk and then transferred (shuffled) to other executors in your cluster.

Although you can’t always do so, we recommend to either avoid shuffle operations or work toward minimizing the amount of shuffle data. We recommend this for two reasons:

  • This is a general Spark best practice, because shuffle is an expensive operation.
  • In the context of Spot Instances, doing this decreases the fault tolerance of the job. This is because losing one node that either contains shuffle data or relies on shuffled data for computations (usually both) requires you to rerun some part of the shuffle process.

There are several patterns that we encounter that produce unnecessary amounts of shuffle data, described following.

The explode to group pattern

From a developer point of view, using explode on complex data types might be a quick solution to some use cases (exploding an array to multiple rows). We thus multiply the number of rows, and later in the job can join them back together.

For example, suppose that our data contains user IDs and an array of dates that describe visits to a website:

AB
1user_idvisit_dates_array
20[ “28/01/2018”29/01/2018”, “01/01/2019”]
3100000[ “01/11/2017”, “01/12/2017”]
4999999[ “01/01/2017”, “02/01/2017”, “03/01/2017”,  “04/01/2017”, “05/01/2017”, “06/01/2017”]

 

Suppose that we run a Spark application that sums the number of visits of users in the website. In this case, an easy solution is to use explode and then aggregate the data, as shown following.

Explode the data:

df.selectExpr("user_id", "explode(visit_dates_array) visit_day").createOrReplaceTempView("visits")

Aggregate back the data:

spark.sql("select count(visit_day), user_id 
                  from visits
                  group by user_id")

Although this method is quick and easy, it bloats our data to three times more than the original data. To accurately sum the visits for each user_id, our data also has to be shipped across the network to other executors.

What are the alternatives to exploding and grouping?

One option is to create a UDF that does the calculations in place, avoiding or minimizing shuffles. The following example is in Scala.

val countVisitsUDF = (array: Seq[String]) => {
    array.length
}

spark.udf.register("countVisits",  countVisitsUDF  )

spark.sql("SELECT user_id, countVisits(arr) 
           FROM tab").show
+-------+--------------------+
|user_id|UDF:countVisits(arr)|
+-------+--------------------+
|  20000|                   3|
| 100000|                   2|
|   9999|                   6|
+-------+--------------------+

Another option that was recently introduced in Spark 2.4 is the aggregate function. This function can also reduce the amount of shuffle data to a bare minimum, just the user_id and the count of their visits:

spark.sql("SELECT user_id, 
           sum(aggregate(arr, 0, (acc, x) -> acc +1)) summary 
           FROM tab 
           GROUP BY user_id").show
+-------+-------+
|user_id|summary|
+-------+-------+
| 100000|      2|
|   9999|      6|
|  00000|      3|
+-------+-------+

Huge data joins (bucketing)

When performing join operations, Spark repartitions (shuffles) the data by the join keys.

If you perform multiple joins on the same table or tables with the same key, you can use bucketing to shuffle the data only once. When persisting the data, any subsequent joins on that same key don’t require shuffle because the data is already “pre-shuffled” on Amazon S3.

To bucket your data, you need to decide on the number of buckets to divide your data into and the columns on which the bucketing occurs.

df.write.bucketBy(4,"user_id").saveAsTable("ExampleTable")

Work with data skew

In some cases, data doesn’t distribute uniformly between partitions. This is an issue for several reasons:

  • Generally, most of the executors finish in a timely manner. However, those that handle the large outliers run for a longer time. This increases your risk of getting your Spot Instances interrupted and having to recompute the whole job. It also has a negative impact on overall performance and prolongs the length of the job or causes resources to be underutilized.
  • Data skew can also be a source for large amounts of shuffle data, which can cause issues as discussed previously.

To handle data skew, we recommend that you try to do the computation that  you’re interested in locally on the executors. You then compute over the results. This approach is also known as a combine operation.

A common technique to handle data skew is salting the keys.

Break huge Spark jobs into smaller ones to increase resiliency

One antipattern that we encounter is large applications that perform numerous jobs that can take hours or days to complete.

This kind of job creates an all-or-nothing situation. Here, a failure can cause loss of time and money due to an issue throughout the runtime of the job.

It might sound obvious, but breaking up your jobs to a chain of smaller jobs increases your resiliency to handle failures and Spot interruptions. Breaking up jobs also means that you can remediate any issues preventing the job from finishing successfully. In addition, it decreases the chances of losing the effort already invested in the process.

Work with Amazon EMR Instance fleets

You can use Amazon EMR instance fleets in a couple of techniques to work effectively with Spark.

Diversify the EC2 instance types in your cluster

By configuring Amazon EMR instance fleets, you can set up a fleet of up to five EC2 instance types for each Amazon EMR node type (Master, Core, Task). As discussed earlier, being instance-flexible is key to the ability to launch and maintain Spot capacity for your Amazon EMR cluster.

For the Master node group, one instance out of your selection is chosen by Amazon EMR. In the Core and Task node groups, Amazon EMR selects the best instance types to use in the cluster based on capacity availability and low price. Also, you can specify multiple subnets in different Availability Zones. In this case, Amazon EMR selects the AZ that best fits the target capacity to launch the entire cluster in.

Size Amazon EMR instance fleets according to the job’s hardware requirements

Amazon EMR instance fleets enable you to define a pool of resources by specifying which instance types fit your application. You can also specify the weight each instance type carries in the pool toward your target capacity.

By default, instances are given weight equivalent to the number of their vCPUs. However, you can also provide weights according to other instance characteristics, such as memory, which I demonstrate in this section.

Sizing by CPU:

For example, suppose that I have a job that requires four cores per executor and 1 GB RAM per core, so the Spark configuration is as follows:

--executor-cores 4 --executor-memory 4G

We want the job to run with 20 executors, which means that we need 80 cores (20*4):

The screenshot shows 80 Spot Units as a representation of the 80 cores that are needed to run the job. It also shows the selection of different instance types that fit the hardware requirements.

Amazon EMR chooses any combination of these instance types to fulfill my target capacity of 80 spot units, while possibly some of the larger instance types will run more than one executor.

 

Sizing by memory

Some Spark application requirements are memory-intensive and require a different weight strategy.

For example, if our job runs with four cores and 6 GB per core (--executor-cores 4 --executor-memory 24G), we first choose instances that have at least 28 GB of RAM:

As you can see in the screenshot, in this configuration the instance type selection is set to accommodate the memory requirement. This leaves about 15–20 percent memory free for other processes running inside the instance operating system.

You then calculate the total units calculated by multiplying the number of units of the smallest eligible instances, with the desired number of executors (25*100).

As in the CPU intensive job, some instance types run only one executor while some run several.

Compensating for performance differences between instance generations

Some workloads can see performance improvements of up to 50 percent just by running on newer instance types. This effect is due to AWS Nitro technology, fast CPU clock speeds, or different CPU architecture (moving from Haswell/Broadwell to Skylake), or a combination of these.

If decreasing application running time is a major requirement, you can offset the performance difference between instance type generations by specifying smaller weights to the older instance generations.

For example, suppose that your job runs for an hour with 10 r5.2xlarge instance and two hours with 10 r4.2xlarge instance. In this case, you might prefer defining your instance fleet as follows:

Select the right purchase option for each node type

Spot Blocks are defined-duration Spot Instances that can run up to six hours without being interrupted, and come at a smaller discount rate compared to Spot Instances. However, you can also use Spot Blocks if your jobs can’t suffer Spot interruptions, given that the cluster run time is forecasted to be smaller than six hours.

Master node: Unless your cluster is very short-lived and the runs are cost-driven, avoid running your Master node on a Spot Instance. We suggest this because a Spot interruption on the Master node terminates the entire cluster. Alternatively to On-Demand, you can set up the Master node on a Spot Block. You do so by setting the defined duration of the node and failing over to On-Demand if the Spot Block capacity is unavailable.

Core nodes: Avoid using Spot Instances for Core nodes if the jobs on the cluster use HDFS. That prevents a situation where Spot interruptions cause data loss for data that was written to the HDFS volumes on the instances.

Task nodes: Use Spot Instances for your core nodes by selecting up to five instance types that match your hardware requirement. Amazon EMR fulfills the most suitable capacity by price and capacity availability.

Get EC2 Spot interruption notifications

When EC2 needs to interrupt Spot Instances, a 2-minute warning is issued for each instance that is going to be interrupted. You can programmatically react to the warning in two ways: from within the instance by polling the instance’s metadata service, and by using Amazon CloudWatch Events. You can find the specifics in the documentation.

The use of this warning varies between types of workloads. For example, you can opt to detach the soon-to-be interrupted instances from an Elastic Load Balancer to drain in-flight connections before the instance gets shuts down. Alternatively, you can copy the logs to a centralized location, or gracefully shut down an application.

To learn more about how EMR handles EC2 Spot interruptions, see the AWS Big Data blog post
Spark enhancements for elasticity and resiliency on Amazon EMR.

You might still want to track the Spot interruptions, possibly to correlate between Amazon EMR job failures and Spot interruptions or job length. In this case, you can set up a CloudWatch Event to trigger an AWS Lambda function to feed the interruption into a data store. This approach means that you can query the historical interruptions in your account. For smaller scale or even initial testing, you can use Amazon SNS with an email target to simply get the interruption notifications by email.

Tag your Amazon EMR cluster and track your costs

Tagging your resources in the AWS Cloud is a fundamental best practice. You can read more about tagging strategies on this AWS Answers page. In Amazon EMR, after you tag the cluster, your tags propagate to the underlying EC2 instances and the Amazon EBS volumes that are created by the cluster. This enables you to have a holistic view of the costs of running your Amazon EMR clusters, and can be easily visualized with AWS Cost Explorer.

Summary

In this blog post, we list best practices for cost-optimizing your Spark applications on Amazon EMR by using Spot Instances. We hope that you find these useful and that you test these best practices with your Spark applications to cost-optimize your workloads.

 


About the authors

Ran Sheinberg is a specialist solutions architect for EC2 Spot Instances with Amazon Web Services. He works with AWS customers on cost optimizing their compute spend by utilizing Spot Instances across different types of workloads: stateless web applications, queue workers, containerized workloads, analytics, HPC and others.

 

 

 

Daniel Haviv is a specialist solutions architect for Analytics with Amazon Web Services.

 

 

 

 

Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer

Post Syndicated from Peter Slawski original https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

The EMRFS S3-optimized committer is a new output committer available for use with Apache Spark jobs as of Amazon EMR 5.19.0. This committer improves performance when writing Apache Parquet files to Amazon S3 using the EMR File System (EMRFS). In this post, we run a performance benchmark to compare this new optimized committer with existing committer algorithms, namely FileOutputCommitter algorithm versions 1 and 2. We close with a discussion on current limitations for the new committer, providing workarounds where possible.

Comparison with FileOutputCommitter

In Amazon EMR version 5.19.0 and earlier, Spark jobs that write Parquet to Amazon S3 use a Hadoop commit algorithm called FileOutputCommitter by default. There are two versions of this algorithm, version 1 and 2. Both versions rely on writing intermediate task output to temporary locations. They subsequently perform rename operations to make the data visible at task or job completion time.

Algorithm version 1 has two phases of rename: one to commit the individual task output, and the other to commit the overall job output from completed/successful tasks. Algorithm version 2 is more efficient because task commits rename files directly to the final output location. This eliminates the second rename phase, but it makes partial data visible before the job completes, which not all workloads can tolerate.

The renames that are performed are fast, metadata-only operations on the Hadoop Distributed File System (HDFS). However, when output is written to object stores such as Amazon S3, renames are implemented by copying data to the target and then deleting the source. This rename “penalty” is exacerbated with directory renames, which can happen in both phases of FileOutputCommitter v1. Whereas these are single metadata-only operations on HDFS, committers must execute N copy-and-delete operations on S3.

To partially mitigate this, Amazon EMR 5.14.0+ defaults to FileOutputCommitter v2 when writing Parquet data to S3 with EMRFS in Spark. The new EMRFS S3-optimized committer improves on that work to avoid rename operations altogether by using the transactional properties of Amazon S3 multipart uploads. Tasks may then write their data directly to the final output location, but defer completion of each output file until task commit time.

Performance test

We evaluated the write performance of the different committers by executing the following INSERT OVERWRITE Spark SQL query. The SELECT * FROM range(…) clause generated data at execution time. This produced ~15 GB of data across exactly 100 Parquet files in Amazon S3.

SET rows=4e9; -- 4 Billion
SET partitions=100;

INSERT OVERWRITE DIRECTORY ‘s3://${bucket}/perf-test/${trial_id}’
USING PARQUET SELECT * FROM range(0, ${rows}, 1, ${partitions});

Note: The EMR cluster ran in the same AWS Region as the S3 bucket. The trial_id property used a UUID generator to ensure that there was no conflict between test runs.

We executed our test on an EMR cluster created with the emr-5.19.0 release label, with a single m5d.2xlarge instance in the master group, and eight m5d.2xlarge instances in the core group. We used the default Spark configuration properties set by Amazon EMR for this cluster configuration, which include the following:

spark.dynamicAllocation.enabled true
spark.executor.memory 11168M
spark.executor.cores 4

After running 10 trials for each committer, we captured and summarized query execution times in the following chart. Whereas FileOutputCommitter v2 averaged 49 seconds, the EMRFS S3-optimized committer averaged only 31 seconds—a 1.6x speedup.

As mentioned earlier, FileOutputCommitter v2 eliminates some, but not all, rename operations that FileOutputCommitter v1 uses. To illustrate the full performance impact of renames against S3, we reran the test using FileOutputCommitter v1. In this scenario, we observed an average runtime of 450 seconds, which is 14.5x slower than the EMRFS S3-optimized committer.

The last scenario we evaluated is the case when EMRFS consistent view is enabled, which addresses issues that can arise due to the Amazon S3 data consistency model. In this mode, the EMRFS S3-optimized committer time was unaffected by this change and still averaged 30 seconds. On the other hand, FileOutputCommitter v2 averaged 53 seconds, which was slower than when the consistent view feature was turned off, widening the overall performance difference to 1.8x.

Job correctness

The EMRFS S3-optimized committer has the same limitations that FileOutputCommitter v2 has because both improve performance by fully delegating commit responsibilities to the individual tasks. The following is a discussion of the notable consequences of this design choice.

Partial results from incomplete or failed jobs

Because both committers have their tasks write to the final output location, concurrent readers of that output location can view partial results when using either of them. If a job fails, partial results are left behind from any tasks that have committed before the overall job failed. This situation can lead to duplicate output if the job is run again without first cleaning up the output location.

One way to mitigate this issue is to ensure that a job uses a different output location each time it runs, publishing the location to downstream readers only if the job succeeds. The following code block is an example of this strategy for workloads that use Hive tables. Notice how output_location is set to a unique value each time the job is run, and that the table partition is registered only if the rest of the query succeeds. As long as readers exclusively access data via the table abstraction, they cannot see results before the job finishes.

SET attempt_id=<a random UUID>;
SET output_location=s3://bucket/${attempt_id};

INSERT OVERWRITE DIRECTORY ‘${output_location}’
USING PARQUET SELECT * FROM input;

ALTER TABLE output ADD PARTITION (dt = ‘2018-11-26’)
LOCATION ‘${output_location}’;

This approach requires treating the locations that partitions point to as immutable. Updates to partition contents require restating all results into a new location in S3, and then updating the partition metadata to point to that new location.

Duplicate results from non-idempotent tasks

Another scenario that can cause both committers to produce incorrect results is when jobs composed of non-idempotent tasks produce outputs into non-deterministic locations for each task attempt.

The following is an example of a query that illustrates the issue. It uses a timestamp-based table partitioning scheme to ensure that it writes to a different location for each task attempt.

SET hive.exec.dynamic.partition=true
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO data PARTITION (time) SELECT 42, current_timestamp();

You can avoid the issue of duplicate results in this scenario by ensuring that tasks write to a consistent location across task attempts. For example, instead of calling functions that return the current timestamp within tasks, consider providing the current timestamp as an input to the job. Similarly, if a random number generator is used within jobs, consider using a fixed seed or one that is based on the task’s partition number to ensure that task reattempts uses the same value.

Note: Spark’s built-in random functions rand(), randn(), and uuid() are already designed with this in mind.

Enabling the EMRFS S3-optimized committer

Starting with Amazon EMR version 5.20.0, the EMRFS S3-optimized committer is enabled by default. In Amazon EMR version 5.19.0, you can enable the committer by setting the spark.sql.parquet.fs.optimized.committer.optimization-enabled property to true from within Spark or when creating clusters. The committer takes effect when you use Spark’s built-in Parquet support to write Parquet files into Amazon S3 with EMRFS. This includes using the Parquet data source with Spark SQL, DataFrames, or Datasets. However, there are some use cases when the EMRFS S3-optimized committer does not take effect, and some use cases where Spark performs its own renames entirely outside of the committer. For more information about the committer and about these special cases, see Using the EMRFS S3-optimized Committer in the Amazon EMR Release Guide.

Related Work – S3A Committers

The EMRFS S3-optimized committer was inspired by concepts used by committers that support the S3A file system. The key take-away is that these committers use the transactional nature of S3 multipart uploads to eliminate some or all of the rename costs. This is also the core concept used by the EMRFS S3-optimized committer.

For more information about the various committers available within the ecosystem, including those that support the S3A file system, see the official Apache Hadoop documentation.

Summary

The EMRFS S3-optimized committer improves write performance compared to FileOutputCommitter. Starting with Amazon EMR version 5.19.0, you can use it with Spark’s built-in Parquet support. For more information, see Using the EMRFS S3-optimized Committer in the Amazon EMR Release Guide.

 


About the authors

Peter Slawski is a software development engineer with Amazon Web Services.

 

 

 

 

Jonathan Kelly is a senior software development engineer with Amazon Web Services.

 

 

 

 

Spark enhancements for elasticity and resiliency on Amazon EMR

Post Syndicated from Udit Mehrotra original https://aws.amazon.com/blogs/big-data/spark-enhancements-for-elasticity-and-resiliency-on-amazon-emr/

Customers take advantage of the elasticity in Amazon EMR to save costs by scaling in clusters when workflows are completed, or when running lighter jobs. This also applies to launching clusters with low-cost Amazon EC2 spot instances.

The Automatic Scaling feature in Amazon EMR lets customers dynamically scale clusters in and out, based on cluster usage or other job-related metrics. These features help you use resources efficiently, but they can also cause EC2 instances to shut down in the middle of a running job. This could result in the loss of computation and data, which can affect the stability of the job or result in duplicate work through recomputing.

To gracefully shut down nodes without affecting running jobs, Amazon EMR uses Apache Hadoop‘s decommissioning mechanism, which the Amazon EMR team developed and contributed back to the community. This works well for most Hadoop workloads, but not so much for Apache Spark. Spark currently faces various shortcomings while dealing with node loss. This can cause jobs to get stuck trying to recover and recompute lost tasks and data, and in some cases eventually crashing the job. For more information about some of the open issues in Spark, see the following links:

To avoid some of these issues and help customers take full advantage of Amazon EMR’s elasticity features with Spark, Amazon EMR has customizations to open-source Spark that make it more resilient to node loss. Recomputation is minimized, and jobs can recover faster from node failures and EC2 instance termination. These improvements are in Amazon EMR release version 5.9.0 and later.

This blog post provides an overview of the issues with how open-source Spark handles node loss and the improvements in Amazon EMR to address the issues.

How Spark handles node loss

When a node goes down during an active Spark job, it has the following risks:

  • Tasks that are actively running on the node might fail to complete and have to run on another node.
  • Cached RDDs (resilient distributed dataset) on the node might be lost. While this does impact performance, it does not cause failures or impact the stability of the application.
  • Shuffle output files in memory, or those written to disk on the node, would be lost. Because Amazon EMR enables the External Shuffle Service by default, the shuffle output is written to disk. Losing shuffle files can bring the application to a halt until they are recomputed on another active node, because future tasks might depend on them. For more information about shuffle operations, see Shuffle operations.

To recover from node loss, Spark should be able to do the following:

  • If actively running tasks are lost, they must be scheduled on another node. In addition, computing for the unscheduled remaining tasks must resume.
  • Shuffle output that was computed on the lost node must be recomputed by re-executing the tasks that produced those shuffle blocks.

The following is the sequence of events for Spark to recover when a node is lost:

  • Spark considers actively running tasks on the node as failed and reruns them on another active node.
  • If the node had shuffle output files that are needed by future tasks, the target executors on other active nodes get a FetchFailedException while trying to fetch missing shuffle blocks from the failed node.
  • When the FetchFailedException happens, the target executors retry fetching the blocks from the failed node for a time determined by the spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait configuration values. After all the retry attempts are exhausted, the failure is propagated to the driver.
  • When the driver receives the FetchFailedException, it marks the currently running shuffle stage during which the failure occurred as failed and stops its execution. It also marks the shuffle output on the node or executors from which shuffle blocks could not be fetched as unavailable/lost, so that they can be recomputed. This triggers the previous Map stage to re-attempt recomputing those missing shuffle blocks.
  • After the missing shuffle output is computed, a re-attempt of the failed shuffle stage is triggered to resume the job from where it stopped. It then runs tasks that failed or had not been scheduled yet.

Issues with Spark’s handling of node loss

Spark’s recovery process helps it recover random executor and node failures that can occur in any cloud environment. However, the recovery process begins only after the node has already failed and Spark gets a FetchFailedException while trying to fetch shuffle blocks. This causes some of the issues described in this section.

Amazon EMR can begin the recovery early, as it knows when and which nodes are going down because of a manual resize, an EC2-triggered Spot instance termination, or an automatic scaling event. It can inform Spark immediately about these nodes, so that Spark can take pro-active actions to gracefully handle loss of nodes and start recovery early. However, Spark currently does not have any mechanism through which it can be notified that a node is going down, such as YARN decommissioning. Therefore, it can not take immediate and relevant actions to help recover faster. As a result, here are some of the issues with Spark’s recovery:

  • The node goes down in the middle of the Map stage, as shown in the following diagram:

In this scenario, the shuffle stage is scheduled unnecessarily, and the application must wait for the FetchFailedException before recomputing the lost shuffle. This takes a lot of time. Instead, it would be better if all lost shuffles could be immediately recomputed in the Map stage before even proceeding to the shuffle stage.

  • The node goes down in the middle of a shuffle stage, as shown in the following diagram:

If there was way to immediately inform Spark about node loss, instead of it depending on FetchFailedException and retry fetching, that would save on recovery time.

  • The Spark driver starts recomputation when it gets the first FetchFailedException. It considers the shuffle files on the lost node as missing. However, if multiple nodes go down at the same time, in its first re-attempt of the previous Map stage, the Spark driver recomputes only the shuffle output for the first node from which it received a FetchFailedException. In the short time between receiving the first fetch failure and starting the re-attempt, it is possible that the driver receives fetch failures from other failed nodes. As a result, it can recompute shuffles for multiple lost nodes in the same re-attempt, but there is no guarantee.

    In most cases, even though nodes go down at the same time, Spark requires multiple re-attempts of the map and shuffle stages to recompute all of the lost shuffle output. This can easily cause a job to be blocked for a significant amount of time. Ideally, Spark could recompute in only one retry the shuffle output on all nodes that were lost around the same time.

  • As long as it can reach a node that is about to go down, Spark can continue to schedule more tasks on it. This causes more shuffle outputs to be computed, which may eventually need to be recomputed. Ideally, these tasks can be redirected to healthy nodes to prevent recomputation and improve recovery time.
  • Spark has a limit on the number of consecutive failed attempts allowed for a stage before it aborts a job. This is configurable with spark.stage.maxConsecutiveAttempts. When a node fails and a FetchFailedException occurs, Spark marks running shuffle stage as failed and triggers a re-attempt after computing the missing shuffle outputs. Frequent scaling of nodes during shuffle stages can easily cause stage failures to reach the threshold and abort the jobs. Ideally, when a stage fails for valid reasons such as a manual scale in, an automatic scaling event, or an EC2-triggered Spot instance termination, there should be a way to tell Spark not to count this toward spark.stage.maxConsecutiveAttempts for that stage.

How Amazon EMR resolves these issues

 This section describes the three main enhancements that Amazon EMR has done to its Spark to resolve the issues described in the previous section.

Integrate with YARN’s decommissioning mechanism

 Spark on Amazon EMR uses YARN as the underlying manager for cluster resources. Amazon EMR has its own implementation of a graceful decommissioning mechanism for YARN that provides a way to gracefully shut down YARN node managers by not scheduling new containers on a node in the Decommissioning state. Amazon EMR does this by waiting for the existing tasks on running containers to complete, or time out, before the node is decommissioned. This decommissioning mechanism has recently been contributed back to open source Hadoop.

We integrated Spark with YARN’s decommissioning mechanism so that the Spark driver is notified when a node goes through Decommissioning or Decommissioned states in YARN. This is shown in the following diagram:

This notification allows the driver to take appropriate actions and start the recovery early, because all nodes go through the decommissioning process before being removed.

Extend Spark’s blacklisting mechanism

YARN’s decommissioning mechanism works well for Hadoop MapReduce jobs by not launching any more containers on decommissioning nodes. This prevents more Hadoop MapReduce tasks from being scheduled on that node. However, this does not work well for Spark jobs because in Spark each executor is assigned a YARN container that is long-lived and keeps receiving tasks.

Preventing new containers from being launched only prevents more executors from being assigned to the node. Already active executors/containers continue to schedule new tasks until the node goes down, and they can end up failing and have to be rerun. Also, if these tasks write shuffle output, they would also be lost. This increases the recomputation and the time that it takes for recovery.

To address this, Amazon EMR extends Spark’s blacklisting mechanism to blacklist a node when the Spark driver receives a YARN decommissioning signal for it. This is shown in the following diagram:

This prevents new tasks from being scheduled on the blacklisted node. Instead they are scheduled on healthy nodes. As soon as tasks already running on the node are complete, the node can be safely decommissioned without the risk of task failures or losses. This also speeds up the recovery process by not producing more shuffle output on a node that is going down. This reduces the number of shuffle outputs to be recomputed. If the node comes out of the Decommissioning state and is active again, Amazon EMR removes the node from the blacklists so that new tasks can be scheduled on it.

This blacklisting extension is enabled by default in Amazon EMR with the spark.blacklist.decommissioning.enabled property set to true. You can control the time for which the node is blacklisted using the spark.blacklist.decommissioning.timeout property, which is set to 1 hour by default, equal to the default value for yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs. We recommend setting spark.blacklist.decommissioning.timeout to a value equal to or greater than yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs to make sure that Amazon EMR blacklists the node for the entire decommissioning period.

Actions for decommissioned nodes

After a node is decommissioning, no new tasks are getting scheduled, and the active containers become idle (or the timeout expires), the node gets decommissioned. When the Spark driver receives the decommissioned signal, it can take the following additional actions to start the recovery process sooner rather than waiting for a fetch failure to occur:

  • All of the shuffle outputs on the decommissioned node are unregistered, thus marking them as unavailable. Amazon EMR enables this by default with the setting spark.resourceManager.cleanupExpiredHost set to true. This has the following advantages:
    • If a node is lost in the middle of a map stage and gets decommissioned, Spark initiates recovery and recomputes the lost shuffle outputs on the decommissioned node, before proceeding to the next Stage. This prevents fetch failures in the shuffle stage, because Spark has all of the shuffle blocks computed and available at the end of map stage, which significantly speeds up recovery.
    • If a node is lost in the middle of a shuffle stage, the target executors trying to get shuffle blocks from the lost node immediately notice that the shuffle output is unavailable. It then sends the failure to the driver instead of retrying and failing multiple times to fetch them. The driver then immediately fails the stage and starts recomputing the lost shuffle output. This reduces the time spent trying to fetch shuffle blocks from lost nodes.
    • The most significant advantage of unregistering shuffle outputs is when a cluster is scaled in by a large number of nodes. Because all of the nodes go down around the same time, they all get decommissioned around the same time, and their shuffle outputs are unregistered. When Spark schedules the first re-attempt to compute the missing blocks, it notices all of the missing blocks from decommissioned nodes and recovers in only one attempt. This speeds up the recovery process significantly over the open-source Spark implementation, where stages might be rescheduled multiple times to recompute missing shuffles from all nodes, and prevent jobs from being stuck for hours failing and recomputing.
  • When a stage fails because of fetch failures from a node being decommissioned, by default, Amazon EMR does not count the stage failure toward the maximum number of failures allowed for a stage as set by spark.stage.maxConsecutiveAttempts. This is determined by the setting spark.stage.attempt.ignoreOnDecommissionFetchFailure being set to true. This prevents a job from failing if a stage fails multiple times because of node failures for valid reasons such as a manual resize, an automatic scaling event, or an EC2-triggered Spot instance termination.

Conclusion

This post described how Spark handles node loss and some of the issues that can occur if a cluster is scaled in during an active Spark job. It also showed the customizations that Amazon EMR has built on Spark, and the configurations available to make Spark on Amazon EMR more resilient, so that you can take full advantage of the elasticity features offered by Amazon EMR.

If you have questions or suggestions, please leave a comment.

 


About the Author

Udit Mehrotra is an software development engineer at Amazon Web Services. He works on cutting-edge features of EMR and is also involved in open source projects such as Apache Spark, Apache Hadoop and Apache Hive. In his spare time, he likes to play guitar, travel, binge watch and hang out with friends.

Metadata classification, lineage, and discovery using Apache Atlas on Amazon EMR

Post Syndicated from Nikita Jaggi original https://aws.amazon.com/blogs/big-data/metadata-classification-lineage-and-discovery-using-apache-atlas-on-amazon-emr/

With the ever-evolving and growing role of data in today’s world, data governance is an essential aspect of effective data management. Many organizations use a data lake as a single repository to store data that is in various formats and belongs to a business entity of the organization. The use of metadata, cataloging, and data lineage is key for effective use of the lake.

This post walks you through how Apache Atlas installed on Amazon EMR can provide capability for doing this. You can use this setup to dynamically classify data and view the lineage of data as it moves through various processes. As part of this, you can use a domain-specific language (DSL) in Atlas to search the metadata.

Introduction to Amazon EMR and Apache Atlas

Amazon EMR is a managed service that simplifies the implementation of big data frameworks such as Apache Hadoop and Spark. If you use Amazon EMR, you can choose from a defined set of applications or choose your own from a list.

Apache Atlas is an enterprise-scale data governance and metadata framework for Hadoop. Atlas provides open metadata management and governance capabilities for organizations to build a catalog of their data assets. Atlas supports classification of data, including storage lineage, which depicts how data has evolved. It also provides features to search for key elements and their business definition.

Among all the features that Apache Atlas offers, the core feature of our interest in this post is the Apache Hive metadata management and data lineage. After you successfully set up Atlas, it uses a native tool to import Hive tables and analyze the data to present data lineage intuitively to the end users. To read more about Atlas and its features, see the Atlas website.

AWS Glue Data Catalog vs. Apache Atlas

The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. AWS Glue Data Catalog integrates with Amazon EMR, and also Amazon RDS, Amazon Redshift, Redshift Spectrum, and Amazon Athena. The Data Catalog can work with any application compatible with the Hive metastore.

The scope of installation of Apache Atlas on Amazon EMR is merely what’s needed for the Hive metastore on Amazon EMR to provide capability for lineage, discovery, and classification. Also, you can use this solution for cataloging for AWS Regions that don’t have AWS Glue.

Architecture

Apache Atlas requires that you launch an Amazon EMR cluster with prerequisite applications such as Apache Hadoop, HBase, Hue, and Hive. Apache Atlas uses Apache Solr for search functions and Apache HBase for storage. Both Solr and HBase are installed on the persistent Amazon EMR cluster as part of the Atlas installation.

This solution’s architecture supports both internal and external Hive tables. For the Hive metastore to persist across multiple Amazon EMR clusters, you should use an external Amazon RDS or Amazon Aurora database to contain the metastore. A sample configuration file for the Hive service to reference an external RDS Hive metastore can be found in the Amazon EMR documentation.

The following diagram illustrates the architecture of our solution.

Amazon EMR–Apache Atlas workflow

To demonstrate the functionality of Apache Atlas, we do the following in this post:

  1. Launch an Amazon EMR cluster using the AWS CLI or AWS CloudFormation
  2. Using Hue, populate external Hive tables
  3. View the data lineage of a Hive table
  4. Create a classification
  5. Discover metadata using the Atlas domain-specific language

1a. Launch an Amazon EMR cluster with Apache Atlas using the AWS CLI

The steps following guide you through the installation of Atlas on Amazon EMR by using the AWS CLI. This installation creates an Amazon EMR cluster with Hadoop, HBase, Hive, and Zookeeper. It also executes a step in which a script located in an Amazon S3 bucket runs to install Apache Atlas under the /apache/atlas folder.

The automation shell script assumes the following:

  • You have a working local copy of the AWS CLI package configured, with access and secret keys.
  • You have a default key pair, VPC, and subnet in the AWS Region where you plan to deploy your cluster.
  • You have sufficient permissions to create S3 buckets and Amazon EMR clusters in the default AWS Region configured in the AWS CLI.
aws emr create-cluster --applications Name=Hive Name=HBase Name=Hue Name=Hadoop Name=ZooKeeper \
  --tags Name="EMR-Atlas" \
  --release-label emr-5.16.0 \
  --ec2-attributes SubnetId=<subnet-xxxxx>,KeyName=<Key Name> \
--use-default-roles \
--ebs-root-volume-size 100 \
  --instance-groups 'InstanceGroupType=MASTER, InstanceCount=1, InstanceType=m4.xlarge, InstanceGroupType=CORE, InstanceCount=1, InstanceType=m4.xlarge \
  --log-uri ‘<S3 location for logging>’ \
--steps Name='Run Remote Script',Jar=command-runner.jar,Args=[bash,-c,'curl https://s3.amazonaws.com/aws-bigdata-blog/artifacts/aws-blog-emr-atlas/apache-atlas-emr.sh -o /tmp/script.sh; chmod +x /tmp/script.sh; /tmp/script.sh']

On successful execution of the command, output containing a cluster ID is displayed:

{
    "ClusterId": "j-2V3BNEB9XQ54H"
}

Use the following command to list the names of active clusters (your cluster shows on the list after it is ready):

aws emr list-clusters --active

In the output of the previous command, look for the server name EMR-Atlas (unless you changed the default name in the script). If you have the jq command line utility available, you can run the following command to filter everything but the name and its cluster ID:

aws emr list-clusters --active | jq '.[][] | {(.Name): .Id}'
Sample output:
{
  "external hive store on rds-external-store": "j-1MO3L3XSXZ45V"
}
{
  "EMR-Atlas": "j-301TZ1GBCLK4K"
}

After your cluster shows up on the active list, Amazon EMR and Atlas are ready for operation.

1b. Launch an Amazon EMR cluster with Apache Atlas using AWS CloudFormation

You can also launch your cluster with CloudFormation. Use the emr-atlas.template to set up your Amazon EMR cluster, or launch directly from the AWS Management Console by using this button:

To launch, provide values for the following parameters:

VPC<VPC>
Subnet<Subnet>
EMRLogDir< Amazon EMR logging directory, for example s3://xxx >
KeyName< EC2 key pair name >

Provisioning an Amazon EMR cluster by using the CloudFormation template achieves the same result as the CLI commands outlined previously.

Before proceeding, wait until the CloudFormation stack events show that the status of the stack has reached “CREATE_COMPLETE”.

2. Use Hue to create Hive tables

Next, you log in to Apache Atlas and Hue and use Hue to create Hive tables.

To log in to Atlas, first find the master public DNS name in the cluster installation by using the Amazon EMR Management Console. Then, use the following command to create a Secure Shell (SSH) tunnel to the Atlas web browser.

ssh -L 21000:localhost:21000 -i key.pem [email protected]<EMR Master IP Address>

If the command preceding doesn’t work, make sure that your key file (*.pem) has appropriate permissions. You also might have to add an inbound rule for SSH (port 22) to the master’s security group.

After successfully creating an SSH tunnel, use following URL to access the Apache Atlas UI.

http://localhost:21000

You should see a screen like that shown following. The default login details are username admin and password admin.

To set up a web interface for Hue, follow the steps in the Amazon EMR documentation. As you did for Apache Atlas, create an SSH tunnel on remote port 8888 for the console access:

ssh -L 8888:localhost:8888 -i key.pem [email protected]<EMR Master IP Address>

After the tunnel is up, use following URL for Hue console access.

http://localhost:8888/

At first login, you are asked to create a Hue superuser, as shown following. Do not lose the superuser credentials.

After creating the Hue superuser, you can use the Hue console to run hive queries.

After you log in to Hue, take the following steps and run the following Hive queries:

    • Run the HQL to create a new database:
create database atlas_emr;
use atlas_emr;
    • Create a new external table called trip_details with data stored on S3. Change the S3 location to a bucket you own.
CREATE external TABLE trip_details
(
  pickup_date        string ,
  pickup_time        string ,
  location_id        int ,
  trip_time_in_secs  int ,
  trip_number        int ,
  dispatching_app    string ,
  affiliated_app     string 
)
row format delimited
fields terminated by ',' stored as textfile
LOCATION 's3://aws-bigdata-blog/artifacts/aws-blog-emr-atlas/trip_details/';
    • Create a new lookup external table called trip_zone_lookup with data stored on S3.
CREATE external TABLE trip_zone_lookup 
(
LocationID     int ,
Borough        string ,
Zone           string ,
service_zone   string
)
row format delimited
fields terminated by ',' stored as textfile
LOCATION 's3://aws-bigdata-blog/artifacts/aws-blog-emr-atlas/zone_lookup/';
    • Create an intersect table of trip_details and trip_zone_lookup by joining these tables:
create table trip_details_by_zone as select *  from trip_details  join trip_zone_lookup on LocationID = location_id;

Next, you perform the Hive import. For metadata to be imported in Atlas, the Atlas Hive import tool is only available by using the command line on the Amazon EMR server (there’s no web UI.)  To start, log in to the Amazon EMR master by using SSH:

ssh -i key.pem [email protected]<EMR Master IP Address>

Then execute the following command. The script asks for your user name and password for Atlas. The default user name is admin and password is admin.

/apache/atlas/bin/import-hive.sh

A successful import looks like the following:

Enter username for atlas :- admin
Enter password for atlas :- 
2018-09-06T13:23:33,519 INFO [main] org.apache.atlas.AtlasBaseClient - Client has only one service URL, will use that for all actions: http://localhost:21000
2018-09-06T13:23:33,543 INFO [main] org.apache.hadoop.hive.conf.HiveConf - Found configuration file file:/etc/hive/conf.dist/hive-site.xml
2018-09-06T13:23:34,394 WARN [main] org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-09-06T13:23:35,272 INFO [main] hive.metastore - Trying to connect to metastore with URI thrift://ip-172-31-90-79.ec2.internal:9083
2018-09-06T13:23:35,310 INFO [main] hive.metastore - Opened a connection to metastore, current connections: 1
2018-09-06T13:23:35,365 INFO [main] hive.metastore - Connected to metastore.
2018-09-06T13:23:35,591 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Importing Hive metadata
2018-09-06T13:23:35,602 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Found 2 databases
2018-09-06T13:23:35,713 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:35,987 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Database atlas_emr is already registered - id=cc311c0e-df88-40dc-ac12-6a1ce139ca88. Updating it.
2018-09-06T13:23:36,130 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,144 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Updated hive_db entity: [email protected], guid=cc311c0e-df88-40dc-ac12-6a1ce139ca88
2018-09-06T13:23:36,164 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Found 3 tables to import in database atlas_emr
2018-09-06T13:23:36,287 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,294 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Table atlas_emr.trip_details is already registered with id c2935940-5725-4bb3-9adb-d153e2e8b911. Updating entity.
2018-09-06T13:23:36,688 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,689 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Updated hive_table entity: [email protected], guid=c2935940-5725-4bb3-9adb-d153e2e8b911
2018-09-06T13:23:36,702 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,703 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Process [email protected]:1536239968000 is already registered
2018-09-06T13:23:36,791 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,802 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Table atlas_emr.trip_details_by_zone is already registered with id c0ff33ae-ca82-4048-9671-c0b6597e1475. Updating entity.
2018-09-06T13:23:36,988 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:36,989 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Updated hive_table entity: [email protected], guid=c0ff33ae-ca82-4048-9671-c0b6597e1475
2018-09-06T13:23:37,035 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,038 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Table atlas_emr.trip_zone_lookup is already registered with id 834d102a-6f92-4fc9-a498-4adb4a3e7897. Updating entity.
2018-09-06T13:23:37,213 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,214 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Updated hive_table entity: [email protected], guid=834d102a-6f92-4fc9-a498-4adb4a3e7897
2018-09-06T13:23:37,228 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,228 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Process [email protected]:1536239987000 is already registered
2018-09-06T13:23:37,229 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Successfully imported 3 tables from database atlas_emr
2018-09-06T13:23:37,243 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/uniqueAttribute/type/ contentType=application/json; charset=UTF-8 accept=application/json status=404
2018-09-06T13:23:37,353 INFO [main] org.apache.atlas.AtlasBaseClient - method=POST path=api/atlas/v2/entity/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,361 INFO [main] org.apache.atlas.AtlasBaseClient - method=GET path=api/atlas/v2/entity/guid/ contentType=application/json; charset=UTF-8 accept=application/json status=200
2018-09-06T13:23:37,362 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - Created hive_db entity: [email protected], guid=798fab06-ad75-4324-b7cd-e4d02b6525e8
2018-09-06T13:23:37,365 INFO [main] org.apache.atlas.hive.bridge.HiveMetaStoreBridge - No tables to import in database default
Hive Meta Data imported successfully!!!

After a successful Hive import, you can return to the Atlas Web UI to search the Hive database or the tables that were imported. On the left pane of the Atlas UI, ensure Search is selected, and enter the following information in the two fields listed following:

      • Search By Type: hive_table
      • Search By Text: trip_details

The output of the preceding query should look like this:

3. View the data lineage of your Hive tables using Atlas

To view the lineage of the created tables, you can use the Atlas web search. For example, to see the lineage of the intersect table trip_details_by_zone created earlier, enter the following information:

      • Search By Type: hive_table
      • Search By Text: trip_details_by_zone

The output of the preceding query should look like this:

Now choose the table name trip_details_by_zone to view the details of the table as shown following.

Now when you choose Lineage, you should see the lineage of the table. As shown following, the lineage provides information about its base tables and is an intersect table of two tables.

4. Create a classification for metadata management

Atlas can help you to classify your metadata to comply with data governance requirements specific to your organization. We create an example classification next.

To create a classification, take the following steps

      1. Choose Classification from the left pane, and choose the +
      2. Type PII in the Name field, and Personally Identifiable Information in the Description
      3. Choose Create.

Next, classify the table as PII:

      1. Return to the Search tab on the left pane.
      2. In the Search By Text field, type: trip_zone_lookup

      1. Choose the Classification tab and choose the add icon (+).
      2. Choose the classification that you created (PII) from the list.

      1. Choose Add.

You can classify columns and databases in a similar manner.

Next, view all the entities belonging to this classification.

      1. Choose the Classification tab.
      2. Choose the PII classification that you created.
      3. View all the entities belonging to this classification, displayed on the main pane.

5. Discover metadata using the Atlas domain-specific language (DSL)

Next, you can search Atlas for entities using the Atlas domain-specific language (DSL), which is a SQL-like query language. This language has simple constructs that help users navigate Atlas data repositories. The syntax loosely emulates the popular SQL from the relational database world.

To search a table using DSL:

      1. Choose Search.
      2. Choose Advanced Search.
      3. In Search By Type, choose hive_table.
      4. In Search By Query, search for the table trip_details using the following DSL snippet:
from hive_table where name = trip_details

As shown following, Atlas shows the table’s schema, lineage, and classification information.

Next, search a column using DSL:

      1. Choose Search.
      2. Choose Advanced Search.
      3. In Search By Type, choose hive_column.
      4. In Search By Query, search for column location_id using the following DSL snippet:
from hive_column where name = 'location_id'

As shown following, Atlas shows the existence of column location_id in both of the tables created previously:

You can also count tables using DSL:

      1. Choose Search.
      2. Choose Advanced Search.
      3. In Search By Type, choose hive_table.
      4. In Search By Query, search for table store using the following DSL command:
hive_table select count()

As shown following, Atlas shows the total number of tables.

The final step is to clean up. To avoid unnecessary charges, you should remove your Amazon EMR cluster after you’re done experimenting with it.

The simplest way to do so, if you used CloudFormation, is to remove the CloudFormation stack that you created earlier. By default, the cluster is created with termination protection enabled. To remove the cluster, you first need to turn termination protection off, which you can do by using the Amazon EMR console.

Conclusion

In this post, we outline the steps required to install and configure an Amazon EMR cluster with Apache Atlas by using the AWS CLI or CloudFormation. We also explore how you can import data into Atlas and use the Atlas console to perform queries and view the lineage of our data artifacts.

For more information about Amazon EMR or any other big data topics on AWS, see the EMR blog posts on the AWS Big Data blog.

 


About the Authors

Nikita Jaggi is a senior big data consultant with AWS.

 

 

 

 

Andrew Park is a cloud infrastructure architect at AWS. In addition to being operationally focused in customer engagements, he often works directly with customers to build and to deliver custom AWS solutions.  Having been a Linux solutions engineer for a long time, Andrew loves deep dives into Linux-related challenges. He is an open source advocate, loves baseball, is a recent winner of the “Happy Camper” award in local AWS practice, and loves being helpful in all contexts.

Reduce costs by migrating Apache Spark and Hadoop to Amazon EMR

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/reduce-costs-by-migrating-apache-spark-and-hadoop-to-amazon-emr/

Apache Spark and Hadoop are popular frameworks to process data for analytics, often at a fraction of the cost of legacy approaches, yet at scale they may still become expensive propositions. This blog post discusses ways to reduce your total costs of ownership, while also improving staff productivity at the same time. This can be accomplished by migrating your on-premises workloads to Amazon EMR, making good architectural choices, and taking advantage of features designed to reduce the resource consumption. The advice included has been learned from hundreds of engagements with customers and many points have been validated by the findings of a recently sponsored business value study conducted by IDC’s Carl Olofson and Harsh Singh in the IDC White Paper, sponsored by Amazon Web Services (AWS), “The Economic Benefits of Migrating Apache Spark and Hadoop to Amazon EMR” (November 2018).

Let’s begin with a few headline statistics to demonstrate the positive economic impact from migrating to Amazon EMR. IDC’s survey of nine Amazon EMR customers found an average 57 percent reduced cost of ownership. This was accompanied by a 342 percent five-year ROI, and eight months to break even on the investment. Those customers varied significantly in the size of their deployments for Spark and Hadoop, and accordingly in their results. However, these are compelling numbers for IT and finance leaders to consider as they set their long-term strategy for big data processing.

Now, how exactly does Amazon EMR save you money over on-premises deployments of Spark and Hadoop? The IDC White Paper identified three common answers, which are detailed in this post.

Reducing physical infrastructure costs

An on-premises deployment inevitably has not fully used its hardware for several reasons. One is the provisioning cycle in which servers are selected, ordered, and installed in anticipation of future needs. Despite the efforts to estimate resource needs, this cycle usually brings a tendency to over-provision. In other words, investing capital in assets that aren’t used immediately. This cycle can also result in reaching the resource capacity limits — limiting the ability to complete needed big data processing in a timely fashion.

This is further magnified by the Spark and Hadoop architecture because of the way distinct resources are provisioned in fixed ratios in servers. Each server has a given number of processors and memory (compute) and a set amount of storage capacity. Some workloads may be storage intensive, with vast quantities of data being retained on servers for possible future use cases. Meanwhile, the purchased compute resources sit idle until the day that data is processed and analyzed. Alternately, some workloads may require large quantities of memory or many processors for complex operations, but otherwise run against relatively small volumes of data. In these cases, the local storage may not be fully used.

Achieving durability in HDFS on premises requires multiple copies of data, which increases the hardware requirement. This is already incorporated into Amazon S3, so decoupling compute and storage also reduces the hardware footprint by removing the need to replicate for durability. Although Spark and Hadoop use commodity hardware, which is far more cost efficient than traditional data warehouse appliances, the on premises approach is inherently wasteful, rigid, and not agile enough to meet varying needs over time.

AWS solutions architect Bruno Faria recently gave a talk at re:Invent 2018 about more sophisticated approaches to “Lower Costs on Amazon EMR: Auto Scaling, Spot Pricing, and Expert Strategies”. Let’s recap those points.

Amazon EMR has several natural advantages over the challenges faced on premises. Customers pay only for the resources they actually consume, and only when they use them. On-demand resources lessen the over- and under-provisioning problem. More advanced customers can decouple their compute and storage resources too. They can store as much data as they need to in Amazon S3 and scale their only costs as data volumes grow, not in advance.

Moreover, they can implement their choice of compute instances that are appropriately sized to the job at hand. They are also on-demand, and charged on a granular “per-second of usage” basis. This brings both more cost savings and more agility. Sometimes, customers “lift and shift” their on-premises workloads to AWS and run in Amazon EC2 without Amazon EMR. However, this doesn’t take advantage of the decoupling effect, and is recommended only as an intermediate step of migration.

Once a customer is running their Spark and Hadoop in AWS, they can further reduce compute costs. They can choose from reserved instances, which means making a payment for an expected baseline at significant discounts. This is complemented by On-demand instances, meaning available at any time and paid per-second, and also Spot instances. Spot instances provide interruptible resources for less sensitive workloads at heavily reduced prices. Instance fleets also include the ability for you to specify:

  • A defined duration (Spot block) to keep the Spot Instances running.
  • The maximum Spot price that you’re willing to pay.
  • A timeout period for provisioning Spot Instances.

To let customers blend these purchasing options for the best results, you should understand the:

  • Baseline demand, with predictable SLAs and needs.
  • Periodic or unexpected peaks, to exceed SLA at reduced costs.
  • Nature of the jobs, whether they are transient or long running.

For more information about the instance purchasing options, see Instance Purchasing Options.

The Auto Scaling feature in EMR, available since 2016, can make this even more efficient to implement. It lets the service automatically manage user demand versus resources consumed as this ratio varies over time. For more information, see these best practices for automatic scaling.

Even storage costs can be reduced further. EMRFS, which decouples storage with Amazon S3 provides the ability to scale the clusters’ compute resources independent of storage capacity. In other words, EMRFS alone should already help to reduce costs. Another way to save on storage costs is to partition data to reduce the amount that needs to be scanned for processing and analytics. This is subject to the “Goldilocks” principle, where a customer wants partitions sized to avoid paying for reading unneeded data, but large enough to avoid excess overhead in finding the data needed.

An example of automatic partitioning of Apache Hive external tables is here. Optimizing file sizes can reduce Amazon S3 requests, and compressing data can minimize the bandwidth required to read data into compute instances. Not least, columnar formats for data storage are usually more efficient for big data processing and analytics than row-based layouts.

Applying these recommended methods led to the 57 percent reduction in total cost of ownership, cited earlier in this post.

Capturing these ideas nicely is a quote from the IDC White Paper where an Amazon EMR customer said the following:

Amazon EMR gave us the best bang for the buck. One of the key factors is that our data is obviously growing. Running our big data operations on [Amazon] EMR increases confidence. It’s really good since we get cheap storage for huge amounts of data. The second thing is that the computation that we need fluctuates highly. Some of the data in our database is only occasionally used by our business or data analysts. We choose EMR because it is the most cost-effective solution as well as providing need-based computational expansion.”

Driving higher IT staff productivity

While infrastructure savings can be the most obvious driver for moving Apache Spark and Amazon EMR to the public cloud, improved IT staff productivity may have a significant benefit also. As Amazon EMR is a managed service, there is no need for a staff to spend time evaluating, purchasing, installing, provisioning, integrating, maintaining, or supporting their hardware infrastructure. Nor do they need to evaluate, install, patch, upgrade, or troubleshoot their software infrastructure, which in the rapidly innovating open-source software world can be a never-ending task.

All that effort and associated “soft” costs go away, as the Amazon EMR environment is managed and kept current for customers. IT staff can instead focus their time on assisting data engineers, data scientists, and business analysts in their strategic endeavors, rather than doing infrastructure administration. The IDC White Paper linked these benefits to a 62 percent reduction in staff time cost per year vs. on premises, along with 54 percent reduction of staff costs for the big data environment managers. Respondents also said it helped them be more agile, improve quality, and develop quicker.

Another customer interviewed by IDC summed this up by saying the following:

“We went with Amazon EMR’s ready-made integration site. It is all about not having to spend time on integration…If we choose another Hadoop technology, then our researchers would have to make that work but if we run into a road block and it doesn’t work, we might learn that the hard way. In a way, we would be doing more testing, which would have meant we needed to hire three more people to do the integration work if we weren’t on Amazon EMR.”

Providing stronger big data environment availability

The third major area of savings cited was improved risk mitigation. Because AWS services are built upon many years of learned lessons in efficient and resilient operations, they deliver against promises of greater than 99.99% availability and durability, often with many more 9’s too. Avoiding unplanned downtime was noted by the IDC study to bring a 99% reduction in lost productivity amongst IT and analytics staff.

A customer noted, “We have made systems much more resilient. It is really all about performance and resiliency.”

There are many other economics benefits of migrating to Amazon EMR. They are often linked to improved staff productivity and delivering not only cost savings, but improved performance and a measurable ROI on analytics. But let’s not spoil the whole IDC White Paper, you can read it for yourself!

 


About the Author

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 

 

 

 

Best Practices for Securing Amazon EMR

Post Syndicated from Tony Nguyen original https://aws.amazon.com/blogs/big-data/best-practices-for-securing-amazon-emr/

Whatever your industry, data is central to how organizations function. When we discuss data strategy with customers, it’s often in the context of how do they ingest, store, process, analyze, distribute, and ultimately secure data.

Amazon EMR is a managed Hadoop framework that you use to process vast amounts of data. One of the reasons that customers choose Amazon EMR is its security features. For example, customers like FINRA in regulated industries such as financial services, and in healthcare, choose Amazon EMR as part of their data strategy. They do so to adhere to strict regulatory requirements from entities such as the Payment Card Industry Data Security Standard (PCI) and the Health Insurance Portability and Accountability Act (HIPAA).

This post walks you through some of the principles of Amazon EMR security. It also describes features that you can use in Amazon EMR to help you meet the security and compliance objectives for your business. We cover some common security best practices that we see used. We also show some sample configurations to get you started. For more in-depth information, see Security in the EMR Management Guide.

Encryption, encryption, encryption

Our CTO, Werner Vogels, is fond of saying “Dance like nobody’s watching, encrypt like everybody is.” Properly protecting your data at rest and in transit using encryption is a core component of our well-architected pillar of security. Amazon EMR security configurations (described in the EMR documentation) make it easy for you to encrypt data. A security configuration is like a template for encryption and other security configurations that you can apply to any cluster when you launch it.

Encryption at rest

You have multiple options to encrypt data at rest in your EMR clusters. EMR by default uses the EMR file system (EMRFS) to read from and write data to Amazon S3. To encrypt data in Amazon S3, you can specify one of the following options:

  • SSE-S3: Amazon S3 manages the encryption keys for you
  • SSE-KMS: You use an AWS Key Management Service (AWS KMS) customer master key (CMK) to encrypt your data server-side on Amazon S3. Be sure to use policies that allow access by Amazon EMR.
  • CSE-KMS/CSE-C: Amazon S3 encryption and decryption takes place client-side on your Amazon EMR cluster. You can use keys provided by AWS KMS (CSE-KMS) or use a custom Java class that provides the master key (CSE-C).

With Amazon EMR, setting up the encryption type for Amazon S3 is easy. You just select it from a list.

Which option to choose depends on your specific workload requirements. With SSE-S3, the management of the keys is completely taken care of by Amazon. This is the simplest option. Using SSE-KMS or CSE-KMS enables you to have more control over the encryption keys and enables you to provide your own key material for encryption. The encryption is applied to objects within the bucket. The applications running on your cluster don’t need to be modified or made aware of the encryption in any way.

For even more control, you can use a custom key provider with CSE-C. For more information, see Amazon S3 Client-Side Encryption in the EMR documentation.

For local disk encryption, you have two approaches that complement each other. Let’s break them down by the specific encryption target:

  • To encrypt your root volume with local disk encryption, create a custom Amazon Machine Image (AMI) for Amazon EMR and specify Amazon EBS volume encryption. We cover details on custom AMIs a bit later in this post.
  • To encrypt your storage volumes with local disk encryption, use Amazon EMR security configurations(described in the EMR documentation). Security configurations use a combination of open-source HDFS encryption and LUKS encryption.To use this feature, specify either the Amazon Resource Name (ARN) for an AWS KMS key or provide a custom Java class with the encryption artifacts.

For more information about using KMS with EMR and S3, see How Amazon EMR Uses AWS KMS in the EMR documentation.

If you are using AWS KMS as part of your encryption strategy, see AWS KMS Limits in the EMR documentation for information about the request rates supported for your use case.

Encryption in transit

Amazon EMR security configurations enable you to choose a method for encrypting data in transit using Transport Layer Security (TLS) (as described in the EMR documentation). You can do either of the following:

  • Manually create PEM certificates, zip them in a file, and reference from Amazon S3.
  • Implement a certificate custom provider in Java and specify the S3 path to the JAR.

For more information on how these certificates are used with different big data technologies, check out In Transit Data Encryption with EMR.

Here’s what you’d see when creating a security configuration in the EMR Management Console.

Expressing encryption as code

You can also specify encryption by using the AWS CLI, Boto3, or AWS CloudFormation. Here’s an example with Boto3 that uses SSE-S3 for S3 encryption and KMS for local disk encryption:

import boto3
client = boto3.client('emr')
response = client.create_security_configuration(
    Name='MySecurityConfig'
    SecurityConfiguration='''{
        "EncryptionConfiguration": {
            "EnableInTransitEncryption" : true,
            "EnableAtRestEncryption" : true,
            "AtRestEncryptionConfiguration" : {
                "S3EncryptionConfiguration" : {
                    "EncryptionMode" : "SSE-S3"
                },
                "LocalDiskEncryptionConfiguration" : {
                    "EncryptionKeyProviderType" : "AwsKms",
                    "AwsKmsKey" : "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012"
                }
            }
            "InTransitEncryptionConfiguration" : {
                "TLSCertificateConfiguration" : {
                    "CertificateProviderType" : "PEM",
                    "S3Object" : "s3://MyConfigStore/artifacts/MyCerts.zip"
                }
            }
        }
    }'''
)

And the same snippet for AWS CloudFormation:

Resources:
    MySecurityConfig:
        Type: AWS::EMR::SecurityConfiguration
        Properties:
            Name: MySecurityConfig
            SecurityConfiguration:
                EncryptionConfiguration:
                    EnableInTransitEncryption: true
                    EnableAtRestEncryption: true
                    AtRestEncryptionConfiguration:
                        S3EncryptionConfiguration:
                            EncryptionMode: SSE-S3
                        LocalDiskEncryptionConfiguration:
                            EncryptionKeyProviderType: AwsKms
                            AwsKmsKey: arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012
                    InTransitEncryptionConfiguration:
                        TLSCertificateConfiguration:
                            CertificateProviderType: PEM
                            S3Object: arn:aws:s3:::MyConfigStore/artifacts/MyCerts.zip

For more information about setting up security configurations in Amazon EMR, see the AWS Big Data Blog post Secure Amazon EMR with Encryption.

Authentication and authorization

Authentication and authorization, otherwise known as AuthN and AuthZ, are two crucial components that need to be considered when controlling access to data. Authentication is the verification of an entity, and authorization is checking whether the entity actually has access to the data or resources it’s asking for. In other words, authentication checks whether you’re really who you say you are. Authorization checks whether you actually have access to what you’re asking for. Alice can be authenticated as indeed being Alice, but this doesn’t necessarily mean that Alice has authorization, or access, to look at Bob’s bank account.

Authentication on Amazon EMR

Kerberos, a network authentication protocol created by the Massachusetts Institute of Technology (MIT), uses secret-key cryptography to provide strong authentication. It helps you avoid having sensitive information such as passwords or other credentials sent over the network in an unencrypted and exposed format.

With Kerberos, you can maintain a set of services (known as a realm) and users that need to authenticate (known as principals). You provide a way for these principals to authenticate. You can also integrate your Kerberos setup with other realms. For example, you can have users authenticate from a Microsoft Active Directory domain and have a cross-realm trust set up. Such a setup can allow these authenticated users to be seamlessly authenticated to access your EMR clusters.

Amazon EMR installs open-source Hadoop-based applications on your cluster, meaning that you can use the existing security features that use Kerberos in these products. For example, you can enable Kerberos authentication for YARN, giving user-level authentication for applications running on YARN such as Apache Spark.

You can configure Kerberos on an EMR cluster (known as Kerberizing) to provide a means of authentication for cluster users. Before you configure Kerberos on Amazon EMR, we recommend that you become familiar with Kerberos concepts by reading Use Kerberos Authentication in the EMR documentation.

The following example shows Kerberizing with CloudFormation:

"SecurityConfiguration": {
      "Type": "AWS::EMR::SecurityConfiguration",
      "Properties": {
        "SecurityConfiguration": {
          "AuthenticationConfiguration": {
            "KerberosConfiguration": {
              "ClusterDedicatedKdcConfiguration": {
                "CrossRealmTrustConfiguration": {
                  "Realm": {
                    "Ref": "KerberosADdomain"
                  },
                  "KdcServer": {
                    "Ref": "DomainDNSName"
                  },
                  "Domain": {
                    "Ref": "DomainDNSName"
                  },
                  "AdminServer": {
                    "Ref": "DomainDNSName"
                  }
                },
                "TicketLifetimeInHours": 24
              },
              "Provider": "ClusterDedicatedKdc"
            }
          }
        }
      }
    }
...

Authorization on Amazon EMR

Amazon EMR uses AWS Identity and Access Management (IAM) to help you manage access to your clusters. You can use IAM to create policies for principals such as users and groups that control actions that can be performed with Amazon EMR and other AWS resources.

There are two roles associated with each cluster in Amazon EMR that typically are of interest—the service role and a role for the Amazon EC2 instance profile. You use the service role, or EMR role, for any action related to provisioning resources and other service-level tasks that aren’t performed in the context of an EC2 instance. The instance profile role is used by EC2 instances within the cluster. The policies associated with this role apply to processes that run on the cluster instances. For more information on these roles and others, see Configure IAM Roles for Amazon EMR Permissions to AWS Services in the EMR documentation.

It’s important to understand how IAM helps you control authorized access to your cluster in relation to these roles and where it does not. IAM controls API-level actions done on other AWS services. IAM helps you with things like controlling access to objects in S3, protecting against cluster modification, and restricting access to keys from AWS KMS.

This has important implications. By default, any process running in a cluster inherits the access permissions of the IAM role associated with the cluster. In contrast, IAM doesn’t control activity inside of your clusters. You need to use other means to appropriately secure the processes running on each EC2 instance.

Because of these characteristics, formerly you used to face a particular Amazon EMR authorization challenge. This challenge was to understand how, by default, the IAM role attached to the EC2 instance profile role on your cluster determined the data that can be accessed in Amazon S3. What this effectively meant was that data access to S3 was granular only at the cluster level. This effect made it difficult to have multiple users with potentially different levels of access to data touching the same cluster.

With Amazon EMR versions 5.10.0 and later, EMRFS fine-grained authorization was introduced. This fine-grained authorization enables the ability to specify the IAM role to assume at the user or group level when EMRFS is accessing Amazon S3. This authorization enables fine-grained access control for Amazon S3 on multitenant EMR clusters and also makes it easier to enable cross-account Amazon S3 access to data.

The EMRFS authorization feature specifically applies to access by using HiveServer2. If your users are using Spark or other applications that allows for the execution of arbitrary code (for example, Jupyter, Zeppelin, SSH, spark-shell…), your users can bypass the roles that EMRFS has mapped to them.

For more information on how to configure your security configurations and IAM roles appropriately, read Configure IAM Roles for EMRFS Requests to Amazon S3 in the EMR documentation.

For a great in-depth guide on setting up a multitenant environment with both Kerberos authentication and EMRFS authorization, take a look at the following post on the AWS Big Data blog: Build a Multi-Tenant Amazon EMR Cluster with Kerberos, Microsoft Active Directory Integration and IAM Roles for EMRFS.

Network

Your network topology is also important when designing for security and privacy. We recommend placing your Amazon EMR clusters in private subnets, and use NAT to perform only outbound internet access.

Security groups control inbound and outbound access from your individual instances. With Amazon EMR, you can use both Amazon EMR-managed security groups and also your own security groups to control network access to your instance. By applying the principle of least privilege to your security groups, you can lock down your EMR cluster to only the applications or individuals who need access.

For more information, see Working With Amazon EMR-Managed Security Groups in the EMR Documentation.

The following example shows a security group with boto3:

import boto3
from botocore.exceptions import ClientError

ec2 = boto3.client('ec2')

response = ec2.describe_vpcs()
vpc_id = response.get('Vpcs', [{}])[0].get('VpcId', '')

try:
    response = ec2.create_security_group(GroupName='SECURITY_GROUP_NAME',
                                         Description='Allow 80',
                                         VpcId=vpc_id)
    security_group_id = response['GroupId']

    data = ec2.authorize_security_group_ingress(
        GroupId=security_group_id,
        IpPermissions=[
            {'IpProtocol': 'tcp',
             'FromPort': 22,
             'ToPort': 22,
             'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
        ])
except ClientError as e:
    print(e)

The following example shows a security group with CloudFormation:

InstanceSecurityGroup:
  Type: AWS::EC2::SecurityGroup
  Properties:
    GroupDescription: Allow 80
    VpcId:
      Ref: myVPC
    SecurityGroupIngress:
    - IpProtocol: tcp
      FromPort: '80'
      ToPort: '80'
      CidrIp: 0.0.0.0/0
    SecurityGroupEgress:
    - IpProtocol: tcp
      FromPort: '80'
      ToPort: '80'
      CidrIp: 0.0.0.0/0
 ...

Minimal IAM policy

By default, the IAM policies that are associated with EMR are generally permissive, to enable you to easily integrate EMR with other AWS services. When securing EMR, a best practice is to start from the minimal set of permissions required for EMR to function and add permissions as necessary.

Following are three policies that are scoped around what EMR minimally requires for basic operation. You can potentially further minimize these policies by removing actions related to spot pricing and autoscaling. For clarity, the policies are annotated with comments—remove the comments before use.

Minimal EMR service-role policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Resource": "*",
            "Action": [
                "ec2:AuthorizeSecurityGroupEgress",
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:CancelSpotInstanceRequests",
                "ec2:CreateNetworkInterface",
                "ec2:CreateSecurityGroup",
                "ec2:CreateTags",
                "ec2:DeleteNetworkInterface", // This is only needed if you are launching clusters in a private subnet. 
                "ec2:DeleteTags",
                "ec2:DeleteSecurityGroup", // This is only needed if you are using Amazon managed security groups for private subnets. You can omit this action if you are using custom security groups.
                "ec2:DescribeAvailabilityZones",
                "ec2:DescribeAccountAttributes",
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeImages",
                "ec2:DescribeInstanceStatus",
                "ec2:DescribeInstances",
                "ec2:DescribeKeyPairs",
                "ec2:DescribeNetworkAcls",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribePrefixLists",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSpotInstanceRequests",
                "ec2:DescribeSpotPriceHistory",
                "ec2:DescribeSubnets",
                "ec2:DescribeTags",
                "ec2:DescribeVpcAttribute",
                "ec2:DescribeVpcEndpoints",
                "ec2:DescribeVpcEndpointServices",
                "ec2:DescribeVpcs",
                "ec2:DetachNetworkInterface",
                "ec2:ModifyImageAttribute",
                "ec2:ModifyInstanceAttribute",
                "ec2:RequestSpotInstances",
                "ec2:RevokeSecurityGroupEgress",
                "ec2:RunInstances",
                "ec2:TerminateInstances",
                "ec2:DeleteVolume",
                "ec2:DescribeVolumeStatus",
                "ec2:DescribeVolumes",
                "ec2:DetachVolume",
                "iam:GetRole",
                "iam:GetRolePolicy",
                "iam:ListInstanceProfiles",
                "iam:ListRolePolicies",
                "s3:CreateBucket",
                "sdb:BatchPutAttributes",
                "sdb:Select",
                "cloudwatch:PutMetricAlarm",
                "cloudwatch:DescribeAlarms",
                "cloudwatch:DeleteAlarms",
                "application-autoscaling:RegisterScalableTarget",
                "application-autoscaling:DeregisterScalableTarget",
                "application-autoscaling:PutScalingPolicy",
                "application-autoscaling:DeleteScalingPolicy",
                "application-autoscaling:Describe*"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": ["arn:aws:s3:::examplebucket/*","arn:aws:s3:::examplebucket2/*"], // Here you can specify the list of buckets which are going to be storing cluster logs, bootstrap action script, custom JAR files, input & output paths for EMR steps
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetBucketCORS",
                "s3:GetObjectVersionForReplication",
                "s3:GetObject",
                "s3:GetBucketTagging",
                "s3:GetObjectVersion",
                "s3:GetObjectTagging",
                "s3:ListMultipartUploadParts",
                "s3:ListBucketByTags",
                "s3:ListBucket",
                "s3:ListObjects",
                "s3:ListBucketMultipartUploads"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": "arn:aws:sqs:*:123456789012:AWS-ElasticMapReduce-*", // This allows EMR to only perform actions (Creating queue, receiving messages, deleting queue, etc) on SQS queues whose names are prefixed with the literal string AWS-ElasticMapReduce-
            "Action": [
                "sqs:CreateQueue",
                "sqs:DeleteQueue",
                "sqs:DeleteMessage",
                "sqs:DeleteMessageBatch",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:PurgeQueue",
                "sqs:ReceiveMessage"
            ]
        },
        {
            "Effect": "Allow",  
            "Action": "iam:CreateServiceLinkedRole",  // EMR needs permissions to create this service-linked role for launching EC2 spot instances
            "Resource": "arn:aws:iam::*:role/aws-service-role/spot.amazonaws.com/AWSServiceRoleForEC2Spot*",
            "Condition": {
                "StringLike": {
                    "iam:AWSServiceName": "spot.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": "iam:PassRole", // We are passing the custom EC2 instance profile (defined following) which has bare minimum permissions
            "Resource": [
                "arn:aws:iam::*:role/Custom_EMR_EC2_role",
                "arn:aws:iam::*:role/EMR_AutoScaling_DefaultRole"
            ]
        }
    ]
}

Minimal EMR role for EC2 (instance profile) policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Resource": "*",
            "Action": [
                "ec2:AuthorizeSecurityGroupEgress",
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:CancelSpotInstanceRequests",
                "ec2:CreateNetworkInterface",
                "ec2:CreateSecurityGroup",
                "ec2:CreateTags",
                "ec2:DeleteNetworkInterface", // This is only needed if you are launching clusters in a private subnet. 
                "ec2:DeleteTags",
                "ec2:DeleteSecurityGroup", // This is only needed if you are using Amazon managed security groups for private subnets. You can omit this action if you are using custom security groups.
                "ec2:DescribeAvailabilityZones",
                "ec2:DescribeAccountAttributes",
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeImages",
                "ec2:DescribeInstanceStatus",
                "ec2:DescribeInstances",
                "ec2:DescribeKeyPairs",
                "ec2:DescribeNetworkAcls",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribePrefixLists",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSpotInstanceRequests",
                "ec2:DescribeSpotPriceHistory",
                "ec2:DescribeSubnets",
                "ec2:DescribeTags",
                "ec2:DescribeVpcAttribute",
                "ec2:DescribeVpcEndpoints",
                "ec2:DescribeVpcEndpointServices",
                "ec2:DescribeVpcs",
                "ec2:DetachNetworkInterface",
                "ec2:ModifyImageAttribute",
                "ec2:ModifyInstanceAttribute",
                "ec2:RequestSpotInstances",
                "ec2:RevokeSecurityGroupEgress",
                "ec2:RunInstances",
                "ec2:TerminateInstances",
                "ec2:DeleteVolume",
                "ec2:DescribeVolumeStatus",
                "ec2:DescribeVolumes",
                "ec2:DetachVolume",
                "iam:GetRole",
                "iam:GetRolePolicy",
                "iam:ListInstanceProfiles",
                "iam:ListRolePolicies",
                "s3:CreateBucket",
                "sdb:BatchPutAttributes",
                "sdb:Select",
                "cloudwatch:PutMetricAlarm",
                "cloudwatch:DescribeAlarms",
                "cloudwatch:DeleteAlarms",
                "application-autoscaling:RegisterScalableTarget",
                "application-autoscaling:DeregisterScalableTarget",
                "application-autoscaling:PutScalingPolicy",
                "application-autoscaling:DeleteScalingPolicy",
                "application-autoscaling:Describe*"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": ["arn:aws:s3:::examplebucket/*","arn:aws:s3:::examplebucket2/*"], // Here you can specify the list of buckets which are going to be storing cluster logs, bootstrap action script, custom JAR files, input & output paths for EMR steps
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetBucketCORS",
                "s3:GetObjectVersionForReplication",
                "s3:GetObject",
                "s3:GetBucketTagging",
                "s3:GetObjectVersion",
                "s3:GetObjectTagging",
                "s3:ListMultipartUploadParts",
                "s3:ListBucketByTags",
                "s3:ListBucket",
                "s3:ListObjects",
                "s3:ListBucketMultipartUploads"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": "arn:aws:sqs:*:123456789012:AWS-ElasticMapReduce-*", // This allows EMR to only perform actions (Creating queue, receiving messages, deleting queue, etc) on SQS queues whose names are prefixed with the literal string AWS-ElasticMapReduce-
            "Action": [
                "sqs:CreateQueue",
                "sqs:DeleteQueue",
                "sqs:DeleteMessage",
                "sqs:DeleteMessageBatch",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:PurgeQueue",
                "sqs:ReceiveMessage"
            ]
        },
        {
            "Effect": "Allow",  
            "Action": "iam:CreateServiceLinkedRole",  // EMR needs permissions to create this service-linked role for launching EC2 spot instances
            "Resource": "arn:aws:iam::*:role/aws-service-role/spot.amazonaws.com/AWSServiceRoleForEC2Spot*",
            "Condition": {
                "StringLike": {
                    "iam:AWSServiceName": "spot.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": "iam:PassRole", // We are passing the custom EC2 instance profile (defined following) which has bare minimum permissions
            "Resource": [
                "arn:aws:iam::*:role/Custom_EMR_EC2_role",
                "arn:aws:iam::*:role/EMR_AutoScaling_DefaultRole"
            ]
        }
    ]
}

Minimal EMR role for EC2 (instance profile) policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Resource": "*",
            "Action": [
                "ec2:Describe*",
                "elasticmapreduce:Describe*",
                "elasticmapreduce:ListBootstrapActions",
                "elasticmapreduce:ListClusters",
                "elasticmapreduce:ListInstanceGroups",
                "elasticmapreduce:ListInstances",
                "elasticmapreduce:ListSteps"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": [    // Here you can specify the list of buckets which are going to be accessed by applications (Spark, Hive, etc) running on the nodes of the cluster
                "arn:aws:s3:::examplebucket1/*",
                "arn:aws:s3:::examplebucket1*",
                "arn:aws:s3:::examplebucket2/*",
                "arn:aws:s3:::examplebucket2*"
            ], 
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetBucketCORS",
                "s3:GetObjectVersionForReplication",
                "s3:GetObject",
                "s3:GetBucketTagging",
                "s3:GetObjectVersion",
                "s3:GetObjectTagging",
                "s3:ListMultipartUploadParts",
                "s3:ListBucketByTags",
                "s3:ListBucket",
                "s3:ListObjects",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
                "s3:PutObjectTagging",
                "s3:HeadBucket",
                "s3:DeleteObject"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": "arn:aws:sqs:*:123456789012:AWS-ElasticMapReduce-*", // This allows EMR to only perform actions (creating queue, receiving messages, deleting queue, and so on) on SQS queues whose names are prefixed with the literal string AWS-ElasticMapReduce-
            "Action": [
                "sqs:CreateQueue",
                "sqs:DeleteQueue",
                "sqs:DeleteMessage",
                "sqs:DeleteMessageBatch",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:PurgeQueue",
                "sqs:ReceiveMessage"
            ]
        }
    ]
}

Minimal role policy for user launching EMR clusters

// This policy can be attached to an IAM user who will be launching EMR clusters. It provides minimum access to the user to launch, monitor and terminate EMR clusters

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "*",
            "Condition": {
                "StringLike": {
                    "iam:AWSServiceName": [
                        "elasticmapreduce.amazonaws.com",
                        "elasticmapreduce.amazonaws.com.cn"
                    ]
                }
            }
        },
        {
            "Sid": "Statement2",
            "Effect": "Allow",
            "Action": [
                "iam:GetPolicyVersion",
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:DescribeInstances",
                "ec2:RequestSpotInstances",
                "ec2:DeleteTags",
                "ec2:DescribeSpotInstanceRequests",
                "ec2:ModifyImageAttribute",
                "cloudwatch:GetMetricData",
                "cloudwatch:GetMetricStatistics",
                "cloudwatch:ListMetrics",
                "ec2:DescribeVpcAttribute",
                "ec2:DescribeSpotPriceHistory",
                "ec2:DescribeAvailabilityZones",
                "ec2:CreateRoute",
                "ec2:RevokeSecurityGroupEgress",
                "ec2:CreateSecurityGroup",
                "ec2:DescribeAccountAttributes",
                "ec2:ModifyInstanceAttribute",
                "ec2:DescribeKeyPairs",
                "ec2:DescribeNetworkAcls",
                "ec2:DescribeRouteTables",
                "ec2:AuthorizeSecurityGroupEgress",
                "ec2:TerminateInstances", //This action can be scoped in similar manner like it has been done following for "elasticmapreduce:TerminateJobFlows"
                "iam:GetPolicy",
                "ec2:CreateTags",
                "ec2:DeleteRoute",
                "iam:ListRoles",
                "ec2:RunInstances",
                "ec2:DescribeSecurityGroups",
                "ec2:CancelSpotInstanceRequests",
                "ec2:CreateVpcEndpoint",
                "ec2:DescribeVpcs",
                "ec2:DescribeSubnets",
                "elasticmapreduce:*"
            ],
            "Resource": "*"
        },
        {
            "Sid": "Statement3",
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:TerminateJobFlows"
            ],
            "Resource":"*",
            "Condition": {
                "StringEquals": {
                  "elasticmapreduce:ResourceTag/custom_key": "custom_value"  // Here you can specify the key value pair of your custom tag so that this IAM user can only delete the clusters which are appropriately tagged by the user
                }
              }
        },
        {
            "Sid": "Statement4",
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": [
                "arn:aws:iam::*:role/Custom_EMR_Role",
                "arn:aws:iam::*:role/Custom_EMR_EC2_role",
                "arn:aws:iam::*:role/EMR_AutoScaling_DefaultRole"
            ]
        }
    ]
}

Bootstrap actions

Bootstrap actions are commonly used to run custom code for setup before the execution of your cluster. You can use them to install software or configure instances in any language already installed on the cluster, including Bash, Perl, Python, Ruby, C++, or Java.

For more details on how to use bootstrap actions, see Create Bootstrap Actions to Install Additional Software in the EMR documentation.

For the purposes of this blog post, we’ll discuss bootstrap actions to harden your cluster, apply security packages you might require, and set up third-party monitoring solutions.

When do bootstrap actions run in the cluster lifecycle?

Bootstrap actions run on each cluster instance after instance provisioning and before application installation. This approach has implications if you want to use bootstrap actions to modify the security configuration of applications that are provisioned by EMR. If this is the case, you can have your script wait on a particular trigger, like so with a bootstrap action dependent on Presto installation:

# Work to do before Presto being installed executes before this line
while [ ! -f /var/run/presto/presto-presto-server.pid ]
do
  sleep 1
done

# Continue on with script execution

Be sure to run the preceding snippet in a background process so that it doesn’t block the provisioning process. Failing to do so results in a timeout failure.

For more information about the cluster lifecycle, refer to: Understanding the Cluster Lifecycle in the EMR documentation.

Custom AMIs and applying CIS controls to harden your AMI

Custom Amazon Machine Images (AMIs) provide another approach that you can take to help harden and secure your EMR cluster. Amazon EMR uses an Amazon Linux AMI to initialize Amazon EC2 instances when you create and launch a cluster. The AMI contains the Amazon Linux operating system, other software, and configurations required for each instance to host your cluster applications.

Specifying a custom AMI is useful for the following use cases:

  • Encrypting the EBS root device volumes (boot volumes) of EC2 instances in your cluster, which you can’t do with a security configuration. Security configurations only help you encrypt your storage volumes. For more information, see Creating a Custom AMI with an Encrypted Amazon EBS Root Device Volume in the EMR documentation.
  • Pre-installing applications and performing other customizations instead of using bootstrap actions, which can improve cluster start time and streamline the startup work flow. For more information and an example, see Creating a Custom Amazon Linux AMI from a Preconfigured Instance in the EMR documentation.
  • Implementing more sophisticated cluster and node configurations than bootstrap actions enable.

By using a custom AMI instead of a bootstrap action, you can have your hardening steps prebaked into the images you use, rather than having to run the bootstrap action scripts at instance provision time. You don’t have to choose between the two, either. You can create a custom AMI for the common security characteristics of your cluster that are less likely to change. You can use bootstrap actions to pull the latest configurations and scripts that might be cluster-specific.

One approach that many of our customers take is to apply the Center for Internet Security (CIS) benchmarks to harden their EMR clusters, found on the Center for Internet Security website. It’s important to verify each and every control for necessity and function test against your requirements when applying these benchmarks to your clusters.

The following example shows using custom AMIs in CloudFormation:

Resources:
  cluster:
    Type: 'AWS::EMR::Cluster'
    Properties:
      CustomAmiId: "ami-7fb3bc69"
      Instances:
        MasterInstanceGroup:
...

The following example shows using custom AMIs in boto3:

response = client.run_job_flow(
    Name='string',
    LogUri='string',
    AdditionalInfo='string',
    CustomAmiId='ami-7fb3bc69',
...

Auditing

You might want the ability to audit compute environments, which is a key requirement for many customers. There are a variety of ways that you can support this requirement within EMR:

  • From EMR 5.14.0 onwards, EMRFS, Amazon EMR’s connector for S3, supports auditing of users who ran queries that accessed data in S3 through EMRFS. This feature is turned on by default and passes on user and group information to audit logs like CloudTrail. It provides you with comprehensive request tracking.
  • If it exists, you can configure and implement application-specific auditing on EMR. For example, this AWS Big Data Blog post walks through how to configure a custom event listener on Presto to enable audit logging, debugging, and performance analysis: Custom Log Presto Query Events on Amazon EMR for Auditing and Performance Insights.
  • You can use tools such as Apache Ranger to implement another layer of auditing and authorization. For additional information, see this AWS Big Data Blog post: Implementing Authorization and Auditing using Apache Ranger on Amazon EMR
  • AWS CloudTrail, a service that provides a record of actions taken by a user, role, or an AWS service, is integrated with Amazon EMR. CloudTrail captures all API calls for Amazon EMR as events. The calls captured include calls from the Amazon EMR console and code calls to the Amazon EMR API operations. If you create a trail, you can enable continuous delivery of CloudTrail events to an Amazon S3 bucket, including events for Amazon EMR.
  • You can also audit the S3 objects that EMR accesses by using S3 access logs. AWS CloudTrail provides logs only for AWS API calls. Thus, if a user runs a job that reads and writes data to S3, the S3 data that was accessed by EMR doesn’t show up in CloudTrail. By using S3 access logs, you can comprehensively monitor and audit access against your data in S3 from anywhere, including EMR.
  • Because you have full control over your EMR cluster, you can always install your own third-party agents or tooling. You do so by using bootstrap actions or custom AMIs to help support your auditing requirements.

Verifying your security configuration

You also want to verify that your configuration works, of course. Following are some steps that you can take to verify your configuration.

S3 server-side encryption (SSE) on EMR

Here, we want to verify a particular case. We want to ensure that if an S3 object is uploaded using EMRFS and server-side encryption (SSE) is enabled in the EMRFS configuration, it has metadata indicating it’s encrypted. For example, calling getSSEAlgorithm on the S3 object should return AES256 if the object is encrypted using an S3 key. It should return aws:kms if the object is encrypted using a KMS key.

We also want to check that if the file is downloaded using EMRFS, the original file and the downloaded file match in terms of content.

To verify this, do the following:

  1. Use Secure Shell (SSH) to connect to the master node as described in Connect to the Master Node Using SSH in the EMR documentation.
  2. Upload an object to S3 using EMRFS:
    • hadoop fs -put <local path> <s3 bucket path>
  3. Check the metadata of the uploaded object directly:
    • aws s3api head-object --bucket <s3 bucket path> --key file
  4. Download the file from S3 using EMRFS:
    • hadoop fs -get <s3 bucket path> <local path>
  5. Use diff on the original and downloaded file and verify that they are the same.

S3 client-side encryption (CSE) on EMR

Similarly to verifying server-side encryption, we want to confirm another point. If an object is uploaded using EMRFS and client side encryption is enabled in the EMRFS configuration, the S3 object’s contents should be encrypted. A client that doesn’t possess the proper key shouldn’t be able to see them. If we get the object using an Amazon S3 client, the contents of the object shouldn’t be the same as the contents of the original object that was uploaded.

We also want to confirm that if we do use EMRFS to download the file, the contents of the downloaded file should match the contents of the original file.

To verify this, do the following:

  1. Use SSH to connect to the master node as described in Connect to the Master Node Using SSH.
  2. Upload an object to S3 using EMRFS:
    • hadoop fs -put <local path> <s3 bucket path>
  3. Download the object from S3 using the AWS CLI:
    • aws s3 mv <s3 bucket path> <local path>
    • This file should NOT match the original file.
  4. Download the object from S3 using EMRFS:
    • hadoop fs -get <s3 bucket path> <local path>
    • This file SHOULD match the original file.

Local disk encryption

We also want to verify that if local disk encryption is enabled, all the block devices on the cluster are of type crypto_LUKS. If there is a partition that is not a LUKS partition, the following command exits with a return code of 1:

! blkid | grep -v crypto_LUKS

In addition, we want to verify that the local disk encryption uses the kind of key that was defined in the configuration.

Let’s try to use KMS to decrypt our encrypted local disk passphrase. If it is decrypted, we try to use it to open the LUKS partitions. If it doesn’t get decrypted, we assume it is a custom key.

To do so, we run the following commands:

  1. base64 -d /var/setup-devices/.encrypted-diskKey > (local path for encrypted passphrase)
  2. aws kms decrypt --ciphertext-blob fileb://(local path for encrypted passphrase) --query Plaintext > (local path for decrypted passphrase)
    • If this step fails, the key is not from KMS
  3. sudo cryptsetup luksOpen --test-passphrase --key-slot 0 /dev/xvdb1 < (local path for decrypted passphrase) | cut -d '"' -f 2

Wrapping it up in AWS CloudFormation

We have provided the following AWS CloudFormation template to help you more easily deploy an EMR cluster that aligns to the patterns and practices we describe in this post. The following template spins up an EMR cluster with the following characteristics:

  • Encryption at rest enabled for both S3 and the local disk
  • Encryption in transit enabled using a certificate bundle that you specify
  • A custom AMI CloudFormation parameter that can incorporate the concepts described in the custom AMI section
  • Kerberos-enabled
  • Fine-grained S3 authorization
---
Description: Sample CloudFormation template for creating an EMR cluster
Parameters:
  KeyName:
    Description: Name of an existing EC2 KeyPair to enable SSH to the instances
    Type: AWS::EC2::KeyPair::KeyName
  Subnet:
    Description: Subnet ID for creating the EMR cluster
    Type: AWS::EC2::Subnet::Id
Resources:
  EMRInstanceProfile:
    Properties:
      Roles:
      - Ref: EMRJobFlowRole
    Type: AWS::IAM::InstanceProfile
  EMRJobFlowRole:
    Properties:
      AssumeRolePolicyDocument:
        Statement:
        - Action:
          - sts:AssumeRole
          Effect: Allow
          Principal:
            Service:
            - ec2.amazonaws.com
      ManagedPolicyArns:
      - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role
    Type: AWS::IAM::Role
  EMRSampleCluster:
    Properties:
      Applications:
      - Name: Hadoop
      - Name: Hive
      - Name: Spark
      AutoScalingRole:
        Ref: EMR_AutoScaling_DefaultRole
      BootstrapActions:
      - Name: Dummy bootstrap action
        ScriptBootstrapAction:
          Args:
          - dummy
          - parameter
          Path: file:/usr/share/aws/emr/scripts/install-hue
      Configurations:
      - Classification: core-site
        ConfigurationProperties:
          hadoop.security.groups.cache.secs: '250'
      - Classification: mapred-site
        ConfigurationProperties:
          mapred.tasktracker.map.tasks.maximum: '2'
          mapreduce.map.sort.spill.percent: '90'
          mapreduce.tasktracker.reduce.tasks.maximum: '5'
      Instances:
        CoreInstanceGroup:
          EbsConfiguration:
            EbsBlockDeviceConfigs:
            - VolumeSpecification:
                SizeInGB: '10'
                VolumeType: gp2
              VolumesPerInstance: '1'
            EbsOptimized: 'true'
          InstanceCount: '1'
          InstanceType: m4.large
          Name: Core Instance
        Ec2KeyName:
          Ref: KeyName
        Ec2SubnetId:
          Ref: Subnet
        MasterInstanceGroup:
          InstanceCount: '1'
          InstanceType: m4.large
          Name: Master Instance
      JobFlowRole:
        Ref: EMRInstanceProfile
      Name: EMR Sample Cluster
      ReleaseLabel: emr-5.5.0
      SecurityConfiguration:
        Ref: EMRSecurityConfiguration
      ServiceRole:
        Ref: EMRServiceRole
      Tags:
      - Key: Name
        Value: EMR Sample Cluster
      VisibleToAllUsers: 'true'
    Type: AWS::EMR::Cluster
  EMRSecurityConfiguration:
    Properties:
      Name: EMRSampleClusterSecurityConfiguration
      SecurityConfiguration:
        EncryptionConfiguration:
          AtRestEncryptionConfiguration:
            LocalDiskEncryptionConfiguration:
              AwsKmsKey: arn:aws:kms:us-east-1:123456789012:key/1234-1234-1234-1234-1234
              EncryptionKeyProviderType: AwsKms
            S3EncryptionConfiguration:
              AwsKmsKey: arn:aws:kms:us-east-1:123456789012:key/1234-1234-1234-1234-1234
              EncryptionMode: SSE-KMS
          EnableAtRestEncryption: 'true'
          EnableInTransitEncryption: 'true'
          InTransitEncryptionConfiguration:
            TLSCertificateConfiguration:
              CertificateProviderType: PEM
              S3Object: s3://MyConfigStore/artifacts/MyCerts.zip
    Type: AWS::EMR::SecurityConfiguration
  EMRServiceRole:
    Properties:
      AssumeRolePolicyDocument:
        Statement:
        - Action:
          - sts:AssumeRole
          Effect: Allow
          Principal:
            Service:
            - elasticmapreduce.amazonaws.com
      ManagedPolicyArns:
      - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole
    Type: AWS::IAM::Role

Summary

In this post, we provide a set of best practices to consider and follow when securing your EMR clusters. If you have questions or suggestions, leave a comment.

 


Additional Reading

If you found this post useful, be sure to check out Restrict access to your AWS Glue Data Catalog with resource-level IAM permissions and resource-based policies, Implementing Authorization and Auditing using Apache Ranger on Amazon EMR, and Secure Amazon EMR with Encryption.

 


About the Author

Tony Nguyen is a Senior Consultant with AWS Professional Services. His specialty is in data and analytics, focused on helping public sector customers with their unique data challenges. He works directly with customers to design, architect, and implement big data and analytics solutions on AWS. When he’s not eyeballs-deep in data, he enjoys playing volleyball, cooking, and occasionally fooling himself into thinking that he’s a half-decent photographer.

 

 

 

 

Dr. Aaron Friedman is a Healthcare and Life Sciences Partner Solutions Architect at Amazon Web Services. He works with ISVs and SIs to architect healthcare solutions on AWS, and bring the best possible experience to their customers. His passion is working at the intersection of science, big data, and software. In his spare time, he’s exploring the outdoors, learning a new thing to cook, or spending time with his wife and his dog, Macaroon.

Connecting to and running ETL jobs across multiple VPCs using a dedicated AWS Glue VPC

Post Syndicated from Nivas Shankar original https://aws.amazon.com/blogs/big-data/connecting-to-and-running-etl-jobs-across-multiple-vpcs-using-a-dedicated-aws-glue-vpc/

Many organizations use a setup that includes multiple VPCs based on the Amazon VPC service, with databases isolated in separate VPCs for security, auditing, and compliance purposes. This blog post shows how you can use AWS Glue to perform extract, transform, load (ETL) and crawler operations for databases located in multiple VPCs.

The solution presented here uses a dedicated AWS Glue VPC and subnet to perform the following operations on databases located in different VPCs:

  • Scenario 1: Ingest data from an Amazon RDS for MySQL database, transform it in AWS Glue, and output the results to an Amazon Redshift data warehouse.
  • Scenario 2: Ingest data from an Amazon RDS for MySQL database, transform it in AWS Glue, and output the results to an Amazon RDS for PostgreSQL database.

In this blog post, we’ll go through the steps needed to build an ETL pipeline that consumes from one source in one VPC and outputs it to another source in a different VPC. We’ll set up in multiple VPCs to reproduce a situation where your database instances are in multiple VPCs for isolation related to security, audit, or other purposes.

For this solution, we create a VPC dedicated to AWS Glue. Next, we set up VPC peering between the AWS Glue VPC and all of the other database VPCs. Then we configure an Amazon S3 endpoint, route tables, security groups, and IAM so that AWS Glue can function properly. Lastly, we create AWS Glue connections and an AWS Glue job to perform the task at hand.

Step 1: Set up a VPC

To simulate these scenarios, we create four VPCs with their respective IPv4 CIDR ranges. (Note: CIDR ranges can’t overlap when you use VPC peering.)

VPC 1Amazon Redshift172.31.0.0/16
VPC 2Amazon RDS for MySQL172.32.0.0/16
VPC 3Amazon RDS for PostgreSQL172.33.0.0/16
VPC 4AWS Glue172.30.0.0/16

Key configuration notes:

  1. The AWS Glue VPC needs at least one private subnet for AWS Glue to use.
  2. Ensure that DNS hostnames are enabled for all of your VPCs (unless you plan to refer to your databases by IP address later on, which isn’t recommended).

Step 2: Set up a VPC peering connection

Next, we peer our VPCs together to ensure that AWS Glue can communicate with all of the database targets and sources. This approach is necessary because AWS Glue resources are created with private addresses only. Thus, they can’t use an internet gateway to communicate with public addresses, such as public database endpoints. If your database endpoints are public, you can alternatively use a network address translation (NAT) gateway with AWS Glue rather than peer across VPCs.

Create the following peering connections.

RequesterAccepter
Peer 1172.30.0.0/16- VPC 4172.31.0.0/16- VPC 1
Peer 2172.30.0.0/16- VPC 4172.32.0.0/16 -VPC 2
Peer 3172.30.0.0/16- VPC 4172.33.0.0/16- VPC 3

These peering connections can be across separate AWS Regions if needed. The database VPCs are not peered together; they are all peered with the AWS Glue VPC instead. We do this because AWS Glue connects to each database from its own VPC. The databases don’t connect to each other.

Key configuration notes:

  1. Create a VPC peering connection, as described in the Amazon VPC documentation. Select the AWS Glue VPC as the requester and the VPC for your database as the accepter.
  2. Accept the VPC peering request. If you are peering to a different AWS Region, switch to that AWS Region to accept the request.

Important: Enable Domain Name Service (DNS) settings for each of the peering connections. Doing this ensures that AWS Glue can retrieve the private IP address of your database endpoints. Otherwise, AWS Glue resolves your database endpoints to public IP addresses. AWS Glue can’t connect to public IP addresses without a NAT gateway.

Step 3: Create an Amazon S3 endpoint for the AWS Glue subnet

We need to add an Amazon S3 endpoint to the AWS Glue VPC (VPC 4). During setup, associate the endpoint with the route table that your private subnet uses. For more details on creating an S3 endpoint for AWS Glue, see Amazon VPC Endpoints for Amazon S3 in the AWS Glue documentation.

AWS Glue uses S3 to store your scripts and temporary data to load into Amazon Redshift.

Step 4: Create a route table configuration

Add the following routes to the route tables used by the respective services’ subnets. These routes are configured along with existing settings.

VPC 4—AWS GlueDestinationTarget
Route table172.33.0.0/16- VPC 3Peer 3
172.31.0.0/16- VPC 1Peer 1
172.32.0.0/16- VPC 2Peer 2

 

VPC 1—Amazon RedshiftDestinationTarget
Route table172.30.0.0/16- VPC 4Peer 1

 

VPC 2—Amazon RDS MySQLDestinationTarget
Route table172.30.0.0/16- VPC 4Peer 2

 

VPC 3—Amazon RDS PostgreSQLDestinationTarget
Route table172.30.0.0/16- VPC 4Peer 3

Key configuration notes:

  • The route table for the AWS Glue VPC has peering connections to all VPCs. It has these so that AWS Glue can initiate connections to all of the databases.
  • All of the database VPCs have a peering connection back to the AWS Glue VPC. They have these connections to allow return traffic to reach AWS Glue.
  • Ensure that your S3 endpoint is present in the route table for the AWS Glue VPC.

Step 5: Update the database security groups

Each database’s security group must allow traffic to its listening port (3306, 5432, 5439, and so on) from the AWS Glue VPC for AWS Glue to be able to connect to it. It’s also a good idea to restrict the range of source IP addresses as much as possible.

There are two ways to accomplish this. If your AWS Glue job will be in the same AWS Region as the resource, you can define the source as the security group that you use for AWS Glue. If you are using AWS Glue to connect across AWS Regions, specify the IP range from the private subnet in the AWS Glue VPC instead. The examples following use a security group as our AWS Glue job, and data sources are all in the same AWS Region.

In addition to configuring the database’s security groups, AWS Glue requires a special security group that allows all inbound traffic from itself. Because it isn’t secure to allow traffic from 0.0.0.0/0, we create a self-referencing rule that simply allows all traffic originating from the security group. You can create a new security group for this purpose, or you can modify an existing security group. In the example following, we create a new security group to use later when AWS Glue connections are created.

The security group Amazon RDS for MySQL needs to allow traffic from AWS Glue:

Amazon RDS for PostgreSQL allows traffic to its listening port from the same:

Amazon Redshift does it as so:

AWS Glue does it as so:

Step 6: Set up IAM

Make sure that you have an AWS Glue IAM role with access to Amazon S3. You might want to provide your own policy for access to specific Amazon S3 resources. Data sources require s3:ListBucket and s3:GetObject permissions. Data targets require s3:ListBucket, s3:PutObject, and s3:DeleteObject permissions. For more information on creating an Amazon S3 policy for your resources, see Policies and Permissions in the IAM documentation.

The role should look like this:

Or you can create an S3 policy that’s more restricted to suit your use case.

Step 7: Set up an AWS Glue connection

The Amazon RDS for MySQL connection in AWS Glue should look like this:

The Amazon Redshift connection should look like this:

The Amazon RDS for PostgreSQL connection should look like this:

Step 8: Set up an AWS Glue job

Key configuration notes:

  1. Create a crawler to import table metadata from the source database (Amazon RDS for MySQL) into the AWS Glue Data Catalog. The scenario includes a database in the catalog named gluedb, to which the crawler adds the sample tables from the source Amazon RDS for MySQL database.
  2. Use either the source connection or destination connection to create a sample job as shown following. (This step is required for the AWS Glue job to establish a network connection and create the necessary elastic network interfaces with the databases’ VPCs and peered connections.)
  3. This scenario uses pyspark code and performs the load operation from Amazon RDS for MySQL to Amazon Redshift. The ingest from Amazon RDS for MySQL to Amazon RDS for PostgreSQL includes a similar job.
  4. After running the job, verify that the table exists in the target database and that the counts match.

The following screenshots show the steps to create a job in the AWS Glue Management Console.

Following are some of examples of loading data from source tables to target instances. These are simple one-to-one mappings, with no transformations applied. Notice that the data sources and data sink (target) connection configuration access multiple VPCs from a single AWS Glue job.

Sample script 1 (Amazon RDS for MySQL to Amazon Redshift)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource = glueContext.create_dynamic_frame.from_catalog(database = "gluedb", table_name = "mysqldb_events", transformation_ctx = "datasource")


datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource, catalog_connection = "Redshift", connection_options = {"dbtable": "mysqldb_events", "database": "dmartblog"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink")

job.commit()

Sample script 2:  Amazon RDS for MySQL to Amazon RDS for PostgreSQL (can also change with other RDS endpoint)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


datasource = glueContext.create_dynamic_frame.from_catalog(database = "gluedb", table_name = "mysqldb_events", transformation_ctx = "datasource")

datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "vpc-pgsql", connection_options = {"dbtable": "mysqldb_events", "database": "mypgsql"}, transformation_ctx = "datasink")

job.commit()

Summary

In this blog post, you learn how to configure AWS Glue to run in a separate VPC so that it can execute jobs for databases located in multiple VPCs.

The benefits of doing this include the following:

  • A separate VPC and dedicated pool on the running AWS Glue job, isolated from database and compute nodes.
  • Dedicated ETL developer access to a single VPC for better security control and provisioning.

 


Additional Reading

If you found this post useful, be sure to check out Restrict access to your AWS Glue Data Catalog with resource-level IAM permissions and resource-based policies, and Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production.

 


About the Author

Nivas Shankar is a Senior Big Data Consultant at Amazon Web Services. He helps and works closely with enterprise customers building big data applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts. He enjoys spending time with his wife and two adorable kids. In his spare time, he takes his kids to tennis and football practice.

 

 

Ian Eberhart is a Cloud Support Engineer on the Big Data team for AWS Premium Support. He works with customers on a daily basis to find solutions for moving and sorting their data on the AWS platform. In his spare time, Ian enjoys seeing independent and weird movies, riding his bike, and hiking in the mountains.

 

Re-affirming Long-Term Support for Java in Amazon Linux

Post Syndicated from Deepak Singh original https://aws.amazon.com/blogs/compute/re-affirming-long-term-support-for-java-in-amazon-linux/

In light of Oracle’s recent announcement indicating an end to free long-term support for OpenJDK after January 2019, we re-affirm that the OpenJDK 8 and OpenJDK 11 Java runtimes in Amazon Linux 2 will continue to receive free long-term support from Amazon until at least June 30, 2023. We are collaborating and contributing in the OpenJDK community to provide our customers with a free long-term supported Java runtime.

In addition, Amazon Linux AMI 2018.03, the last major release of Amazon Linux AMI, will receive support for the OpenJDK 8 runtime at least until June 30, 2020, to facilitate migration to Amazon Linux 2. Java runtimes provided by AWS Services such as AWS Lambda, AWS Elastic Map Reduce (EMR), and AWS Elastic Beanstalk will also use the AWS supported OpenJDK builds.

Amazon Linux users will not need to make any changes to get support for OpenJDK 8. OpenJDK 11 will be made available through the Amazon Linux 2 repositories at a future date. The Amazon Linux OpenJDK support posture will also apply to the on-premises virtual machine images and Docker base image of Amazon Linux 2.

Amazon Linux 2 provides a secure, stable, and high-performance execution environment. Amazon Linux AMI and Amazon Linux 2 include a Java runtime based on OpenJDK 8 and are available in all public AWS regions at no additional cost beyond the pricing for Amazon EC2 instance usage.

Dynamically scale up storage on Amazon EMR clusters

Post Syndicated from Jigar Mistry original https://aws.amazon.com/blogs/big-data/dynamically-scale-up-storage-on-amazon-emr-clusters/

In a managed Apache Hadoop environment—like an Amazon EMR cluster—when the storage capacity on your cluster fills up, there is no convenient solution to deal with it. This situation occurs because you set up Amazon Elastic Block Store (Amazon EBS) volumes and configure mount points when the cluster is launched, so it’s difficult to modify the storage capacity after the cluster is running. The feasible solutions usually involve adding more nodes to your cluster, backing up your data to a data lake, and then launching a new cluster with a higher storage capacity. Or, if the data that occupies the storage is expendable, removing the excess data is usually the way to go.

To help you deal with this issue in a manageable way in Amazon EMR, I will show you how to dynamically scale up the storage using the elastic volumes feature of Amazon EBS. With this feature, you can increase the volume size, adjust the performance, or change the volume type while the volume is in use. You can continue to use your EMR cluster to run big data applications while the changes take effect.

How HDFS and YARN use disk space on an Amazon EMR cluster

When you create an Amazon EMR cluster, by default, HDFS (Hadoop Distributed File System) and YARN (Yet Another Resource Negotiator) are configured to use the local disk storage available on all the core/task nodes. You configure this inside the yarn-site.xml and hdfs-site.xml configuration files.

Specifically, for HDFS, the dfs.datanode.data.dir parameter is configured to use local storage, where it stores the data blocks for the files maintained by the NameNode daemon. And for YARN, the yarn.nodemanager.local-dirs parameter is configured to store the intermediate files needed for the NodeManager daemon to run the YARN containers.

For example, when the cluster is running a MapReduce job, the map tasks store their output files inside the directories defined by yarn.nodemanager.local-dirs. Additionally, the yarn.nodemanager.log-dirs parameter is configured to store the container logs on the core and task nodes after the YARN application has finished.

General best practices for avoiding storage issues

As you plan to run your jobs on an Amazon EMR cluster, the following are some helpful tips to avoid exceeding the available storage on your cluster.

Estimate your future storage needs

Plan ahead regarding the storage needs of your jobs. When you launch a cluster using the default storage configuration, it might not be sufficient for your workloads, and you might have issues while your jobs are running. It is a good practice to estimate how much intermediate storage your jobs will need. And based on that, you can customize the storage configuration when launching a new cluster.

Store passive data in a data lake

Try to design your workloads in such a way that all your passive data is stored in a data lake like Amazon Simple Storage Service (Amazon S3). This way, you use your cluster only for data processing, performing miscellaneous computation tasks, and storing the results back to the data lake for persistent storage. This approach minimizes the storage requirements on the running cluster.

Plan for more capacity

If your use case dictates that the input or output data should be stored locally on the cluster (HDFS or local storage), you should plan the size of your cluster accordingly. For example, if you are using HDFS, you can create a cluster with a higher number of core nodes that will be enough to store your data. Or, you can customize the core instance group to have more EBS storage capacity than what the default configuration provides.

Possible issues if the storage reaches its maximum capacity

As the EMR cluster is being used to run different data processing applications, at some point, the storage capacity on the cluster might be used up. In that case, the following are some of the issues that can affect your cluster.

Issues from a YARN perspective

If the directories that are defined by the yarn.nodemanager.local-dirs or yarn.nodemanager.log-dirs parameters are filled up to at least 90 percent of the total storage of the volume, the NodeManager marks that particular disk as unhealthy. This action then causes the NodeManager to mark the node that has those disks as unhealthy also. So, if the node is unhealthy, the ResourceManager will not assign any containers to the node.

Additionally, if the termination protection feature is turned off on your EMR cluster, the EMR service eventually terminates the node from your cluster.

Issues from an HDFS perspective

If the HDFS usage on the cluster increases, it corresponds to an increase in the usage of local storage on EBS volumes also. In EMR, the HDFS data directories are configured under the same mount point as the YARN local and log directories. So, if the usage of the mount point exceeds the storage threshold (90 percent) due to HDFS, that again causes YARN to mark that disk as unhealthy, and the ResourceManager blacklists the node.

Dynamically resize the storage space on core and task nodes

To scale up the storage of core and task nodes on your cluster, use the following bootstrap action script:

s3://aws-bigdata-blog/artifacts/resize_storage/resize_storage.sh

Additionally, the EC2 instance profile of your cluster must have the ec2:ModifyVolume permissions to be able to resize the volume.

The script runs on all the nodes of an EMR cluster. It configures a cron job on the nodes and performs a disk utilization check every 2 minutes. On the master node, it performs the check on the root volume and the volumes that are storing the logs of various master daemons. On core and task nodes, it performs the check on volumes that YARN and HDFS use and determines whether there is a need to scale up the storage.

When it determines that a volume has exceeded 90 percent of its usage, the size of the volume is expanded by the percentage specified by the “--scaling-factor” parameter. During the resize process, the partition of the volume is expanded, and the file system is extended to reflect the updated capacity. All of this happens without affecting the applications that are running on the cluster.

Consider the following caveats before using this solution:

  • You can scale up the storage capacity of nodes in an EMR cluster only if the cluster uses EBS volumes as its storage backend. Certain EC2 instance types use only instance store volumes or both instance store and EBS volumes. You can’t resize the storage capacity on clusters that use such EC2 instance types.
  • While you are deciding on the scaling factor option of the script, plan ahead to increase the volume so that the updated configuration will last for quite some time. The scaling up of storage has to wait at least 6 hours before applying further modifications to the same volume.

Conclusion

In this post, I explained how HDFS and YARN use the local storage on Amazon EMR cluster nodes. I covered how you can scale up the storage on an EMR cluster using the elastic volumes feature of Amazon EBS. You can use this feature to increase volume size, adjust performance, or change volume type while the volume is in use. You can then continue to use your EMR cluster to run big data applications while the changes are being applied.

If you have any questions or suggestions, please leave a comment below.

 


About the Author

Jigar Mistry is a Hadoop Systems Engineer with Amazon Web Services. He works with customers to provide them architectural guidance and technical support for processing large datasets in the cloud using open-source applications. In his spare time, he enjoys going for camping and exploring different restaurants in the Seattle area.

 

Migrate to Apache HBase on Amazon S3 on Amazon EMR: Guidelines and Best Practices

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/migrate-to-apache-hbase-on-amazon-s3-on-amazon-emr-guidelines-and-best-practices/

This blog post provides guidance and best practices about how to migrate from Apache HBase on HDFS to Apache HBase on Amazon S3 on Amazon EMR.

Apache HBase on Amazon S3 on Amazon EMR

Amazon EMR version 5.2.0 or later, lets you run Apache HBase on Amazon S3. By using Amazon S3 as a data store for Apache HBase, you can separate your cluster’s storage and compute nodes. This saves costs because you’re sizing your cluster for your compute requirements. You’re not paying to store your entire dataset with 3x replication in the on-cluster HDFS.

Many customers have taken advantage of the benefits of running Apache HBase on Amazon S3 for data storage. These benefits include lower costs, data durability, and more efficient scalability. Customers, such as the Financial Industry Regulatory Agency (FINRA), have lowered their costs by 60% by moving to an Apache HBase on Amazon S3 architecture. They have also experienced operational benefits that come with decoupling storage from compute and using Amazon S3 as the storage layer.

Whitepaper on Migrating to Apache HBase on Amazon S3 on Amazon EMR

This whitepaper walks you through the stages of a migration. It also helps you determine when to choose Apache HBase on Amazon S3 on Amazon EMR, plan for platform security, tune Apache HBase and EMRFS to support your application SLA, identify options to migrate and restore your data, and manage your cluster in production.

For more information, see Migrating to Apache HBase on Amazon S3 on Amazon EMR


Additional Reading

If you found this post useful, be sure to check out Setting up Read Replica Clusters with HBase on Amazon S3, and Tips for Migrating to Apache HBase on Amazon S3 from HDFS.

 


About the Author

Francisco Oliveira is a Senior Big Data Engineer with AWS Professional Services. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

Real-time bushfire alerting with Complex Event Processing in Apache Flink on Amazon EMR and IoT sensor network

Post Syndicated from Santoshkumar Kulkarni original https://aws.amazon.com/blogs/big-data/real-time-bushfire-alerting-with-complex-event-processing-in-apache-flink-on-amazon-emr-and-iot-sensor-network/

Bushfires are frequent events in the warmer months of the year when the climate is hot and dry. Countries like Australia and the United States are severely impacted due to the bushfires causing devastating effects to human lives and property. Over the years the prediction of bushfires has been a subject of study in various research projects. Many of these projects use complex machine learning algorithms. These algorithms learn to predict the bushfires from the real-time spread for the fire over a particular geographical region­.

In this blog post, we use event processing paradigm provided by Apache Flink’s Complex Event Processing (CEP) to detect potential bushfire patterns from incoming temperature events from IoT sensors in real time, and then send alerts via email. A real-time heat-map visualization of the area under surveillance is also integrated for monitoring purposes.

This post uses the following AWS services:

Overview of the real-time bushfire prediction alert system

The development and deployment of a large-scale wireless sensor network for bushfire detection and alerting is a complex task. The scenario for this post assumes that the sensors are long-lived battery powered and deployed over a multi-hop wireless mesh network using technologies like LoRaWAN. It also assumes that the placement of the IoT sensors is placed strategically within an area under surveillance, and not under the direct exposure of sunlight. This placement avoids excessive heating, and constantly recording and emitting the temperature readings where they are installed. The sensors can also communicate with each other to send and receive individual temperature readings to perceive the status of their surroundings. Key parameters recorded by the devices include temperature in degree Celsius, time stamp, node id, and infectedBy, as illustrated in Figure 1.

Figure 1. List of sensor events containing measured temperature by the IoT sensor devices over different time points

Once a potential bushfire starts to spread, the IoT sensors placed within its path can detect the subsequent temperature increase. It can then share the news with their neighboring sensors. This phenomenon is similar to spreading an epidemic over a network following a directional path. It is usually referred to as a Susceptible-Infected (SI) epidemic model in network science.

As shown in Figure 2, the parameter ‘infectedBy’ sent by a given node indicates that it has been infected by a neighboring IoT device (that is, the bushfire has been spread through this path) with the ‘node id’ listed as the parameter value. Here, we assume that once a node is infected within the network by one of its neighbors, it isn’t infected again by another node. Therefore, the value of the ‘infectedBy’ parameter remains the same.

Figure 2. High-level overview of an IoT sensor network monitoring temperature of the surrounding geographical area

For the purposes of this scenario, an 11-node IoT sensor network is shown in Figure 2 that exhibits how the bushfire spreads over time. The placement of IoT sensors can be visualized as an undirected graph network where each node is an IoT sensor device. A link between two neighboring nodes denotes the wireless connectivity within a multi-hop wireless ad hoc network. Figure 2 shows the following details:

  • At time t1, the nodes are all emitting the temperature data. However, none of them have reported any temperature greater than the bushfire alert threshold, which is set as 50°
  • At time t2, node-1 reports a temperature of 50° Celsius, which was over the pre-set threshold. In reality, it could have been a small-scale bushfire that recently triggered in the area under node-1’s surveillance.
  • As time moves forward to t3, we see that the fire rapidly spreads to the area monitored by node-2 in the network. Hence, we can now say that node-2 is infected by node-1. This is reflected in the ‘infectedBy’ parameter emitted by the node-2 at which has the value 1 denoting how the fire is spreading over time.
  • Next, at time t4, the fire spreads further to node-3, and followed by node-4 and node-5 by time .

This analogy helps us visualize the overall spread of the bushfire over a network of IoT devices. In this blog post, we use the CEP feature from Apache Flink to detect an event pattern where the measured temperature is higher than 50° Celsius and has an infection degree of 4 within 5 wirelessly connected IoT devices. Once this pattern is detected by real-time event stream processing in Amazon EMR, an SNS alert email is sent to the subscribed email address. The email highlights the path through which the fire has spread so far.

Architecture overview

In this section, we build a real-time event processing pipeline from start to finish. This streamlines the temperature measurements emitted by the IoT devices over the infrastructure layer to build predictive alert and visualization systems for potential bushfire monitoring. The high-level architectural diagram in Figure 3 shows the components required to build this system.

Figure 3. High-level block diagram of the real-time bushfire alert and visualization systems

The diagram shows that the IoT sensor events (that is, measured temperature) feed into an IoT Gateway. The gateway has internet connectivity to forward the records to a stream processing system. The gateway is the landing zone for all the IoT events, which ingests the records into a durable streaming storage. The IoT gateway should scale automatically to support over a billion devices without requiring us to manage any infrastructure.

Next, we store the events in the durable storage, as the temperature events are coming from non-reliable sources, like the IoT sensor devices. As a result, the events cannot be replayed in case any of the records are lost. The streaming storage should also support ingress and egress of large number of events. The events are then consumed by a stream processing engine that matches the incoming events to a pattern and later sends out alerts to the subscribers, if necessary. The raw events are also ingested into a visualization system. This system displays the real-time heat map of the temperatures in the area under surveillance by the IoT nodes.

Building a real-time bushfire alert and visualization system in AWS

Figure 4. Architecture of the real-time IoT stream processing pipeline using AWS services

In this section, we depict a component-level architecture for an event processing system using several of the AWS services, as shown in Figure 4. The IoT sensor data is sent to the AWS IoT services, which receives the incoming temperature records. It then acts as a gateway to the streaming pipeline. The AWS IoT Services is equipped with a rules engine. This can be used to configure an action for the incoming events so that they can be forwarded to another AWS service as a destination.

In this case, Amazon Kinesis Data Streams service is chosen as the destination to act as reliable underlying stream storage system with 1 day as retention period. Figure 5 below shows one such rule and action configuration used in this blog article. The events are then consumed by the Apache Flink processing engine running on an Amazon EMR cluster. Apache Flink consumes the records from the Amazon Kinesis Data Streams shards and matches the records against a pre-defined pattern to detect the possibility of a potential bushfire.

Figure 5. AWS IoT rule and action for the incoming temperature events

Use Apache Flink as the stream processing engine

In this blog, we have chosen Apache Flink as the stream processing engine as it provides high throughput with low latency processing of real-time events. More importantly, it supports stream processing and windowing with Event Time semantics for Amazon Kinesis Data Streams. This is an important feature where events arrive out of order and may also be delayed due to unreliable wireless network communication. Another great feature available in Apache Flink is the Complex Event Processing (CEP) library. It allows you to detect patterns within the stream of incoming events over a period of time. Let’s explore these features in more detail and how they can be used in this particular use case.

Characteristics of IoT events, Event-time processing and Watermarks

Most IoT use cases deal with a large number of sensor devices continuously generating high volume of events over time. The events generally have a time stamp in the record, which indicates when it was generated. However, for consumers, the events can arrive out of order or with delays during processing. In a stream processing application, the consumer should be able to deal with out of order and delayed events.

Apache Flink does this by using event time windowing. The window slides, not according to the processing time window, but by the event time. This helps to make alerting decisions based on the event time where it is more relevant. When the processing window is based on event time, we must know when to advance the event time. That tells us when we can close the window and trigger the next processing task. This is done through ‘Watermarks’ in Flink. There are various mechanisms to generate watermarks. We have used the TimeLagWatermarkGenerator, which generates watermarks that are lagging behind the processing time by a certain unit of time. This assumes that some of the records arrive in the Flink consumer after a certain delay. It is also important to mention that we chose Flink as the processing engine. It provides the Flink CEP feature to match the event based on the pattern we provide. This feature is currently not available in Structured Spark Streaming and Amazon Kinesis Data Analytics.

Complex Event Processing Apache Flink

Once the records are fetched from the Amazon Kinesis Data stream into the Apache Flink application running on Amazon EMR cluster, they must be matched against a pattern. The pattern filters records that first reached the threshold temperature of 50° Celsius and are next followed by another event (from another IoT sensor), which has also reached the same threshold temperature and has been infected by the first IoT sensor node corresponding to the first event on the pattern.

For example, among the incoming events, we get an event from the IoT sensor from node-1 where the temperature is greater than 50° Celsius, which is the first event in the CEP pattern. Next, we look for an event, which follows this first event. For example, an event from node-2 that has a temperature that reached the 50° threshold mark, and has its ‘infectedBy’ field set to 1 indicating node-1. If this condition repeats iteratively for the other three nodes like node-3, node-4, and node-5, then a complete pattern of four network degree path (N1 -> N2 -> N3 -> N -> N5) of potential bushfire initiated from node-1 is said to be detected.

In our implementation, we have chosen to send out an alert when the fire spreads to five connected nodes. But in reality, the number of nodes the fire spreads to before we send out an alert must be carefully chosen. A logical diagram of this particular pattern is shown in Figure 6. Finally, in response to this potential bushfire, an alert email is published to SNS and it delivers the email to the service’s subscribers. A sample SNS alert email is shown below in Figure 7.

Figure 6. The logical pattern diagram for predicting the bushfire

Figure 7. A sample Amazon SNS email alert to notify a potential bushfire and its traversing path

Real-time visualisation of the potential bushfire spread over time

All the incoming IoT event records (unfiltered and raw events got from the Amazon Kinesis Data Stream) are pushed into an Amazon Elasticsearch Service cluster for durable storage and visualization on the Kibana web UI. (For more information about Kibana, see What Is Kibana.) A real-time heat-map visualization and dashboard is created to continuously monitor the progress of the bushfire as shown in Figure 8. As you can see from Figure 8, the bushfire is spreading from node-1 to node-2 to node-3, and then to node-4 and node-5. This visualization can be further enhanced by recording the geographical location of the IoT sensor nodes. Then, plotting the heat-map over the geographical area under surveillance.

Figure 8. A sample bushfire heat-map visualisation from Amazon Elasticsearch Services

Setup and source code

The URL below explains in detail, the steps to set up all the necessary AWS components and run the IoT simulator and Apache Flink CEP. Here, we provide an AWS CloudFormation template. It creates the architecture from start to finish, shown in Figure 4, by setting up the IoT simulator in an EC2 instance with all other respective components, and then automatically runs the stack. Once the stack creation is completed, users can visit the Kibana web UI to observe the real-time bushfire dashboard. They can also receive an SNS alert email for potential bushfires when they confirm their email subscription for it.

To launch the AWS CloudFormation stack, click on the following Launch Stack button to open the AWS CloudFormation console.

Specify the Amazon S3 URL for the template, and then proceed to the next step where you specify the details explained in the next section. After you provide all the parameters, you can proceed and then create the stack. The parameters that you use are as follows:

1. SNS subscription email: This should be a valid email address for the fire alert notification to be sent. Once the CloudFormation stack creation initiates, you get an email on your provided email account for confirming the subscription. Choose the Confirm subscription button to receive the SNS notification.

Note: You may safely delete the SNS subscription from the Amazon SNS console upon completion of the end-to-end observation.

2. Public subnet ID for EMR cluster: Choose a public subnet from the drop-down menu. This is the subnet where the EMR cluster is created. The EMR cluster requires access to the internet to access the Kinesis stream and the Elasticsearch domain. Therefore, a public subnet with the option auto-assign public IPv4 address enabled within a VPC where the enableDnsHostnames and enableDnsSupport options are set to true.

3. S3 path location for EMR cluster logs: The S3 bucket where EMR puts the cluster logs.

Figure 9. Amazon CloudFormation console to create the AWS resource stack.

4. Public subnet ID for EC2 instance: The subnet where the EC2 instance is created to run the IoT simulator. Once the CloudFormation stack comes up, the IoT simulator automatically runs to ingest the IoT events to the AWS IoT gateway. The choice of this subnet follows the same guidelines set out for the subnet chosen for the EMR cluster shown in Figure 9.

5. Security group ID for EC2 instance: This is the security group attached to the EC2 instance running the IoT simulator. You can optionally add a new rule in this security group for SSH port 22. This allows access to the IoT simulator running in this EC2 instance from your workstation, and use the same public IP address for accessing Kibana web UI described later in this post.

6. Key pair ID for EC2 instance: The key pair to be associated with the EC2 instance and the EMR cluster. It allows you to log in to the instances for further exploration.

Figure 10. Amazon CloudFormation console to create the AWS resource stack.

7. Domain name for Amazon Elasticsearch domain: The Elasticsearch domain name to be created by the CloudFormation template.

8. Public IP address to access Kibana from local machine: The IP address of the local machine from where you want to access Kibana dashboard for visualization. For simplicity, you can provide the public IP address of the workstation from where you are running the CloudFormation stack, this IP address opens up for access on the Amazon Elasticsearch domain for displaying the real-time Kibana dashboard. You also must access the Kibana URL from this IP address only. If your IP address changes, then modify the policy of AWS Elasticsearch cluster with new IP address. For more information, see IP-based Policies in the Amazon Elasticsearch Service Developer Guide.

Once the stack creation is completed, the output section of the CloudFormation stack lists the web URLs to access the necessary resources in this architecture. Use the Kibana web URL and create an index pattern under the Management section with the name “weather-sensor-data,” and then choose the dashboard to see the visualization of the real-time spread of the bushfire covered by the IoT network.

Figure 11. Amazon Elasticsearch domain Kibana Web UI to create the index pattern.

The source codes and an elaborate README are provided in GitHub for the interested users who want to explore the implementation of this architecture in further detail.

https://github.com/aws-samples/realtime-bushfire-alert-with-apache-flink-cep

Troubleshooting Common Issues

1. Kibana Web UI is not accessible

If you see the Message “User: anonymous is not authorized to perform: es:ESHttpGet” while trying to access the Kibana web UI, then this means that the public IP address that was specified during the CloudFormation stack creation time either is not correct or might have been changed. You can confirm the public IP address again from http://checkip.amazonaws.com. Then go to the AWS Management Console for Elasticsearch and modify the security access policy, as shown in the following example, to change the IP address only.

Figure 12. Modifying public IP address in the Amazon Elasticsearch domain access policy.

2.  No records ingested into Elasticsearch

This issue can occur if records fail to be ingested from the EMR cluster. To troubleshoot this, go to the AWS Management Console for IAM and search for the IAM role named “EMR_EC2_Default_Role” and make sure it has the default AWS managed policy “AmazonElasticMapReduceforEC2Role” attached to it.

Figure 13. Verifying the default AWS Managed policy attached to “EMR_EC2_Default_Role” in IAM.

3. No SNS alert E-mail notification

If you do not receive an SNS email alert about the potential bushfire after several minutes of observing the complete visualization, then check whether you had confirmed the SNS subscription at the beginning while the CloudFormation stack was creating by checking your inbox. Additionally, make sure that you have provided a correct email address and re-create the stack again from the scratch.

Summary

In this blog post, we discussed how to build a real-time IoT stream processing, visualization, and alerting pipeline using various AWS services. We took advantage of the Complex Event Processing feature provided by Apache Flink to detect patterns within a network from the incoming events. The GitHub repository contains the resources that are required to run through the example provided in this post. Includes further information that helps you to get started quickly. We encourage you to explore the IoT simulator code and test with different network configuration to ingest more records with different patterns and visualize the bushfire spread path pattern on the Kibana dashboard.


Additional Reading

If you found this post useful, be sure to check out Build a Real-time Stream Processing Pipeline with Apache Flink on AWS, Derive Insights from IoT in Minutes using AWS IoT, Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight, and Integrating IoT Events into Your Analytic Platform.

 


About the Authors

Santoshkumar Kulkarni is a Cloud Support Engineer with AWS Big Data and Analytics Services. He works closely with AWS customers to provide them architectural and engineering assistance and guidance. He is passionate about distributed technologies and streaming systems. In his free time, he enjoys spending time with his family on the beautiful beaches of Sydney.

 

 

 

Joarder Kamal, PhD is a Cloud Support Engineer with AWS Big Data and Analytics Services. He likes building and automating systems that combines distributed communications, real-time data, and collective intelligence. During his spare time, he loves reading travel diaries, doing pencil sketches, and touring across Australia with his wife.

Launch an edge node for Amazon EMR to run RStudio

Post Syndicated from Tanzir Musabbir original https://aws.amazon.com/blogs/big-data/launch-an-edge-node-for-amazon-emr-to-run-rstudio/

RStudio Server provides a browser-based interface for R and a popular tool among data scientists. Data scientist use Apache Spark cluster running on  Amazon EMR to perform distributed training. In a previous blog post, the author showed how you can install RStudio Server on Amazon EMR cluster. However, in certain scenarios you might want to install it on a standalone Amazon EC2 instance and connect to a remote Amazon EMR cluster. Benefits of running RStudio on EC2 include the following:

  • Running RStudio Server on an EC2 instance, you can keep your scientific models and model artifacts on the instance. You might have to relaunch your EMR cluster to meet your application requirements. By running RStudio Server separately, you have more flexibility and don’t have to depend entirely on an Amazon EMR cluster.
  • Installing RStudio on the master node of Amazon EMR requires sharing of resources with the applications running on the same node. By running RStudio on a standalone Amazon EC2 instance, you can use resources as you need without having to share the resources with other applications.
  • You might have multiple Amazon EMR clusters in your environment. With RStudio on Edge node, you have the flexibility to connect to any EMR clusters in your environment.

There is one major difference between running RStudio Server on an Amazon EMR cluster vs. running it on a standalone Amazon EC2 instance. In the latter case, the instance needs to be configured as an Amazon EMR client (or edge node). By doing so, you can submit Apache Spark jobs and other Hadoop-based jobs from an instance other than EMR master node.

In this post, I walk you through a list of steps to configure an Amazon EC2 instance as an Amazon EMR edge node with RStudio Server configured for remote workloads.

Solution overview

In the next few sections, I describe creating an edge node, installing RStudio, and connecting to a remote Spark cluster from R running on the edge node.

At a high level, the solution includes the following steps:

  1. Create an Amazon EC2 instance.
  2. Install RStudio Server and required dependencies on that instance.
  3. Install the Apache Spark and Hadoop client libraries and dependencies on the same instance.
  4. Launch an Amazon EMR cluster with the Apache Spark, Livy, and Hive applications.
  5. Configure the Amazon EC2 instance as an EMR client.
  6. Test EMR client functionality by running sample remote jobs.
  7. Connect to the Spark cluster from R using Sparklyr.
  8. Interact with Hive tables on EMR from RStudio.
  9. Run R models on the data on the EMR cluster.

Let’s take a look at the steps to configure an Amazon EC2 instance as EMR edge node.

Creating an edge node for Amazon EMR

In this exercise, I create a Spark client on the edge node. Because Spark relies on Hadoop libraries, I also install Hadoop on the edge node. To make sure that the client works properly, I install the same Hadoop and Spark versions as those on the EMR cluster. Because most of the libraries also run on JVM, I recommend that you have the same JVM version as the one on the EMR cluster.

After I install Spark and Hadoop on the edge node, I configure the edge node to talk to the remote Amazon EMR cluster. To do that, several configurations files from the EMR cluster need to copy to the edge node. There are two ways to copy the configuration files from a newly created EMR cluster to the edge node on EC2 instance—manual and automated. In the next section, I discuss those two approaches.

Manual approach

After the EMR cluster is up and running, you can use the secure transfer tool scp to copy the required configuration files from an EMR master node to a local machine. In my case, I used my laptop.

> mkdir emr-config
> cd emr-config
> scp -i <key> [email protected]<master-node-dns>:/etc/hadoop/conf/*-site.xml .

You can also use the same tool to copy those files from that local machine to the edge node:

> scp -i <key> hdfs-site.xml [email protected]<edge-node-dns>:/etc/hadoop/conf/.

PC users can use an application like WinSCP to connect and transfer files between an EMR master node and a PC.

Note: Depending on the applications installed on Amazon EMR, you might need to copy other libraries from the cluster. As an example, the open-source distributions of Hadoop and Spark packages that are used in this solution don’t have libraries for EMRFS. So, to use EMRFS, copy the EMRFS libraries to the edge node and update the classpath to include the libraries.

Automated approach (used in this solution)

As you might have noticed in the previous approach, you need to run the copy operation twice:

  1. From the EMR master node to a local machine
  2. From the local machine to the edge node

If you use a bastion host to access EMR, then the copy process also needs to go one extra hop. One way to automate this process is to execute a script as an EMR step, which uploads all the required libraries and configuration files to an Amazon S3 location. A second script on the edge node runs through cfn-init, which downloads files from the S3 location and places them in the right application paths. The following diagram illustrates a sequence of steps that take place during this process.

In this solution, the EMR step (CreateEMRClientDeps) executes the script create-emr-client.sh to copy the configuration files to Amazon S3. The script first creates an archive file awsemrdeps.tgz with all the required libraries. It then uploads that file into a temporary S3 bucket with a prefix ending in /emr-client/. On the edge node, the install-client-and-rstudio.sh script is used to copy the awsemrdeps.tgz file from S3 back to the edge node.

Let’s take a look at the AWS CloudFormation steps to create an edge node for Amazon EMR and run RStudio on the edge node.

Walkthrough using AWS CloudFormation

Following are the prerequisites to run the AWS CloudFormation template for this solution:

  • Create an Amazon VPC with at least one public subnet and one private subnet.
  • Update the IAM policy so that the user has access to create IAM policies, instance profile, roles, and security groups.
  • Enable VPC endpoints for Amazon S3.
  • Create an EC2 key-pair to connect to EC2 instances.

To set up this entire solution, you need to create a few AWS resources. The attached CloudFormation template creates all those required AWS resources and configures them to create an Amazon EMR edge node and running RStudio on it.

This CloudFormation template requires you to pass the following parameters during launch.

ParameterDescription
EmrSubnetThe subnet where the Amazon EMR cluster is deployed. It can be either a public or private subnet.
InstanceTypeThe Amazon EC2 instance type used for the RStudio Server and edge node, which defaults to m4.xlarge.
KeyNameThe name of the existing EC2 key pair to access the Amazon EMR and edge node.
RStudioServerSubnetThe public subnet where the RStudio Server and edge node are launched.
S3RepoPathThe Amazon S3 path where all required files (template, scripts job, sample data, and so on) are stored.
S3TempUploadPathThe S3 path in your AWS account for housing temporary dependency files and sample data for Hive.
VPCThe ID of the virtual private cloud (VPC) where the EMR and edge node is deployed.

Important: This template is designed only to show how you can create an EMR edge node and configure RStudio for remote EMR workloads. This setup isn’t intended for production use without modification. If you try this solution outside of the US-East-1 Region, be sure to download the necessary files from s3://aws-data-analytics-blog/rstudio-edge-node. You then upload the files to the buckets in your AWS Region, edit the script as appropriate, and then run it.

To launch the CloudFormation stack, choose Launch Stack:

The following sample screenshot shows the stack parameters.

Launching this stack creates the following AWS resources.

Logical IDResource typeDescription
EMRClusterAmazon EMR clusterThe EMR cluster to run the Spark and Hive jobs
CreateEMRClientDepsEMR step jobA job that runs a script to create client dependencies and uploads to S3
CreateHiveTablesEMR step jobA job to copy sample data for Hive and create Hive tables
RStudioConfigureWaitConditionCloudFormation wait conditionA wait condition that works with the wait handler, and waits for the RStudio Server setup process to complete
RStudioEIPElastic IP addressThe elastic IP address for RStudio Server
RStudioInstanceProfileInstance profileThe instance profile for the RStudio and edge node instance (for this solution, I used the default role EMR_EC2_DefaultRole created during EMR launch)
RStudioSecGroupAmazon EC2 security groupThe security group that controls incoming traffic to the edge node
RStudioServerEC2Amazon EC2 instanceThe EC2 instance for the edge node and RStudio Server
RStudioToEMRSecGroupAmazon EC2 security groupThe security group that controls traffic between EMR and the edge node
RStudioWaitHandleCloudFormation wait handlerThe wait handler that gets triggered after RStudio Server is launched
SecGroupSelfIngressAmazon EC2 security group ingress ruleAn ingress rule to RStudioToEMRSecGroup that allows the instance to talk an instance with the same security group

The CloudFormation template used in this solution configures S3 paths and stores files to their respective locations. The EMR client dependencies archive awsemrdeps.tgz is stored at the <<s3-temp-upload-path>>/emr-client/ location. The sample data file tripdata.csv is stored at <<s3-temp-upload-path>>/ny-taxi/.

The following screenshot shows how the S3 paths are configured after deployment. In this example, I passed an S3 full path, s3://<<my-bucket>>/rstudio-edge-node, which is on my Amazon S3 account.

When the CloudFormation template has successfully completed, the DNS address of RStudio Server is displayed on the Outputs tab, as shown following.

The address shown is the DNS address of the RStudio Server and edge node. A user should be able to connect to this address immediately after enabling FoxyProxy.

Test data and tables

For the source data, I have used New York City Taxi and Limousine Commission (TLC) trip record data. For a description of the data, see this detailed dictionary of the taxi data. The trip data is in comma-separated value (CSV) format with the first row as a header. The following image shows data from the trip dataset.

A second EMR step, CreateHiveTables, is created as part of the CloudFormation template. This step creates two Hive tables that will be later used by R on RStudio to run sample models. Both are external Hive tables—one is stored in HDFS on the EMR cluster and the other in Amazon S3. The goal is to demonstrate how RStudio can consume data with storage that is backed by HDFS and S3.

Table nameStorage typePath
ny_taxi_hdfsHDFS/user/ruser/ny_taxi
ny_taxi_s3S3s3://<s3-temp-upload-path>/ny_taxi

The following section shows a list of steps to test Amazon EMR client functionality, which is optional.

Testing EMR client functionality (optional)

If the EMR client is configured correctly on the edge node, you should be able to submit Spark jobs from the edge node to the EMR cluster. You can apply the following few steps to the edge node to verify this functionality:

  1. Log in to the edge node using the default user ec2-user.
  2. Choose this host address from the CloudFormation Outputs tab:
ssh -i <<key-pair>> [email protected]<<rstudio-server-address>>
  1. The CloudFormation template also creates a new user, called ruser, that you can use to submit Spark jobs or use the RStudio UI. Switch the user from ec2-user to ruser:
[[email protected] ~]$ sudo -s
[[email protected] ec2-user]# su – ruser
  1. Submit a Spark example job to the remote EMR cluster:
$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn $SPARK_HOME/examples/jars/spark-examples_2.11-2.3.1.jar

  1. Check the job status in the terminal and also on the EMR console. The Spark example job should be able to finish successfully. In the terminal, it should display the value of Pi as shown following.

  1. Check the job status in the Resource Manager UI; notice that the Spark PI job ran as ruser and completed successfully.

  1. Test this setup further by running spark-shell, and retrieve Hive table data from the remote EMR cluster:
[[email protected] ~]$ $SPARK_HOME/bin/spark-shell

  1. Check the list of all available Hive tables and their content:
scala> spark.sql("show tables").show

scala> spark.sql("select * from ny_taxi_s3 limit 10").show

Running R and connecting to Apache Spark

In this section, let’s run some tests and models from RStudio consuming data from Amazon EMR. Locate the RStudio Server address on the Outputs tab on the CloudFormation console. The user name is ruser and the password is BigData26.

A successful login redirects you to this welcome window. The left big window is the console window, where you write R.

Create a SparkContext on R console. No additional configuration is needed, because RStudio is already set up with the required environment variables and files through the AWS CloudFormation stack. In this solution, Sparklyr is used to connect to Spark. Attach the required R packages before creating SparkContext as follows:

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "yarn")

When a connection to Spark is established, it creates a “yarn” connection channel (find this in the RStudio UI, on the Connections tab at the right corner). It also shows Hive metadata on the same widget. Because the CloudFormation template created two Hive tables, they appear under the “yarn” connection as shown following.

A YARN application is also placed under ruser. The status of the application is RUNNING as long as the connection is established. You can find the status of that application on the YARN ResourceManager UI also. Notice that the user is ruser and the name of the application is sparklyr.

For more information, check the YARN app log by choosing Log on the widget.

Now, test whether the data for those two Hive tables is accessible. Choose the ny_taxi_hdfs table to sample the data.

Choose the ny_taxi_s3 table to sample the data on S3.

By successfully running these two tests, you can see that RStudio running on an edge node can consume data stored in a remote EMR cluster.

During the development phase, users might want to write some data back to your S3 bucket. So it’s a good idea to verify whether a user can write data directly to S3 using R and Spark. To test this, I read the ny_taxi_hdfs Hive table using the spark_read_table API. Then I write the data to Amazon S3 by calling the spark_write_csv API and passing my S3 target path. For this solution, I used s3://tm-blogs-placeholder/write-from-rstudio as my new S3 path.

ny_taxi <- spark_read_table(sc, "ny_taxi_hdfs")
spark_write_csv(ny_taxi,path = "s3://tm-blogs-placeholder/write-from-rstudio")

After the write operation, the S3 location appears as follows.

You can also see Spark write logs in the YARN application log.

Now analyze the data with R and draw some plots. To do so, first check the count of ny_taxi data. It should return 20,000.

ny_taxi <- spark_read_table(sc, "ny_taxi_hdfs")
ny_taxi %>% count

Now, find the number of trips for each rate code type. There are six different rate code types where 1 is the standard rate code and 5 is the negotiated rate. For details, see this detailed dictionary of the taxi data.

library(ggplot2)
trip_by_rate_code_id <- ny_taxi %>%
  mutate(rate_code_id) %>%
  group_by(rate_code_id) %>%
  summarize(n = n()) %>%
  collect()

ggplot(trip_by_rate_code_id, aes(rate_code_id, n)) + 
  geom_bar(stat="Identity") +
  scale_y_continuous(labels = scales::comma) +
  labs(title = "Number of Trips by Rate Code", x = "Rate Code Id", y = "")

Based on the graph, I can say that (except for some passengers who paid a negotiated rate) the rest of the passengers paid the standard rate during their ride.

Now find the average trip duration between two New York areas—Queens and Manhattan. The pu_location_id value represents the taxi pick-up zone, and do_location_id represents the taxi drop-off zone. For this test, I use 129 as the pick-up zone and 82 as the drop-off zone. Taxi zone 129 represents the Jackson Heights area in Queens, and taxi zone 82 represents the Elmhurst area. For details, see this taxi zone lookup table.

trip_duration_tbl <- ny_taxi %>%
  filter(pu_location_id == 129 & do_location_id == 82) %>%
  mutate(pickup_time = hour(from_unixtime(unix_timestamp(lpep_pickup_datetime, "MM/dd/yy HH:mm")))) %>%
  mutate(trip_duration = unix_timestamp(lpep_dropoff_datetime, "MM/dd/yy HH:mm") - unix_timestamp(lpep_pickup_datetime, "MM/dd/yy HH:mm")) %>%
  group_by(pickup_time) %>% 
  summarize(n = n(),
            trip_duration_mean = mean(trip_duration),
            trip_duration_p10 = percentile(trip_duration, 0.10),
            trip_duration_p25 = percentile(trip_duration, 0.25),
            trip_duration_p50 = percentile(trip_duration, 0.50),
            trip_duration_p75 = percentile(trip_duration, 0.75),
            trip_duration_p90 = percentile(trip_duration, 0.90)) %>% 
  collect()
            
ggplot(trip_duration_tbl, aes(x = pickup_time)) +
          geom_line(aes(y = trip_duration_p50, alpha = "Median")) +
          geom_ribbon(aes(ymin = trip_duration_p25, ymax = trip_duration_p75, 
                          alpha = "25–75th percentile")) +
          geom_ribbon(aes(ymin = trip_duration_p10, ymax = trip_duration_p90, 
                          alpha = "10–90th percentile")) +
          scale_y_continuous("Trip Duration (in seconds)") + 
          scale_x_continuous("Pickup Time of the day")

Based on the plot, I can say that on average, each trip duration was about 10–12 minutes. There was a rare peak around 1 a.m. for some days, where the trip duration was more than 30 minutes.

Next steps

The goal of this post is to show, first how to create an edge node or Amazon EMR client on an Amazon EC2 instance. Second, it’s to show how other applications—RStudio in this case—can use that edge node or Amazon EMR client to submit workloads remotely. By following the same approach, you can also create an edge node for other Hadoop applications—Hive client, Oozie client, HBase client, and so on. Data scientists can keep enriching their R environment by adding additional packages and keeping it totally isolated from developers EMR environments. To enhance this solution further and make this production ready, you can explore the following options:

  • Use friendly URLs for Amazon EMR interfaces. For example, instead of thrift://ip-10-0-20-253.ec2.internal:9083 for the hive.metastore.uris value, you can use something like thrift://hive-metastore.dev.example.corp:9083. In the same way, instead of using ip-10-0-20-253.ec2.internal:8032 for the yarn.resourcemanager.address property value, you can use dev.emr.example.corp:8032. The benefit of this approach is that, even if you terminate your EMR cluster and recreate it again (with new IP addresses), you don’t have to change your client node’s configuration. This blog post shows how you can create friendly URLs for Amazon EMR.
  • If you already integrated Microsoft Active Directory into your Amazon EMR cluster, you can do the same with RStudio. That way, you can achieve single sign-on across your data analytics solutions.
  • Enable detailed Amazon CloudWatch logs to monitor your edge node behaviors and trigger alerts for different scenarios (disk space utilization, memory usage, and so on). With this approach, you can proactively notify your data scientists before a possible failure.
  • H20 is one of the popular packages used in R. It’s open-source software that allows users to fit thousands of potential models to discover patterns in user data. You can install H20 using CRAN just like the way Sparklyr was installed in this solution. You can execute this on RStudio. Alternatively, you can add the H20 package as part of the installation process by placing it in the install-client-and-rstudio.sh
install.packages("h2o")
library(h2o)
localH2O = h2o.init()

Common issues

Although it’s hard to cover every possible scenario (because these vary on AWS environments), this section covers some common issues that can occur and ways to fix them.

Issue 1: Clicking the RStudio Server URL returns a There is no Internet connection error.

Solution:  Make sure that you configured FoxyProxy in your browser and that you are connecting to the public IP address of the RStudio EC2 instance. You can get this address from the AWS CloudFormation console on the Outputs tab.

Issue 2: The EMR step job CreateClientDeps fails.

Solution: This EMR step job runs the create-emr-client.sh script, which creates an archive with all required dependencies and uploads it to the S3 location. If the edge node doesn’t have write access to S3, this step job fails. In this solution, the default EMR role EMR_EC2_DefaultRole is assigned to the edge node instance also. We assume that EMR_EC2_DefaultRole has write access to the S3 location given through the CloudFormation parameter S3TempUploadPath.

Issue 3: The AWS CloudFormation template Blog-EMR-Edge-Node-With-RStudio times out or fails.

Solution: A script called install-client-and-rstudio.sh runs through cfn-init on the edge node, and it writes logs to the /tmp/edge-node-rstudio-installation.log file. This script contains a sleep clause, where it waits for the awsemrdeps.tgz file to be available on S3. This clause times out after 20 minutes. If the script fails to find that file within that time period, subsequent execution fails. Also, in this solution, RStudio uses http://cran.rstudio.com/ as its repo when installing packages. If the Amazon EC2 instance can’t reach the internet, it can’t download and install those packages, and the template might fail. Make sure that you pick a public subnet or a private subnet with NAT for the edge node.

Issue 4: During Amazon EMR client testing, the Spark sample application fails with a NoClassdefFounderror or UnsupportedOperationException error.

Solution: This blog post uses Amazon EMR 5.16.0. Make sure to use the Hadoop and Spark versions corresponding to the EMR release. If the master node’s application version is different from the edge node’s application version, the client might fail with a NoClassdefFounderror or UnsupportedOperationException error. Make sure that you always install the same version of Hadoop and Spark in both locations.

Cleaning up

When you’ve finished testing this solution, remember to clean up all those AWS resources that you created using AWS CloudFormation. Use the AWS CloudFormation console or AWS CLI to delete the stack named Blog-EMR-Edge-Node-With-RStudio.

Summary

In this post, I show you how to create a client for Amazon EMR. I also show how you can install RStudio on that client node and connect Apache Spark clusters running on Amazon EMR. I used Sparklyr to connect to Spark, consume data from both HDFS and S3, and analyze the data using R models. Go ahead—give this solution a try and share your experience with us!

 


Additional Reading

If you found this post useful, be sure to check out Running sparklyr – RStudio’s R Interface to Spark on Amazon EMR, Statistical Analysis with Open-Source R and RStudio on Amazon EMR, and Running R on Amazon Athena.

 


About the Author

Tanzir Musabbir is an EMR Specialist Solutions Architect with AWS. He is an early adopter of open source Big Data technologies. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.

 

Migrate RDBMS or On-Premise data to EMR Hive, S3, and Amazon Redshift using EMR – Sqoop

Post Syndicated from Nivas Shankar original https://aws.amazon.com/blogs/big-data/migrate-rdbms-or-on-premise-data-to-emr-hive-s3-and-amazon-redshift-using-emr-sqoop/

This blog post shows how our customers can benefit by using the Apache Sqoop tool. This tool is designed to transfer and import data from a Relational Database Management System (RDBMS) into AWS – EMR Hadoop Distributed File System (HDFS), transform the data in Hadoop, and then export the data into a Data Warehouse (e.g. in Hive or Amazon Redshift).

To demonstrate the Sqoop tool, this post uses Amazon RDS for MySQL as a source and imports data in the following three scenarios:

  • Scenario 1AWS EMR (HDFS -> Hive and HDFS)
  • Scenario 2Amazon S3 (EMFRS), and then to EMR-Hive
  • Scenario 3 — S3 (EMFRS), and then to Redshift

 

These scenarios help customers initiate the data transfer simultaneously, so that the transfer can run more expediently and cost efficient than a traditional ETL tool. Once the script is developed, customers can reuse it to transfer a variety of RDBMS data sources into EMR-Hadoop. Examples of these data sources are PostgreSQL, SQL Server, Oracle, and MariaDB.

We can also simulate the same steps for an on-premise RDBMS. This requires us to have the correct JDBC driver installed, and a network connection set up between the Corporate Data Center and the AWS Cloud environment. In this scenario, consider using either the AWS Direct Connect or AWS Snowball methods, based upon the data load volume and network constraints.

Prerequisites

To complete the procedures in this post, you need to perform the following tasks.

Step 1 — Launch an RDS Instance

By using the AWS Management Console or AWS CLI commands, launch MySQL instances with the desired capacity. The following example use the T2.Medium class with default settings.

To call the right services, copy the endpoint and use the following JDBC connection string exactly as shown. This example uses the US East (N. Virginia) us-east-1 AWS Region.

jdbc:mysql:// <<Connection string>>.us-east-1.rds.amazonaws.com.us-east-1.rds.amazonaws.com:3306/sqoopblog

Step 2 — Test the connection and load sample data into RDS – MySQL

First, I used open source data sample from this location: https://bulkdata.uspto.gov/data/trademark/casefile/economics/2016/

Second, I loaded the following two tables:

Third, I used MySQL Workbench tool to load sample tables and the Import/Export wizard to load data.  This loads data automatically and creates the table structure.

Download Steps:

The following steps can help download the MySQL Database Engine and load above mentioned data source into tables: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_GettingStarted.CreatingConnecting.MySQL.html#CHAP_GettingStarted.Connecting.MySQL

I used the following instructions on a Mac:

Step A: Install Homebrew Step B: Install MySQL
Homebrew is open source software package management system; At the time of this blog post, Homebrew has MySQL version 5.7.15 as the default formula in its main repository.Enter the following command: $ brew info MySQL
To install Homebrew, open terminal, and enter:Expected output: MySQL: stable 8.0.11 (bottled)
$ /usr/bin/ruby -e “$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)”To install MySQL, enter: $ brew install MySQL
Homebrew then downloads and installs command line tools for Xcode 8.0 as part of the installation process)

Fourth, when the download is complete, provide the connection string, port, SID to the connection parameter. In main console, click MySQL connections (+) sign à new connection window and provide connection parameter Name, hostname – RDS endpoint, port, username and password.

Step 3 — Launch EMR Cluster

Open the EMR console, choose Advanced option, and launch the cluster with the following options set:

Step 4 — Test the SSH access and install MySQL-connector-java-version-bin.jar in the EMR folder

a. From the security groups for Master – click link and edit inbound rule to allow your PC or laptop IP to access the Master cluster.

b. Download the MySQL JDBC driver to your local PC from the following location: http://www.mysql.com/downloads/connector/j/5.1.html

c. Unzip the folder and copy the latest version of MySQL Connector available. (In my example, the version I use is MySQL-connector-java-5.1.46-bin.jar file).

d. Copy the file to the /var/lib/sqoop/ directory EMR master cluster. (Note: Because EMR Master doesn’t allow public access to the master node, I had to do a manual download from a local PC. I then used FileZila (Cross platform FTP application) to push the file.)

e. From your terminal, SSH to Master cluster, navigate to the /usr/lib/sqoop directory and copy this JAR file.

Note: This driver copy can be automated by using a bootstrap script to copy the driver file into an S3 path, and then transferring it into a master node. An example script would be:

aws s3 cp s3://mybucket/myfilefolder/ MySQL-connector-java-5.1.46-bin.jar  /usr/lib/sqoop/  

Or, with temporary internet access to download file directly into Master node, copy code below:

wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.46.tar.gz
tar -xvzf mysql-connector-java-5.1.46.tar.gz
sudo cp mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar /usr/lib/sqoop/ 

In the Master node directory /usr/lib/sqoop, it should look like below.

  1. Before you begin working with EMR, you need at least two AWS Identity and Access Management (IAM) service roles with sufficient permissions to access the resources in your account.
  2. Amazon RDS and EMR Master and Slave clusters must have access to connect and then initiate the importing and exporting of data from MySQL RDS instances. For example, I am editing the RDS MySQL instance security group to allow an incoming connection from the EMR nodes – the Master security group and Slave Security group.

Step 5 — Test the connection to MySQL RDS from EMR

After you are logged in, run the following command in the EMR master cluster to validate your connection. It also checks the MySQL RDS login and runs the sample query to check table record count.

sqoop eval --connect "jdbc:mysql:// <<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog"  --query "	
select count(*) from sqoopblog.event" --username admin -P

Note: This record count in the previous sample query should match with MySQL tables, as shown in the following example:

Import data into EMR – bulk load

To import data into EMR, you must first import the full data as a text file, by using the following query:

sqoop import --connect "jdbc:mysql://<<ConnectionString>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event --target-dir /user/hadoop/EVENT --username admin -P -m 1

After the import completes, validate the extract file in respective hadoop directory location.

As shown in the previous example, the original table was not partitioned. Hence, it is extracted as one file and imported into the Hadoop folder. If this had been a larger table, it would have caused performance issues.

To address this issue, I show how performance increases if we select a partition table and use the direct method to export faster and more efficiently. I updated the event table, EVENTS_PARTITION, with the EVENT_DT column as the KEY Partition. I then copied the original table data into this table.  In addition, I used the direct method to take advantage of utilizing MySQL partitions to optimize the efficiency of simultaneous data transfer.

Copy data and run stats.

Run the following query in MySQL Workbench to copy data and run stats:

insert into sqoopblog.event_partition select * from sqoopblog.event

analyze table sqoopblog.event_partition

After running the query in MySQL workbench, run the following Sqoop command in the EMR master node:

sqoop import --connect "jdbc:mysql:// <<ConnectionString>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --target-dir /user/hadoop/EVENTSPARTITION --username admin -P --split-by event_dt

This example shows the performance improvement for the same command with the added argument option, which is a partitioned table.  It also shows the data file split into four parts. Number of map reduce tasks automatically creates 4 based on table partition stats.

We can also use the m 10 argument to increase the map tasks, which equals to the number of input splits

sqoop import --connect "jdbc:mysql:// <<ConnectionString>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --target-dir /user/hadoop/EVENTSPARTITION --username admin -P --split-by event_dt -m 10

Note: You can also split more data extract files during the import process by increasing the map reduce engine argument ( -m <<desired #> , as shown in the above sample code. Make sure that the extract files align with partition distribution, otherwise the output files will be out of order.

Consider the following additional options, if required to import selective columns.

In the following example, add the – COLUMN argument to the selective field.

sqoop import --connect "jdbc:mysql://<<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --columns "EVENT_CD,SERIAL_NO,EVENT_DT" --target-dir /user/hadoop/EVENTSSELECTED --split-by EVENT_DT --username admin -P -m 5

For scenario 2, we will import the table data file into S3 bucket. Before you do, make sure that the EMR-EC2 instance group has added security to the S3 bucket. Run the following command in the EMR master cluster:

sqoop import --connect "jdbc:mysql:// <<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --target-dir s3://nivasblog/sqoopblog/ --username admin -P -m 1 --fields-terminated-by '\t' --lines-terminated-by '\n' --as-textfile 

Import as Hive table – Full Load

Now, let’s try creating a hive table directly from the Sqoop command. This is a more efficient way to create hive tables dynamically, and we can later alter this table as an external table for any additional requirements. With this method, customers can save time creating and transforming data into hive through an automated approach.

sqoop import --connect "jdbc:mysql://<<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition  --hive-import --create-hive-table --hive-table HIVEIMPORT1 --delete-target-dir --target-dir /user/hadoop/EVENTSHIVE1 --split-by EVENT_DT --hive-overwrite -m 4

Now, let’s try a direct method to see how significantly the load performance and import time improves.

sqoop import --connect "jdbc:mysql://<<connection string>>us-east-1.rds.amazonaws.com:3306/sqoopblog"  --username admin -P --table event_partition  --hive-import --create-hive-table --hive-table HIVEIMPORT2 --delete-target-dir --target-dir /user/hadoop/nivas/EVENTSHIVE2 --split-by EVENT_DT --hive-overwrite --direct

Following are additional optional to consider.

In the following example, add the COLUMN argument to the selective field and import into EMR as hive table.

sqoop import --connect "jdbc:mysql://<<connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition  --columns "event_cd,serial_no,event_dt" --hive-import --create-hive-table --hive-table HIVESELECTED --delete-target-dir --target-dir /user/hadoop/nivas/EVENTSELECTED --split-by EVENT_DT --hive-overwrite –direct

Perform a free-form query and import into EMR as a hive table.

sqoop import --connect "jdbc:mysql:// <<connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --query "select a.serial_no, a.event_cd, a.event_dt, b.us_class_cd, b.class_id from event_partition a, us_class b where a.serial_no=b.serial_no AND \$CONDITIONS" --hive-import --create-hive-table --hive-table HIVEQUERIED --delete-target-dir --target-dir /user/hadoop/EVENTSQUERIED -m 1 --hive-overwrite -direct

– For scenario 2, create a hive table manually from the S3 location.  The following sample creates an external table from the S3 location. Run the select statement to check data counts.

Import note: Using Sqoop version 1.4.7, you can directly create hive tables by using scripts, as shown in the following sample code.  This feature is supported in EMR 5.15.0.

sqoop import --connect "jdbc:mysql://<<connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition  --hive-import --target-dir s3n://nivasblog/sqoopblog/1/ --create-hive-table --hive-table s3table --split-by EVENT_DT --fields-terminated-by '\t' --lines-terminated-by '\n' --as-textfile

For the previous code samples, validate in Hive or Hue, and confirm the table records.

Import the full schema table into Hive.

Note: Create a Hive database in Hue or Hive first, and then run the following command in the EMR master cluster.

sqoop import-all-tables --connect "jdbc:mysql://<<Connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --hive-database sqoopimport --create-hive-table --hive-import --compression-codec=snappy --hive-overwrite –direct

Import as Hive table – Incremental Load

Now, let’s try loading into Hive a sample incremental data feed for the partition table with the event date as the key. Use the following Sqoop command on an incremental basis.

In addition to initial data in table called EVENT_BASETABLE. I loaded the incremental data into  EVENT_BASETABLE table. Let’s follow below steps and command to do incremental updates by sqoop, and import into Hive.

sqoop import --connect "jdbc:mysql:// <<Connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition --target-dir /user/hadoop/INCRTAB --split-by event_dt -m 1 --check-column event_dt --incremental lastmodified --last-value '2018-06-29'

Once the incremental extracts are loaded into the Hadoop directory, you can create temporary, or incremental, tables in Hive and insert them into the main tables.

CREATE TABLE incremental_table (event_cd text, event_dt date, event_seq int(11),event_type_cd text,serial_no int(11)) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','LOCATION '/user/hadoop/INCRTAB'

Insert into default.hiveimport1 select * from default.incremental_table

Alternatively, you can also perform the –query argument to do the incremental operation by joining various tables and condition arguments, and then inserting them into the main table.

--query "select * from EVENT_BASETABLE where modified_date > {last_import_date} AND $CONDITIONS"

All of these steps have been created as a Sqoop job to automate the flow.

Export data to Redshift

Now that data is imported into EMR- HDFS, S3 data store, let’s see how to use the Sqoop command to export data back into the Datawarehouse layer. In this case, we will use the Redshift cluster and demonstrate with an example.

Download the following JDBC API that our SQL client tool or application uses. If you’re not sure, download the latest version of the JDBC 4.2 API driver.

The class name for this driver is 1.2.15.1025/RedshiftJDBC41-1.2.15.1025.jar

com.amazon.redshift.jdbc42.Driver.

Copy this JAR file into the EMR master cluster node. SSH to Master cluster, navigate to /usr/lib/sqoop directory and copy this JAR file.

Note: Because EMR Master doesn’t allow public access to the master node, I had to do a manual download from a local PC. Also, I used FileZila to push the file.

Log in to the EMR master cluster and run this Sqoop command to copy the S3 data file into the Redshift cluster.

Launch the Redshift cluster. This example uses ds2.xLarge(Storage Node).

After Redshift launches, and the security group is associated with the EMR cluster to allow a connection, run the Sqoop command in EMR master node. This exports the data from the S3 location (shown previously in the Code 6 command) into the Redshift cluster as a table.

I created a table structure in Redshift as shown in the following example.

DROP TABLE IF EXISTS sqoopexport CASCADE;

CREATE TABLE sqoopexport
(
   event_cd       varchar(25)   NOT NULL,
   event_dt       varchar(25),
   event_seq      varchar(25)   NOT NULL,
   event_type_cd  varchar(25)   NOT NULL,
   serial_no      varchar(25)   NOT NULL
);

COMMIT;

When the table is created, run the following command to import data into the Redshift table.

sqoop export --connect jdbc:redshift://<<Connection String>>.us-east-1.redshift.amazonaws.com:5439/sqoopexport --table sqoopexport --export-dir s3://nivastest1/events/ --driver com.amazon.redshift.jdbc42.Driver --username admin -P --input-fields-terminated-by '\t'

This command inserts the data records into the table.

For more information, see Loading Data from Amazon EMR.

For information about how to copy data back into RDBMS, see Use Sqoop to Transfer Data from Amazon EMR to Amazon RDS.

Summary

You’ve learned how to use Apache Sqoop on EMR to transfer data from RDBMS to an EMR cluster. You created an EMR cluster with Sqoop, processed a sample dataset on Hive, built sample tables in MySQL-RDS, and then used Sqoop to import the data into EMR. You also created a Redshift cluster and exported data from S3 using Sqoop.

You proved that Sqoop can perform data transfer in parallel, so execution is quick and more cost effective. You also simplified ETL data processing from the source to a target layer.

The advantages of Sqoop are:

  • Fast and parallel data transfer into EMR — taking advantage of EMR compute instances to do an import process by removing external tool dependencies.
  • An import process by using a direct-to-MySQL expediting query and pull performance into EMR Hadoop and S3.
  • An Import Sequential dataset from Source system (Provided tables have primary keys) maintained simplifying growing need to migrate on-premised RDBMS data without re-architect

Sqoop pain points include.

  • Automation by developer/Operations team community. This requires automating through workflow/job method either using Airflow support for Sqoop or other tools.
  • For those tables doesn’t have primary keys and maintains legacy tables dependencies, will have challenges importing data incrementally. Recommendation is to do one-time migration through Sqoop bulk transfer and re-architect your source ingestion mechanism.
  • Import/Export follows JDBC connection and doesn’t support other methods like ODBC or API calls.

If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Use Sqoop to transfer data from Amazon EMR to Amazon RDS and Seven tips for using S3DistCp on AMazon EMR to move data efficiently between HDFS and Amazon S3.

 


About the Author

Nivas Shankar is a Senior Big Data Consultant at Amazon Web Services. He helps and works closely with enterprise customers building big data applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts. He enjoys spending time with his wife and two adorable kids. In his spare time, he takes his kids to tennis and football practice.

 

 

 

Build a Concurrent Data Orchestration Pipeline Using Amazon EMR and Apache Livy

Post Syndicated from Binal Jhaveri original https://aws.amazon.com/blogs/big-data/build-a-concurrent-data-orchestration-pipeline-using-amazon-emr-and-apache-livy/

Many customers use Amazon EMR and Apache Spark to build scalable big data pipelines. For large-scale production pipelines, a common use case is to read complex data originating from a variety of sources. This data must be transformed to make it useful to downstream applications, such as machine learning pipelines, analytics dashboards, and business reports. Such pipelines often require Spark jobs to be run in parallel on Amazon EMR. This post focuses on how to submit multiple Spark jobs in parallel on an EMR cluster using Apache Livy, which is available in EMR version 5.9.0 and later.

Apache Livy is a service that enables easy interaction with a Spark cluster over a REST interface. Apache Livy lets you send simple Scala or Python code over REST API calls instead of having to manage and deploy large jar files. This helps because it scales data pipelines easily with multiple spark jobs running in parallel, rather than running them serially using EMR Step API. Customers can continue to take advantage of transient clusters as part of the workflow resulting in cost savings.

For the purpose of this blog post, we use Apache Airflow to orchestrate the data pipeline. Airflow is an open-sourced task scheduler that helps manage ETL tasks. Customers love Apache Airflow because workflows can be scheduled and managed from one central location. With Airflow’s Configuration as Code approach, automating the generation of workflows, ETL tasks, and dependencies is easy. It helps customers shift their focus from building and debugging data pipelines to focusing on the business problems.

High-level Architecture

Following is a detailed technical diagram showing the configuration of the architecture to be deployed.

We use an AWS CloudFormation script to launch the AWS services required to create this workflow. CloudFormation is a powerful service that allows you to describe and provision all the infrastructure and resources required for your cloud environment, in simple JSON or YAML templates. In this case, the template includes the following:

The Airflow server uses a LocalExecutor (tasks are executed as a subprocess), which helps to parallelize tasks locally. For production workloads, you should consider scaling out with the CeleryExecutor on a cluster with multiple worker nodes.

For demonstration purposes, we use the movielens dataset to concurrently convert the csv files to parquet format and save it to Amazon S3. This dataset is a popular open-source dataset, which is used in exploring data science algorithms. Each dataset file is a comma-separated file with a single header row. The following table describes each file in the dataset.

DatasetDescription
movies.tsvHas the title and list of genres for movies being reviewed.
ratings.csvShows how users rated movies, using a scale from 1-5. The file also contains the time stamp for the movie review.
tags.csvShows a user-generated tag for each movie. A tag is user-generated metadata about a movie. A tag can be a word or a short phrase. The file also contains the time stamp for the tag.
links.csvContains identifiers to link to movies used by IMDB and MovieDB.
genome-scores.csvShows the relevance of each tag for each movie.
genome-tags.csvProvides the tag descriptions for each tag in the genome-scores.csv file.

Building the Pipeline

Step 0: Prerequisites

Make sure that you have a bash-enabled machine with AWS CLI installed.

Step 1: Create an Amazon EC2 key pair

To build this ETL pipeline, connect to an EC2 instance using SSH. This requires access to an Amazon EC2 key pair in the AWS Region you’re launching your CloudFormation stack. If you have an existing Key Pair in your Region, go ahead and use that Key Pair for this exercise. If not, to create a key pair open the AWS Management Console and navigate to the EC2 console. In the EC2 console left navigation pane, choose Key Pairs.

Choose Create Key Pair, type airflow_key_pair (make sure to type it exactly as shown), then choose Create. This downloads a file called airflow_key_pair.pem. Be sure to keep this file in a safe and private place. Without access to this file, you lose the ability to use SSH to connect with your EC2 instance.

Step 2: Execute the CloudFormation Script

Now, we’re ready to run the CloudFormation script!

Note: The CloudFormation script uses a DBSecurityGroup, which is NOT supported in all Regions.

On the next page, choose the key pair that you created in the previous step (airflow_key_pair) along with a S3 bucket name. The S3 bucket should NOT exist as the cloudformation creates a new S3 bucket. Default values for other parameters have been chosen for simplicity.

After filling out these parameters to fit your environment, choose Next. Finally, review all the settings on the next page. Select the box marked I acknowledge that AWS CloudFormation might create IAM resources (this is required since the script creates IAM resources), then choose Create. This creates all the resources required for this pipeline and takes some time to run. To view the stack’s progress, select the stack you created and choose the Events section or panel.

It takes a couple of minutes for the CloudFormation template to complete.

Step 3: Start the Airflow scheduler in the Airflow EC2 instance

To make these changes, we use SSH to connect to the EC2 instance created by the CloudFormation script. Assuming your local machine has an SSH client, this can be accomplished from the command line. Navigate to the directory that contains the airflow_key_pair.pem file you downloaded earlier and insert the following commands, replacing your-public-ip and your-region with the relevant values from your EC2 instance. The public DNS Name of the EC2 instance can be found on the Outputs tab

Type yes when prompted after the SSH command.

chmod 400 airflow_key_pair.pem
ssh -i "airflow_key_pair.pem" [email protected]

For more information or help with issues, see Connecting to Your Linux Instance Using SSH.

Now we need to run some commands as the root user.

# sudo as the root user
sudo su
# Navigate to the airflow directory which was created by the cloudformation template – Look at the user-data section.
cd ~/airflow
source ~/.bash_profile

Below is an image of how the ‘/root/airflow/’ directory should look like.

Now we need to start the airflow scheduler. The Airflow scheduler monitors all tasks and all directed acyclic graphs (DAGs), and triggers the task instances whose dependencies have been met. In the background, it monitors and stays in sync with a folder for all DAG objects that it may contain. It periodically (every minute or so) inspects active tasks to see whether they can be triggered.

The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To get it started, run the airflow scheduler. It will use the configuration specified in airflow.cfg.

To start a scheduler, run the below command in your terminal.

airflow scheduler

Your screen should look like the following with scheduler running.

Step 4: View the transform_movielens DAG on the Airflow Webserver

The Airflow webserver should be running on port 8080. To see the Airflow webserver, open any browser and type in the <EC2-public-dns-name>:8080. The public EC2 DNS name is the same one found in Step 3.

You should see a list of DAGs on the Airflow dashboard. The example DAGs are left there in case you want you experiment with them. But we focus on the transform_movielens DAG for the purposes of this blog. Toggle the ON button next to the name of the DAG.

The following example shows how the dashboard should look.

Choose the transform_movielens DAG, then choose Graph View to view the following image.

This image shows the overall data pipeline. In the current setup, there are six transform tasks that convert each .csv file to parquet format from the movielens dataset. Parquet is a popular columnar storage data format used in big data applications. The DAG also takes care of spinning up and terminating the EMR cluster once the workflow is completed.

The DAG code can also be viewed by choosing the Code button.

Step 5: Run the Airflow DAG

To run the DAG, go back to the Airflow dashboard, and choose the Trigger DAG button for the transform_movielens DAG.

When the Airflow DAG is run, the first task calls the run_job_flow boto3 API to create an EMR cluster. The second task waits until the EMR cluster is ready to take on new tasks. As soon as the cluster is ready, the transform tasks are kicked off in parallel using Apache Livy, which runs on port 8998. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. To run more tasks in parallel (multiple spark sessions) in Airflow without overwhelming the EMR cluster, you can throttle the concurrency.

How does Apache Livy run the Scala code on the EMR cluster in parallel?

Once the EMR cluster is ready, the transform tasks are triggered by the Airflow scheduler. Each transform task triggers Livy to create a new interactive spark session. Each POST request brings up a new Spark context with a Spark interpreter. This remote Spark interpreter is used to receive and run code snippets, and return back the result.

Let’s use one of the transform tasks as an example to understand the steps in detail.

# Converts each of the movielens datafile to parquet
def transform_movies_to_parquet(**kwargs):
    # ti is the Task Instance
    ti = kwargs['ti']
    cluster_id = ti.xcom_pull(task_ids='create_cluster')
    cluster_dns = emr.get_cluster_dns(cluster_id)
    headers = emr.create_spark_session(cluster_dns, 'spark')
    session_url = emr.wait_for_idle_session(cluster_dns, headers)
    statement_response = emr.submit_statement(session_url,   '/root/airflow/dags/transform/movies.scala')
    emr.track_statement_progress(cluster_dns, statement_response.headers)
    emr.kill_spark_session(session_url)

The first three lines of this code helps to look up the EMR cluster details. This is used to create an interactive spark session on the EMR cluster using Apache Livy.

create_spark_session

Apache Livy creates an interactive spark session for each transform task. The code for which is shown below. SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. The Spark session is created by calling the POST /sessions API.

Note: You can also change different parameters like driverMemory, executor Memory, number of driver and executor cores as part of the API call.

# Creates an interactive scala spark session. 
# Python(kind=pyspark), R(kind=sparkr) and SQL(kind=sql) spark sessions can also be created by changing the value of kind.
def create_spark_session(master_dns, kind='spark'):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind}
    headers = {'Content-Type': 'application/json'}
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

submit_statement

Once the session has completed starting up, it transitions to the idle state. The transform task is then submitted to the session. The scala code is submitted as a REST API call to the Livy Server instead of the EMR cluster, to have good fault tolerance and concurrency.

# Submits the scala code as a simple JSON command to the Livy server
def submit_statement(session_url, statement_path):
    statements_url = session_url + '/statements'
    with open(statement_path, 'r') as f:
        code = f.read()
    data = {'code': code}
    response = requests.post(statements_url, data=json.dumps(data), headers={'Content-Type': 'application/json'})
    logging.info(response.json())
    return response

track_statement_progress

The progress of the statement can also be easily tracked and the logs are centralized on the Airflow webserver.

# Function to help track the progress of the scala code submitted to Apache Livy
def track_statement_progress(master_dns, response_headers):
    statement_status = ''
    host = 'http://' + master_dns + ':8998'
    session_url = host + response_headers['location'].split('/statements', 1)[0]
    # Poll the status of the submitted scala code
    while statement_status != 'available':
        # If a statement takes longer than a few milliseconds to execute, Livy returns early and provides a statement URL that can be polled until it is complete:
        statement_url = host + response_headers['location']
        statement_response = requests.get(statement_url, headers={'Content-Type': 'application/json'})
        statement_status = statement_response.json()['state']
        logging.info('Statement status: ' + statement_status)

        #logging the logs
        lines = requests.get(session_url + '/log', headers={'Content-Type': 'application/json'}).json()['log']
        for line in lines:
            logging.info(line)

        if 'progress' in statement_response.json():
            logging.info('Progress: ' + str(statement_response.json()['progress']))
        time.sleep(10)
    final_statement_status = statement_response.json()['output']['status']
    if final_statement_status == 'error':
        logging.info('Statement exception: ' + statement_response.json()['output']['evalue'])
        for trace in statement_response.json()['output']['traceback']:
            logging.info(trace)
        raise ValueError('Final Statement Status: ' + final_statement_status)
    logging.info('Final Statement Status: ' + final_statement_status)

The below is a snapshot of the centralized logs from the Airflow webserver.

Once the job is run successfully, the Spark session is ended and the EMR cluster is terminated.

Analyze the data in Amazon Athena

The output data in S3 can be analyzed in Amazon Athena by creating a crawler on AWS Glue. For information about automatically creating the tables in Athena, see the steps in Build a Data Lake Foundation with AWS Glue and Amazon S3.

Summary

In this post, we explored orchestrating a Spark data pipeline on Amazon EMR using Apache Livy and Apache Airflow. We created a simple Airflow DAG to demonstrate how to run spark jobs concurrently. You can modify this to scale your ETL data pipelines and improve latency. Additionally, we saw how Livy helps to hide the complexity to submit spark jobs via REST by using optimal EMR resources.

For more information about the code shown in this post, see AWS Concurrent Data Orchestration Pipeline EMR Livy. Feel free to leave questions and other feedback in the comments.

 


Additional Reading

If you found this post useful, be sure to check out Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy.

 


About the Author

Binal Jhaveri is a Big Data Engineer at Amazon Web Services. Her passion is to build big data products in the cloud. During her spare time, she likes to travel, read detective fiction and loves exploring the abundant nature that the Pacific Northwest has to offer.

 

 

 

Exploratory data analysis of genomic datasets using ADAM and Mango with Apache Spark on Amazon EMR

Post Syndicated from Alyssa Marrow original https://aws.amazon.com/blogs/big-data/exploratory-data-analysis-of-genomic-datasets-using-adam-and-mango-with-apache-spark-on-amazon-emr/

Exploratory data analysis of genomic datasets using ADAM and Mango with Apache Spark on Amazon EMR

As the cost of genomic sequencing has rapidly decreased, the amount of publicly available genomic data has soared over the past couple of years. New cohorts and studies have produced massive datasets consisting of over 100,000 individuals. Simultaneously, these datasets have been processed to extract genetic variation across populations, producing mass amounts of variation data for each cohort.

In this era of big data, tools like Apache Spark have provided a user-friendly platform for batch processing of large datasets. However, to use such tools as a sufficient replacement to current bioinformatics pipelines, we need more accessible and comprehensive APIs for processing genomic data. We also need support for interactive exploration of these processed datasets.

ADAM and Mango provide a unified environment for processing, filtering, and visualizing large genomic datasets on Apache Spark. ADAM allows you to programmatically load, process, and select raw genomic and variation data using Spark SQL, an SQL interface for aggregating and selecting data in Apache Spark. Mango supports the visualization of both raw and aggregated genomic data in a Jupyter notebook environment, allowing you to draw conclusions from large datasets at multiple resolutions.

With the combined power of ADAM and Mango, you can load, query, and explore datasets in a unified environment. You can interactively explore genomic data at a scale previously impossible using single node bioinformatics tools. In this post, we describe how to set up and run ADAM and Mango on Amazon EMR. We demonstrate how you can use these tools in an interactive notebook environment to explore the 1000 Genomes dataset, which is publicly available in Amazon S3 as a public dataset.

Configuring ADAM and Mango on Amazon EMR

First, you launch and configure an EMR cluster. Mango uses Docker containers to easily run on Amazon EMR. Upon cluster startup, EMR uses the following bootstrap action to install Docker and the required startup scripts. The scripts are available at /home/hadoop/mango-scripts.

aws emr create-cluster 
--release-label emr-5.14.0 \   
--name 'emr-5.14.0 Mango example' \   
--applications Name=Hadoop Name=Hive Name=Spark \   
--ec2-attributes KeyName=<your-ec2-key>,InstanceProfile=EMR_EC2_DefaultRole \   
--service-role EMR_DefaultRole \     
--instance-groups \     InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c5.4xlarge \     InstanceGroupType=CORE,InstanceCount=4,InstanceType=c5.4xlarge \   --region <your-aws-region> \   
--log-uri s3://<your-s3-bucket>/emr-logs/ \   
--bootstrap-actions \     
Name='Install Mango', Path="s3://aws-bigdata-blog/artifacts/mango-emr/install-bdg-mango-emr5.sh"

To start the Mango notebook, run the following:

/home/hadoop/mango-scripts/run-notebook.sh

This file sets up all of the environment variables that are needed to run Mango in Docker on Amazon EMR. In your terminal, you will see the port and Jupyter notebook token for the Mango notebook session. Navigate to this port on the public DNS URL of the master node for your EMR cluster.

Loading data from the 1000 Genomes Project

Now that you have a working environment, you can use ADAM and Mango to discover interesting variants in the child from the genome sequencing data of a trio (data from a mother, father, and child). This data is available from the 1000 Genomes Project AWS public dataset. In this analysis, you will view a trio (NA19685, NA19661, and NA19660) and search for variants that are present in the child but not present in the parents.

In particular, we want to identify genetic variants that are found in the child but not in the parents, known as de novo variants. These are interesting regions, as they can indicate sights of de novo variation that might contribute to multiple disorders.

You can find the Jupyter notebook containing these examples in Mango’s GitHub repository, or at /opt/cgl-docker-lib/mango/example-files/notebooks/aws-1000genomes.ipynb in the running Docker container for Mango.

First, import the ADAM and Mango modules and any Spark modules that you need:

# Import ADAM modules
from bdgenomics.adam.adamContext import ADAMContext
from bdgenomics.adam.rdd import AlignmentRecordRDD, CoverageRDD
from bdgenomics.adam.stringency import LENIENT, _toJava

# Import Mango modules
from bdgenomics.mango.rdd import GenomicVizRDD
from bdgenomics.mango.QC import CoverageDistribution

# Import Spark modules
from pyspark.sql import functions as sf

Next, create a Spark session. You will use this session to run SQL queries on variants.

# Create ADAM Context
ac = ADAMContext(spark)
genomicRDD = GenomicVizRDD(spark)

Variant analysis with Spark SQL

Load in a subset of variant data from chromosome 17:

genotypesPath = 's3://1000genomes/phase1/analysis_results/integrated_call_sets/ALL.chr17.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.gz'
genotypes = ac.loadGenotypes(genotypesPath)

# repartition genotypes to balance the load across memory
genotypes_df  = genotypes.toDF()

You can take a look at the schema by printing the columns in the dataframe.

# cache genotypes and show the schema
genotypes_df.columns

This genotypes dataset contains all samples from the 1000 Genomes Project. Therefore, you will next filter genotypes to only consider samples that are in the NA19685 trio, and cache the results in memory.

# trio IDs
IDs = ['NA19685', 'NA19661','NA19660']

# Filter by individuals in the trio
trio_df = genotypes_df.filter(genotypes_df["sampleId"].isin(IDs))

trio_df.cache()
trio_df.count()

Next, add a new column to your dataframe that determines the genomic location of each variant. This is defined by the chromosome (contigName) and the start and end position of the variant.

# Add ReferenceRegion column and group by referenceRegion
trios_with_referenceRegion = trio_df.withColumn('ReferenceRegion', 
                    sf.concat(sf.col('contigName'),sf.lit(':'), sf.col('start'), sf.lit('-'), sf.col('end')))

Now, you can query your dataset to find de novo variants. But first, you must register your dataframe with Spark SQL.

#  Register df with Spark SQL
trios_with_referenceRegion.createOrReplaceTempView("trios")

Now that your dataframe is registered, you can run SQL queries on it. For the first query, select the names of variants belonging to sample NA19685 that have at least one alternative (ALT) allele.

# filter by alleles. This is a list of variant names that have an alternate allele for the child
alternate_variant_sites = spark.sql("SELECT variant.names[0] AS snp FROM trios \
                                    WHERE array_contains(alleles, 'ALT') AND sampleId == 'NA19685'") 

collected_sites = map(lambda x: x.snp, alternate_variant_sites.collect())

For your next query, filter sites in which the parents have both reference alleles. Then filter these variants by the set produced previously from the child.

# get parent records and filter by only REF locations for variant names that were found in the child with an ALT
filtered1 = spark.sql("SELECT * FROM trios WHERE sampleId == 'NA19661' or sampleId == 'NA19660' \
            AND !array_contains(alleles, 'ALT')")
filtered2 = filtered1.filter(filtered1["variant.names"][0].isin(collected_sites))
snp_counts = filtered2.groupBy("variant.names").count().collect()

# collect snp names as a list
snp_names = map(lambda x: x.names, snp_counts)
denovo_snps = [item for sublist in snp_names for item in sublist]
denovo_snps[:10]

Now that you have found some interesting variants, you can unpersist your genotypes from memory.

trio_df.unpersist()

Working with alignment data

You have found a lot of potential de novo variant sites. Next, you can visually verify some of these sites to see if the raw alignments match up with these de novo hits.

First, load in the alignment data for the NA19685 trio:

# load in NA19685 exome from s3a

childReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19685.mapped.illumina.mosaik.MXL.exome.20110411.bam'
parent1ReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19660.mapped.illumina.mosaik.MXL.exome.20110411.bam'
parent2ReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19661.mapped.illumina.mosaik.MXL.exome.20110411.bam'

childReads = ac.loadAlignments(childReadsPath, stringency=LENIENT)
parent1Reads = ac.loadAlignments(parent1ReadsPath, stringency=LENIENT)
parent2Reads = ac.loadAlignments(parent2ReadsPath, stringency=LENIENT)

Note that this example uses s3a:// instead of s3:// style URLs. The reason for this is that the ADAM formats use Java NIO to access BAM files. To do this, we are using a JSR 203 implementation for the Hadoop Distributed File System to access these files. This itself requires the s3a:// protocol. You can view that implementation in this GitHub repository.

You now have data alignment data for three individuals in your trio. However, the data has not yet been loaded into memory. To cache these datasets for fast subsequent access to the data, run the cache() function:

# cache child RDD and count records
# takes about 2 minutes, on 4 c3.4xlarge worker nodes 
childReads.cache()

# Count reads in the child
childReads.toDF().count()
# Output should be 95634679

Quality control of alignment data

One popular analysis to visually re-affirm the quality of genomic alignment data is by viewing coverage distribution. Coverage distribution gives you an idea of the read coverage that you have across a sample.

Next, generate a sample coverage distribution plot for the child alignment data on chromosome 17:

# Calculate read coverage
# Takes 2-3 minutes
childCoverage = childReads.transform(lambda x: x.filter(x.contigName == "17")).toCoverage()

childCoverage.cache()
childCoverage.toDF().count()

# Output should be 51252612

Now that coverage data is calculated and cached, compute the coverage distribution of chromosome 17 and plot the coverage distribution:

# Calculate coverage distribution

# You can check the progress in the SparkUI by navigating to 
# <PUBLIC_MASTER_DNS>:8088 and clicking on the currently running Spark application.
cd = CoverageDistribution(sc, childCoverage)
x = cd.plot(normalize=True, cumulative=False, xScaleLog=True, labels="NA19685")

 

This looks pretty standard because the data you are viewing is exome data. Therefore, you can see a high number of sights with low coverage and a smaller number of genomic positions with more than 100 reads. Now that you are done with coverage, you can unpersist these datasets to clear space in memory for the next analysis.

childCoverage.unpersist()

Viewing sites with missense variants in the proband

After verifying alignment data and filtering variants, you have four genes with potential missense mutations in the proband, including YBX2, ZNF286B, KSR1, and GNA13. You can visually verify these sites by filtering and viewing the raw reads of the child and parents.

First, view the child reads. If you zoom in to the location of the GNA13 variant (63052580-63052581), you can see a heterozygous T to A call:

# missense variant at GNA13: 63052580-63052581 (SNP rs201316886)
# Takes about 2 minutes to collect data from workers
contig = "17"
start = 63052180
end = 63052981

genomicRDD.ViewAlignments(childReads, contig, start, end)

It looks like there indeed is a variant at this position, possibly a heterozygous SNP with alternate allele A. Look at the parent data to verify that this variant does not appear in the parents:

# view missense variant at GNA13: 63052580-63052581 in parent 1
contig = "17"
start = 63052180
end = 63052981

genomicRDD.ViewAlignments(parent1Reads, contig, start, end)

This confirms the filter that this variant is indeed present only in the proband, but not the parents.

Summary

To summarize, this post demonstrated how to set up and run ADAM and Mango in Amazon EMR. We demonstrated how to use these tools in an interactive notebook environment to explore the 1000 Genomes dataset, a publicly available dataset on Amazon S3. We used these tools inspect 1000 Genomes data quality, query for interesting variants in the genome, and validate results through the visualization of raw data.

For more information about Mango, see the Mango User Guide. If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Genomic Analysis with Hail on Amazon EMR and Amazon Athena, Interactive Analysis of Genomic Datasets Using Amazon Athena, and, on the AWS Compute Blog, Building High-Throughput Genomics Batch Workflows on AWS: Introduction (Part 1 of 4).

 


About the Author

Alyssa Marrow is a graduate student in the RISELab and Yosef Lab at the University of California Berkeley. Her research interests lie at the intersection of systems and computational biology. This involves building scalable systems and easily parallelized algorithms to process and compute on all that ‘omics data.

 

 

 

Encrypt data in transit using a TLS custom certificate provider with Amazon EMR

Post Syndicated from Remek Hetman original https://aws.amazon.com/blogs/big-data/encrypt-data-in-transit-using-a-tls-custom-certificate-provider-with-amazon-emr/

Many enterprises have highly regulated policies around cloud security. Those policies might be even more restrictive for Amazon EMR where sensitive data is processed.

EMR provides security configurations that allow you to set up encryption for data at rest stored on Amazon S3 and local Amazon EBS volumes. It also allows the setup of Transport Layer Security (TLS) certificates for the encryption of data in transit.

When in-transit encryption is enabled, EMR supports the following components by default:

  • Hadoop MapReduce Encrypted Shuffle.
  • Secure Hadoop RPC set to Privacy and using SASL, which is activated in EMR when data at rest encryption is enabled.
  • Secure Hadoop RPC set to Privacy and using SASL. This is activated in EMR when data at rest encryption is enabled in the security configuration.
  • Presto internal communication between nodes using SSL/TLS. This applies only to EMR version 5.6.0 and later.
  • Tez Shuffle Handler using TLS.
  • Internal RPC communication between Apache Spark
  • HTTP protocol communication with user interfaces, such as Spark History Server and HTTPS-enabled file servers encrypted using Spark’s Secure Sockets Layer (SSL) configuration.

For more information about EMR in-transit encryption, see Encryption Options.

A security configuration provides the following options to specify TLS certificates:

  1. As a path to a .zip file in an S3 bucket that contains all certificates
  2. Through a custom certificate provider as a Java class

In many cases, company security policies prohibit storing any type of sensitive information in an S3 bucket, including certificate private keys. For that reason, the only remaining option to secure data in transit on EMR is to configure the custom certificate provider.

In this post, I guide you through the configuration process, showing you how to secure data in transit on EMR using the TLS custom certificate provider.

Required knowledge

To walk through this solution, you should know or be familiar with:

Solution overview

The custom certificate provider is a Java class that implements the TLSArtifactsProvider interface and compiles it into a JAR file. The TLSArtifactsProvider interface is available in the AWS SDK for Java version 1.11.+.

The TLSArtifactsProvider interface provides the TLSArtifacs method, which as argument expects certificates.

To make this solution work, you need a secure place to store certificates that can also be accessed by Java code.

In this example, use Parameter Store, which supports encryption using the AWS Key Management Service (AWS KMS) key.

Another way would be to store encrypted certificates in Amazon DynamoDB.

The following diagram and steps show the configuration process from the Java standpoint:

 

  1. During bootstrap, EMR downloads the Java JAR file from the S3 bucket, and runs it on each node.
  2. Java invokes the Lambda function, requesting the value of a specific parameter key.
  3. Lambda calls Parameter Store to get the value. The value returned by Systems Manager remains encrypted.
  4. Lambda returns the encrypted value back to Java.
  5. Java decrypts the value using an AWS KMS API call.
  6. The decrypted value is converted to the correct format of the certificate.
  7. The process repeats for all certificates.
  8. Certificates are returned back to EMR through the TLSArtifactsProvider interface.

In this example, for the master node, I used a certificate signed by a certificate authority (CA) and wildcard self-signed certificate for slave nodes. Depending on requirements, you can use CA certificates for all nodes or only a self-signed wildcard certificate.

Implementing in-transit encryption

This section walks you through all aspects of implementation and configuration for in-transit encryption using a custom certificate provider.

Create a self-signed wildcard certificate

To create a self-signed wildcard certificate, you can use OpenSSL:

openssl req -x509 -newkey rsa:4096 -keyout inter-nodes.key -out inter-nodes.crt -days 365 -subj "/C=US/ST=MA/L=Boston/O=EMR/OU=EMR/CN=*.ec2.internal" -nodes


This command creates a self-signed, 4096-bit certificate.

Explanation of parameter:

-keyout – The output file in which to store the private key.

-out – The output file in which to store the certificate.

-days – The number of days for which to certify the certificate.

-subj – The subject name for a new request.  The CN must match the domain name specified in DHCP that is assigned to the virtual private cloud (VPC). The default is ec2.internal. The “*” prefix is the wildcard certificate.

-nodes – Allows you to create a private key without a password, which is without encryption.

For more information, see req command.

Upload certificates

To upload certificates to the Parameter Store, run the following AWS Command Line Interface (AWS CLI) command for each certificate file, including private keys:

aws ssm put-parameter --name <parameter key name> --key-id < KMS key ID> --type SecureString --value file://<path to certificate file>

The following are examples of uploaded CA and self-signed certificate files:

aws ssm put-parameter --name /emr/certificate --value fileb://emr-ca-certificate.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/private-key --value fileb://emr-ca-private-key.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-certificate --value fileb://inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-private-key --value fileb://inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

The following are examples of uploading certificates when the wildcard certificate is used on all nodes:

aws ssm put-parameter --name /emr/certificate --value fileb:// inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/private-key --value fileb:// inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-certificate --value fileb://inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-private-key --value fileb://inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

Using the Lambda function

The Lambda function in this solution is a broker that allows Java JAR to retrieve certificates from Parameter Store.

Create a new role for the Lambda function, using the following command:

aws iam create-role --role-name lambda-ssm-parameter-store-role --assume-role-policy-document "{\"Version\": \"2012-10-17\", \"Statement\": [{\"Effect\": \"Allow\",\"Principal\": {\"Service\": \"lambda.amazonaws.com\"},\"Action\": \"sts:AssumeRole\"}]}"

Grant permissions to Parameter Store, using the following command:

aws iam put-role-policy --role-name lambda-ssm-parameter-store-role --policy-name ssm --policy-document "{\"Version\": \"2012-10-17\",\"Statement\": [{\"Effect\": \"Allow\",\"Action\": \"ssm:GetParameter\",\"Resource\": \"*\"}]}"

Create a new Lambda function:

To create a new Lambda function, open the AWS Management Console, choose Lambda, and choose Create function. On the Create function page, complete the form as shown in the following screenshot:

Choose runtime as Python 2.7, and specify the role that you created for the Lambda function.

When the new function is created, add the following code in the Function code section:

import json
import boto3

ssm = boto3.client('ssm')

def lambda_handler(event, context):

    ssmResp = ssm.get_parameter(
        Name=event['ParameterName'],
        WithDecryption=False
    )

    paramValue = ssmResp['Parameter']['Value']
    return(paramValue)

Change the timeout to 1 minute, and then save the function.

Tag resources

For the Java class to call the Lambda function, you must provide information about the function name and names of parameter keys under which the certificates are stored.

To reuse the same Java JAR with different certificates and configurations, provide those values to Java through EMR tags, rather than embedding them in Java code.

In this example, I used the following tags:

  • ssm:ssl:certificate – The name of the Systems Manager parameter key storing the CA-signed certificate.
  • ssm:ssl:private-key – The name of the Systems Manager parameter key storing the CA-signed certificate private key.
  • ssm:ssl:inter-node-certificate – The name of the Systems Manager parameter key storing the self-signed certificate.
  • ssm:ssl:inter-node-private-key – The name of the Systems Manager parameter key storing the self-signed certificate private key.
  • tls:lambda-fn-name – The name of the Lambda function. In this example, this is get-ssm-parameter-lambda.

Use the Java class flow

This section describes the flow in the Java code only. You can download the full code alone with the compiled JAR file from GitHub. For more information, see the Java folder in the emr-tls-security GitHub repo.

Important

Because of EMR dependencies, all other methods must be implemented based on the AWS SDK for Java version 1.10.75. These dependencies do not include the TLSArtifactsProvider interface that should be imported from the AWS SDK for Java version 1.11.170 (aws-java-sdk-emr-1.11.170.jar).

All necessary dependencies are included in the example project.

The following is an example of the basic structure of the Java class, with an implementation of the TLSArtifactsProvider interface:

public class emrtls extends TLSArtifactsProvider {

            public emrtls() {

            }

            @Override

            public TLSArtifacts getTlsArtifacts() {

 

                        List<Certificate> crt = new ArrayList<Certificate>();

                        List<Certificate> crtCA = new ArrayList<Certificate>();

                        PrivateKey privkey;

           

                        //here code to retrieve certificates from secure location

                        // and assign them to local variables

                       

                        TLSArtifacts tls = new TLSArtifacts(privkey,crt,crtCA);

                        return tls;

            }

}

The code to add is related to getting certificates from a secure location.

In the provided code example from GitHub, the following logic was implemented. I’ve listed the methods used in each step.

  1. Read the names of the Systems Manager parameter key. Also read the name of the AWS Lambda function from the EMR tags (see “Tagging” section) – readTags()
  2. Invoke the Lambda function to download certificates from Parameter Store – callLambda():
    • Decrypt the values returned by Lambda using the KMS API call – decryptValue().
    • Assign decrypted values to local variables.
  3. If needed, save CA-signed certificates to a local disk. For more information, see Other Communication – Hue section later in this post– createDirectoryForCerts() and writeCert().
  4. Convert certificates to an X509 format – getX509FromString().
  5. Convert the private key to the correct format – getPrivateKey().
  6. Call the getTlsArtifacts() method to provide certificates in arguments.

You can use a wildcard certificate for all nodes without changing code. Reference the same Systems Manager parameter key in ssm:ssl:certificate/ssm:ssl:private-key, and in the ssm:ssl:inter-node-certificate/ ssm:ssl:inter-node-private-key in EMR tags.

If the implemented methods in the example code meet requirements, you can use the provided Java JAR file in the EMR security configuration, as described in the next section. Otherwise, any changes in code require a compile of Java code into a JAR file.

Create the EMR security configuration

Before creating the security configuration, upload the compiled Java JAR file to an S3 bucket.

To create the security configuration:

  1. Log in to the Amazon EMR console.
  2. Choose Security configurations, Create.
  3. Type a name for your new security configuration; for example, emr-tls-ssm
  4. Select In-transit encryption.
  5. Under TLS certificate provider, for Certificate provider type, choose Custom.
  6. For S3 object, type the path to the uploaded Java JAR file.
  7. For Certificate provider class, type the name of the Java class. In the example code, the name is emrtls.
  8. Configure the At-rest encryption, as required.
  9. Choose Create.

Modify the instance profile role

Applications running on EMR assumes and uses the EMR role for EC2 to interact with other AWS services.

To grant Java permissions to invoke Lambda, and to decrypt certificates, add the following policy to your EC2 instance profile role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "TLS",
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:us-east-1:xxxxxxxxxx:function:get-ssm-parameter-lambda",
                "arn:aws:kms:us-east-1:xxxxxxxxxx:key/<your KMS key used to encrypt certificates in AWS Systems Manager"
            ]
        }
    ]
}

Note

Before creating the policy, update resources with the correct Amazon Resource Name (ARN) for the Lambda function and KMS key.

Other available configurations

In addition to the applications that natively support in-transit encryption in EMR, the custom TLS certificate provider can also be used to secure communication (HTTPS) for other applications like Presto, Hue, and Zeppelin.

The sections that follow describe the configuration of each application that works with the certificates set up by the TLS security configuration.

Presto

For Presto, most configuration is done by EMR when TLS certificates are applied.

Depending on the type of certificates used, there are two additional configurations that must be added:

  1. When the CA-signed certificate with a single common name (not wildcard) is set on the master node, additional configurations are required:
    • The certificate common name must be registered in DNS. The EMR cluster must be able to resolve that name to the IP address of the master node. One solution would be to run a script on the bootstrap action to register the IP address of the EMR master node and name in DNS.
    • The Discovery URI in the Presto configuration file must match the certificate common name. The value of uri must be changed on all nodes. This can be accomplished by two provided scripts.

Each script must be uploaded to an S3 bucket to which the EMR cluster has permission.

The first script, emr-presto-conf.sh, must be run on the EMR bootstrap action, as follows:

          {
            "Name": "PrestoConfiguration",
            "ScriptBootstrapAction": {
              "Path": "s3://xxxxx/emr-presto-conf.sh",
              "Args": [ "emr.mycluster.com" ]
            }
          }
Where the value of Args is the certificate common name

The PrestoConfiguration bootstrap action downloads and runs a script (presto-update-dicovery-uri.sh) as a background process. This script waits for the Presto server to be installed and then modify the configuration files.

Before uploading the emr-presto-conf.sh script to the Amazon S3 bucket, change the path to “presto-update-dicovery-uri.sh”

Both scripts can be downloaded from GitHub:

https://github.com/aws-samples/emr-tls-security/tree/master/scripts

2. When a self-signed wildcard certificate is used on the master node, the certificate must be added to the Java default truststore. This can be accomplished by running the following script:

#!/bin/bash

truststorePass=$(grep -Po "(?<=^internal-communication.https.keystore.key = ).*" /etc/presto/conf/config.properties)

sudo keytool -importkeystore -srckeystore /usr/share/aws/emr/security/conf/truststore.jks -destkeystore /usr/lib/jvm/java/jre/lib/security/cacerts -deststorepass changeit -srcstorepass $truststorePass

The previous script can be run on the EMR step. The following is an AWS CloudFormation snippet:

"EMRPrestoTrustedStorStep": {
      "Type": "AWS::EMR::Step",
      "Properties": {
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
          "Jar": "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar",
          "Args": [
            "s3://xxxxxx/presto-update-trusted-store.sh"
          ]
        },
        "JobFlowId": {
          "Ref": "EMRCluster"
        },
        "Name": "EMR-Setup-Presto-Trusted-Store"
      }
    }

Hue

To configure access to the Hue UI over HTTPS, the path to the certificate and private key files must be specified in the hue.ini file. Because our Java class has methods, createDirectoryForCerts() and writeCert(), which support exporting TLS certificates to the local disk, the remaining configuration should point to those files in the hue.ini file.

This configuration can be applied by adding the following configuration to the EMR cluster:

         [{
            "Classification": "hue-ini",
            "Configurations": [
              {
                "Classification": "desktop",
                "ConfigurationProperties": {
                  	"ssl_certificate": "/etc/certs/public.crt",
                 	"ssl_private_key": "/etc/certs/private.key"
                }
	}]
          }]

The port for the HTTPS connection to Hue remains the same. The default is: 8888

Zeppelin

Unlike Hue, Zeppelin configuration files reference certificates from the Java keystore.

Because EMR already added all certificates to the Java keystore, the only modification needed is to reference the same Java keystore files and password in zeppelin-site.xml.

The path to the Java keystore and the password can be read directly from the Presto configuration file.

This configuration can be done by running the following script on EMR:

#!/bin/bash
sudo cp /etc/zeppelin/conf/zeppelin-site.xml.template /etc/zeppelin/conf/zeppelin-site.xml
truststorePath=$(grep -Po "(?<=^internal-communication.https.keystore.path = ).*" /etc/presto/conf/config.properties)
truststorePass=$(grep -Po "(?<=^internal-communication.https.keystore.key = ).*" /etc/presto/conf/config.properties)
keystorePath=$(grep -Po "(?<=^http-server.https.keystore.path = ).*" /etc/presto/conf/config.properties)
keystorePass=$(grep -Po "(?<=^http-server.https.keystore.key = ).*" /etc/presto/conf/config.properties)
keymanager=$(grep -Po "(?<=^http-server.https.keymanager.password = ).*" /etc/presto/conf/config.properties)
sudo sed -i '/<name>zeppelin.server.port<\/name>/!b;n;c<value>8890<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.server.ssl.port<\/name>/!b;n;c<value>7773<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl<\/name>/!b;n;c<value>true<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl.keystore.path<\/name>/!b;n;c<value>'"$keystorePath"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl.keystore.password<\/name>/!b;n;c<value>'"$keystorePass"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl.truststore.path<\/name>/!b;n;c<value>'"$truststorePath"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
CONTENT1="<property>\n  <name>zeppelin.ssl.truststore.password</name>\n  <value>${truststorePass}</value>\n</property>"
sudo sed -i '/<\/configuration>/i'"$CONTENT1" /etc/zeppelin/conf/zeppelin-site.xml
CONTENT2="<property>\n  <name>zeppelin.ssl.key.manager.password</name>\n  <value>${keymanager}</value>\n</property>"
sudo sed -i '/<\/configuration>/i'"$CONTENT2" /etc/zeppelin/conf/zeppelin-site.xml
sudo stop zeppelin
sudo start zeppelin

The previous script sets the HTTPS port in Zeppelin to 7773, which can be changed as needed.

GitHub example

On GitHub, you can download an example of the CloudFormation templates that can be used to launch an EMR cluster with all the security features discussed in this post.

The following is an example of the AWS CLI command required to launch an EMR cluster with security features.

Before running this command, you must change the <key pair name>, <subnet id> and <security group id> values to the correct one. 

aws emr create-cluster --configurations https://s3.amazonaws.com/tls-blog-cf/emr-configuration.json \

--applications Name=Hadoop Name=Hive Name=Presto Name=Spark Name=Hue Name=Zeppelin \

--instance-groups 'InstanceGroupType=MASTER,InstanceCount=1,InstanceType='m4.xlarge'' \

'InstanceGroupType=CORE,InstanceCount='2',InstanceType='m4.xlarge'' \

--release-label emr-5.14.0 \

--service-role EMR_DefaultRole \

--ec2-attributes KeyName=<key pair name>,SubnetId=<subnet id>,\

EmrManagedMasterSecurityGroup=<security group id>,\

AdditionalMasterSecurityGroups=<security group id>,\

EmrManagedSlaveSecurityGroup=<security group id>,\

InstanceProfile=EMR_EC2_DefaultRole \

--name EMR-TLS-Demo \

--visible-to-all-users \

--security-configuration emr-tls-ssm \

--steps Type=CUSTOM_JAR,Name=ZeppelinSSL,ActionOnFailure=CONTINUE,\

Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,\

Args=s3://tls-blog-cf/zeppelin-ssl.sh \

Type=CUSTOM_JAR,Name=PrestoSSL,ActionOnFailure=CONTINUE,\

Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,\

Args=s3://tls-blog-cf/presto-update-trusted-store.sh \

--tags ssm:ssl:private-key="/emr/private-key" \

ssm:ssl:certificate="/emr/certificate" \

ssm:ssl:inter-node-private-key="/emr/inter-nodes-private-key" \

ssm:ssl:inter-node-certificate="/emr/inter-nodes-certificate" \

tls:lambda-fn-name="get-ssm-parameter-lambda" \

--region us-east-1

Note

Before running any of the provided examples, the certificates must be uploaded to the Parameter Store.

For test purposes, you can download the shell script from:

https://github.com/aws-samples/emr-tls-security/blob/master/scripts/upload-certificates.sh

This script uploads self-signed certificates issued for *.ec2.internal to the Parameter Store. Make sure that the DNS associated with your VPC, where you launch the EMR cluster, matches the certificate.

Use the following command:

sh upload-certificates.sh <KMS key ID> <AWS Region>

Where:

KMS key ID – is the identifier of the KMS key that is used to encrypt certificates.

AWS Region – is the Region where certificates are uploaded.

Validation

There are a few methods to validate whether the TLS certificate was installed correctly.

Hue

To test the HTTPS connection to Hue from the browser, connect to https://<EMR URL or IP>:8888

Zeppelin
To test the HTTPS connection to Zeppelin from the browser, connect to https://<EMR URL or IP>:7773

Port 7773 is the one used in this example. If you changed it, make sure you’re connecting to the port under which the Zeppelin is running on your EMR cluster.

There’s something to remember in both of these scenarios. If the certificate doesn’t match the provided URL (or when you created a self-signed certificate), you get a warning message in the browser that the certificate is invalid.

Presto

You can test Presto using one of the following methods:

  1. HTTPS connection to the Presto UI
  2. Using the Presto CLI

To test the connection to Presto UI from the browser, connect to https://<EMR URL or IP>:8446

To test the certificate using the Presto CLI, follow these steps:

  1. Connect (SSH) to the EMR master node
  2. Create a test Hive table by running the following command:

hive -e “create table test1 (id int, name string); insert into test1 values (1, ‘John’), (2,’Robert’), (3,’David’);”

  1. Run the following Presto command:

presto-cli –server https://master-node-url:8446 –schema default –catalog hive –execute ‘select count(*) from test;’

Change ‘master-node-url’ to the correct value.

If the command succeeds, you should see 3 as the command output.

Conclusion

Amazon EMR security configurations provide options to encrypt data at rest, but also data in transit.

This post demonstrated how to create and apply the TLS custom certificate provider to an EMR cluster to secure data in-transit without storing certificate’s private key in an S3 bucket. For a company with strict policies, this might be the only solution to encrypt data in-transit.

 


Additional Reading

If you found this post useful, be sure to check out Build a Multi-Tenant Amazon EMR Cluster with Kerberos, Microsoft Active Directory Integration and IAM Roles for EMRFS and Encrypt Data At-Rest and In-Flight on Amazon EMR with Security Configurations.

 


About the Author

Remek Hetman is a Senior Cloud Infrastructure Architect with Amazon Web Services Professional Services. He works with AWS enterprise customers, providing technical guidance and assistance for Infrastructure, DevOps, and Big Data to help them make the best use of AWS services. Outside of work, he enjoys spending his time actively, and pursuing his passion – astronomy.

 

Best Practices for resizing and automatic scaling in Amazon EMR

Post Syndicated from Brandon Scheller original https://aws.amazon.com/blogs/big-data/best-practices-for-resizing-and-automatic-scaling-in-amazon-emr/

You can increase your savings by taking advantage of the dynamic scaling feature set available in Amazon EMR. The ability to scale the number of nodes in your cluster up and down on the fly is among the major features that make Amazon EMR elastic. You can take advantage of scaling in EMR by resizing your cluster down when you have little or no workload. You can also scale your cluster up to add processing power when the job gets too slow. This allows you to spend just enough to cover the cost of your job and little more.

Knowing the complex logic behind this feature can help you take advantage of it to save on cluster costs. In this post, I detail how EMR clusters resize, and I present some best practices for getting the maximum benefit and resulting cost savings for your own cluster through this feature.

EMR scaling is more complex than simply adding or removing nodes from the cluster. One common misconception is that scaling in Amazon EMR works exactly like Amazon EC2 scaling. With EC2 scaling, you can add/remove nodes almost instantly and without worry, but EMR has more complexity to it, especially when scaling a cluster down. This is because important data or jobs could be running on your nodes.

To prevent data loss, Amazon EMR scaling ensures that your node has no running Apache Hadoop tasks or unique data that could be lost before removing your node. It is worth considering this decommissioning delay when resizing your EMR cluster. By understanding and accounting for how this process works, you can avoid issues that have plagued others, such as slow cluster resizes and inefficient automatic scaling policies.

When an EMR scale cluster is scaled down, two different decommission processes are triggered on the nodes that will be terminated. The first process is the decommissioning of Hadoop YARN, which is the Hadoop resource manager. Hadoop tasks that are submitted to Amazon EMR generally run through YARN, so EMR must ensure that any running YARN tasks are complete before removing the node. If for some reason the YARN task is stuck, there is a configurable timeout to ensure that the decommissioning still finishes. When this timeout happens, the YARN task is terminated and is instead rescheduled to a different node so that the task can finish.

The second decommission process is that of the Hadoop Distributed File System or HDFS. HDFS stores data in blocks that are spread through the EMR cluster on any nodes that are running HDFS. When an HDFS node is decommissioning, it must replicate those data blocks to other HDFS nodes so that they are not lost when the node is terminated.

So how can you use this knowledge in Amazon EMR?

Tips for resizing clusters

The following are some issues to consider when resizing your clusters.

EMR clusters can use two types of nodes for Hadoop tasks: core nodes and task nodes. Core nodes host persistent data by running the HDFS DataNode process and run Hadoop tasks through YARN’s resource manager. Task nodes only run Hadoop tasks through YARN and DO NOT store data in HDFS.

When scaling down task nodes on a running cluster, expect a short delay for any running Hadoop task on the cluster to decommission. This allows you to get the best usage of your task node by not losing task progress through interruption. However, if your job allows for this interruption, you can adjust the one hour default timeout on the resize by adjusting the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property (in EMR 5.14) in yarn-site.xml. When this process times out, your task node is shut down regardless of any running tasks. This process is usually relatively quick, which makes it fast to scale down task nodes.

When you’re scaling down core nodes, Amazon EMR must also wait for HDFS to decommission to protect your data. HDFS can take a relatively long time to decommission. This is because HDFS block replication is throttled by design through configurations located in hdfs-site.xml. This in turn means that HDFS decommissioning is throttled. This protects your cluster from a spiked workload if a node goes down, but it slows down decommissioning. When scaling down a large number of core nodes, consider adjusting these configurations beforehand so that you can scale down more quickly.

For example, consider this exercise with HDFS and resizing speed.

The HDFS configurations, located in hdfs-site.xml, have some of the most significant impact on throttling block replication:

  • datanode.balance.bandwidthPerSec: Bandwidth for each node’s replication
  • namenode.replication.max-streams: Max streams running for block replication
  • namenode.replication.max-streams-hard-limit: Hard limit on max streams
  • datanode.balance.max.concurrent.moves: Number of threads used by the block balancer for pending moves
  • namenode.replication.work.multiplier.per.iteration: Used to determine the number of blocks to begin transfers immediately during each replication interval

(Beware when modifying: Changing these configurations improperly, especially on a cluster with high load, can seriously degrade cluster performance.)

Cluster resizing speed exercise

Modifying these configurations can speed up the decommissioning time significantly. Try the following exercise to see this difference for yourself.

  1. Create an EMR cluster with the following hardware configuration:
  • Master: 1 node – m3.xlarge
  • Core: 6 nodes – m3.xlarge
  1. Connect to the master node of your cluster using SSH (Secure Shell).

For more information, see Connect to the Master Node Using SSH in the Amazon EMR documentation.

  1. Load data into HDFS by using the following jobs:
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar teragen 1000000000 /user/hadoop/data1/
$ s3-dist-cp --src s3://aws-bigdata-blog/artifacts/ClusterResize/smallfiles25k/ --dest  hdfs:///user/hadoop/data2/
  1. Edit your hdfs-site.xml configs:
$ sudo vim /etc/hadoop/conf/hdfs-site.xml

Then paste in the following configuration setup in the hdfs-site properties.

Disclaimer: These values are relatively high for example purposes and should not necessarily be used in production. Be sure to test config values for production clusters under load before modifying them.

<property>
  <name>dfs.datanode.balance.bandwidthPerSec</name>
  <value>100m</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams</name>
  <value>100</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams-hard-limit</name>
  <value>200</value>
</property>

<property>
  <name>dfs.datanode.balance.max.concurrent.moves</name>
  <value>500</value>
</property>

<property>
  <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
  <value>30</value>
</property>
  1. Resize your EMR cluster from six to five core nodes, and look in the EMR events tab to see how long the resize took.
  2. Repeat the previous steps without modifying the configurations, and check the difference in resize time.

While performing this exercise, I saw resizing time lower from 45+ minutes (without config changes) down to about 6 minutes (with modified hdfs-site configs). This exercise demonstrates how much HDFS is throttled under default configurations. Although removing these throttles is dangerous and performance using them should be tested first, they can significantly speed up decommissioning time and therefore resizing.

The following are some additional tips for resizing clusters:

  • Shrink resizing timeouts. You can configure EMR nodes in two ways: instance groups or instance fleets. For more information, see Create a Cluster with Instance Fleets or Uniform Instance Groups. EMR has implemented shrink resize timeouts when nodes are configured in instance fleets. This timeout prevents an instance fleet from attempting to resize forever if something goes wrong during the resize. It currently defaults to one day, so keep it in mind when you are resizing an instance fleet down.

If an instance fleet shrink request takes longer than one day, it finishes and pauses at however many instances are currently running. On the other hand, instance groups have no default shrink resize timeout. However, both types have the one-hour YARN timeout described earlier in the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property (in EMR 5.14) in yarn-site.xml.

  • Watch out for high frequency HDFS writes when resizing core nodes. If HDFS is receiving a lot of writes, it will modify a large number of blocks that require replication. This replication can interfere with the block replication from any decommissioning core nodes and significantly slow down the resizing process.

Setting up policies for automatic scaling

Although manual scaling is useful, a majority of the time cluster resizes are executed dynamically through Amazon EMR automatic scaling. Generally, the details of the automatic scaling policy must be tailored to the specific Hadoop job, so I won’t go into detail there. Instead, I provide some general guidelines for setting up your cluster’s auto scaling policies.

The following are some considerations when setting up your auto scaling policy.

Metrics for scaling

Choose the right metrics for your node types to trigger scaling. For example, scaling core nodes solely on the YARNMemoryAvailablePercentage metric doesn’t make sense. This is because you would be increasing/decreasing HDFS total size when really you only need more processing power. Scaling task nodes on HDFSUtilization also doesn’t make sense because you would want more HDFS storage space that does not come with task nodes. A common automatic scaling metric for core nodes is HDFSUtilization. Common auto scaling metrics for task nodes include ContainerPending-Out and YarnMemoryAvailablePercentage.

Note: Keep in mind that Amazon EMR currently requires HDFS, so you must have at least one core node in your cluster. Core nodes can also provide CPU and memory resources. But if you don’t need to scale HDFS, and you just need more CPU or memory resources for your job, we recommend that you use task nodes for that purpose.

Scaling core nodes

As described earlier, one of the two EMR node types in your cluster is the core node. Core nodes run HDFS, so they have a longer decommissioning delay. This means that they are slow to scale and should not be aggressively scaled. Only adding and removing a few core nodes at a time will help you avoid scaling issues. Unless you need the HDFS storage, scaling task nodes is usually a better option. If you find that you have to scale large numbers of core nodes, consider changing hdfs-site.xml configurations to allow faster decommission time and faster scale down.

Scaling task nodes

Task nodes don’t run HDFS, which makes them perfect for aggressively scaling with a dynamic job. When your Hadoop task has spikes of work between periods of downtime, this is the node type that you want to use.

You can set up task nodes with a very aggressive auto scaling policy, and they can be scaled up or down easily. If you don’t need HDFS space, you can use task nodes in your cluster.

Using Spot Instances

Automatic scaling is a perfect time to use EMR Spot Instance types. The tendency of Spot Instances to disappear and reappear makes them perfect for task nodes. Because these task nodes are already used to scale in and out aggressively, Spot Instances can have very little disadvantage here. However, for time-sensitive Hadoop tasks, On-Demand Instances might be prioritized for the guaranteed availability.

Scale-in vs. scale-out policies for core nodes

Don’t fall into the trap of making your scale-in policy the exact opposite of your scale-out policy, especially for core nodes. Many times, scaling in results in additional delays for decommissioning. Take this into account and allow your scale-in policy to be more forgiving than your scale-out policy. This means longer cooldowns and higher metric requirements to trigger resizes.

You can think of scale-out policies as easily triggered with a low cooldown and small node increments. Scale-in policies should be hard to trigger, with larger cooldowns and node increments.

Minimum nodes for core node auto scaling

One last thing to consider when scaling core nodes is the yarn.app.mapreduce.am.labels property located in yarn-site.xml. In Amazon EMR, yarn.app.mapreduce.am.labels is set to “CORE” by default, which means that the application master always runs on core nodes and not task nodes. This is to help prevent application failure in a scenario where Spot Instances are used for the task nodes.

This means that when setting a minimum number of core nodes, you should choose a number that is greater than or at least equal to the number of simultaneous application masters that you plan to have running on your cluster. If you want the application master to also run on task nodes, you should modify this property to include “TASK.” However, as a best practice, don’t set the yarn.app.mapreduce.am.labels property to TASK if Spot Instances are used for task nodes.

Aggregating data using S3DistCp

Before wrapping up this post, I have one last piece of information to share about cluster resizing. When resizing core nodes, you might notice that HDFS decommissioning takes a very long time. Often this is the result of storing many small files in your cluster’s HDFS. Having many small files within HDFS (files smaller than the HDFS block size of 128 MB) adds lots of metadata overhead and can cause slowdowns in both decommissioning and Hadoop tasks.

Keeping your small files to a minimum by aggregating your data can help your cluster and jobs run more smoothly. For information about how to aggregate files, see the post Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3.

Summary

In this post, you read about how Amazon EMR resizing logic works to protect your data and Hadoop tasks. I also provided some additional considerations for EMR resizing and automatic scaling. Keeping these practices in mind can help you maximize cluster savings by allowing you to use only the required cluster resources.

If you have questions or suggestions, please leave a comment below.

 


Additional Reading

If you found this post useful, be sure to check out Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3 and Dynamically Scale Applications on Amazon EMR with Auto Scaling.

 


About the Author

Brandon Scheller is a software development engineer for Amazon EMR. His passion lies in developing and advancing the applications of the Hadoop ecosystem and working with the open source community. He enjoys mountaineering in the Cascades with his free time.

Build a blockchain analytic solution with AWS Lambda, Amazon Kinesis, and Amazon Athena

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/big-data/build-a-blockchain-analytic-solution-with-aws-lambda-amazon-kinesis-and-amazon-athena/

There are many potential benefits to using a blockchain. A blockchain is a distributed data structure that can record transactions in a verifiable and immutable manner. Depending upon the use case, there are opportunities for reducing costs, improving speed and efficiency, stronger regulatory compliance, and greater resilience and scalability.

Early adopters of the blockchain are finding innovative ways of using it in such areas as finance, healthcare, eGovernment, and non-profit organizations. The blockchain was even initially pioneered as the key technology behind the cryptocurrency Bitcoin.

Many of the opportunities to use blockchains arise from their design. They are typically large-scale distributed systems that often consist of many thousands of nodes. It can be challenging to gain insight into user activity, events, anomalies, and other state changes on a blockchain. But AWS analytics services provide the ability to analyze blockchain applications and provide meaningful information about these areas.

Walkthrough

In this post, we’ll show you how to:

You can readily adapt this Ethereum deployment and the blockchain analytics for use with a wide range of blockchain scenarios.

Prerequisites

This post assumes that you are familiar with AWS and Ethereum. The following documentation provides background reading to help you perform the steps described in this post:

Additionally, it’s useful to be familiar with Amazon Kinesis, AWS Lambda, Amazon QuickSight, and Amazon Athena to get the most out of this blog post. For more information, see:

For an introduction to serverless computing with AWS Lambda, see Introduction to AWS Lambda – Serverless Compute on Amazon Web Services.

Blockchain 101

Before we proceed with the solution in this post, we’ll provide a short discussion regarding blockchains and Ethereum, which is the blockchain implementation used in this solution.

In short, blockchains are a means for achieving consensus. The motivation behind blockchain was in allowing the Bitcoin network to agree upon the order of financial transactions while resisting vulnerability, malicious threats, and omission errors. Other blockchain implementations are used to agree upon the state of generic computation. This is achieved through a process called mining, whereby an arbitrary computational problem is solved to make falsifying transactions computationally challenging.

Ethereum is a major blockchain implementation. Unlike Bitcoin and other earlier blockchain systems, Ethereum is not solely a cryptocurrency platform, though it does have its own cryptocurrency called Ether. Ethereum extends the blockchain concept by building an Ethereum virtual machine (VM) that is Turing-complete on top of the blockchain. This allows for the development of smart contracts, which are programs that run on the blockchain. The appeal of smart contracts is the ability to translate natural language contracts, such as insurance contracts, into code that can run on Ethereum. This allows contractual agreements to be built without the need for a centralized authority, such as a bank or notary, potentially decreasing time to market and reducing costs.

An overview of the blockchain solution

The following is an overview of the solution provided in this post. The solution consists of:

  • An Ethereum blockchain running on Amazon Elastic Container Service (Amazon ECS) via the AWS Blockchain Template
  • An Application Load Balancer, providing access to the various Ethereum APIs.
  • A Lambda function, which deploys a smart contract to the blockchain
  • A Lambda function, which runs transactions against the smart contract
  • A Lambda function, which listens for events on the smart contract and pushes those events to Amazon Kinesis
  • An Amazon DynamoDB table used to share the blockchain state between Lambda functions
  • A blockchain analytics pipeline that uses Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, Amazon Kinesis Data Streams, and Amazon Athena.
  • An analytics dashboard built using Amazon QuickSight

The solution is presented in the following architectural diagram:

As shown, the solution is comprised of two main portions:

  • The blockchain hosted on Amazon Elastic Compute Cloud (Amazon EC2) and the Lambda functions that interact with the blockchain.
  • The analytics pipeline based around Kinesis that consumes data from the blockchain.

The AWS CloudFormation template we provide deploys the left side of that architecture diagram up to and including Kinesis Data Streams. It is the right side of the diagram that we’re going to build in this post.

Create the initial resources

  1. First, download the AWS CloudFormation template from: https://s3.amazonaws.com/blockchainblog/blockchainblogpost.template
  2. Use AWS CloudFormation to launch the template. The AWS CloudFormation stack deploys a virtual private cloud (VPC), two subnets, and a series of Lambda functions, which interact with the blockchain. This provides a foundation on which to build the analytics pipeline. You can either provide your own CIDR blocks or use the default parameters. Each subnet must have at least eight IP addresses.
  3. Deploy the AWS Blockchain Templates. The AWS Blockchain Templates make it efficient to deploy Ethereum and Hyperledger blockchain networks on AWS. In this case, we’re deploying an Ethereum network into the VPC created by the AWS CloudFormation template in step 2.
  4. Launch the following AWS CloudFormation template: https://aws-blockchain-templates-us-east-1.s3.us-east-1.amazonaws.com/ethereum/templates/latest/ethereum-network.template.yaml This template requires a number of parameters:
  • Set the Initial List of Accounts to the following predefined accounts the Lambda functions use:
0x34db0A1D7FE9D482C389b191e703Bf0182E0baE3,0xB3Bbce5d76aF28EcE4318c28479565F802f96808,0x877108a8825222cf669Ca9bFA3397D6973fE1640,0xb272056E07C94C7E762F642685bE822df6d08D03,0x0c00e92343f7AA255e0BBC17b21a02f188b53D6C,0xaDf205a5fcb846C4f8D5e9f5228196e3c157e8E0,0x1373a92b9BEbBCda6B87a4B5F94137Bc64E47261,0x9038284431F878f17F4387943169d5263eA55650,0xe1cd3399F6b0A1Ef6ac8Cebe228D7609B601ca8a,0x0A67cCC3FD9d664D815D229CEA7EF215d4C00A0a
  • In VPC Network Configuration:
    • Set the VPC ID to the blockchainblog VPC created by the first AWS CloudFormation template.
    • Add the blockchainblog-public subnet to the list of subnets to use.
    • Add blockchainblog-public and blockchainblog-private to the list of ALB subnets.
  • In Security Configuration:
    • Choose your Amazon EC2 key pair.
    • Provide the blockchainblog security group.
    • Provide the blockchainblog-ec2-role for the Amazon EC2 role.
    • Provide the blockchainblog-ecs-role for the Amazon ECS role.
    • Set the ALB security group to the blockchainblog security group.
  1. Leave all other variables unchanged, create the template, and wait for all resources to be deployed. This deploys an Ethereum blockchain, starts the mining process, and exposes the Web3 API through an Application Load Balancer.

After the resources are created, move on to deploying the smart contract.

Deploy a smart contract

To use the blockchain, deploy a smart contract to it. This smart contract is not complex — it provides the functions for holding an auction.

The auction contract represents a public auction, which is an auction whereby all parties involved can be identified. The user offering the item to be auctioned deploys the contract and other users can bid using the contract. The auction is considered completed after a pre-defined number of blocks have been mined. When the auction ends, losing bids can then be withdrawn and the funds returned to the bidders. Later, the user who created the auction can withdraw the funds of the winning bid.

Note that the contract does nothing to ensure that the winner receives the commodity in question. In fact, this contract is entirely separate from what is being auctioned. The contract could be extended to provide this functionality, but for the scope of this post, we’re keeping the contract simple.

The auction contract is located at https://s3.amazonaws.com/blockchainblog/Auction.sol.

Examine the auction contract

The auction contract is automatically pulled by our Lambda function and deployed to our blockchain. This contract is written in a domain-specific language called Solidity. The syntax is inspired by the C family of languages; however, unlike C it doesn’t compile to object code. Instead, it compiles to bytecode, which runs on the Ethereum VM.

This smart contract has two functions: bid and withdraw. Bid allows users to bid in the auction, and withdraw allows users to withdraw funds from the contract when the auction has finished. The auction owner can obtain the winning bid and the losers can recoup their funds. Note that the data structure BidEvent is similar to a C struct, and is how we’ll trigger Solidity events. The Solidity events are captured and sent to our analytics pipeline.

Now it’s time to deploy our smart contract, run transactions against it, and listen for events by using pre-built Lambda functions. The following diagram shows the interactions of these Lambda functions:

DeployContract is a Lambda function created by the AWS CloudFormation stack that we deployed earlier. This function takes our Solidity source code from the Amazon Simple Storage Service (Amazon S3) bucket, compiles it to EVM bytecode using the solc compiler, deploys that to our blockchain, and stores the blockchain address of the contract in a DynamoDB table. The function interacts with the Ethereum blockchain on our Amazon EC2 instance via the web3 1.0.0 API. You can see the source code for this function at https://s3.amazonaws.com/blockchainblog/DeployContract.zip.

After deploying the AWS CloudFormation template, wait about 5 minutes before deploying the contract to give the blockchain time to start the mining process. The majority of this time is the blockchain generating the initial directed acyclic graph (DAG).

DeployContract can be invoked in the Lambda console by testing it with an empty test event. Before invoking the function, provide it with the address of the blockchain. To do this, locate the output of the AWS Blockchain Template and obtain the EthJSONRPCURL value from the output. Later, provide this value in an environment variable named BLOCKCHAIN_HOST, for the DeployContract function, as shown in the following example:

Now invoke the DeployContract function. It should print various states, including the blockchain address of the deployed contract and the JSON ABI of the contract. After the function completes, the contract is deployed to our private blockchain and available for use. If the function produces an error, it’s likely because the blockchain has not yet been initialized. Wait a few minutes after creating the AWS CloudFormation template before invoking DeployContract.

Execute Transactions

To generate some transaction data to analyze, we must first have some transactions. To get transactions, we are using a second Lambda function named ExecuteTransactions.

In the smart contract, an event is specified at the start of the file. Events are a useful mechanism in Solidity that can be used as a callback to code outside of the blockchain. The final Lambda function, ListenForTransactions, listens for events occurring against the contract and then sends those events to Kinesis for analysis.

Ethereum currently does not support sending events directly to Kinesis. So we’ll run the ListenForTransactions function to pull events from the blockchain. We can do this manually by invoking the function with an empty test event. ListenForTransactions pulls all events from the blockchain since the last time it was run. However, if we wanted transactions to be pulled from the blockchain in real time, we’d want the function running perpetually. In the following section, you can optionally schedule the Lambda function to run periodically or regularly. Once again, provide the address of the Ethereum RPC endpoint via the BLOCKCHAIN_HOST environment variable, per DeployContract for both ListenForTransactions and for ExecuteTransactions.

Optional: Use an Amazon CloudWatch event to schedule ListenForTransactions

To have ListenForTransactions run continually, we’ll use Amazon CloudWatch Events as a trigger for our Lambda function. In the Amazon CloudWatch console, choose the Triggers tab, and add a new Amazon CloudWatch Events trigger, with the schedule pattern rate(5). This ensures that the function is continually running and thus ensure that all events are sent to Kinesis as soon as they occur. This allows us to do near real-time processing of events against our smart contract. However, if we want to reduce costs, or if real-time analytics isn’t a key objective, we could run our ListenForTransactions function periodically. Running the function periodically fetches all events since the last time it was run; however, this is less timely than having it wait for events to occur.

To configure a CloudWatch event to trigger ListenForTransactions:

  1. In the designer section on the Lambda console for ListenForTransactions, select CloudWatch events
  2. Click on configure and scroll down to the CloudWatch event configuration
  3. Select Create New Rule from the rule dropdown menu
  4. Name the rule and provide a description
  5. Select schedule expression
  6. Provide the expression: rate(5)
  7. Select enable trigger
  8. Click add

After the function is scheduled, we can then generate some events against our contract. We can run ExecuteTransactions, with an empty test event. We can do this any number of times to generate more transactions to send to our analytics pipeline. ExecuteTransactions produces batches of 10 transactions at a time.

Analyze Transactions with Kinesis Data Analytics

Because our Lambda function is listening to events on our smart contract, all voting activity is sent to a Kinesis Data Stream that was already by an AWS CloudFormation called BlockchainBlogEvents.

Right now, all events go to Kinesis but no further. We’ll persist our events for analysis with Athena later on. To do so, navigate to the Kinesis Data Streams console and choose the BlockchainBlog stream that was created for you.

  1. In the upper right-hand corner, choose Connect to Firehose. This forwards all events to a Kinesis Data Firehose stream, which delivers them to an S3 bucket.
  2. Name the delivery stream choose Next, and don’t enable record transformation.
  3. Provide an S3 bucket in which to store your results. Remember so you can use it later with Athena.

All events coming from the blockchain should now be persisted to Amazon S3.

Now that our events are being persisted, we’ll use Kinesis Data Analytics to perform a series of real-time analytics on the Kinesis Data Stream. Later, we’ll perform batch analytics on the data stored in Amazon S3 via Athena.

First, look at Kinesis Data Analytics. Our ListenForTransactions Lambda function sends a message to a stream each time a transaction is run against our Auction smart contract.

The message is a JSON object. It contains the address of the bidder who initiated the transaction, how much they bid, the contract they bid on, when the transaction was run, and which block the transaction was added to.

Kinesis Data Analytics processes each incoming message to the stream and lets us perform analysis over the stream. In this example, we use Kinesis Data Analytics to:

  1. Calculate the amount of Ether being bid in each block within a sliding window of 15 seconds.
  2. Detect the number of unique bidders occurring within a sliding window of 15 seconds.

Our sliding window is 15 seconds because this is the Ethereum target block time. This is the measure of how long it takes to add a block to the blockchain. By setting the sliding window to 15 seconds, we can gain insight into transactions occurring within the mining interval. We could also set the window to be longer to learn how it pertains to our auction application.

To start with our real time analytics, we must create a Kinesis data analytics application. To do so:

  1. Navigate to the Kinesis data analytics application console on the AWS Management Console.
  2. Create a new Kinesis data analytics application with appropriate name and description, then specify the pre-made blockchainblog Kinesis Data Stream as the source.
  3. Run ExecuteTransactions to send a set of transactions to Kinesis and automatically discover the schema.
  4. Open the SQL editor for the application.

Next, we’re going to add SQL to our Kinesis data analytics application to find out the amount of Ether being sent in each block. This includes all bids sent to the contract and all funds withdrawn from a completed auction.

Copy the following SQL, paste it into the SQL editor in Kinesis Data Analytics, then execute it.

CREATE OR REPLACE STREAM "SPEND_PER_BLOCK_STREAM" (block INTEGER, spend INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "SPEND_PER_BLOCK_STREAM"

SELECT STREAM "Block", SUM("Amount") AS block_sum
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "Block", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '15' SECOND);

This simple piece of SQL provides some insight into our smart contract. The output of SPEND_PER_BLOCK_STREAM yields the block number and the volume of funds, from our contract, in that block. This output explains how much cryptocurrency is spent in relation to our smart contract and when it’s spent.

Make sure that there is data for the Kinesis data analytics application to process by running the ExecuteTransactions and ListenForTransactions functions. You can run these functions either with an Amazon CloudWatch event or manually.

Now, we’ll modify our application to detect the number of unique bidders placing bids within a 15-second window. This is about the time required to add a block to the blockchain. To do so, add the following code to our Kinesis data analytics application:

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    NUMBER_OF_DISTINCT_ITEMS BIGINT
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
   INSERT INTO "DESTINATION_SQL_STREAM" 
      SELECT STREAM * 
      FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
          CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            'Bidder',                                     
            10                                                 
      )
);

The resulting output of this code is the count of unique bidders occurring within the 15-second window. This is useful in helping us understand who is running transactions against our contract. For example, if it’s a large number of blockchain addresses responsible for the bids or if it is a smaller number of addresses bidding.

Finally, as denoted in our architectural diagram, we can add a destination stream to our Kinesis data analytics application. We’ll send the output of our application to Kinesis Data Firehose to persist the results. Then we’ll enable the resulting data to be used in batch analytics with Athena or other tools. To send the output, create a destination for the analytics output stream and point it at a Kinesis Data Firehose stream.

This section has shown real time analytics that can be run on blockchain transactions. The next section shows using Athena to run batch analytics against our stored transactions.

Analyze Transactions with Athena

In this section, we’ll create a table in Athena so we can query our transaction data in batch form.

  1. Create a database in Athena and then create a table, specifying the location that you provided earlier to Kinesis Data Firehose. Your configuration should look like the following example:

  1. Choose Next, choose JSON as the input format, then click next.
  2. In Columns, provide the data types for each of our columns. Choose Bulk add columns, then copy and paste the following column information:
Block int, Blockhash string, Bidder string, Maxbidder string, Contractowner string, Amount int, Auction string, EventTimestamp string
ColumnDescription
BlockThe block that this event pertains to.
AuctionWhich auction smart contract the event pertains to
ContractOwnerThe address of the owner of the contract
BidderThe address of the bidder
BlockHashThe SHA hash of the block
AddressThe address of the transaction
MaxBidderThe address of the currently winning bidder (current to when the event was generated)
AmountThe amount of the bid

 

  1. Click next and then create the table.

After you configure Athena, you can then explore the data. First, look at whether the user who created the auction has bid in their own auction. Most auctions typically disallow this bidding, but our smart contract doesn’t prohibit this. We could solve this by modifying the contract, but for now let’s see if we can detect this via Athena. Run the following query:

select * from events where contractowner=bidder

The result should resemble the following:

You should see at least one instance where the contract owner has bid on their own contract. Note that the Lambda function running transactions does this at random. Bidding on one’s own contract could be permissible or it might violate the terms of the auction. In that scenario, we can easily detect this violation.

This scenario is an example of using analytics to detect and enforce compliance in a blockchain-backed system. Compliance remains an open question for many blockchain users, as detecting regulatory and compliance issues involving smart contracts often involves significant complexity. Analytics is one way to gain insight and answer these regulatory questions.

Useful queries for analyzing transactions

This section provides some other queries that we can use to analyze our smart contract transactions.

Find the number of transactions per block

SELECT block, COUNT(amount) as transactions FROM events Group By block    

This query yields results similar to the following:

Find the winning bid for each auction

SELECT DISTINCT t.auction, t.amount
    FROM events t
        INNER JOIN (SELECT auction, MAX(amount) AS maxamount
                        FROM events
                        GROUP BY auction) q
            ON t.auction = q.auction
                AND t.amount = q.maxamount

This query yields a set of results such as the following:

The results show each auction that you’ve created on the blockchain and the resulting highest bid.

Visualize queries with Amazon QuickSight

Instead of querying data in plain SQL, it is often beneficial to have a graphical representation of your analysis. You can do this with Amazon QuickSight, which can use Athena as a data source. As a result, with little effort we can build a dashboard solution on top of what we’ve already built. We’re going to use Amazon QuickSight to visualize data stored in Amazon S3, via Athena.

In Amazon QuickSight, we can create a new data source and use the Athena database and table that we created earlier.

To create a new data source

  1. Open the Amazon QuickSight console, then choose New Dataset.
  2. From the list of data sources, choose Athena, then name your data source.

  1. Choose the database and table in Athena that you created earlier.

  1. Import the data into SPICE. SPICE is instrumental for faster querying and visualization of data, without having to go directly to the source data. For more information about SPICE, see the Amazon QuickSight Documentation.
  2. Choose Visualize to start investigating the data.

With Amazon QuickSight, we can visualize the behavior of our simulated blockchain users. We’ll choose Amount as our measurement and Auction as our dimension from teh QuickSight side pane. This shows us how much ether has been bid in each auction. Doing so yields results similar to the following:

The amount depends on the number of times you ran the ExecuteTransactions function.

If we look at MaxBidder, we see a pie chart. In the chart, we can see which blockchain address (user) is most often our highest bidder. This looks like the following:

This sort of information can be challenging to obtain from within a blockchain-based application. But in Amazon QuickSight, with our analytics pipeline, getting the information can be easier.

Finally, we can look at the mining time in Amazon QuickSight by choosing Eventtimestamp as the x-axis, choosing block as the y-axis, and using the minimum aggregate function. This produces a line graph that resembles the following:

The graph shows that we start at around block 9200 and have a steady rate of mining occurring. This is roughly consistent with around a 15 to 20 second block mining time. Note that the time stamp is in Unix time.

This section has shown analysis that can be performed on a blockchain event to understand the behavior of both the blockchain and the smart contracts deployed to it. Using the same methodology, you can build your own analytics pipelines that perform useful analytics that shed light on your blockchain-backed applications.

Conclusion

Blockchain is an emerging technology with a great deal of potential. AWS analytics services provide a means to gain insight into blockchain applications that run over thousands of nodes and deal with millions of transactions. This allows developers to better understand the complexities of blockchain applications and aid in the creation of new applications. Moreover, the analytics portion can all be done without provisioning servers, reducing the need for managing infrastructure. This allows you to focus on building the blockchain applications that you want.

Important: Remember to destroy the stacks created by AWS CloudFormation. Also delete the resources you deployed, including the scheduled Lambda function that listens for blockchain events.


Additional Reading

If you found this post useful, be sure to check out Analyze Apache Parquet optimized data using 10 visualizatinos to try in Amazon QuickSight with sample data and Analyzing Bitcoin Data: AWS CloudFormation Support for AWS Glue.

 


About the Author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.