Tag Archives: Spark

Design considerations for Amazon EMR on EKS in a multi-tenant Amazon EKS environment

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/design-considerations-for-amazon-emr-on-eks-in-a-multi-tenant-amazon-eks-environment/

Many AWS customers use Amazon Elastic Kubernetes Service (Amazon EKS) in order to take advantage of Kubernetes without the burden of managing the Kubernetes control plane. With Kubernetes, you can centrally manage your workloads and offer administrators a multi-tenant environment where they can create, update, scale, and secure workloads using a single API. Kubernetes also allows you to improve resource utilization, reduce cost, and simplify infrastructure management to support different application deployments. This model is beneficial for those running Apache Spark workloads, for several reasons. For example, it allows you to have multiple Spark environments running concurrently with different configurations and dependencies that are segregated from each other through Kubernetes multi-tenancy features. In addition, the same cluster can be used for various workloads like machine learning (ML), host applications, data streaming and thereby reducing operational overhead of managing multiple clusters.

AWS offers Amazon EMR on EKS, a managed service that enables you to run your Apache Spark workloads on Amazon EKS. This service uses the Amazon EMR runtime for Apache Spark, which increases the performance of your Spark jobs so that they run faster and cost less. When you run Spark jobs on EMR on EKS and not on self-managed Apache Spark on Kubernetes, you can take advantage of automated provisioning, scaling, faster runtimes, and the development and debugging tools that Amazon EMR provides

In this post, we show how to configure and run EMR on EKS in a multi-tenant EKS cluster that can used by your various teams. We tackle multi-tenancy through four topics: network, resource management, cost management, and security.

Concepts

Throughout this post, we use terminology that is either specific to EMR on EKS, Spark, or Kubernetes:

  • Multi-tenancy – Multi-tenancy in Kubernetes can come in three forms: hard multi-tenancy, soft multi-tenancy and sole multi-tenancy. Hard multi-tenancy means each business unit or group of applications gets a dedicated Kubernetes; there is no sharing of the control plane. This model is out of scope for this post. Soft multi-tenancy is where pods might share the same underlying compute resource (node) and are logically separated using Kubernetes constructs through namespaces, resource quotas, or network policies. A second way to achieve multi-tenancy in Kubernetes is to assign pods to specific nodes that are pre-provisioned and allocated to a specific team. In this case, we talk about sole multi-tenancy. Unless your security posture requires you to use hard or sole multi-tenancy, you would want to consider using soft multi-tenancy for the following reasons:
    • Soft multi-tenancy avoids underutilization of resources and waste of compute resources.
    • There is a limited number of managed node groups that can be used by Amazon EKS, so for large deployments, this limit can quickly become a limiting factor.
    • In sole multi-tenancy there is high chance of ghost nodes with no pods scheduled on them due to misconfiguration as we force pods into dedicated nodes with label, taints and tolerance and anti-affinity rules.
  • Namespace – Namespaces are core in Kubernetes and a pillar to implement soft multi-tenancy. With namespaces, you can divide the cluster into logical partitions. These partitions are then referenced in quotas, network policies, service accounts, and other constructs that help isolate environments in Kubernetes.
  • Virtual cluster – An EMR virtual cluster is mapped to a Kubernetes namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. Multiple virtual clusters can be backed by the same physical cluster. However, each virtual cluster maps to one namespace on an EKS cluster. Virtual clusters don’t create any active resources that contribute to your bill or require lifecycle management outside the service.
  • Pod template – In EMR on EKS, you can provide a pod template to control pod placement, or define a sidecar container. This pod template can be defined for executor pods and driver pods, and stored in an Amazon Simple Storage Service (Amazon S3) bucket. The S3 locations are then submitted as part of the applicationConfiguration object that is part of configurationOverrides, as defined in the EMR on EKS job submission API.

Security considerations

In this section, we address security from different angles. We first discuss how to protect IAM role that is used for running the job. Then address how to protect secrets use in jobs and finally we discuss how you can protect data while it is processed by Spark.

IAM role protection

A job submitted to EMR on EKS needs an AWS Identity and Access Management (IAM) execution role to interact with AWS resources, for example with Amazon S3 to get data, with Amazon CloudWatch Logs to publish logs, or use an encryption key in AWS Key Management Service (AWS KMS). It’s a best practice in AWS to apply least privilege for IAM roles. In Amazon EKS, this is achieved through IRSA (IAM Role for Service Accounts). This mechanism allows a pod to assume an IAM role at the pod level and not at the node level, while using short-term credentials that are provided through the EKS OIDC.

IRSA creates a trust relationship between the EKS OIDC provider and the IAM role. This method allows only pods with a service account (annotated with an IAM role ARN) to assume a role that has a trust policy with the EKS OIDC provider. However, this isn’t enough, because it would allow any pod with a service account within the EKS cluster that is annotated with a role ARN to assume the execution role. This must be further scoped down using conditions on the role trust policy. This condition allows the assume role to happen only if the calling service account is the one used for running a job associated with the virtual cluster. The following code shows the structure of the condition to add to the trust policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Federated": <OIDC provider ARN >
            },
            "Action": "sts:AssumeRoleWithWebIdentity"
            "Condition": { "StringLike": { “<OIDC_PROVIDER>:sub": "system:serviceaccount:<NAMESPACE>:emr-containers-sa-*-*-<AWS_ACCOUNT_ID>-<BASE36_ENCODED_ROLE_NAME>”} }
        }
    ]
}

To scope down the trust policy using the service account condition, you need to run the following the command with AWS CLI:

aws emr-containers update-role-trust-policy \
–cluster-name cluster \
–namespace namespace \
–role-name iam_role_name_for_job_execution

The command will the add the service account that will be used by the spark client, Jupyter Enterprise Gateway, Spark kernel, driver or executor. The service accounts name have the following structure emr-containers-sa-*-*-<AWS_ACCOUNT_ID>-<BASE36_ENCODED_ROLE_NAME>.

In addition to the role segregation offered by IRSA, we recommend blocking access to instance metadata because a pod can still inherit the rights of the instance profile assigned to the worker node. For more information about how you can block access to metadata, refer to Restrict access to the instance profile assigned to the worker node.

Secret protection

Sometime a Spark job needs to consume data stored in a database or from APIs. Most of the time, these are protected with a password or access key. The most common way to pass these secrets is through environment variables. However, in a multi-tenant environment, this means any user with access to the Kubernetes API can potentially access the secrets in the environment variables if this access isn’t scoped well to the namespaces the user has access to.

To overcome this challenge, we recommend using a Secrets store like AWS Secrets Manager that can be mounted through the Secret Store CSI Driver. The benefit of using Secrets Manager is the ability to use IRSA and allow only the role assumed by the pod access to the given secret, thereby improving your security posture. You can refer to the best practices guide for sample code showing the use of Secrets Manager with EMR on EKS.

Spark data encryption

When a Spark application is running, the driver and executors produce intermediate data. This data is written to the node local storage. Anyone who is able to exec into the pods would be able to read this data. Spark supports encryption of this data, and it can be enabled by passing --conf spark.io.encryption.enabled=true. Because this configuration adds performance penalty, we recommend enabling data encryption only for workloads that store and access highly sensitive data and in untrusted environments.

Network considerations

In this section we discuss how to manage networking within the cluster as well as outside the cluster. We first address how Spark handle cross executors and driver communication and how to secure it. Then we discuss how to restrict network traffic between pods in the EKS cluster and allow only traffic destined to EMR on EKS. Last, we discuss how to restrict traffic of executors and driver pods to external AWS service traffic using security groups.

Network encryption

The communication between the driver and executor uses RPC protocol and is not encrypted. Starting with Spark 3 in the Kubernetes backed cluster, Spark offers a mechanism to encrypt communication using AES encryption.

The driver generates a key and shares it with executors through the environment variable. Because the key is shared through the environment variable, potentially any user with access to the Kubernetes API (kubectl) can read the key. We recommend securing access so that only authorized users can have access to the EMR virtual cluster. In addition, you should set up Kubernetes role-based access control in such a way that the pod spec in the namespace where the EMR virtual cluster runs is granted to only a few selected service accounts. This method of passing secrets through the environment variable would change in the future with a proposal to use Kubernetes secrets.

To enable encryption, RPC authentication must also be enabled in your Spark configuration. To enable encryption in-transit in Spark, you should use the following parameters in your Spark config:

--conf spark.authenticate=true

--conf spark.network.crypto.enabled=true

Note that these are the minimal parameters to set; refer to Encryption from the complete list of parameters.

Additionally, applying encryption in Spark has a negative impact on processing speed. You should only apply it when there is a compliance or regulation need.

Securing Network traffic within the cluster

In Kubernetes, by default pods can communicate over the network across different namespaces in the same cluster. This behavior is not always desirable in a multi-tenant environment. In some instances, for example in regulated industries, to be compliant you want to enforce strict control over the network and send and receive traffic only from the namespace that you’re interacting with. For EMR on EKS, it would be the namespace associated to the EMR virtual cluster. Kubernetes offers constructs that allow you to implement network policies and define fine-grained control over the pod-to-pod communication. These policies are implemented by the CNI plugin; in Amazon EKS, the default plugin would be the VPC CNI. A policy is defined as follows and is applied with kubectl:

Kind: NetworkPolicy
metadata:
  name: default-np-ns1
  namespace: <EMR-VC-NAMESPACE>
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          nsname: <EMR-VC-NAMESPACE>

Network traffic outside the cluster

In Amazon EKS, when you deploy pods on Amazon Elastic Compute Cloud (Amazon EC2) instances, all the pods use the security group associated with the node. This can be an issue if your pods (executor pods) are accessing a data source (namely a database) that allows traffic based on the source security group. Database servers often restrict network access only from where they are expecting it. In the case of a multi-tenant EKS cluster, this means pods from other teams that shouldn’t have access to the database servers, would be able to send traffic to it.

To overcome this challenge, you can use security groups for pods. This feature allows you to assign a specific security group to your pods, thereby controlling the network traffic to your database server or data source. You can also refer to the best practices guide for a reference implementation.

Cost management and chargeback

In a multi-tenant environment, cost management is a critical subject. You have multiple users from various business units, and you need to be able to precisely chargeback the cost of the compute resource they have used. At the beginning of the post, we introduced three models of multi-tenancy in Amazon EKS: hard multi-tenancy, soft multi-tenancy, and sole multi-tenancy. Hard multi-tenancy is out of scope because the cost tracking is trivial; all the resources are dedicated to the team using the cluster, which is not the case for sole multi-tenancy and soft multi-tenancy. In the next sections, we discuss these two methods to track the cost for each of model.

Soft multi-tenancy

In a soft multi-tenant environment, you can perform chargeback to your data engineering teams based on the resources they consumed and not the nodes allocated. In this method, you use the namespaces associated with the EMR virtual cluster to track how much resources were used for processing jobs. The following diagram illustrates an example.

Diagram to illustrate soft multi-tenancy

Diagram -1 Soft multi-tenancy

Tracking resources based on the namespace isn’t an easy task because jobs are transient in nature and fluctuate in their duration. However, there are partner tools available that allow you to keep track of the resources used, such as Kubecost, CloudZero, Vantage, and many others. For instructions on using Kubecost on Amazon EKS, refer to this blog post on cost monitoring for EKS customers.

Sole multi-tenancy

For sole multi-tenancy, the chargeback is done at the instance (node) level. Each member on your team uses a specific set of nodes that are dedicated to it. These nodes aren’t always running, and are spun up using the Kubernetes auto scaling mechanism. The following diagram illustrates an example.

Diagram to illustrate Sole tenancy

Diagram -2 Sole tenancy

With sole multi-tenancy, you use a cost allocation tag, which is an AWS mechanism that allows you to track how much each resource has consumed. Although the method of sole multi-tenancy isn’t efficient in terms of resource utilization, it provides a simplified strategy for chargebacks. With the cost allocation tag, you can chargeback a team based on all the resources they used, like Amazon S3, Amazon DynamoDB, and other AWS resources. The chargeback mechanism based on the cost allocation tag can be augmented using the recently launched AWS Billing Conductor, which allows you to issue bills internally for your team.

Resource management

In this section, we discuss considerations regarding resource management in multi-tenant clusters. We briefly discuss topics like sharing resources graciously, setting guard rails on resource consumption, techniques for ensuring resources for time sensitive and/or critical jobs, meeting quick resource scaling requirements and finally cost optimization practices with node selectors.

Sharing resources

In a multi-tenant environment, the goal is to share resources like compute and memory for better resource utilization. However, this requires careful capacity management and resource allocation to make sure each tenant gets their fair share. In Kubernetes, resource allocation is controlled and enforced by using ResourceQuota and LimitRange. ResourceQuota limits resources on the namespace level, and LimitRange allows you to make sure that all the containers are submitted with a resource requirement and a limit. In this section, we demonstrate how a data engineer or Kubernetes administrator can set up ResourceQuota as a LimitRange configuration.

The administrator creates one ResourceQuota per namespace that provides constraints for aggregate resource consumption:

apiVersion: v1
kind: ResourceQuota
metadata:
  name: compute-resources
  namespace: teamA
spec:
  hard:
    requests.cpu: "1000"
    requests.memory: 4000Gi
    limits.cpu: "2000"
    limits.memory: 6000Gi

For LimitRange, the administrator can review the following sample configuration. We recommend using default and defaultRequest to enforce the limit and request field on containers. Lastly, from a data engineer perspective while submitting the EMR on EKS jobs, you need to make sure the Spark parameters of resource requirements are within the range of the defined LimitRange. For example, in the following configuration, the request for spark.executor.cores=7 will fail because the max limit for CPU is 6 per container:

apiVersion: v1
kind: LimitRange
metadata:
  name: cpu-min-max
  namespace: teamA
spec:
  limits:
  - max:
      cpu: "6"
    min:
      cpu: "100m"
    default:
      cpu: "500m"
    defaultRequest:
      cpu: "100m"
    type: Container

Priority-based resource allocation

Diagram Illustrates an example of resource allocation with priority

Diagram – 3 Illustrates an example of resource allocation with priority.

As all the EMR virtual clusters share the same EKS computing platform with limited resources, there will be scenarios in which you need to prioritize jobs in a sensitive timeline. In this case, high-priority jobs can utilize the resources and finish the job, whereas low-priority jobs that are running gets stopped and any new pods must wait in the queue. EMR on EKS can achieve this with the help of pod templates, where you specify a priority class for the given job.

When a pod priority is enabled, the Kubernetes scheduler orders pending pods by their priority and places them in the scheduling queue. As a result, the higher-priority pod may be scheduled sooner than pods with lower priority if its scheduling requirements are met. If this pod can’t be scheduled, the scheduler continues and tries to schedule other lower-priority pods.

The preemptionPolicy field on the PriorityClass defaults to PreemptLowerPriority, and the pods of that PriorityClass can preempt lower-priority pods. If preemptionPolicy is set to Never, pods of that PriorityClass are non-preempting. In other words, they can’t preempt any other pods. When lower-priority pods are preempted, the victim pods get a grace period to finish their work and exit. If the pod doesn’t exit within that grace period, that pod is stopped by the Kubernetes scheduler. Therefore, there is usually a time gap between the point when the scheduler preempts victim pods and the time that a higher-priority pod is scheduled. If you want to minimize this gap, you can set a deletion grace period of lower-priority pods to zero or a small number. You can do this by setting the terminationGracePeriodSeconds option in the victim Pod YAML.

See the following code samples for priority class:

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: high-priority
value: 100
globalDefault: false
description: " High-priority Pods and for Driver Pods."

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: low-priority
value: 50
globalDefault: false
description: " Low-priority Pods."

One of the key considerations while templatizing the driver pods, especially for low-priority jobs, is to avoid the same low-priority class for both driver and executor. This will save the driver pods from getting evicted and lose the progress of all its executors in a resource congestion scenario. In this low-priority job example, we have used a high-priority class for driver pod templates and low-priority classes only for executor templates. This way, we can ensure the driver pods are safe during the eviction process of low-priority jobs. In this case, only executors will be evicted, and the driver can bring back the evicted executor pods as the resource becomes freed. See the following code:

apiVersion: v1
kind: Pod
spec:
  priorityClassName: "high-priority"
  nodeSelector:
    eks.amazonaws.com/capacityType: ON_DEMAND
  containers:
  - name: spark-kubernetes-driver # This will be interpreted as Spark driver container

apiVersion: v1
kind: Pod
spec:
  priorityClassName: "low-priority"
  nodeSelector:
    eks.amazonaws.com/capacityType: SPOT
  containers:
  - name: spark-kubernetes-executors # This will be interpreted as Spark executor container

Overprovisioning with priority

Diagram Illustrates an example of overprovisioning with priority

Diagram – 4 Illustrates an example of overprovisioning with priority.

As pods wait in a pending state due to resource availability, additional capacity can be added to the cluster with Amazon EKS auto scaling. The time it takes to scale the cluster by adding new nodes for deployment has to be considered for time-sensitive jobs. Overprovisioning is an option to mitigate the auto scaling delay using temporary pods with negative priority. These pods occupy space in the cluster. When pods with high priority are unschedulable, the temporary pods are preempted to make the room. This causes the auto scaler to scale out new nodes due to overprovisioning. Be aware that this is a trade-off because it adds higher cost while minimizing scheduling latency. For more information about overprovisioning best practices, refer to Overprovisioning.

Node selectors

EKS clusters can span multiple Availability Zones in a VPC. A Spark application whose driver and executor pods are distributed across multiple Availability Zones can incur inter- Availability Zone data transfer costs. To minimize or eliminate the data transfer cost, you should configure the job to run on a specific Availability Zone or even specific node type with the help of node labels. Amazon EKS places a set of default labels to identify capacity type (On-Demand or Spot Instance), Availability Zone, instance type, and more. In addition, we can use custom labels to meet workload-specific node affinity.

EMR on EKS allows you to choose specific nodes in two ways:

  • At the job level. Refer to EKS Node Placement for more details.
  • In the driver and executor level using pod templates.

When using pod templates, we recommend using on demand instances for driver pods. You can also consider including spot instances for executor pods for workloads that are tolerant of occasional periods when the target capacity is not completely available. Leveraging spot instances allow you to save cost for jobs that are not critical and can be terminated. Please refer Define a NodeSelector in PodTemplates.

Conclusion

In this post, we provided guidance on how to design and deploy EMR on EKS in a multi-tenant EKS environment through different lenses: network, security, cost management, and resource management. For any deployment, we recommend the following:

  • Use IRSA with a condition scoped on the EMR on EKS service account
  • Use a secret manager to store credentials and the Secret Store CSI Driver to access them in your Spark application
  • Use ResourceQuota and LimitRange to specify the resources that each of your data engineering teams can use and avoid compute resource abuse and starvation
  • Implement a network policy to segregate network traffic between pods

Lastly, if you are considering migrating your spark workload to EMR on EKS you can further learn about design patterns to manage Apache Spark workload in EMR on EKS in this blog and about migrating your EMR transient cluster to EMR on EKS in this blog.


About the Authors

author - lotfiLotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

author - peter ajeebAjeeb Peter is a Senior Solutions Architect with Amazon Web Services based in Charlotte, North Carolina, where he guides global financial services customers to build highly secure, scalable, reliable, and cost-efficient applications on the cloud. He brings over 20 years of technology experience on Software Development, Architecture and Analytics from industries like finance and telecom.

Disaster recovery considerations with Amazon EMR on Amazon EC2 for Spark workloads

Post Syndicated from Bharat Gamini original https://aws.amazon.com/blogs/big-data/disaster-recovery-considerations-with-amazon-emr-on-amazon-ec2-for-spark-workloads/

Amazon EMR is a cloud big data platform for running large-scale distributed data processing jobs, interactive SQL queries, and machine learning (ML) applications using open-source analytics frameworks such as Apache Spark, Apache Hive, and Presto. Amazon EMR launches all nodes for a given cluster in the same Amazon Elastic Compute Cloud (Amazon EC2) Availability Zone to improve performance. During an Availability Zone failure or due to any unexpected interruption, Amazon EMR may not be accessible, and we need a disaster recovery (DR) strategy to mitigate this problem.

Part of architecting a resilient, highly available Amazon EMR solution is the consideration that failures do occur. These unexpected interruptions can be caused by natural disasters, technical failures, and human interactions resulting in an Availability Zone outage. The EMR cluster could also become unreachable due to failure of critical services running on the EMR master node, network issues, or other issues.

In this post, we show you how to architect your Amazon EMR environment for disaster recovery to maintain business continuity with minimum Recovery Time Objective (RTO) during Availability Zone failure or when your EMR cluster is inoperable.

Although various disaster recovery strategies are available in the cloud, we discuss active-active and active-passive DR strategies for Amazon EMR in this post. We focus on a use case for Spark batch workloads where persistent storage is decoupled from Amazon EMR and the EMR cluster is running with a single master node. If the EMR cluster is used for persistent storage, it requires an additional strategy to replicate data from the EMR cluster, which we will cover in subsequent posts.

Prerequisites

To follow along with this post, you should have a knowledge of Amazon Managed Workflows for Apache Airflow (Amazon MWAA) and an understanding of Network Load Balancers.

Solution overview

The following diagram illustrates the solution architecture.

Customers often use Amazon MWAA to submit Spark jobs to an EMR cluster using an Apache Livy REST interface. We can configure Apache Livy to use a Network Load Balancer hostname instead of an Amazon EMR master hostname, so that we don’t need to update Livy connections from Amazon MWAA whenever a new cluster is created or stopped. You can register Network Load Balancer target groups with multiple EMR cluster master nodes for an active-active setup. In the case of an active-passive setup, we can create a new EMR cluster when a failure is detected and register the new EMR master with the Network Load Balancer target group. The Network Load Balancer automatically performs health checks and distributes requests to healthy targets. With this solution, we can maintain business continuity when an EMR cluster isn’t reachable due to Availability Zone failure or when the cluster is unhealthy due to any other reason.

Active-active DR strategy

An active-active DR setup focuses on running two EMR clusters with identical configuration in two different Availability Zones. To reduce the running costs of two active EMR clusters, we can launch both clusters with minimum capacity, and managed scaling automatically scales the cluster based on the workload. EMR managed scaling only launches instances when there is demand for resources and stops the unneeded instances when the work is finished. With this strategy, we can reduce our recovery time to near zero with optimal cost. This active-active DR strategy is suitable when businesses want to have near-zero downtime with automatic failover for your analytics workloads.

In the following section, we walk through the steps to implement the solution and provide references to related resources that provide more detailed guidance.

Create EMR clusters

We create two EMR clusters in different Availability Zones within the same Region of your choice. Use the following AWS Command Line Interface (AWS CLI) command and modify or add required configurations as per your needs:

aws emr create-cluster \
  --name "<emr-cluster-az-a>" \
  --release-label emr-6.4.0 \
  --log-uri "s3://<your-log-bucket>" \
  --applications Name=Spark Name=Livy \
  --ec2-attributes "KeyName=<your-key-name>,SubnetId=<private-subnet-in-az-a>" \
  --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=1,InstanceType=m4.large \
  --use-default-roles

We can create the cluster with EMR managed scaling, which lets you automatically increase or decrease the number of instances or units in your cluster based on workload. Amazon EMR continuously evaluates cluster metrics to make scaling decisions that optimize your clusters for cost and speed.

Create and configure a Network Load Balancer

You can create a Network Load Balancer using the AWS CLI (see Create a Network Load Balancer using the AWS CLI) or the AWS Management Console (see Create a Network Load Balancer). For this post, we do so on the console.

  • Create a target group (emr-livy-dr) and register both EMR clusters’ master IP addresses in the target group.

  • Create an internal Network Load Balancer in the same VPC or Region as your EMR clusters, and choose two different Availability Zones and select the private subnets.
    These subnets don’t need to be in the same subnets as the EMR clusters, but the clusters must allow the traffic from the Network Load Balancer, which is discussed in next steps.

  • Create a TCP listener on port 8998 (the default EMR cluster Livy port) to forward requests to the target group you created.

  • Modify the EMR clusters’ master security groups to allow the Network Load Balancer’s private IP addresses to access port 8998.

You can find the Network Load Balancer’s private IP address by searching the elastic network interfaces for the Network Load Balancer’s name. For access control instructions, refer to How do I attach a security group to my Elastic Load Balancer.

When the target groups become healthy, the Network Load Balancer forwards requests to registered targets when it receives requests on Livy port 8998.

  • Get the DNS name of the Network Load Balancer.

We can also use an Amazon Route 53 alias record to use our own domain name to route traffic to the Network Load Balancer DNS name. We use this DNS name in our Amazon MWAA Livy connection.

Create and configure Amazon MWAA

Complete the following steps:

  • Make sure the execution role you’re using with Amazon MWAA has proper access to EMR clusters and other required services.
  • Update the Amazon MWAA Livy connection (livy_default) host with the Network Load Balancer hostname you created.
  • Create a new Livy connection ID if it’s not already available.

  • Use the following sample DAG to submit a sample Spark application using LivyOperator. We assign the livy_default connection to the livy_conn_id in the DAG code.
  • Enable the DAG and verify if the Spark application is successful on one of the EMR clusters.
from datetime import timedelta, datetime
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'airflow',
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag_name = "livy_spark_dag"
# Replace S3 bucket name
# You can use sample spark jar from EMR cluster master node
# /usr/lib/spark/examples/jars/spark-examples.jar
s3_bucket = "artifacts-bucket"
jar_location = "s3://{}/spark-examples.jar".format(s3_bucket)

dag = DAG(
    dag_id = dag_name,
    default_args=default_args,
    schedule_interval='@once',
    start_date = days_ago(1),
    catchup=False,
    tags=['emr', 'spark', 'livy']
)

livy_spark = LivyOperator(
    file=jar_location,
    class_name="org.apache.spark.examples.SparkPi",
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    task_id="livy_spark",
    conf={
    "spark.submit.deployMode": "cluster",
    "spark.app.name": dag_name
    },
    livy_conn_id="livy_default",
    dag=dag,
)

livy_spark

Test the DR plan

We can test our DR plan by creating scenarios that could be caused by real disasters. Perform the following steps to validate if our DR strategy works automatically during a disaster:

  1. Run the sample DAG multiple times and verify if Spark applications are randomly submitted to the registered EMR clusters.
  2. Stop one of the clusters and verify if jobs are automatically submitted to the other cluster in a different Availability Zone without any issues.

Active-passive DR strategy

Although the active-active DR strategy has benefits of maintaining near-zero recovery time, it’s complex to maintain two environments because both environments require patching and constant monitoring. In cases where Recovery Time Objective (RTO) and Recovery Point Objective (RPO) aren’t critical for your workloads, we can adopt an active-passive strategy. This approach offers a more economical and operationally less complex approach.

In this approach, we use a single EMR cluster as an active cluster and in case of disaster (due to Availability Zone failures or any other reason the EMR cluster is unhealthy), we launch a second EMR cluster in a different Availability Zone and redirect all our workloads to the newly launched cluster. End-users may notice some delay because launching a second EMR cluster takes time.

The high-level architecture of the active-passive DR solution is shown in the following diagram.

Complete the following steps to implement this solution:

  • Create an EMR cluster in a single Availability Zone.
  • Create target groups and register the EMR cluster master node IP address. Create target group for Resource Manager(8088), Name Node(9870) and Livy(8998) services. Change the port numbers if services are running on different ports.

  • Create a Network Load Balancer and add TCP listeners and forward requests to the respective target groups.

  • Create an Amazon MWAA environment with proper access to the EMR cluster in the same Region.
  • Edit the Amazon MWAA Livy connection to use the Network Load Balancer DNS name.
  • Use the updated Livy connection in Amazon MWAA DAGs to submit Spark applications.
  • Validate if we can successfully submit Spark applications via Livy to the EMR cluster.
  • Set up a DAG on Amazon MWAA or similar scheduling tool that continuously monitors the existing EMR cluster health.
  • Monitor the following key services running on the Amazon EMR master host using REST APIs or commands provided by each service. Add more health checks as required.
  • If the health check process detects a failure of the first EMR cluster, create a new EMR cluster in a different Availability Zone.
  • Automatically register the newly created EMR cluster master IP address to the Network Load Balancer target groups.
  • When the Network Load Balancer health checks are successful with the new EMR cluster master IP, delete the unhealthy EMR cluster master IP address from the target group and stop the old EMR cluster.
  • Validate the DR plan.

Follow the steps mentioned in the active-active DR strategy to create the following resources:

  • Amazon EMR
  • Amazon MWAA
  • Network Load Balancer

The following sample script provides the functionality described in this section. Use this as reference and modify it accordingly to fit your use case.

#!/bin/bash

usage() {
	cat <<EOF
   Usage: ./dr_health_check.sh j-2NPQWXK1U4E6G

   This script takes current EMR cluster id as argument and monitors the cluster health and
   creates new EMR cluster in different AZ if existing cluster is unhealthy/unreachable

EOF
	exit 1
}

[[ $# -lt 1 ]] && {
	echo Specify cluster id as argument to the script
	usage
}

#Set NLB DNS name and region
hostname="emr-ap-ae4ffe5g23fd9245.elb.us-west-2.amazonaws.com"
region="us-west-2"
cluster_id=$1
cluster_status=""

export AWS_DEFAULT_REGION=$region

#Depending on the use case perform below health checks for more than one time in a loop and if cluster state is still unhealthy then only perform remaining steps
#Ports and SSL properties for curl command may differ depending on how services are set up
rm_state=$(curl -s --connect-timeout 5 --max-time 10 http://$hostname:8088/ws/v1/cluster | jq -r .clusterInfo.state)
if [[ $? -ne 0 || "$rm_state" != "STARTED" ]]; then
	echo "ResourceManager port not reachable or service not running"
	cluster_status="unhealthy"
fi

nn_state=$(curl -s --connect-timeout 5 --max-time 10 http://$hostname:9870/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus | jq -r .beans[0].State)
if [[ $? -ne 0 || "$nn_state" != "active" ]]; then
	echo "NameNode port not reachable or service not running"
	cluster_status="unhealthy"
fi

livy_state=$(curl -s --connect-timeout 5 --max-time 10 http://$hostname:8998/sessions)
if [[ $? -ne 0 ]]; then
	echo "Livy port not reachable"
	cluster_status="unhealthy"
fi

cluster_name=$(aws emr describe-cluster --cluster-id $cluster_id | jq -r ".Cluster.Name")

update_target_groups() {
	new_master_ip=$1
	current_master_ip=$2
	current_az=$3

	nlb_arn=$(aws elbv2 describe-load-balancers --query "LoadBalancers[?DNSName==\`$hostname\`].[LoadBalancerArn]" --output text)
	target_groups=$(aws elbv2 describe-target-groups --load-balancer-arn $nlb_arn --query "TargetGroups[*].TargetGroupArn" --output text)
	IFS=" " read -a tg_array <<<$target_groups
	for tg in "${tg_array[@]}"; do
		echo "Registering new EMR master IP with target group $tg"
		aws elbv2 register-targets --target-group-arn $tg --targets Id=$new_master_ip,AvailabilityZone=all

		echo "De-registering old/unhealthy EMR master IP from target group $tg"
		aws elbv2 deregister-targets --target-group-arn $tg --targets Id=$current_master_ip,AvailabilityZone=all
	done
}

if [[ $cluster_status == "unhealthy" ]]; then
	echo "Cluster status is $cluster_status, creating new EMR cluster"
	current_az=$(aws emr describe-cluster --cluster-id $cluster_id | jq -r ".Cluster.Ec2InstanceAttributes.Ec2AvailabilityZone")
	new_az=$(aws ec2 describe-availability-zones --output json --filters "Name=region-name,Values=$region" --query "AvailabilityZones[?ZoneName!=\`$current_az\`].ZoneName|[0]" --output=text)
	current_master_ip=$(aws emr list-instances --cluster-id $cluster_id --instance-group-types MASTER --query "Instances[*].PrivateIpAddress" --output text)
	echo "Current/unhealthy cluster id $cluster_id, cluster name $cluster_name,AZ $current_az, Master private ip $current_master_ip"

	echo "Creating new EMR cluster in $new_az"
	emr_op=$(aws emr create-cluster \
		--name "$cluster_name-$new_az" \
		--release-label emr-6.4.0 \
		--applications Name=Spark Name=Livy \
		--ec2-attributes "AvailabilityZone=$new_az" \
		--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=1,InstanceType=m4.large \
		--use-default-role \
		--region $region)

	new_cluster_id=$(echo $emr_op | jq -r ".ClusterId")

	#wait for cluster provisioning to get master ip address
	sleep 2m

	new_master_ip=$(aws emr list-instances --cluster-id $new_cluster_id --instance-group-types MASTER --query "Instances[*].PrivateIpAddress" --output text)
	echo "New EMR cluster id $new_cluster_id and Master node IP $new_master_ip"

	echo "Terminating unhealthy cluster $cluster_id/$cluster_name in $current_az"
	aws emr modify-cluster-attributes --cluster-id $cluster_id --no-termination-protected
	aws emr terminate-clusters --cluster-ids $cluster_id

	echo "Register new EMR master IP address with NLB target groups and de-register unhealthy EMR master"
	update_target_groups $new_master_ip $current_master_ip $current_az
else
	echo "Current cluster $cluster_id/$cluster_name is healthy"
fi

Summary

In this post, we shared some solutions and considerations to improve DR implementation using Amazon EMR on Amazon EC2, Network Load Balancer, and Amazon MWAA. Based on your use case, you can determine the type of DR strategy you want to deploy. We have provided the steps required to create the necessary environments and set up a successful DR strategy.

For more details about the systems and processes described in this post, refer to the following:


About the Author

Bharat Gamini is a Data Architect focused on Big Data & Analytics at Amazon Web Services. He helps customers architect and build highly scalable, robust and secure cloud-based analytical solutions on AWS.

Simplify and optimize Python package management for AWS Glue PySpark jobs with AWS CodeArtifact

Post Syndicated from Ashok Padmanabhan original https://aws.amazon.com/blogs/big-data/simplify-and-optimize-python-package-management-for-aws-glue-pyspark-jobs-with-aws-codeartifact/

Data engineers use various Python packages to meet their data processing requirements while building data pipelines with AWS Glue PySpark Jobs. Languages like Python and Scala are commonly used in data pipeline development. Developers can take advantage of their open-source packages or even customize their own to make it easier and faster to perform use cases, such as data manipulation and analysis. However, managing standardized packages can be cumbersome with multiple teams using different versions of packages, installing non-approved packages, and causing duplicate development effort due to the lack of visibility of what is available at the enterprise level. This can be especially challenging in large enterprises with multiple data engineering teams.

ETL Developers have requirements to use additional packages for their AWS Glue ETL jobs. With security being job zero for customers, many will restrict egress traffic from their VPC to the public internet, and they need a way to manage the packages used by applications including their data processing pipelines.

Our proposed solution will enable you with network egress restrictions to manage packages centrally with AWS CodeArtifact and use their favorite libraries in their AWS Glue ETL PySpark code. In this post, we’ll describe how CodeArtifact can be used for managing packages and modules for AWS Glue ETL jobs, and we’ll demo a solution using Glue PySpark jobs that run within VPC Subnets that have no internet access.

Solution overview

The solution uses CodeArtifact as a tool to make it easier for organizations of any size to securely store, publish, and share software packages used in their ETL with AWS Glue. VPC Endpoints will be enabled for CodeArtifact and Glue to enable private link connections. AWS Step Functions makes it easy to coordinate the orchestration of components used in the data processing pipeline. Native integrations with both CodeArtifact and AWS Glue enable the workflow to both authenticate the request to CodeArtifact and start the AWS Glue ETL job.

The following architecture shows an implementation of a solution using AWS Glue, CodeArtifact, and Step Functions to use additional Python modules without egress internet access. The solution is deployed using AWS Cloud Development Kit (AWS CDK), an open-source software development framework to define your cloud application resources using familiar programming languages.

Solution Architecture for the blog post

Fig 1: Architecture Diagram for the Solution

To illustrate how to set up this architecture, we’ll walk you through the following steps:

  1. Deploying an AWS CDK stack to provision the following AWS Resources
    1. CodeArtifact
    2. An AWS Glue job
    3. Step Functions workflow
    4. Amazon Simple Storage Service (Amazon S3) bucket
    5. A VPC with a private Subnet and VPC Endpoints to Amazon S3 and CodeArtifact
  2. Validate the Deployment.
  3. Run a Sample Workflow – This workflow will run an AWS Glue PySpark job that uses a custom Python library, and an upgraded version of boto3.
  4. Cleaning up your resources.

Prerequisites

Make sure that you complete the following steps as prerequisites:

The solution

Launching your AWS CDK Stack

Step 1: Using your device’s command line, check out our Git repository to a local directory on your device:

git clone https://github.com/aws-samples/python-lib-management-without-internet-for-aws-glue-in-private-subnets.git

Step 2: Change directories to the new directory Amazon S3 script location:

cd python-lib-management-without-internet-for-aws-glue-in-private-subnets/scripts/s3

Step 3: Download the following CSV, which contains New York City Taxi and Limousine Commission (TLC) Trip weekly trips. This will serve as the input source for the AWS Glue Job:

aws s3 cp s3://nyc-tlc/misc/FOIL_weekly_trips_apps.csv .

Step 4: Change the directories to the path where the app.py file is located (in reference to the previous step, execute the following step):

cd ../..

Step 5: Create a virtual environment:

macOS/Linux:
python3 -m venv .env

Windows:
python -m venv .env

Step 6: Activate the virtual environment after the init process completes and the virtual environment is created:

macOS/Linux:
source .env/bin/activate

Windows:
.env\Scripts\activate.bat

Step 7: Install the required dependencies:

pip3 install -r requirements.txt

Step 8: Make sure that your AWS profile is setup along with the region that you want to deploy as mentioned in the prerequisite. Synthesize the templates. AWS CDK apps use code to define the infrastructure, and when run they produce or “synthesize” a CloudFormation template for each stack defined in the application:

cdk synthesize

Step 9: BootStrap the cdk app using the following command:

cdk bootstrap aws://<AWS_ACCOUNTID>/<AWS_REGION>

Replace the place holder AWS_ACCOUNTID and AWS_REGION with your AWS account ID and the region to be deployed.

This step provisions the initial resources, including an Amazon S3 bucket for storing files and IAM roles that grant permissions needed to perform deployments.

Step 10: Deploy the solution. By default, some actions that could potentially make security changes require approval. In this deployment, you’re creating an IAM role. The following command overrides the approval prompts, but if you would like to manually accept the prompts, then omit the --require-approval never flag:

cdk deploy "*" --require-approval never

While the AWS CDK deploys the CloudFormation stacks, you can follow the deployment progress in your terminal:

AWS CDK Deployment progress in terminal

Fig 2: AWS CDK Deployment progress in terminal

Once the deployment is successful, you’ll see the successful status as follows:

AWS CDK Deployment completion success

Fig 3: AWS CDK Deployment completion success

Step 11: Log in to the AWS Console, go to CloudFormation, and see the output of the ApplicationStack stack:

AWS CloudFormation stack output

Fig 4: AWS CloudFormation stack output

Note the values of the DomainName and RepositoryName variables. We’ll use them in the next step to upload our artifacts

Step 12: We will upload a custom library into the repo that we created. This will be used by our Glue ETL job.

  • Install twine using pip:
python3 -m pip install twine

The custom python package glueutils-0.2.0.tar.gz can be found under this folder of the cloned repo:

cd scripts/custom_glue_library
  • Configure twine with the login command (additional details here ). Refer to step 11 for the DomainName and RepositoryName from the CloudFormation output:
aws codeartifact login --tool twine --domain <DomainName> --domain-owner <AWS_ACCOUNTID> --repository <RepositoryName>
  • Publish Python package assets:
twine upload --repository codeartifact glueutils-0.2.0.tar.gz
Python package publishing using twine

Fig 5: Python package publishing using twine

Validate the Deployment

The AWS CDK stack will deploy the following AWS resources:

  1. Amazon Virtual Private Cloud (Amazon VPC)
    1. One Private Subnet
  2. AWS CodeArtifact
    1. CodeArtifact Repository
    2. CodeArtifact Domain
    3. CodeArtifact Upstream Repository
  3. AWS Glue
    1. AWS Glue Job
    2. AWS Glue Database
    3. AWS Glue Connection
  4. AWS Step Function
  5. Amazon S3 Bucket for AWS CDK and also for storing scripts and CSV file
  6. IAM Roles and Policies
  7. Amazon Elastic Compute Cloud (Amazon EC2) Security Group

Step 1: Browse to the AWS account and region via the AWS Console to which the resources are deployed.

Step 2: Browse the Subnet page (https://<region> .console.aws.amazon.com/vpc/home?region=<region> #subnets:) (*Replace region with actual AWS Region to which your resources are deployed)

Step 3: Select the Subnet with name as ApplicationStack/enterprise-repo-vpc/Enterprise-Repo-Private-Subnet1

Step 4: Select the Route Table and validate that there are no Internet Gateway or NAT Gateway for routes to Internet, and that it’s similar to the following image:

Route table validation

Fig 6: Route table validation

Step 5: Navigate to the CodeArtifact console and review the repositories created. The enterprise-repo is your local repository, and pypi-store is the upstream repository connected to the PyPI, providing artifacts from pypi.org.

AWS CodeArifact repositories created

Fig 7: AWS CodeArifact repositories created

Step 6: Navigate to enterprise-repo and search for glueutils. This is the custom python package that we published.

AWS CodeArifact custom python package published

Fig 8: AWS CodeArifact custom python package published

Step 7: Navigate to Step Functions Console and review the enterprise-repo-step-function as follows:

AWS Step Functions workflow

Fig 9: AWS Step Functions workflow

The diagram shows how the Step Functions workflow will orchestrate the pattern.

  1. The first step CodeArtifactGetAuthorizationToken calls the getAuthorizationToken API to generate a temporary authorization token for accessing repositories in the domain (this token is valid for 15 mins.).
  2. The next step GenerateCodeArtifactURL takes the authorization token from the response and generates the CodeArtifact URL.
  3. Then, this will move into the GlueStartJobRun state, which makes a synchronous API call to run the AWS Glue job.

Step 8: Navigate to the AWS Glue Console and select the Jobs tab, then select enterprise-repo-glue-job.

The AWS Glue job is created with the following script and AWS Glue Connection enterprise-repo-glue-connection. The AWS Glue connection is a Data Catalog object that enables the job to connect to sources and APIs from within the VPC. The network type connection runs the job from within the private subnet to make requests to Amazon S3 and CodeArtifact over the VPC endpoint connection. This enables the job to run without any traffic through the internet.

Note the connections section in the AWS Glue PySpark Job, which makes the Glue job run on the private subnet in the VPC provisioned.

AWS Glue network connections

Fig 10: AWS Glue network connections

The job takes an Amazon S3 bucket, Glue Database, Python Job Installer Option, and Additional Python Modules as job parameters. The parameters --additional-python-modules and --python-modules-installer-option are passed to install the selected Python module from a PyPI repository hosted in AWS CodeArtifact.

The script itself first reads the Amazon S3 input path of the taxi data in the CSV format. A light transformation to sum the total trips by year, week, and app is performed. Then the output is written to an Amazon S3 path as parquet . A partitioned table in the AWS Glue Data Catalog will either be created or updated if it already exists .

You can find the Glue PySpark script here.

Run a sample workflow

The following steps will demonstrate how to run a sample workflow:

Step 1: Navigate to the Step Functions Console and select the enterprise-repo-step-function.

Step 2: Select Start execution and input the following: We’re including the glueutils and latest boto3 libraries as part of the job run. It is always recommended to pin your python dependencies to avoid any breaking change due to a future version of dependency . In the below example, the latest available version of boto3, and the 0.2.0 version of glueutils will be installed. To pin it to a specific release you may add  boto3==1.24.2   (Current latest release at the time of publishing this post).

{"pythonmodules": "boto3,glueutils==0.2.0"}

Step 3: Select Start execution and wait until Execution Status is Succeeded. This may take a few minutes.

Step 4: Navigate to the CodeArtifact Console to review the enterprise-repo repository. You’ll see the cached PyPi packages and all of their dependencies pulled down from PyPi.

Step 5: In the Glue Console under the Runs section of the enterprise-glue-job, you’ll see the parameters passed:

Fig 11 : AWS Glue job execution history

Fig 11 : AWS Glue job execution history

Note the --index-url which was passed as a parameter to the glue ETL job. The token is valid only for 15 minutes.

Step 6: Navigate to the Amazon CloudWatch Console and go to the /aws/glue-jobs log group to verify that the packages were installed from the local repo.

You will see that the 2 package names passed as parameters are installed with the corresponding versions.

Fig 12 : Amazon CloudWatch logs details for the Glue job

Fig 12 : Amazon CloudWatch logs details for the Glue job

Step 7: Navigate to the Amazon Athena console and select Query Editor.

Step 8: Run the following query to validate the output of the AWS Glue job:

SELECT year, app, SUM(total_trips) as sum_of_total_trips 
FROM 
"codeartifactblog_glue_db"."taxidataparquet" 
GROUP BY year, app;

Clean up

Make sure that you clean up all of the other AWS resources that you created in the AWS CDK Stack deployment. You can delete these resources via the AWS CDK Destroy command as follows or the CloudFormation console.

To destroy the resources using AWS CDK, follow these steps:

  1. Follow Steps 1-6 from the ‘Launching your CDK Stack’ section.
  2. Destroy the app by executing the following command:
    cdk destroy

Conclusion

In this post, we demonstrated how CodeArtifact can be used for managing Python packages and modules for AWS Glue jobs that run within VPC Subnets that have no internet access. We also demonstrated how the versions of existing packages can be updated (i.e., boto3) and a custom Python library (glueutils) that is developed locally is also managed through CodeArtifact.

This post enables you to use your favorite Python packages with AWS Glue ETL PySpark jobs by modifying the input to the AWS StepFunctions workflow (Step 2 in the Run a Sample workflow section).


About the Authors

Bret Pontillo is a Data & ML Engineer with AWS Professional Services. He works closely with enterprise customers building data lakes and analytical applications on the AWS platform. In his free time, Bret enjoys traveling, watching sports, and trying new restaurants.

Gaurav Gundal is a DevOps consultant with AWS Professional Services, helping customers build solutions on the customer platform. When not building, designing, or developing solutions, Gaurav spends time with his family, plays guitar, and enjoys traveling to different places.

Ashok Padmanabhan is a Sr. IOT Data Architect with AWS Professional Services, helping customers build data and analytics platform and solutions. When not helping customers build and design data lakes, Ashok enjoys spending time at the beach near his home in Florida.

A new Spark plugin for CPU and memory profiling

Post Syndicated from Bo Xiong original https://aws.amazon.com/blogs/devops/a-new-spark-plugin-for-cpu-and-memory-profiling/

Introduction

Have you ever wondered if there are low-hanging optimization opportunities to improve the performance of a Spark app? Profiling can help you gain visibility regarding the runtime characteristics of the Spark app to identify its bottlenecks and inefficiencies. We’re excited to announce the release of a new Spark plugin that enables profiling for JVM based Spark apps via Amazon CodeGuru. The plugin is open sourced on GitHub and published to Maven.

Walkthrough

This post shows how you can onboard this plugin with two steps in under 10 minutes.

  • Step 1: Create a profiling group in Amazon CodeGuru Profiler and grant permission to your Amazon EMR on EC2 role, so that profiler agents can emit metrics to CodeGuru. Detailed instructions can be found here.
  • Step 2: Reference codeguru-profiler-for-spark when submitting your Spark job, along with PROFILING_CONTEXT and ENABLE_AMAZON_PROFILER defined.

Prerequisites

Your app is built against Spark 3 and run on Amazon EMR release 6.x or newer. It doesn’t matter if you’re using Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) or on Amazon Elastic Kubernetes Service (Amazon EKS).

Illustrative Example

For the purposes of illustration, consider the following example where profiling results are collected by the plugin and emitted to the “CodeGuru-Spark-Demo” profiling group.

spark-submit \
--master yarn \
--deploy-mode cluster \
--class \
--packages software.amazon.profiler:codeguru-profiler-for-spark:1.0 \
--conf spark.plugins=software.amazon.profiler.AmazonProfilerPlugin \
--conf spark.executorEnv.PROFILING_CONTEXT="{\\\"profilingGroupName\\\":\\\"CodeGuru-Spark-Demo\\\"}" \
--conf spark.executorEnv.ENABLE_AMAZON_PROFILER=true \
--conf spark.dynamicAllocation.enabled=false \t

An alternative way to specify PROFILING_CONTEXT and ENABLE_AMAZON_PROFILER is under the yarn-env.export classification for instance groups in the Amazon EMR web console. Note that PROFILING_CONTEXT, if configured in the web console, must escape all of the commas on top of what’s for the above spark-submit command.

[
  {
    "classification": "yarn-env",
    "properties": {},
    "configurations": [
      {
        "classification": "export",
        "properties": {
          "ENABLE_AMAZON_PROFILER": "true",
          "PROFILING_CONTEXT": "{\\\"profilingGroupName\\\":\\\"CodeGuru-Spark-Demo\\\"\\,\\\"driverEnabled\\\":\\\"true\\\"}"
        },
        "configurations": []
      }
    ]
  }
]

Once the job above is launched on Amazon EMR, profiling results should show up in your CodeGuru web console in about 10 minutes, similar to the following screenshot. Internally, it has helped us identify issues, such as thread contentions (revealed by the BLOCKED state in the latency flame graph), and unnecessarily create AWS Java clients (revealed by the CPU Hotspots view).

Go to your profiling group under the Amazon CodeGuru web console. Click the “Visualize CPU” button to render a flame graph displaying CPU usage. Switch to the latency view to identify latency bottlenecks, and switch to the heap summary view to identify objects consuming most memory.

Troubleshooting

To help with troubleshooting, use a sample Spark app provided in the plugin to check if everything is set up correctly. Note that the profilingGroupName value specified in PROFILING_CONTEXT should match what’s created in CodeGuru.

spark-submit \
--master yarn \
--deploy-mode cluster \
--class software.amazon.profiler.SampleSparkApp \
--packages software.amazon.profiler:codeguru-profiler-for-spark:1.0 \
--conf spark.plugins=software.amazon.profiler.AmazonProfilerPlugin \
--conf spark.executorEnv.PROFILING_CONTEXT="{\\\"profilingGroupName\\\":\\\"CodeGuru-Spark-Demo\\\"}" \
--conf spark.executorEnv.ENABLE_AMAZON_PROFILER=true \
--conf spark.yarn.appMasterEnv.PROFILING_CONTEXT="{\\\"profilingGroupName\\\":\\\"CodeGuru-Spark-Demo\\\",\\\"driverEnabled\\\":\\\"true\\\"}" \
--conf spark.yarn.appMasterEnv.ENABLE_AMAZON_PROFILER=true \
--conf spark.dynamicAllocation.enabled=false \
/usr/lib/hadoop-yarn/hadoop-yarn-server-tests.jar

Running the command above from the master node of your EMR cluster should produce logs similar to the following:

21/11/21 21:27:21 INFO Profiler: Starting the profiler : ProfilerParameters{profilingGroupName='CodeGuru-Spark-Demo', threadSupport=BasicThreadSupport (default), excludedThreads=[Signal Dispatcher, Attach Listener], shouldProfile=true, integrationMode='', memoryUsageLimit=104857600, heapSummaryEnabled=true, stackDepthLimit=1000, samplingInterval=PT1S, reportingInterval=PT5M, addProfilerOverheadAsSamples=true, minimumTimeForReporting=PT1M, dontReportIfSampledLessThanTimes=1}
21/11/21 21:27:21 INFO ProfilingCommandExecutor: Profiling scheduled, sampling rate is PT1S
...
21/11/21 21:27:23 INFO ProfilingCommand: New agent configuration received : AgentConfiguration(AgentParameters={MaxStackDepth=1000, MinimumTimeForReportingInMilliseconds=60000, SamplingIntervalInMilliseconds=1000, MemoryUsageLimitPercent=10, ReportingIntervalInMilliseconds=300000}, PeriodInSeconds=300, ShouldProfile=true)
21/11/21 21:32:23 INFO ProfilingCommand: Attempting to report profile data: start=2021-11-21T21:27:23.227Z end=2021-11-21T21:32:22.765Z force=false memoryRefresh=false numberOfTimesSampled=300
21/11/21 21:32:23 INFO javaClass: [HeapSummary] Processed 20 events.
21/11/21 21:32:24 INFO ProfilingCommand: Successfully reported profile

Note that the CodeGuru Profiler agent uses a reporting interval of five minutes. Therefore, any executor process shorter than five minutes won’t be reflected by the profiling result. If the right profiling group is not specified, or it’s associated with a wrong EC2 role in CodeGuru, then the log will show a message similar to “CodeGuruProfilerSDKClient: Exception while calling agent orchestration” along with a stack trace including a 403 status code. To rule out any network issues (e.g., your EMR job running in a VPC without an outbound gateway or a misconfigured outbound security group), then you can remote into an EMR host and ping the CodeGuru endpoint in your Region (e.g., ping codeguru-profiler.us-east-1.amazonaws.com).

Cleaning up

To avoid incurring future charges, you can delete the profiling group configured in CodeGuru and/or set the ENABLE_AMAZON_PROFILER environment variable to false.

Conclusion

In this post, we describe how to onboard this plugin with two steps. Consider to give it a try for your Spark app? You can find the Maven artifacts here. If you have feature requests, bug reports, feedback of any kind, or would like to contribute, please head over to the GitHub repository.

Author:

Bo Xiong

Bo Xiong is a software engineer with Amazon Ads, leveraging big data technologies to process petabytes of data for billing and reporting. His main interests include performance tuning and optimization for Spark on Amazon EMR, and data mining for actionable business insights.

Open-sourcing Polynote: an IDE-inspired polyglot notebook

Post Syndicated from Netflix Technology Blog original https://medium.com/netflix-techblog/open-sourcing-polynote-an-ide-inspired-polyglot-notebook-7f929d3f447?source=rss----2615bd06b42e---4

Jeremy Smith, Jonathan Indig, Faisal Siddiqi

We are pleased to announce the open-source launch of Polynote: a new, polyglot notebook with first-class Scala support, Apache Spark integration, multi-language interoperability including Scala, Python, and SQL, as-you-type autocomplete, and more.

Polynote provides data scientists and machine learning researchers with a notebook environment that allows them the freedom to seamlessly integrate our JVM-based ML platform — which makes heavy use of Scala — with the Python ecosystem’s popular machine learning and visualization libraries. It has seen substantial adoption among Netflix’s personalization and recommendation teams, and it is now being integrated with the rest of our research platform.

At Netflix, we have always felt strongly about sharing with the open source community, and believe that Polynote has a great potential to address similar needs outside of Netflix.

Feature Overview

Reproducibility

Polynote promotes notebook reproducibility by design. By taking a cell’s position in the notebook into account when executing it, Polynote helps prevent bad practices that make notebooks difficult to re-run from the top.

Editing Improvements

Polynote provides IDE-like features such as interactive autocomplete and parameter hints, in-line error highlighting, and a rich text editor with LaTeX support.

Visibility

The Polynote UI provides at-a-glance insights into the state of the kernel by showing kernel status, highlighting currently-running cell code, and showing currently executing tasks.

Polyglot

Each cell in a notebook can be written in a different language with variables shared between them. Currently Scala, Python, and SQL cell types are supported.

Dependency and Configuration Management

Polynote provides configuration and dependency setup saved within the notebook itself, and helps solve some of the dependency problems commonly experienced by Spark developers.

Data Visualization

Native data exploration and visualization helps users learn more about their data without cluttering their notebooks. Integration with matplotlib and Vega allows power users to communicate with others through beautiful visualizations

Reimagining the Scala notebook experience

On the Netflix Personalization Infrastructure team, our job is to accelerate machine learning innovation by building tools that can remove pain points and allow researchers to focus on research. Polynote originated from a frustration with the shortcomings of existing notebook tools, especially with respect to their support of Scala.

For example, while Python developers are used to working inside an environment constructed using a package manager with a relatively small number of dependencies, Scala developers typically work in a project-based environment with a build tool managing hundreds of (often) conflicting dependencies. With Spark, developers are working in a cluster computing environment where it is imperative that their distributed code runs in a consistent environment no matter which node is being used. Finally, we found that our users were also frustrated with the code editing experience within notebooks, especially those accustomed to using IntelliJ IDEA or Eclipse.

Some problems are unique to the notebook experience. A notebook execution is a record of a particular piece of code, run at a particular point in time, in a particular environment. This combination of code, data and execution results into a single document makes notebooks powerful, but also difficult to reproduce. Indeed, the scientific computing community has documented some notebook reproducibility concerns as well as some best practices for reproducible notebooks.

Finally, another problem that might be unique to the ML space is the need for polyglot support. Machine learning researchers often work in multiple programming languages — for example, researchers might use Scala and Spark to generate training data (cleaning, subsampling, etc), while actual training might be done with popular Python ML libraries like tensorflow or scikit-learn.

Next, we’ll go through a deeper dive of Polynote’s features.

Reproducible by Design

Two of Polynote’s guiding principles are reproducibility and visibility. To further these goals, one of our earliest design decisions was to build Polynote’s code interpretation from scratch, rather than relying on a REPL like a traditional notebook.

We feel that while REPLs are great in general, they are fundamentally unfit for the notebook model. In order to understand the problems with REPLs and notebooks, let’s take a look at the design of a typical notebook environment.

A notebook is an ordered collection of cells, each of which can hold code or text. The contents of each cell can be modified and executed independently. Cells can be rearranged, inserted, and deleted. They can also depend on the output of other cells in the notebook.

Contrast this with a REPL environment. In a REPL session, a user inputs expressions into the prompt one at a time. Once evaluated, expressions and the results of their evaluation are immutable. Evaluation results are appended to the global state available to the next expression.

Unfortunately, the disconnect between these two models means that a typical notebook environment, which uses a REPL session to evaluate cell code, causes hidden state to accrue as users interact with the notebook. Cells can be executed in any order, mutating this global hidden state that in turn affects the execution of other cells. More often than not, notebooks are unable to be reliably rerun from the top, which makes them very difficult to reproduce and share with others. The hidden state also makes it difficult for users to reason about what’s going on in the notebook.

In other notebooks, hidden state means that a variable is still available after its cell is deleted.
In a Polynote notebook, there is no hidden state. A deleted cell’s variables are no longer available.

Writing Polynote’s code interpretation from scratch allowed us to do away with this global, mutable state. By keeping track of the variables defined in each cell, Polynote constructs the input state for a given cell based on the cells that have run above it. Making the position of a cell important in its execution semantics enforces the principle of least surprise, allowing users to read the notebook from top to bottom. It ensures reproducibility by making it far more likely that running the notebook sequentially will work.

Better editing

Let’s face it — for someone used to IDEs, writing a nontrivial amount of code in a notebook can feel like going back in time a few decades. We’ve seen users who prefer to write code in an IDE instead, and paste it into the notebook to run. While it’s not our goal to provide all the features of a full-fledged modern IDE, there are a few quality-of-life code editing enhancements that go a long way toward improving usability.

Code editing in Polynote integrates with the Monaco editor for interactive auto-complete.
Polynote highlights errors inside the code to help users quickly figure out what’s gone wrong.
Polynote provides a rich text editor for text cells.
The rich text editor allows users to easily insert LaTeX equations.

Visibility

As we mentioned earlier, visibility is one of Polynote’s guiding principles. We want it to be easy to see what the kernel is doing at any given time, without needing to dive into logs. To that end, Polynote provides a variety of UI treatments that let users know what’s going on.

Here’s a snapshot of Polynote in the midst of some code execution.

There’s quite a bit of information available to the user from a single glance at this UI. First, it is clear from both the notebook view and task list that Cell 1 is currently running. We can also see that Cells 2 through 4 are queued to be run, in that order.

We can also see the exact statement currently being run is highlighted in blue — the line defining the value `sumOfRandomNumbers`. Finally, since evaluating that statement launches a Spark job, we can also see job- and stage-level Spark progress information in the task list..

Here’s an animation of that execution so we can see how Polynote makes it easy to follow along with the state of the kernel.

Executing a Polynote notebook

The symbol table provides insight into the notebook internal state. When a cell is selected, the symbol table shows any values that resulted from the current cell’s execution above a black line, and any values available to the cell (from previous cells) below the line. At the end of the animation, we show the symbol table updating as we click on each cell in turn.

Finally, the kernel status area provides information about the execution status of the kernel. Below, we show a closeup view of how the kernel status changes from idle and connected, in green, to busy, in yellow. Other states include disconnected, in gray, and dead or not started, in red.

Kernel status changing from green (idle and connected) to yellow (busy)

Polyglot

You may have noticed in the screenshots shown earlier that each cell has a language dropdown in its toolbar. That’s because Polynote supports truly polyglot notebooks, where each cell can be written in a different language!

When a cell is run, the kernel provides the available typed input values to the cell’s language interpreter. In turn, the interpreter provides the resulting typed output values back to the kernel. This allows cells in Polynote notebooks to operate within the same context, and use the same shared state, regardless of which language they are defined in — so users can pick the best tool for the job at hand.

Here’s an example using scikit-learn, a Python library, to compute an isotonic regression of a dataset generated with Scala. This code is adapted from the Isotonic Regression example on the scikit-learn website.

A polyglot example showing data generation in Scala and data analysis in Python

As this example shows, Polynote enables users to fluently move from one language to another within the same notebook.

Dependency and Configuration Management

In order to better facilitate reproducibility, Polynote stores configuration and dependency information directly in the notebook itself, rather than relying on external files or a cluster/server level configuration. We found that managing dependencies directly in the notebook code was clunky and could be confusing to users. Instead, Polynote provides a user-friendly Configuration section where users can set dependencies for each notebook.

Polynote’s Configuration UI, providing user-friendly, notebook-level configuration and dependency management

With this configuration, Polynote constructs an environment for the notebook. It fetches the dependencies locally (using Coursier or pip to fetch them from a repository) and loads the Scala dependencies into an isolated ClassLoader to reduce the chances of a class conflict with Spark libraries. Python dependencies are loaded into an isolated virtualenv. When Polynote is used in Spark mode, it creates a Spark Session for the notebook which uses the provided configuration. The Python and Scala dependencies are automatically added to the Spark Session.

Data Visualization

One of the most important use cases of notebooks is the ability to explore and visualize data. Polynote integrates with two of the most popular open source visualization libraries, Vega and Matplotlib.

While matplotlib integration is quite standard among notebooks, Polynote also has native support for data exploration — including a data schema view, table inspector, plot constructor and Vega support.

We’ll walk through a quick example of some data analysis and exploration using the tools mentioned above, using the Wine Reviews dataset from Kaggle. First, here’s a quick example of just loading the data in Spark, seeing the Schema, plotting it and saving that plot in the notebook.

Example of data exploration using the plot constructor

Let’s focus on some of what we’re seeing here.

View of the quick inspector, showing the DataFrame’s schema. The blue arrow points to the quick access buttons to the table view (left) and plot view (right)

If the last statement of a cell is an expression, it gets assigned to the cell’s Out variable. Polynote will display a representation of the result in a fashion determined by its data type. If it’s a table-like data type, such as a DataFrame or collection of case classes, Polynote shows the quick inspector, allowing users to see schema and type information at a glance.

The quick inspector also provides two buttons that bring up the full data inspector — the button on the left brings up the table view, while the button on the right brings up the plot constructor. The animation also shows the plot constructor and how users can drag and drop measures and dimensions to create different plots.

We also show how to save a plot to the notebook as its own cell. Because Polynote natively supports Vega specs, saving the plot simply inserts a new Vega cell with a generated spec. As with any other language, Vega specs can leverage polyglot support to refer to values from previous cells. In this case, we’re using the Out value (a DataFrame) and performing additional aggregations on it. This enables efficient plotting without having to bring millions of data points to the client. Polynote’s Vega spec language provides an API for aggregating and otherwise modifying table-like data streams.

A Vega cell generated by the plot constructor, showing its spec

Vega cells don’t need to be authored using the plot constructor — any Vega spec can be put into a Vega cell and plotted directly, as seen below.

Vega’s Stacked Area Chart Example displayed in Polynote

In addition to the cell result value, any variable in the symbol table can be inspected with a click.

Inspecting a variable in the symbol table

The road ahead

We have described some of the key features of Polynote here. We’re proud to share Polynote widely by open sourcing it, and we’d love to hear your feedback. Take it for a spin today by heading over to our website or directly to the code and let us know what you think! Take a look at our currently open issues and to see what we’re planning, and, of course, PRs are always welcome! Polynote is still very much in its infancy, so you may encounter some rough edges. It is also a powerful tool that enables arbitrary code execution (“with great power, comes great responsibility”), so please be cognizant of this when you use it in your environment.

Plenty of exciting work lies ahead. We are very optimistic about the potential of Polynote and we hope to learn from the community just as much as we hope they will find value from Polynote. If you are interested in working on Polynote or other Machine Learning research, engineering and infrastructure problems, check out the Netflix Research site as well as some of the current openings.

Acknowledgements

Many colleagues at Netflix helped us in the early stages of Polynote’s development. We would like to express our tremendous gratitude to Aish Fenton, Hua Jiang, Kedar Sadekar, Devesh Parekh, Christopher Alvino, and many others who provided thoughtful feedback along their journey as early adopters of Polynote.


Open-sourcing Polynote: an IDE-inspired polyglot notebook was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

EMR Notebooks: A managed analytics environment based on Jupyter notebooks

Post Syndicated from Vignesh Rajamani original https://aws.amazon.com/blogs/big-data/emr-notebooks-a-managed-analytics-environment-based-on-jupyter-notebooks/

Notebooks are increasingly becoming the standard tool for interactively developing big data applications. It’s easy to see why. Their flexible architecture allows you to experiment with data in multiple languages, test code interactively, and visualize large datasets. To help scientists and developers easily access notebook tools, we launched Amazon EMR Notebooks, a managed notebook environment that is based on the popular open-source Jupyter notebook application. EMR Notebooks support Spark Magic kernels, which allows you to submit jobs remotely on your EMR cluster using languages like PySpark, Spark SQL, Spark R, and Scala. The kernels submit your Spark code through Apache Livy, which is a REST server for Spark running on your cluster.

EMR Notebooks is designed to make it easy for you to experiment and build applications with Apache Spark. In this blog post, I first cover some of the benefits that EMR Notebooks offers. Then I introduce you to some of its capabilities such as detaching and attaching a notebook to different EMR clusters, monitoring Spark activity from within the notebook, using tags to control user permissions, and setting up user-impersonation to track notebook users and their actions. To learn about creating and using EMR Notebooks, you can visit Using Amazon EMR Notebooks or follow along with the AWS Online Tech Talks webinar.

Benefits of EMR Notebooks

One of the useful features of EMR Notebooks is the separation of the notebook environment from your underlying cluster infrastructure. The separation makes it easy for you to execute notebook code against transient clusters without worrying about deploying or configuring your notebook infrastructure every time you bring up a new cluster. You can create multiple serverless notebooks from the AWS Management Console for EMR and access the notebook UI without spending time setting up SSH access or configuring your browser for port-forwarding. Each notebook you create is launched instantly with its own Spark context. This capability enables you to attach multiple notebooks to a single shared cluster and submit parallel jobs without fear of job conflicts in a multi-tenant environment. This way you make efficient use of your clusters.

You can also connect EMR Notebooks to an EMR cluster as small as a one node. This gives you a budget-friendly sandbox environment to develop your Spark application.

Finally, with EMR Notebooks, you don’t have to spend time to manually configure your notebook to store files persistently. Your notebook files are saved automatically to a chosen Amazon S3 bucket periodically so you don’t have to worry about losing your work if your cluster is shut down. You can retrieve your saved notebooks from the console or download it locally from your S3 bucket in Jupyter “ipynb” format.

Detaching and attaching EMR Notebooks to different clusters

With EMR Notebooks, you can detach an active notebook from a cluster and attach it to a different cluster and promptly resume your work. This capability can be useful in scenarios where you want to move your notebook from a sandbox development cluster to a production environment or attach to a different cluster with appropriate CPU or memory resources and library packages required to execute your notebook against large datasets. To detach an active notebook:

First select the notebook name and then choose Stop.

Wait for the notebook status shown next to the notebook name to change from Ready to Stopped and then choose Change Cluster.

After you stop the notebook, you can choose to attach it to a different cluster in the same VPC or create a new cluster. EMR Notebooks automatically attaches the notebook to the cluster and re-starts the notebook.

Monitoring and debugging Spark jobs

EMR Notebooks supports a built-in Jupyter notebook widget called SparkMonitor that allows you to monitor the status of all your Spark jobs launched from the notebook without connecting to the Spark web UI server.

A widget appears and automatically integrates within the cell structure of your notebook and displays detailed status about the job submitted from each cell of your notebook, providing you with real-time progress of the different stages of the job. For any failed jobs, this widget also offers embedded links to the container logs in Amazon S3, allowing you to get to the relevant logs and debug your jobs.

Additionally, if you have configured your cluster to accept SSH connections, then you can access the Spark application web UI and Hadoop jobs history server from within the notebook. This allows you to view the event timelines, visualize Directed Acyclic Graphs (DAG) of each job, and view the detailed system and runtime information to inspect and debug your code. These web UIs are available automatically the first time you start to run your Spark code from a notebook.

Writing tag-based policies to control user permissions for notebooks and users

EMR Notebooks are by default shared resources that anyone from your organization with access to your AWS account can open, edit, or even delete. If you want more control over your notebook, you can use tags to label your notebook and write IAM policies that control access for other users. To get you started, when you create a notebook, a default tag with a key string, creatorUserId, is set to the value of the IAM User ID of the user who created the notebook.

You can use this tag to limit allowed actions on the notebook to only the creator of the notebook. For example, the permissions policy statement below, when attached to a role or user, enables the IAM user to view, start, stop, edit, or delete only those notebooks that they have created. This policy statement uses the default tag that is applied by EMR Notebooks when you create the notebook.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:DescribeEditor",
                "elasticmapreduce:StartEditor",
                "elasticmapreduce:StopEditor",
                "elasticmapreduce:DeleteEditor",
                "elasticmapreduce:OpenEditorInConsole"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/creatorUserId": "${aws:userId}"
                }
            }
        }
    ]

You can write policies that enforce the creation of tags before starting a notebook. For example, the policy below requires that the user not change or delete the creatorUserID tag that is added by default. The variable ${aws:userId}, specifies the currently active user’s User ID, which is the default value of the tag.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:CreateEditor"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:RequestTag/creatorUserId": "${aws:userid}"
                }
            }
        }
    ]
}

You can use the notebook tags along with EMR cluster tags to control notebook user access to your cluster. Tagging your notebook and clusters, in addition to securing your resources, also allows you to categorize, track, and allocate your EMR cluster costs across your different line of businesses. For example, the policy below allows a user to create a notebook only if the notebook has a tag with a key string “department” with its value set to “Analytics” and only if the notebook is attached to the EMR cluster that has a tag with the key string “cost-center” and value set to “12345.”

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:editor/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/department": [
                        "Analytics"
                    ]
                }
            }
        },
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:cluster/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/cost-center": [
                        "12345"
                    ]
                }
            }
        }
    ]
}

You can learn more about notebook and cluster tags by visiting Using Tags to Control User Permissions and Tag Clusters. Also, visit Using Cost Allocation Tags to learn more using tags to generate cost allocation report in the AWS Billing and Cost Management Console.

Tracking notebook users by enforcing user-impersonation

EMR Notebooks enables multiple users to execute their notebooks’ code concurrently in a shared EMR cluster, improving cluster utilization. By default, all Spark jobs spawned by these different users from their notebook run as the same user (the livy user) on your EMR cluster. If your corporate policy requires you to set up an audit trail and track individual notebook user actions on shared clusters, you can use the user-impersonation feature of EMR Notebooks. This capability allows you to discriminate and audit all notebook users by associating the jobs they executed from their notebook with the user’s IAM identity. To use this feature, you should enable Livy user-impersonation by configuring the core-site and livy-conf configuration classifications when you create and launch an EMR cluster as follows:

[
    {
        "Classification": "core-site",
        "Properties": {
          "hadoop.proxyuser.livy.groups": "*",
          "hadoop.proxyuser.livy.hosts": "*"
        }
    },
    {
        "Classification": "livy-conf",
        "Properties": {
          "livy.impersonation.enabled": "true"
        }
    }
]

Visit Configuring Applications to learn more about configuring application. After this feature is enabled, EMR Notebooks creates HDFS user directories on the master node for each user identity. This means all the Spark jobs from the notebook run as the IAM user instead of the indistinct user livy. For example, if user NB_User1 runs code from the notebook editor, then a user directory named user_NB_User1 is created on the master node and all Spark jobs run as user_NB_User1. You can then use a service like AWS CloudTrail to audit the record of actions by the user NbUser1 by creating a trail. To learn more about setting up audit trails, see logging Amazon EMR API calls in AWS CloudTrail.

Conclusion

In this post, I highlighted some of the capabilities of EMR Notebooks such as the ability to change clusters, monitor Spark jobs from each notebook cell, control user permission, and categorize resource costs. You can do this by using notebook and cluster tags and setting up user-impersonation to track notebook user actions.

Oh by the way, there is no additional charge for using EMR Notebooks and you only pay for the use of the EMR cluster as-usual!

If you have questions or suggestions, feel free to leave a comment.

 


About the Authors

Vignesh Rajamani is a senior product manager for EMR at AWS.

 

 

 

 

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 

 

Best practices for successfully managing memory for Apache Spark applications on Amazon EMR

Post Syndicated from Karunanithi Shanmugam original https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

In the world of big data, a common use case is performing extract, transform (ET) and data analytics on huge amounts of data from a variety of data sources. Often, you then analyze the data to get insights. One of the most popular cloud-based solutions to process such vast amounts of data is Amazon EMR.

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS. Amazon EMR enables organizations to spin up a cluster with multiple instances in a matter of few minutes. It also enables you to process various data engineering and business intelligence workloads through parallel processing. By doing this, to a great extent you can reduce the data processing times, effort, and costs involved in establishing and scaling a cluster.

Apache Spark is a cluster-computing software framework that is open-source, fast, and general-purpose. It is widely used in distributed processing of big data. Apache Spark relies heavily on cluster memory (RAM) as it performs parallel computing in memory across nodes to reduce the I/O and execution times of tasks.

Generally, you perform the following steps when running a Spark application on Amazon EMR:

  1. Upload the Spark application package to Amazon S3.
  2. Configure and launch the Amazon EMR cluster with configured Apache Spark.
  3. Install the application package from Amazon S3 onto the cluster and then run the application.
  4. Terminate the cluster after the application is completed.

It’s important to configure the Spark application appropriately based on data and processing requirements for it to be successful. With default settings, Spark might not use all the available resources of the cluster and might end up with physical or virtual memory issues, or both. There are thousands of questions raised in stackoverflow.com related to this specific topic.

This blog post is intended to assist you by detailing best practices to prevent memory-related issues with Apache Spark on Amazon EMR.

Common memory issues in Spark applications with default or improper configurations

Listed following are a few sample out-of-memory errors that can occur in a Spark application with default or improper configurations.

Out of Memory Error, Java Heap Space

WARN TaskSetManager: Loss was due to 
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space

Out of Memory Error, Exceeding Physical Memory

Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
12.4 GB of 12.3 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
4.5GB of 3GB physical memory used limits.
Consider boosting spark.yarn.executor.memoryOverhead.

Out of Memory Error, Exceeding Virtual Memory

Container killed by YARN for exceeding memory limits.
1.1gb of 1.0gb virtual memory used. Killing container.

Out of Memory Error, Exceeding Executor Memory

Required executor memory (1024+384 MB) is above 
the max threshold (896 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb'
and/or 'yarn.nodemanager.resource.memory-mb

These issues occur for various reasons, some of which are listed following:

  1. When the number of Spark executor instances, the amount of executor memory, the number of cores, or parallelism is not set appropriately to handle large volumes of data.
  2. When the Spark executor’s physical memory exceeds the memory allocated by YARN. In this case, the total of Spark executor instance memory plus memory overhead is not enough to handle memory-intensive operations. Memory-intensive operations include caching, shuffling, and aggregating (using reduceByKey, groupBy, and so on). Or, in some cases, the total of Spark executor instance memory plus memory overhead can be more than what is defined in yarn.scheduler.maximum-allocation-mb.
  3. The memory required to perform system operations such as garbage collection is not available in the Spark executor instance.

In the following sections, I discuss how to properly configure to prevent out-of-memory issues, including but not limited to those preceding.

Configuring for a successful Spark application on Amazon EMR

The following steps can help you configure a successful Spark application on Amazon EMR.

1. Determine the type and number of instances based on application needs

Amazon EMR has three types of nodes:

  1. Master: An EMR cluster has one master, which acts as the resource manager and manages the cluster and tasks.
  2. Core: The core nodes are managed by the master node. Core nodes run YARN NodeManager daemons, Hadoop MapReduce tasks, and Spark executors to manage storage, execute tasks, and send a heartbeat to the master.
  3. Task: The optional task-only nodes perform tasks and don’t store any data, in contrast to core nodes.

Best practice 1: Choose the right type of instance for each of the node types in an Amazon EMR cluster. Doing this is one key to success in running any Spark application on Amazon EMR.

There are numerous instance types offered by AWS with varying ranges of vCPUs, storage, and memory, as described in the Amazon EMR documentation. Based on whether an application is compute-intensive or memory-intensive, you can choose the right instance type with the right compute and memory configuration.

For memory-intensive applications, prefer R type instances over the other instance types. For compute-intensive applications, prefer C type instances. For applications balanced between memory and compute, prefer M type general-purpose instances.

To understand the possible use cases for each instance type offered by AWS, see Amazon EC2 Instance Types on the EC2 service website.

After deciding the instance type, determine the number of instances for each of the node types. You do this based on the size of the input datasets, application execution times, and frequency requirements.

2. Determine the Spark configuration parameters

Before we dive into the details on Spark configuration, let’s get an overview of how the executor container memory is organized using the diagram following.

As the preceding diagram shows, the executor container has multiple memory compartments. Of these, only one (execution memory) is actually used for executing the tasks. These compartments should be properly configured for running the tasks efficiently and without failure.

Calculate and set the following Spark configuration parameters carefully for the Spark application to run successfully:

  • spark.executor.memory – Size of memory to use for each executor that runs the task.
  • spark.executor.cores – Number of virtual cores.
  • spark.driver.memory – Size of memory to use for the driver.
  • spark.driver.cores – Number of virtual cores to use for the driver.
  • spark.executor.instances ­– Number of executors. Set this parameter unless spark.dynamicAllocation.enabled is set to true.
  • spark.default.parallelism – Default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join, reduceByKey, and parallelize when no partition number is set by the user.

Amazon EMR provides high-level information on how it sets the default values for Spark parameters in the release guide. These values are automatically set in the spark-defaults settings based on the core and task instance types in the cluster.

To use all the resources available in a cluster, set the maximizeResourceAllocation parameter to true. This EMR-specific option calculates the maximum compute and memory resources available for an executor on an instance in the core instance group. It then sets these parameters in the spark-defaults settings. Even with this setting, generally the default numbers are low and the application doesn’t use the full strength of the cluster. For example, the default for spark.default.parallelism is only 2 x the number of virtual cores available, though parallelism can be higher for a large cluster.

Spark on YARN can dynamically scale the number of executors used for a Spark application based on the workloads. Using Amazon EMR release version 4.4.0 and later, dynamic allocation is enabled by default (as described in the Spark documentation).

The problem with the spark.dynamicAllocation.enabled property is that it requires you to set subproperties. Some example subproperties are spark.dynamicAllocation.initialExecutors, minExecutors, and maxExecutors. Subproperties are required for most cases to use the right number of executors in a cluster for an application, especially when you need multiple applications to run simultaneously. Setting subproperties requires a lot of trial and error to get the numbers right. If they’re not right, the capacity might be reserved but never actually used. This leads to wastage of resources or memory errors for other applications.

Best practice 2: Set spark.dynamicAllocation.enabled to true only if the numbers are properly determined for spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutors parameters. Otherwise, set spark.dynamicAllocation.enabled to false and control the driver memory, executor memory, and CPU parameters yourself. To do this, calculate and set these properties manually for each application (see the example following).

Let’s assume that we are going to process 200 terabytes of data spread across thousands of file stores in Amazon S3. Further, let’s assume that we do this through an Amazon EMR cluster with 1 r5.12xlarge master node and 19 r5.12xlarge core nodes. Each r5.12xlarge instance has 48 virtual cores (vCPUs) and 384 GB RAM. All these calculations are for the --deploy-mode cluster, which we recommend for production use.

The following list describes how to set some important Spark properties, using the preceding case as an example.

spark.executor.cores

Assigning executors with a large number of virtual cores leads to a low number of executors and reduced parallelism. Assigning a low number of virtual cores leads to a high number of executors, causing a larger amount of I/O operations. Based on historical data, we suggest that you have five virtual cores for each executor to achieve optimal results in any sized cluster.

For the preceding cluster, the property spark.executor.cores should be assigned as follows: spark.executors.cores = 5 (vCPU)

spark.executor.memory

After you decide on the number of virtual cores per executor, calculating this property is much simpler. First, get the number of executors per instance using total number of virtual cores and executor virtual cores. Subtract one virtual core from the total number of virtual cores to reserve it for the Hadoop daemons.

Number of executors per instance = (total number of virtual cores per instance - 1)/ spark.executors.cores

Number of executors per instance = (48 - 1)/ 5 = 47 / 5 = 9 (rounded down)

Then, get the total executor memory by using the total RAM per instance and number of executors per instance. Leave 1 GB for the Hadoop daemons.

Total executor memory = total RAM per instance / number of executors per instance
Total executor memory = 383 / 9 = 42 (rounded down)

This total executor memory includes the executor memory and overhead (spark.yarn.executor.memoryOverhead). Assign 10 percent from this total executor memory to the memory overhead and the remaining 90 percent to executor memory.

spark.executors.memory = total executor memory * 0.90
spark.executors.memory = 42 * 0.9 = 37 (rounded down)

spark.yarn.executor.memoryOverhead = total executor memory * 0.10
spark.yarn.executor.memoryOverhead = 42 * 0.1 = 5 (rounded up)

spark.driver.memory

We recommend setting this to equal spark.executors.memory.

spark.driver.memory = spark.executors.memory

spark.driver.cores

We recommend setting this to equal spark.executors.cores.

spark.driver.cores= spark.executors.cores.

spark.executor.instances

Calculate this by multiplying the number of executors and total number of instances. Leave one executor for the driver.

spark.executor.instances = (number of executors per instance * number of core instances) minus 1 for the driver

spark.executor.instances = (9 * 19) - 1 = 170

spark.default.parallelism

Set this property using the following formula.

spark.default.parallelism = spark.executor.instances * spark.executors.cores * 2

spark.default.parallelism = 170 * 5 * 2 = 1,700

Warning: Although this calculation gives partitions of 1,700, we recommend that you estimate the size of each partition and adjust this number accordingly by using coalesce or repartition.

In case of dataframes, configure the parameter spark.sql.shuffle.partitions along with spark.default.parallelism.

Though the preceding parameters are critical for any Spark application, the following parameters also help in running the applications smoothly to avoid other timeout and memory-related errors. We advise that you set these in the spark-defaults configuration file.

  • spark.network.timeout – Timeout for all network transactions.
  • spark.executor.heartbeatInterval – Interval between each executor’s heartbeats to the driver. This value should be significantly less than spark.network.timeout.
  • spark.memory.fraction – Fraction of JVM heap space used for Spark execution and storage. The lower this is, the more frequently spills and cached data eviction occur.
  • spark.memory.storageFraction – Expressed as a fraction of the size of the region set aside by spark.memory.fraction. The higher this is, the less working memory might be available to execution. This means that tasks might spill to disk more often.
  • spark.yarn.scheduler.reporterThread.maxFailures – Maximum number executor failures allowed before YARN can fail the application.
  • spark.rdd.compress – When set to true, this property can save substantial space at the cost of some extra CPU time by compressing the RDDs.
  • spark.shuffle.compress – When set to true, this property compresses the map output to save space.
  • spark.shuffle.spill.compress – When set to true, this property compresses the data spilled during shuffles.
  • spark.sql.shuffle.partitions – Sets the number of partitions for joins and aggregations.
  • spark.serializer – Sets the serializer to serialize or deserialize data. As a serializer, I prefer Kyro (org.apache.spark.serializer.KryoSerializer), which is faster and more compact than the Java default serializer.

To understand more about each of the parameters mentioned preceding, see the Spark documentation.

We recommend you consider these additional programming techniques for efficient Spark processing:

  • coalesce – Reduces the number of partitions to allow for less data movement.
  • repartition – Reduces or increases the number of partitions and performs full shuffle of data as opposed to coalesce.
  • partitionBy – Distributes data horizontally across partitions.
  • bucketBy – Decomposes data into more manageable parts (buckets) based on hashed columns.
  • cache/persist – Pulls datasets into a clusterwide in-memory cache. Doing this is useful when data is accessed repeatedly, such as when querying a small lookup dataset or when running an iterative algorithm.

Best practice 3: Carefully calculate the preceding additional properties based on application requirements. Set these properties appropriately in spark-defaults, when submitting a Spark application (spark-submit), or within a SparkConf object.

3. Implement a proper garbage collector to clear memory effectively

Garbage collection can lead to out-of-memory errors in certain cases. These include cases when there are multiple large RDDs in the application. Other cases occur when there is an interference between the task execution memory and RDD cached memory.

You can use multiple garbage collectors to evict the old objects and place the new ones into the memory. However, the latest Garbage First Garbage Collector (G1GC) overcomes the latency and throughput limitations with the old garbage collectors.

Best practice 4: Always set up a garbage collector when handling large volume of data through Spark.

The parameter -XX:+UseG1GC specifies that the G1GC garbage collector should be used. (The default is -XX:+UseParallelGC.) To understand the frequency and execution time of the garbage collection, use the parameters -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps. To initiate garbage collection sooner, set InitiatingHeapOccupancyPercent to 35 (the default is 0.45). Doing this helps avoid potential garbage collection for the total memory, which can take a significant amount of time. An example follows.

"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",

4. Set the YARN configuration parameters

Even if all the Spark configuration properties are calculated and set correctly, virtual out-of-memory errors can still occur rarely as virtual memory is bumped up aggressively by the OS. To prevent these application failures, set the following flags in the YARN site settings.

Best practice 5: Always set the virtual and physical memory check flag to false.

"yarn.nodemanager.vmem-check-enabled":"false",
"yarn.nodemanager.pmem-check-enabled":"false"

5. Perform debugging and monitoring

To get details on where the spark configuration options are coming from, you can run spark-submit with the –verbose option. Also, you can use Ganglia and Spark UI to monitor the application progress, Cluster RAM usage, Network I/O, etc.

In the following example, we compare the outcomes between configured and non-configured Spark applications using Ganglia graphs.

When configured following the methods described, a Spark application can process 10 TB data successfully without any memory issues on an Amazon EMR cluster whose specs are as follows:

  • 1 r5.12xlarge master node
  • 19 r5.12xlarge core nodes
  • 8 TB total RAM
  • 960 total virtual CPUs
  • 170 executor instances
  • 5 virtual CPUs/executor
  • 37 GB memory/executor
  • Parallelism equals 1,700

Following, you can find Ganglia graphs for reference.

If you run the same Spark application with default configurations on the same cluster, it fails with an out-of-physical-memory error. This is because the default configurations (two executor instances, parallelism of 2, one vCPU/executor, 8-GB memory/executor) aren’t enough to process 10 TB data. Though the cluster had 7.8 TB memory, the default configurations limited the application to use only 16 GB memory, leading to the following out-of-memory error.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.5 GB of 8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Also, for large datasets, the default garbage collectors don’t clear the memory efficiently enough for the tasks to run in parallel, causing frequent failures. The following charts help in comparing the RAM usage and garbage collection with the default and G1GC garbage collectors.With G1GC, the RAM used is maintained below 5 TB (see the blue area in the graph).

With the default garbage collector (CMS), the RAM used goes above 5 TB. This can lead to the failure of the Spark job when running many tasks continuously.

Example: EMR instance template with configuration

There are different ways to set the Spark and YARN configuration parameters. One of ways is to pass these when creating the EMR cluster.

To do this, in the Amazon EMR console’s Edit software settings section, you can enter the appropriately updated configuration template (Enter configuration). Or the configuration can be passed from S3 (Load JSON from S3).

Following is a configuration template with sample values. At a minimum, calculate and set the following parameters for a successful Spark application.

{
      "InstanceGroups":[
         {
            "Name":"AmazonEMRMaster",
            "Market":"ON_DEMAND",
            "InstanceRole":"MASTER",
            "InstanceType":"r5.12xlarge",
            "InstanceCount":1,
            "Configurations":[
               {
                 "Classification": "yarn-site",
                 "Properties": {
                   "yarn.nodemanager.vmem-check-enabled": "false",
                   "yarn.nodemanager.pmem-check-enabled": "false"
                 }
               },
               {
                 "Classification": "spark",
                 "Properties": {
                   "maximizeResourceAllocation": "false"
                 }
               },
               {
                 "Classification": "spark-defaults",
                 "Properties": {
                   "spark.network.timeout": "800s",
                   "spark.executor.heartbeatInterval": "60s",
                   "spark.dynamicAllocation.enabled": "false",
                   "spark.driver.memory": "21000M",
                   "spark.executor.memory": "21000M",
                   "spark.executor.cores": "5",
                   "spark.executor.instances": "171",
                   "spark.yarn.executor.memoryOverhead": "21000M",
                   "spark.yarn.driver.memoryOverhead": "21000M",
                   "spark.memory.fraction": "0.80",
                   "spark.memory.storageFraction": "0.30",
                   "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.yarn.scheduler.reporterThread.maxFailures": "5",
                   "spark.storage.level": "MEMORY_AND_DISK_SER",
                   "spark.rdd.compress": "true",
                   "spark.shuffle.compress": "true",
                   "spark.shuffle.spill.compress": "true",
                   "spark.default.parallelism": "3400"
                 }
               },
               {
                 "Classification": "mapred-site",
                 "Properties": {
                   "mapreduce.map.output.compress": "true"
                 }
               },
               {
                 "Classification": "hadoop-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Configurations": [],
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                   }
                 }],
                 "Properties": {}
               },
               {
                 "Classification": "spark-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                   }
                 }],
                 "Properties": {}
               }
            ]
        },
        {
            "Name":"AmazonEMRCore",
            "Market":"ON_DEMAND",
             "InstanceRole":"CORE",
             "InstanceType":"r5.12xlarge",
             "InstanceCount":19,
             "Configurations":[        
        ..............
        ..............
        ..............
        }
      ],
      "Ec2KeyName":"KEY_NAME"
  } 

Conclusion

In this blog post, I detailed the possible out-of-memory errors, their causes, and a list of best practices to prevent these errors when submitting a Spark application on Amazon EMR.

My colleagues and I formed these best practices after thorough research and understanding of various Spark configuration properties and testing multiple Spark applications. These best practices apply to most of out-of-memory scenarios, though there might be some rare scenarios where they don’t apply. However, we believe that this blog post provides all the details needed so you can tweak parameters and successfully run a Spark application.

 


About the Author

Karunanithi Shanmugam is a data engineer with AWS Tech and Finance.

 

 

 

 

Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer

Post Syndicated from Peter Slawski original https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

The EMRFS S3-optimized committer is a new output committer available for use with Apache Spark jobs as of Amazon EMR 5.19.0. This committer improves performance when writing Apache Parquet files to Amazon S3 using the EMR File System (EMRFS). In this post, we run a performance benchmark to compare this new optimized committer with existing committer algorithms, namely FileOutputCommitter algorithm versions 1 and 2. We close with a discussion on current limitations for the new committer, providing workarounds where possible.

Comparison with FileOutputCommitter

In Amazon EMR version 5.19.0 and earlier, Spark jobs that write Parquet to Amazon S3 use a Hadoop commit algorithm called FileOutputCommitter by default. There are two versions of this algorithm, version 1 and 2. Both versions rely on writing intermediate task output to temporary locations. They subsequently perform rename operations to make the data visible at task or job completion time.

Algorithm version 1 has two phases of rename: one to commit the individual task output, and the other to commit the overall job output from completed/successful tasks. Algorithm version 2 is more efficient because task commits rename files directly to the final output location. This eliminates the second rename phase, but it makes partial data visible before the job completes, which not all workloads can tolerate.

The renames that are performed are fast, metadata-only operations on the Hadoop Distributed File System (HDFS). However, when output is written to object stores such as Amazon S3, renames are implemented by copying data to the target and then deleting the source. This rename “penalty” is exacerbated with directory renames, which can happen in both phases of FileOutputCommitter v1. Whereas these are single metadata-only operations on HDFS, committers must execute N copy-and-delete operations on S3.

To partially mitigate this, Amazon EMR 5.14.0+ defaults to FileOutputCommitter v2 when writing Parquet data to S3 with EMRFS in Spark. The new EMRFS S3-optimized committer improves on that work to avoid rename operations altogether by using the transactional properties of Amazon S3 multipart uploads. Tasks may then write their data directly to the final output location, but defer completion of each output file until task commit time.

Performance test

We evaluated the write performance of the different committers by executing the following INSERT OVERWRITE Spark SQL query. The SELECT * FROM range(…) clause generated data at execution time. This produced ~15 GB of data across exactly 100 Parquet files in Amazon S3.

SET rows=4e9; -- 4 Billion
SET partitions=100;

INSERT OVERWRITE DIRECTORY ‘s3://${bucket}/perf-test/${trial_id}’
USING PARQUET SELECT * FROM range(0, ${rows}, 1, ${partitions});

Note: The EMR cluster ran in the same AWS Region as the S3 bucket. The trial_id property used a UUID generator to ensure that there was no conflict between test runs.

We executed our test on an EMR cluster created with the emr-5.19.0 release label, with a single m5d.2xlarge instance in the master group, and eight m5d.2xlarge instances in the core group. We used the default Spark configuration properties set by Amazon EMR for this cluster configuration, which include the following:

spark.dynamicAllocation.enabled true
spark.executor.memory 11168M
spark.executor.cores 4

After running 10 trials for each committer, we captured and summarized query execution times in the following chart. Whereas FileOutputCommitter v2 averaged 49 seconds, the EMRFS S3-optimized committer averaged only 31 seconds—a 1.6x speedup.

As mentioned earlier, FileOutputCommitter v2 eliminates some, but not all, rename operations that FileOutputCommitter v1 uses. To illustrate the full performance impact of renames against S3, we reran the test using FileOutputCommitter v1. In this scenario, we observed an average runtime of 450 seconds, which is 14.5x slower than the EMRFS S3-optimized committer.

The last scenario we evaluated is the case when EMRFS consistent view is enabled, which addresses issues that can arise due to the Amazon S3 data consistency model. In this mode, the EMRFS S3-optimized committer time was unaffected by this change and still averaged 30 seconds. On the other hand, FileOutputCommitter v2 averaged 53 seconds, which was slower than when the consistent view feature was turned off, widening the overall performance difference to 1.8x.

Job correctness

The EMRFS S3-optimized committer has the same limitations that FileOutputCommitter v2 has because both improve performance by fully delegating commit responsibilities to the individual tasks. The following is a discussion of the notable consequences of this design choice.

Partial results from incomplete or failed jobs

Because both committers have their tasks write to the final output location, concurrent readers of that output location can view partial results when using either of them. If a job fails, partial results are left behind from any tasks that have committed before the overall job failed. This situation can lead to duplicate output if the job is run again without first cleaning up the output location.

One way to mitigate this issue is to ensure that a job uses a different output location each time it runs, publishing the location to downstream readers only if the job succeeds. The following code block is an example of this strategy for workloads that use Hive tables. Notice how output_location is set to a unique value each time the job is run, and that the table partition is registered only if the rest of the query succeeds. As long as readers exclusively access data via the table abstraction, they cannot see results before the job finishes.

SET attempt_id=<a random UUID>;
SET output_location=s3://bucket/${attempt_id};

INSERT OVERWRITE DIRECTORY ‘${output_location}’
USING PARQUET SELECT * FROM input;

ALTER TABLE output ADD PARTITION (dt = ‘2018-11-26’)
LOCATION ‘${output_location}’;

This approach requires treating the locations that partitions point to as immutable. Updates to partition contents require restating all results into a new location in S3, and then updating the partition metadata to point to that new location.

Duplicate results from non-idempotent tasks

Another scenario that can cause both committers to produce incorrect results is when jobs composed of non-idempotent tasks produce outputs into non-deterministic locations for each task attempt.

The following is an example of a query that illustrates the issue. It uses a timestamp-based table partitioning scheme to ensure that it writes to a different location for each task attempt.

SET hive.exec.dynamic.partition=true
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO data PARTITION (time) SELECT 42, current_timestamp();

You can avoid the issue of duplicate results in this scenario by ensuring that tasks write to a consistent location across task attempts. For example, instead of calling functions that return the current timestamp within tasks, consider providing the current timestamp as an input to the job. Similarly, if a random number generator is used within jobs, consider using a fixed seed or one that is based on the task’s partition number to ensure that task reattempts uses the same value.

Note: Spark’s built-in random functions rand(), randn(), and uuid() are already designed with this in mind.

Enabling the EMRFS S3-optimized committer

Starting with Amazon EMR version 5.20.0, the EMRFS S3-optimized committer is enabled by default. In Amazon EMR version 5.19.0, you can enable the committer by setting the spark.sql.parquet.fs.optimized.committer.optimization-enabled property to true from within Spark or when creating clusters. The committer takes effect when you use Spark’s built-in Parquet support to write Parquet files into Amazon S3 with EMRFS. This includes using the Parquet data source with Spark SQL, DataFrames, or Datasets. However, there are some use cases when the EMRFS S3-optimized committer does not take effect, and some use cases where Spark performs its own renames entirely outside of the committer. For more information about the committer and about these special cases, see Using the EMRFS S3-optimized Committer in the Amazon EMR Release Guide.

Related Work – S3A Committers

The EMRFS S3-optimized committer was inspired by concepts used by committers that support the S3A file system. The key take-away is that these committers use the transactional nature of S3 multipart uploads to eliminate some or all of the rename costs. This is also the core concept used by the EMRFS S3-optimized committer.

For more information about the various committers available within the ecosystem, including those that support the S3A file system, see the official Apache Hadoop documentation.

Summary

The EMRFS S3-optimized committer improves write performance compared to FileOutputCommitter. Starting with Amazon EMR version 5.19.0, you can use it with Spark’s built-in Parquet support. For more information, see Using the EMRFS S3-optimized Committer in the Amazon EMR Release Guide.

 


About the authors

Peter Slawski is a software development engineer with Amazon Web Services.

 

 

 

 

Jonathan Kelly is a senior software development engineer with Amazon Web Services.

 

 

 

 

AWS Online Tech Talks – May and Early June 2018

Post Syndicated from Devin Watson original https://aws.amazon.com/blogs/aws/aws-online-tech-talks-may-and-early-june-2018/

AWS Online Tech Talks – May and Early June 2018  

Join us this month to learn about some of the exciting new services and solution best practices at AWS. We also have our first re:Invent 2018 webinar series, “How to re:Invent”. Sign up now to learn more, we look forward to seeing you.

Note – All sessions are free and in Pacific Time.

Tech talks featured this month:

Analytics & Big Data

May 21, 2018 | 11:00 AM – 11:45 AM PT Integrating Amazon Elasticsearch with your DevOps Tooling – Learn how you can easily integrate Amazon Elasticsearch Service into your DevOps tooling and gain valuable insight from your log data.

May 23, 2018 | 11:00 AM – 11:45 AM PTData Warehousing and Data Lake Analytics, Together – Learn how to query data across your data warehouse and data lake without moving data.

May 24, 2018 | 11:00 AM – 11:45 AM PTData Transformation Patterns in AWS – Discover how to perform common data transformations on the AWS Data Lake.

Compute

May 29, 2018 | 01:00 PM – 01:45 PM PT – Creating and Managing a WordPress Website with Amazon Lightsail – Learn about Amazon Lightsail and how you can create, run and manage your WordPress websites with Amazon’s simple compute platform.

May 30, 2018 | 01:00 PM – 01:45 PM PTAccelerating Life Sciences with HPC on AWS – Learn how you can accelerate your Life Sciences research workloads by harnessing the power of high performance computing on AWS.

Containers

May 24, 2018 | 01:00 PM – 01:45 PM PT – Building Microservices with the 12 Factor App Pattern on AWS – Learn best practices for building containerized microservices on AWS, and how traditional software design patterns evolve in the context of containers.

Databases

May 21, 2018 | 01:00 PM – 01:45 PM PTHow to Migrate from Cassandra to Amazon DynamoDB – Get the benefits, best practices and guides on how to migrate your Cassandra databases to Amazon DynamoDB.

May 23, 2018 | 01:00 PM – 01:45 PM PT5 Hacks for Optimizing MySQL in the Cloud – Learn how to optimize your MySQL databases for high availability, performance, and disaster resilience using RDS.

DevOps

May 23, 2018 | 09:00 AM – 09:45 AM PT.NET Serverless Development on AWS – Learn how to build a modern serverless application in .NET Core 2.0.

Enterprise & Hybrid

May 22, 2018 | 11:00 AM – 11:45 AM PTHybrid Cloud Customer Use Cases on AWS – Learn how customers are leveraging AWS hybrid cloud capabilities to easily extend their datacenter capacity, deliver new services and applications, and ensure business continuity and disaster recovery.

IoT

May 31, 2018 | 11:00 AM – 11:45 AM PTUsing AWS IoT for Industrial Applications – Discover how you can quickly onboard your fleet of connected devices, keep them secure, and build predictive analytics with AWS IoT.

Machine Learning

May 22, 2018 | 09:00 AM – 09:45 AM PTUsing Apache Spark with Amazon SageMaker – Discover how to use Apache Spark with Amazon SageMaker for training jobs and application integration.

May 24, 2018 | 09:00 AM – 09:45 AM PTIntroducing AWS DeepLens – Learn how AWS DeepLens provides a new way for developers to learn machine learning by pairing the physical device with a broad set of tutorials, examples, source code, and integration with familiar AWS services.

Management Tools

May 21, 2018 | 09:00 AM – 09:45 AM PTGaining Better Observability of Your VMs with Amazon CloudWatch – Learn how CloudWatch Agent makes it easy for customers like Rackspace to monitor their VMs.

Mobile

May 29, 2018 | 11:00 AM – 11:45 AM PT – Deep Dive on Amazon Pinpoint Segmentation and Endpoint Management – See how segmentation and endpoint management with Amazon Pinpoint can help you target the right audience.

Networking

May 31, 2018 | 09:00 AM – 09:45 AM PTMaking Private Connectivity the New Norm via AWS PrivateLink – See how PrivateLink enables service owners to offer private endpoints to customers outside their company.

Security, Identity, & Compliance

May 30, 2018 | 09:00 AM – 09:45 AM PT – Introducing AWS Certificate Manager Private Certificate Authority (CA) – Learn how AWS Certificate Manager (ACM) Private Certificate Authority (CA), a managed private CA service, helps you easily and securely manage the lifecycle of your private certificates.

June 1, 2018 | 09:00 AM – 09:45 AM PTIntroducing AWS Firewall Manager – Centrally configure and manage AWS WAF rules across your accounts and applications.

Serverless

May 22, 2018 | 01:00 PM – 01:45 PM PTBuilding API-Driven Microservices with Amazon API Gateway – Learn how to build a secure, scalable API for your application in our tech talk about API-driven microservices.

Storage

May 30, 2018 | 11:00 AM – 11:45 AM PTAccelerate Productivity by Computing at the Edge – Learn how AWS Snowball Edge support for compute instances helps accelerate data transfers, execute custom applications, and reduce overall storage costs.

June 1, 2018 | 11:00 AM – 11:45 AM PTLearn to Build a Cloud-Scale Website Powered by Amazon EFS – Technical deep dive where you’ll learn tips and tricks for integrating WordPress, Drupal and Magento with Amazon EFS.

 

 

 

 

10 visualizations to try in Amazon QuickSight with sample data

Post Syndicated from Karthik Kumar Odapally original https://aws.amazon.com/blogs/big-data/10-visualizations-to-try-in-amazon-quicksight-with-sample-data/

If you’re not already familiar with building visualizations for quick access to business insights using Amazon QuickSight, consider this your introduction. In this post, we’ll walk through some common scenarios with sample datasets to provide an overview of how you can connect yuor data, perform advanced analysis and access the results from any web browser or mobile device.

The following visualizations are built from the public datasets available in the links below. Before we jump into that, let’s take a look at the supported data sources, file formats and a typical QuickSight workflow to build any visualization.

Which data sources does Amazon QuickSight support?

At the time of publication, you can use the following data methods:

  • Connect to AWS data sources, including:
    • Amazon RDS
    • Amazon Aurora
    • Amazon Redshift
    • Amazon Athena
    • Amazon S3
  • Upload Excel spreadsheets or flat files (CSV, TSV, CLF, and ELF)
  • Connect to on-premises databases like Teradata, SQL Server, MySQL, and PostgreSQL
  • Import data from SaaS applications like Salesforce and Snowflake
  • Use big data processing engines like Spark and Presto

This list is constantly growing. For more information, see Supported Data Sources.

Answers in instants

SPICE is the Amazon QuickSight super-fast, parallel, in-memory calculation engine, designed specifically for ad hoc data visualization. SPICE stores your data in a system architected for high availability, where it is saved until you choose to delete it. Improve the performance of database datasets by importing the data into SPICE instead of using a direct database query. To calculate how much SPICE capacity your dataset needs, see Managing SPICE Capacity.

Typical Amazon QuickSight workflow

When you create an analysis, the typical workflow is as follows:

  1. Connect to a data source, and then create a new dataset or choose an existing dataset.
  2. (Optional) If you created a new dataset, prepare the data (for example, by changing field names or data types).
  3. Create a new analysis.
  4. Add a visual to the analysis by choosing the fields to visualize. Choose a specific visual type, or use AutoGraph and let Amazon QuickSight choose the most appropriate visual type, based on the number and data types of the fields that you select.
  5. (Optional) Modify the visual to meet your requirements (for example, by adding a filter or changing the visual type).
  6. (Optional) Add more visuals to the analysis.
  7. (Optional) Add scenes to the default story to provide a narrative about some aspect of the analysis data.
  8. (Optional) Publish the analysis as a dashboard to share insights with other users.

The following graphic illustrates a typical Amazon QuickSight workflow.

Visualizations created in Amazon QuickSight with sample datasets

Visualizations for a data analyst

Source:  https://data.worldbank.org/

Download and Resources:  https://datacatalog.worldbank.org/dataset/world-development-indicators

Data catalog:  The World Bank invests into multiple development projects at the national, regional, and global levels. It’s a great source of information for data analysts.

The following graph shows the percentage of the population that has access to electricity (rural and urban) during 2000 in Asia, Africa, the Middle East, and Latin America.

The following graph shows the share of healthcare costs that are paid out-of-pocket (private vs. public). Also, you can maneuver over the graph to get detailed statistics at a glance.

Visualizations for a trading analyst

Source:  Deutsche Börse Public Dataset (DBG PDS)

Download and resources:  https://aws.amazon.com/public-datasets/deutsche-boerse-pds/

Data catalog:  The DBG PDS project makes real-time data derived from Deutsche Börse’s trading market systems available to the public for free. This is the first time that such detailed financial market data has been shared freely and continually from the source provider.

The following graph shows the market trend of max trade volume for different EU banks. It builds on the data available on XETRA engines, which is made up of a variety of equities, funds, and derivative securities. This graph can be scrolled to visualize trade for a period of an hour or more.

The following graph shows the common stock beating the rest of the maximum trade volume over a period of time, grouped by security type.

Visualizations for a data scientist

Source:  https://catalog.data.gov/

Download and resources:  https://catalog.data.gov/dataset/road-weather-information-stations-788f8

Data catalog:  Data derived from different sensor stations placed on the city bridges and surface streets are a core information source. The road weather information station has a temperature sensor that measures the temperature of the street surface. It also has a sensor that measures the ambient air temperature at the station each second.

The following graph shows the present max air temperature in Seattle from different RWI station sensors.

The following graph shows the minimum temperature of the road surface at different times, which helps predicts road conditions at a particular time of the year.

Visualizations for a data engineer

Source:  https://www.kaggle.com/

Download and resources:  https://www.kaggle.com/datasnaek/youtube-new/data

Data catalog:  Kaggle has come up with a platform where people can donate open datasets. Data engineers and other community members can have open access to these datasets and can contribute to the open data movement. They have more than 350 datasets in total, with more than 200 as featured datasets. It has a few interesting datasets on the platform that are not present at other places, and it’s a platform to connect with other data enthusiasts.

The following graph shows the trending YouTube videos and presents the max likes for the top 20 channels. This is one of the most popular datasets for data engineers.

The following graph shows the YouTube daily statistics for the max views of video titles published during a specific time period.

Visualizations for a business user

Source:  New York Taxi Data

Download and resources:  https://data.cityofnewyork.us/Transportation/2016-Green-Taxi-Trip-Data/hvrh-b6nb

Data catalog: NYC Open data hosts some very popular open data sets for all New Yorkers. This platform allows you to get involved in dive deep into the data set to pull some useful visualizations. 2016 Green taxi trip dataset includes trip records from all trips completed in green taxis in NYC in 2016. Records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.

The following graph presents maximum fare amount grouped by the passenger count during a period of time during a day. This can be further expanded to follow through different day of the month based on the business need.

The following graph shows the NewYork taxi data from January 2016, showing the dip in the number of taxis ridden on January 23, 2016 across all types of taxis.

A quick search for that date and location shows you the following news report:

Summary

Using Amazon QuickSight, you can see patterns across a time-series data by building visualizations, performing ad hoc analysis, and quickly generating insights. We hope you’ll give it a try today!

 


Additional Reading

If you found this post useful, be sure to check out Amazon QuickSight Adds Support for Combo Charts and Row-Level Security and Visualize AWS Cloudtrail Logs Using AWS Glue and Amazon QuickSight.


Karthik Odapally is a Sr. Solutions Architect in AWS. His passion is to build cost effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.

 

 

 

Pranabesh Mandal is a Solutions Architect in AWS. He has over a decade of IT experience. He is passionate about cloud technology and focuses on Analytics. In his spare time, he likes to hike and explore the beautiful nature and wild life of most divine national parks around the United States alongside his wife.

 

 

 

 

Implement continuous integration and delivery of serverless AWS Glue ETL applications using AWS Developer Tools

Post Syndicated from Prasad Alle original https://aws.amazon.com/blogs/big-data/implement-continuous-integration-and-delivery-of-serverless-aws-glue-etl-applications-using-aws-developer-tools/

AWS Glue is an increasingly popular way to develop serverless ETL (extract, transform, and load) applications for big data and data lake workloads. Organizations that transform their ETL applications to cloud-based, serverless ETL architectures need a seamless, end-to-end continuous integration and continuous delivery (CI/CD) pipeline: from source code, to build, to deployment, to product delivery. Having a good CI/CD pipeline can help your organization discover bugs before they reach production and deliver updates more frequently. It can also help developers write quality code and automate the ETL job release management process, mitigate risk, and more.

AWS Glue is a fully managed data catalog and ETL service. It simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. AWS Glue crawls your data sources and constructs a data catalog using pre-built classifiers for popular data formats and data types, including CSV, Apache Parquet, JSON, and more.

When you are developing ETL applications using AWS Glue, you might come across some of the following CI/CD challenges:

  • Iterative development with unit tests
  • Continuous integration and build
  • Pushing the ETL pipeline to a test environment
  • Pushing the ETL pipeline to a production environment
  • Testing ETL applications using real data (live test)
  • Exploring and validating data

In this post, I walk you through a solution that implements a CI/CD pipeline for serverless AWS Glue ETL applications supported by AWS Developer Tools (including AWS CodePipeline, AWS CodeCommit, and AWS CodeBuild) and AWS CloudFormation.

Solution overview

The following diagram shows the pipeline workflow:

This solution uses AWS CodePipeline, which lets you orchestrate and automate the test and deploy stages for ETL application source code. The solution consists of a pipeline that contains the following stages:

1.) Source Control: In this stage, the AWS Glue ETL job source code and the AWS CloudFormation template file for deploying the ETL jobs are both committed to version control. I chose to use AWS CodeCommit for version control.

To get the ETL job source code and AWS CloudFormation template, download the gluedemoetl.zip file. This solution is developed based on a previous post, Build a Data Lake Foundation with AWS Glue and Amazon S3.

2.) LiveTest: In this stage, all resources—including AWS Glue crawlers, jobs, S3 buckets, roles, and other resources that are required for the solution—are provisioned, deployed, live tested, and cleaned up.

The LiveTest stage includes the following actions:

  • Deploy: In this action, all the resources that are required for this solution (crawlers, jobs, buckets, roles, and so on) are provisioned and deployed using an AWS CloudFormation template.
  • AutomatedLiveTest: In this action, all the AWS Glue crawlers and jobs are executed and data exploration and validation tests are performed. These validation tests include, but are not limited to, record counts in both raw tables and transformed tables in the data lake and any other business validations. I used AWS CodeBuild for this action.
  • LiveTestApproval: This action is included for the cases in which a pipeline administrator approval is required to deploy/promote the ETL applications to the next stage. The pipeline pauses in this action until an administrator manually approves the release.
  • LiveTestCleanup: In this action, all the LiveTest stage resources, including test crawlers, jobs, roles, and so on, are deleted using the AWS CloudFormation template. This action helps minimize cost by ensuring that the test resources exist only for the duration of the AutomatedLiveTest and LiveTestApproval

3.) DeployToProduction: In this stage, all the resources are deployed using the AWS CloudFormation template to the production environment.

Try it out

This code pipeline takes approximately 20 minutes to complete the LiveTest test stage (up to the LiveTest approval stage, in which manual approval is required).

To get started with this solution, choose Launch Stack:

This creates the CI/CD pipeline with all of its stages, as described earlier. It performs an initial commit of the sample AWS Glue ETL job source code to trigger the first release change.

In the AWS CloudFormation console, choose Create. After the template finishes creating resources, you see the pipeline name on the stack Outputs tab.

After that, open the CodePipeline console and select the newly created pipeline. Initially, your pipeline’s CodeCommit stage shows that the source action failed.

Allow a few minutes for your new pipeline to detect the initial commit applied by the CloudFormation stack creation. As soon as the commit is detected, your pipeline starts. You will see the successful stage completion status as soon as the CodeCommit source stage runs.

In the CodeCommit console, choose Code in the navigation pane to view the solution files.

Next, you can watch how the pipeline goes through the LiveTest stage of the deploy and AutomatedLiveTest actions, until it finally reaches the LiveTestApproval action.

At this point, if you check the AWS CloudFormation console, you can see that a new template has been deployed as part of the LiveTest deploy action.

At this point, make sure that the AWS Glue crawlers and the AWS Glue job ran successfully. Also check whether the corresponding databases and external tables have been created in the AWS Glue Data Catalog. Then verify that the data is validated using Amazon Athena, as shown following.

Open the AWS Glue console, and choose Databases in the navigation pane. You will see the following databases in the Data Catalog:

Open the Amazon Athena console, and run the following queries. Verify that the record counts are matching.

SELECT count(*) FROM "nycitytaxi_gluedemocicdtest"."data";
SELECT count(*) FROM "nytaxiparquet_gluedemocicdtest"."datalake";

The following shows the raw data:

The following shows the transformed data:

The pipeline pauses the action until the release is approved. After validating the data, manually approve the revision on the LiveTestApproval action on the CodePipeline console.

Add comments as needed, and choose Approve.

The LiveTestApproval stage now appears as Approved on the console.

After the revision is approved, the pipeline proceeds to use the AWS CloudFormation template to destroy the resources that were deployed in the LiveTest deploy action. This helps reduce cost and ensures a clean test environment on every deployment.

Production deployment is the final stage. In this stage, all the resources—AWS Glue crawlers, AWS Glue jobs, Amazon S3 buckets, roles, and so on—are provisioned and deployed to the production environment using the AWS CloudFormation template.

After successfully running the whole pipeline, feel free to experiment with it by changing the source code stored on AWS CodeCommit. For example, if you modify the AWS Glue ETL job to generate an error, it should make the AutomatedLiveTest action fail. Or if you change the AWS CloudFormation template to make its creation fail, it should affect the LiveTest deploy action. The objective of the pipeline is to guarantee that all changes that are deployed to production are guaranteed to work as expected.

Conclusion

In this post, you learned how easy it is to implement CI/CD for serverless AWS Glue ETL solutions with AWS developer tools like AWS CodePipeline and AWS CodeBuild at scale. Implementing such solutions can help you accelerate ETL development and testing at your organization.

If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Implement Continuous Integration and Delivery of Apache Spark Applications using AWS and Build a Data Lake Foundation with AWS Glue and Amazon S3.

 


About the Authors

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

 
Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.

 

 

 

Safety first: a Raspberry Pi safety helmet

Post Syndicated from Alex Bate original https://www.raspberrypi.org/blog/safety-helmet/

Jennifer Fox is back, this time with a Raspberry Pi Zero–controlled impact force monitor that will notify you if your collision is a worth a trip to the doctor.

Make an Impact Force Monitor!

Check out my latest Hacker in Residence project for SparkFun Electronics: the Helmet Guardian! It’s a Pi Zero powered impact force monitor that turns on an LED if your head/body experiences a potentially dangerous impact. Install in your sports helmets, bicycle, or car to keep track of impact and inform you when it’s time to visit the doctor.

Concussion

We’ve all knocked our heads at least once in our lives, maybe due to tripping over a loose paving slab, or to falling off a bike, or to walking into the corner of the overhead cupboard door for the third time this week — will I ever learn?! More often than not, even when we’re seeing stars, we brush off the accident and continue with our day, oblivious to the long-term damage we may be doing.

Force of impact

After some thorough research, Jennifer Fox, founder of FoxBot Industries, concluded that forces of 4 to 6 G sustained for more than a few seconds are dangerous to the human body. With this in mind, she decided to use a Raspberry Pi Zero W and an accelerometer to create helmet with an impact force monitor that notifies its wearer if this level of G-force has been met.

Jennifer Fox Raspberry Pi Impact Force Monitor

Obviously, if you do have a serious fall, you should always seek medical advice. This project is an example of how affordable technology can be used to create medical and citizen science builds, and not a replacement for professional medical services.

Setting up the impact monitor

Jennifer’s monitor requires only a few pieces of tech: a Zero W, an accelerometer and breakout board, a rechargeable USB battery, and an LED, plus the standard wires and resistors for these components.

After installing Raspbian, Jennifer enabled SSH and I2C on the Zero W to make it run headlessly, and then accessed it from a laptop. This allows her to control the Pi without physically connecting to it, and it makes for a wireless finished project.

Jen wired the Pi to the accelerometer breakout board and LED as shown in the schematic below.

Jennifer Fox Raspberry Pi Impact Force Monitor

The LED acts as a signal of significant impacts, turning on when the G-force threshold is reached, and not turning off again until the program is reset.

Jennifer Fox Raspberry Pi Impact Force Monitor

Make your own and more

Jennifer’s full code for the impact monitor is on GitHub, and she’s put together a complete tutorial on SparkFun’s website.

For more tutorials from Jennifer Fox, such as her ‘Bark Back’ IoT Pet Monitor, be sure to follow her on YouTube. And for similar projects, check out Matt’s smart bike light and Amelia Day’s physical therapy soccer ball.

The post Safety first: a Raspberry Pi safety helmet appeared first on Raspberry Pi.

How to migrate a Hue database from an existing Amazon EMR cluster

Post Syndicated from Anvesh Ragi original https://aws.amazon.com/blogs/big-data/how-to-migrate-a-hue-database-from-an-existing-amazon-emr-cluster/

Hadoop User Experience (Hue) is an open-source, web-based, graphical user interface for use with Amazon EMR and Apache Hadoop. The Hue database stores things like users, groups, authorization permissions, Apache Hive queries, Apache Oozie workflows, and so on.

There might come a time when you want to migrate your Hue database to a new EMR cluster. For example, you might want to upgrade from an older version of the Amazon EMR AMI (Amazon Machine Image), but your Hue application and its database have had a lot of customization.You can avoid re-creating these user entities and retain query/workflow histories in Hue by migrating the existing Hue database, or remote database in Amazon RDS, to a new cluster.

By default, Hue user information and query histories are stored in a local MySQL database on the EMR cluster’s master node. However, you can create one or more Hue-enabled clusters using a configuration stored in Amazon S3 and a remote MySQL database in Amazon RDS. This allows you to preserve user information and query history that Hue creates without keeping your Amazon EMR cluster running.

This post describes the step-by-step process for migrating the Hue database from an existing EMR cluster.

Note: Amazon EMR supports different Hue versions across different AMI releases. Keep in mind the compatibility of Hue versions between the old and new clusters in this migration activity. Currently, Hue 3.x.x versions are not compatible with Hue 4.x.x versions, and therefore a migration between these two Hue versions might create issues. In addition, Hue 3.10.0 is not backward compatible with its previous 3.x.x versions.

Before you begin

First, let’s create a new testUser in Hue on an existing EMR cluster, as shown following:

You will use these credentials later to log in to Hue on the new EMR cluster and validate whether you have successfully migrated the Hue database.

Let’s get started!

Migration how-to

Follow these steps to migrate your database to a new EMR cluster and then validate the migration process.

1.) Make a backup of the existing Hue database.

Use SSH to connect to the master node of the old cluster, as shown following (if you are using Linux/Unix/macOS), and dump the Hue database to a JSON file.

$ ssh -i ~/key.pem [email protected]
$ /usr/lib/hue/build/env/bin/hue dumpdata > ./hue-mysql.json

Edit the hue-mysql.json output file by removing all JSON objects that have useradmin.userprofile in the model field, and save the file. For example, remove the objects as shown following:

{
  "pk": 1,
  "model": "useradmin.userprofile",
  "fields": {
    "last_activity": "2018-01-10T11:41:04",
    "creation_method": "HUE",
    "first_login": false,
    "user": 1,
    "home_directory": "/user/hue_admin"
  }
},

2.) Store the hue-mysql.json file on persistent storage like Amazon S3.

You can copy the file from the old EMR cluster to Amazon S3 using the AWS CLI or Secure Copy (SCP) client. For example, the following uses the AWS CLI:

$ aws s3 cp ./hue-mysql.json s3://YourBucketName/folder/

3.) Recover/reload the backed-up Hue database into the new EMR cluster.

a.) Use SSH to connect to the master node of the new EMR cluster, and stop the Hue service that is already running.

$ ssh -i ~/key.pem [email protected]
$ sudo stop hue
hue stop/waiting

b.) Connect to the Hue database—either the local MySQL database or the remote database in Amazon RDS for your cluster as shown following, using the mysql client.

$ mysql -h HOST –u USER –pPASSWORD

For a local MySQL database, you can find the hostname, user name, and password for connecting to the database in the /etc/hue/conf/hue.ini file on the master node.

[[database]]
    engine = mysql
    name = huedb
    case_insensitive_collation = utf8_unicode_ci
    test_charset = utf8
    test_collation = utf8_bin
    host = ip-172-31-37-133.us-west-2.compute.internal
    user = hue
    test_name = test_huedb
    password = QdWbL3Ai6GcBqk26
    port = 3306

Based on the preceding example configuration, the sample command is as follows. (Replace the host, user, and password details based on your EMR cluster settings.)

$ mysql -h ip-172-31-37-133.us-west-2.compute.internal -u hue -pQdWbL3Ai6GcBqk26

c.) Drop the existing Hue database with the name huedb from the MySQL server.

mysql> DROP DATABASE IF EXISTS huedb;

d.) Create a new empty database with the same name huedb.

mysql> CREATE DATABASE huedb DEFAULT CHARACTER SET utf8 DEFAULT COLLATE=utf8_bin;

e.) Now, synchronize Hue with its database huedb.

$ sudo /usr/lib/hue/build/env/bin/hue syncdb --noinput
$ sudo /usr/lib/hue/build/env/bin/hue migrate

(This populates the new huedb with all Hue tables that are required.)

f.) Log in to MySQL again, and drop the foreign key to clean tables.

mysql> SHOW CREATE TABLE huedb.auth_permission;

In the following example, replace <id value> with the actual value from the preceding output.

mysql> ALTER TABLE huedb.auth_permission DROP FOREIGN KEY
content_type_id_refs_id_<id value>;

g.) Delete the contents of the django_content_type

mysql> DELETE FROM huedb.django_content_type;

h.) Download the backed-up Hue database dump from Amazon S3 to the new EMR cluster, and load it into Hue.

$ aws s3 cp s3://YourBucketName/folder/hue-mysql.json ./
$ sudo /usr/lib/hue/build/env/bin/hue loaddata ./hue-mysql.json

i.) In MySQL, add the foreign key content_type_id back to the auth_permission

mysql> use huedb;
mysql> ALTER TABLE huedb.auth_permission ADD FOREIGN KEY (`content_type_id`) REFERENCES `django_content_type` (`id`);

j.) Start the Hue service again.

$ sudo start hue
hue start/running, process XXXX

That’s it! Now, verify whether you can successfully access the Hue UI, and sign in using your existing testUser credentials.

After a successful sign in to Hue on the new EMR cluster, you should see a similar Hue homepage as shown following with testUser as the user signed in:

Conclusion

You have now learned how to migrate an existing Hue database to a new Amazon EMR cluster and validate the migration process. If you have any similar Amazon EMR administration topics that you want to see covered in a future post, please let us know in the comments below.


Additional Reading

If you found this post useful, be sure to check out Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR and Dynamically Create Friendly URLs for Your Amazon EMR Web Interfaces.


About the Author


Anvesh Ragi is a Big Data Support Engineer with Amazon Web Services. He works closely with AWS customers to provide them architectural and engineering assistance for their data processing workflows. In his free time, he enjoys traveling and going for hikes.

Best Practices for Running Apache Kafka on AWS

Post Syndicated from Prasad Alle original https://aws.amazon.com/blogs/big-data/best-practices-for-running-apache-kafka-on-aws/

This post was written in partnership with Intuit to share learnings, best practices, and recommendations for running an Apache Kafka cluster on AWS. Thanks to Vaishak Suresh and his colleagues at Intuit for their contribution and support.

Intuit, in their own words: Intuit, a leading enterprise customer for AWS, is a creator of business and financial management solutions. For more information on how Intuit partners with AWS, see our previous blog post, Real-time Stream Processing Using Apache Spark Streaming and Apache Kafka on AWS. Apache Kafka is an open-source, distributed streaming platform that enables you to build real-time streaming applications.

The best practices described in this post are based on our experience in running and operating large-scale Kafka clusters on AWS for more than two years. Our intent for this post is to help AWS customers who are currently running Kafka on AWS, and also customers who are considering migrating on-premises Kafka deployments to AWS.

AWS offers Amazon Kinesis Data Streams, a Kafka alternative that is fully managed.

Running your Kafka deployment on Amazon EC2 provides a high performance, scalable solution for ingesting streaming data. AWS offers many different instance types and storage option combinations for Kafka deployments. However, given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.

In this blog post, we cover the following aspects of running Kafka clusters on AWS:

  • Deployment considerations and patterns
  • Storage options
  • Instance types
  • Networking
  • Upgrades
  • Performance tuning
  • Monitoring
  • Security
  • Backup and restore

Note: While implementing Kafka clusters in a production environment, make sure also to consider factors like your number of messages, message size, monitoring, failure handling, and any operational issues.

Deployment considerations and patterns

In this section, we discuss various deployment options available for Kafka on AWS, along with pros and cons of each option. A successful deployment starts with thoughtful consideration of these options. Considering availability, consistency, and operational overhead of the deployment helps when choosing the right option.

Single AWS Region, Three Availability Zones, All Active

One typical deployment pattern (all active) is in a single AWS Region with three Availability Zones (AZs). One Kafka cluster is deployed in each AZ along with Apache ZooKeeper and Kafka producer and consumer instances as shown in the illustration following.

In this pattern, this is the Kafka cluster deployment:

  • Kafka producers and Kafka cluster are deployed on each AZ.
  • Data is distributed evenly across three Kafka clusters by using Elastic Load Balancer.
  • Kafka consumers aggregate data from all three Kafka clusters.

Kafka cluster failover occurs this way:

  • Mark down all Kafka producers
  • Stop consumers
  • Debug and restack Kafka
  • Restart consumers
  • Restart Kafka producers

Following are the pros and cons of this pattern.

Pros Cons
  • Highly available
  • Can sustain the failure of two AZs
  • No message loss during failover
  • Simple deployment

 

  • Very high operational overhead:
    • All changes need to be deployed three times, one for each Kafka cluster
    • Maintaining and monitoring three Kafka clusters
    • Maintaining and monitoring three consumer clusters

A restart is required for patching and upgrading brokers in a Kafka cluster. In this approach, a rolling upgrade is done separately for each cluster.

Single Region, Three Availability Zones, Active-Standby

Another typical deployment pattern (active-standby) is in a single AWS Region with a single Kafka cluster and Kafka brokers and Zookeepers distributed across three AZs. Another similar Kafka cluster acts as a standby as shown in the illustration following. You can use Kafka mirroring with MirrorMaker to replicate messages between any two clusters.

In this pattern, this is the Kafka cluster deployment:

  • Kafka producers are deployed on all three AZs.
  • Only one Kafka cluster is deployed across three AZs (active).
  • ZooKeeper instances are deployed on each AZ.
  • Brokers are spread evenly across all three AZs.
  • Kafka consumers can be deployed across all three AZs.
  • Standby Kafka producers and a Multi-AZ Kafka cluster are part of the deployment.

Kafka cluster failover occurs this way:

  • Switch traffic to standby Kafka producers cluster and Kafka cluster.
  • Restart consumers to consume from standby Kafka cluster.

Following are the pros and cons of this pattern.

Pros Cons
  • Less operational overhead when compared to the first option
  • Only one Kafka cluster to manage and consume data from
  • Can handle single AZ failures without activating a standby Kafka cluster
  • Added latency due to cross-AZ data transfer among Kafka brokers
  • For Kafka versions before 0.10, replicas for topic partitions have to be assigned so they’re distributed to the brokers on different AZs (rack-awareness)
  • The cluster can become unavailable in case of a network glitch, where ZooKeeper does not see Kafka brokers
  • Possibility of in-transit message loss during failover

Intuit recommends using a single Kafka cluster in one AWS Region, with brokers distributing across three AZs (single region, three AZs). This approach offers stronger fault tolerance than otherwise, because a failed AZ won’t cause Kafka downtime.

Storage options

There are two storage options for file storage in Amazon EC2:

Ephemeral storage is local to the Amazon EC2 instance. It can provide high IOPS based on the instance type. On the other hand, Amazon EBS volumes offer higher resiliency and you can configure IOPS based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. Your choice of storage is closely related to the type of workload supported by your Kafka cluster.

Kafka provides built-in fault tolerance by replicating data partitions across a configurable number of instances. If a broker fails, you can recover it by fetching all the data from other brokers in the cluster that host the other replicas. Depending on the size of the data transfer, it can affect recovery process and network traffic. These in turn eventually affect the cluster’s performance.

The following table contrasts the benefits of using an instance store versus using EBS for storage.

Instance store EBS
  • Instance storage is recommended for large- and medium-sized Kafka clusters. For a large cluster, read/write traffic is distributed across a high number of brokers, so the loss of a broker has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important, but a failed broker takes longer and requires more network traffic for a smaller Kafka cluster.
  • Storage-optimized instances like h1, i3, and d2 are an ideal choice for distributed applications like Kafka.

 

  • The primary advantage of using EBS in a Kafka deployment is that it significantly reduces data-transfer traffic when a broker fails or must be replaced. The replacement broker joins the cluster much faster.
  • Data stored on EBS is persisted in case of an instance failure or termination. The broker’s data stored on an EBS volume remains intact, and you can mount the EBS volume to a new EC2 instance. Most of the replicated data for the replacement broker is already available in the EBS volume and need not be copied over the network from another broker. Only the changes made after the original broker failure need to be transferred across the network. That makes this process much faster.

 

 

Intuit chose EBS because of their frequent instance restacking requirements and also other benefits provided by EBS.

Generally, Kafka deployments use a replication factor of three. EBS offers replication within their service, so Intuit chose a replication factor of two instead of three.

Instance types

The choice of instance types is generally driven by the type of storage required for your streaming applications on a Kafka cluster. If your application requires ephemeral storage, h1, i3, and d2 instances are your best option.

Intuit used r3.xlarge instances for their brokers and r3.large for ZooKeeper, with ST1 (throughput optimized HDD) EBS for their Kafka cluster.

Here are sample benchmark numbers from Intuit tests.

Configuration Broker bytes (MB/s)
  • r3.xlarge
  • ST1 EBS
  • 12 brokers
  • 12 partitions

 

Aggregate 346.9

If you need EBS storage, then AWS has a newer-generation r4 instance. The r4 instance is superior to R3 in many ways:

  • It has a faster processor (Broadwell).
  • EBS is optimized by default.
  • It features networking based on Elastic Network Adapter (ENA), with up to 10 Gbps on smaller sizes.
  • It costs 20 percent less than R3.

Note: It’s always best practice to check for the latest changes in instance types.

Networking

The network plays a very important role in a distributed system like Kafka. A fast and reliable network ensures that nodes can communicate with each other easily. The available network throughput controls the maximum amount of traffic that Kafka can handle. Network throughput, combined with disk storage, is often the governing factor for cluster sizing.

If you expect your cluster to receive high read/write traffic, select an instance type that offers 10-Gb/s performance.

In addition, choose an option that keeps interbroker network traffic on the private subnet, because this approach allows clients to connect to the brokers. Communication between brokers and clients uses the same network interface and port. For more details, see the documentation about IP addressing for EC2 instances.

If you are deploying in more than one AWS Region, you can connect the two VPCs in the two AWS Regions using cross-region VPC peering. However, be aware of the networking costs associated with cross-AZ deployments.

Upgrades

Kafka has a history of not being backward compatible, but its support of backward compatibility is getting better. During a Kafka upgrade, you should keep your producer and consumer clients on a version equal to or lower than the version you are upgrading from. After the upgrade is finished, you can start using a new protocol version and any new features it supports. There are three upgrade approaches available, discussed following.

Rolling or in-place upgrade

In a rolling or in-place upgrade scenario, upgrade one Kafka broker at a time. Take into consideration the recommendations for doing rolling restarts to avoid downtime for end users.

Downtime upgrade

If you can afford the downtime, you can take your entire cluster down, upgrade each Kafka broker, and then restart the cluster.

Blue/green upgrade

Intuit followed the blue/green deployment model for their workloads, as described following.

If you can afford to create a separate Kafka cluster and upgrade it, we highly recommend the blue/green upgrade scenario. In this scenario, we recommend that you keep your clusters up-to-date with the latest Kafka version. For additional details on Kafka version upgrades or more details, see the Kafka upgrade documentation.

The following illustration shows a blue/green upgrade.

In this scenario, the upgrade plan works like this:

  • Create a new Kafka cluster on AWS.
  • Create a new Kafka producers stack to point to the new Kafka cluster.
  • Create topics on the new Kafka cluster.
  • Test the green deployment end to end (sanity check).
  • Using Amazon Route 53, change the new Kafka producers stack on AWS to point to the new green Kafka environment that you have created.

The roll-back plan works like this:

  • Switch Amazon Route 53 to the old Kafka producers stack on AWS to point to the old Kafka environment.

For additional details on blue/green deployment architecture using Kafka, see the re:Invent presentation Leveraging the Cloud with a Blue-Green Deployment Architecture.

Performance tuning

You can tune Kafka performance in multiple dimensions. Following are some best practices for performance tuning.

 These are some general performance tuning techniques:

  • If throughput is less than network capacity, try the following:
    • Add more threads
    • Increase batch size
    • Add more producer instances
    • Add more partitions
  • To improve latency when acks =-1, increase your num.replica.fetches value.
  • For cross-AZ data transfer, tune your buffer settings for sockets and for OS TCP.
  • Make sure that num.io.threads is greater than the number of disks dedicated for Kafka.
  • Adjust num.network.threads based on the number of producers plus the number of consumers plus the replication factor.
  • Your message size affects your network bandwidth. To get higher performance from a Kafka cluster, select an instance type that offers 10 Gb/s performance.

For Java and JVM tuning, try the following:

  • Minimize GC pauses by using the Oracle JDK, which uses the new G1 garbage-first collector.
  • Try to keep the Kafka heap size below 4 GB.

Monitoring

Knowing whether a Kafka cluster is working correctly in a production environment is critical. Sometimes, just knowing that the cluster is up is enough, but Kafka applications have many moving parts to monitor. In fact, it can easily become confusing to understand what’s important to watch and what you can set aside. Items to monitor range from simple metrics about the overall rate of traffic, to producers, consumers, brokers, controller, ZooKeeper, topics, partitions, messages, and so on.

For monitoring, Intuit used several tools, including Newrelec, Wavefront, Amazon CloudWatch, and AWS CloudTrail. Our recommended monitoring approach follows.

For system metrics, we recommend that you monitor:

  • CPU load
  • Network metrics
  • File handle usage
  • Disk space
  • Disk I/O performance
  • Garbage collection
  • ZooKeeper

For producers, we recommend that you monitor:

  • Batch-size-avg
  • Compression-rate-avg
  • Waiting-threads
  • Buffer-available-bytes
  • Record-queue-time-max
  • Record-send-rate
  • Records-per-request-avg

For consumers, we recommend that you monitor:

  • Batch-size-avg
  • Compression-rate-avg
  • Waiting-threads
  • Buffer-available-bytes
  • Record-queue-time-max
  • Record-send-rate
  • Records-per-request-avg

Security

Like most distributed systems, Kafka provides the mechanisms to transfer data with relatively high security across the components involved. Depending on your setup, security might involve different services such as encryption, Kerberos, Transport Layer Security (TLS) certificates, and advanced access control list (ACL) setup in brokers and ZooKeeper. The following tells you more about the Intuit approach. For details on Kafka security not covered in this section, see the Kafka documentation.

Encryption at rest

For EBS-backed EC2 instances, you can enable encryption at rest by using Amazon EBS volumes with encryption enabled. Amazon EBS uses AWS Key Management Service (AWS KMS) for encryption. For more details, see Amazon EBS Encryption in the EBS documentation. For instance store–backed EC2 instances, you can enable encryption at rest by using Amazon EC2 instance store encryption.

Encryption in transit

Kafka uses TLS for client and internode communications.

Authentication

Authentication of connections to brokers from clients (producers and consumers) to other brokers and tools uses either Secure Sockets Layer (SSL) or Simple Authentication and Security Layer (SASL).

Kafka supports Kerberos authentication. If you already have a Kerberos server, you can add Kafka to your current configuration.

Authorization

In Kafka, authorization is pluggable and integration with external authorization services is supported.

Backup and restore

The type of storage used in your deployment dictates your backup and restore strategy.

The best way to back up a Kafka cluster based on instance storage is to set up a second cluster and replicate messages using MirrorMaker. Kafka’s mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. Depending on your setup and requirements, your backup cluster might be in the same AWS Region as your main cluster or in a different one.

For EBS-based deployments, you can enable automatic snapshots of EBS volumes to back up volumes. You can easily create new EBS volumes from these snapshots to restore. We recommend storing backup files in Amazon S3.

For more information on how to back up in Kafka, see the Kafka documentation.

Conclusion

In this post, we discussed several patterns for running Kafka in the AWS Cloud. AWS also provides an alternative managed solution with Amazon Kinesis Data Streams, there are no servers to manage or scaling cliffs to worry about, you can scale the size of your streaming pipeline in seconds without downtime, data replication across availability zones is automatic, you benefit from security out of the box, Kinesis Data Streams is tightly integrated with a wide variety of AWS services like Lambda, Redshift, Elasticsearch and it supports open source frameworks like Storm, Spark, Flink, and more. You may refer to kafka-kinesis connector.

If you have questions or suggestions, please comment below.


Additional Reading

If you found this post useful, be sure to check out Implement Serverless Log Analytics Using Amazon Kinesis Analytics and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.


About the Author

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.