Tag Archives: Amazon EMR

Best Practices for Securing Amazon EMR

Post Syndicated from Tony Nguyen original https://aws.amazon.com/blogs/big-data/best-practices-for-securing-amazon-emr/

Whatever your industry, data is central to how organizations function. When we discuss data strategy with customers, it’s often in the context of how do they ingest, store, process, analyze, distribute, and ultimately secure data.

Amazon EMR is a managed Hadoop framework that you use to process vast amounts of data. One of the reasons that customers choose Amazon EMR is its security features. For example, customers like FINRA in regulated industries such as financial services, and in healthcare, choose Amazon EMR as part of their data strategy. They do so to adhere to strict regulatory requirements from entities such as the Payment Card Industry Data Security Standard (PCI) and the Health Insurance Portability and Accountability Act (HIPAA).

This post walks you through some of the principles of Amazon EMR security. It also describes features that you can use in Amazon EMR to help you meet the security and compliance objectives for your business. We cover some common security best practices that we see used. We also show some sample configurations to get you started. For more in-depth information, see Security in the EMR Management Guide.

Encryption, encryption, encryption

Our CTO, Werner Vogels, is fond of saying “Dance like nobody’s watching, encrypt like everybody is.” Properly protecting your data at rest and in transit using encryption is a core component of our well-architected pillar of security. Amazon EMR security configurations (described in the EMR documentation) make it easy for you to encrypt data. A security configuration is like a template for encryption and other security configurations that you can apply to any cluster when you launch it.

Encryption at rest

You have multiple options to encrypt data at rest in your EMR clusters. EMR by default uses the EMR file system (EMRFS) to read from and write data to Amazon S3. To encrypt data in Amazon S3, you can specify one of the following options:

  • SSE-S3: Amazon S3 manages the encryption keys for you
  • SSE-KMS: You use an AWS Key Management Service (AWS KMS) customer master key (CMK) to encrypt your data server-side on Amazon S3. Be sure to use policies that allow access by Amazon EMR.
  • CSE-KMS/CSE-C: Amazon S3 encryption and decryption takes place client-side on your Amazon EMR cluster. You can use keys provided by AWS KMS (CSE-KMS) or use a custom Java class that provides the master key (CSE-C).

With Amazon EMR, setting up the encryption type for Amazon S3 is easy. You just select it from a list.

Which option to choose depends on your specific workload requirements. With SSE-S3, the management of the keys is completely taken care of by Amazon. This is the simplest option. Using SSE-KMS or CSE-KMS enables you to have more control over the encryption keys and enables you to provide your own key material for encryption. The encryption is applied to objects within the bucket. The applications running on your cluster don’t need to be modified or made aware of the encryption in any way.

For even more control, you can use a custom key provider with CSE-C. For more information, see Amazon S3 Client-Side Encryption in the EMR documentation.

For local disk encryption, you have two approaches that complement each other. Let’s break them down by the specific encryption target:

  • To encrypt your root volume with local disk encryption, create a custom Amazon Machine Image (AMI) for Amazon EMR and specify Amazon EBS volume encryption. We cover details on custom AMIs a bit later in this post.
  • To encrypt your storage volumes with local disk encryption, use Amazon EMR security configurations(described in the EMR documentation). Security configurations use a combination of open-source HDFS encryption and LUKS encryption.To use this feature, specify either the Amazon Resource Name (ARN) for an AWS KMS key or provide a custom Java class with the encryption artifacts.

For more information about using KMS with EMR and S3, see How Amazon EMR Uses AWS KMS in the EMR documentation.

If you are using AWS KMS as part of your encryption strategy, see AWS KMS Limits in the EMR documentation for information about the request rates supported for your use case.

Encryption in transit

Amazon EMR security configurations enable you to choose a method for encrypting data in transit using Transport Layer Security (TLS) (as described in the EMR documentation). You can do either of the following:

  • Manually create PEM certificates, zip them in a file, and reference from Amazon S3.
  • Implement a certificate custom provider in Java and specify the S3 path to the JAR.

For more information on how these certificates are used with different big data technologies, check out In Transit Data Encryption with EMR.

Here’s what you’d see when creating a security configuration in the EMR Management Console.

Expressing encryption as code

You can also specify encryption by using the AWS CLI, Boto3, or AWS CloudFormation. Here’s an example with Boto3 that uses SSE-S3 for S3 encryption and KMS for local disk encryption:

import boto3
client = boto3.client('emr')
response = client.create_security_configuration(
    Name='MySecurityConfig'
    SecurityConfiguration='''{
        "EncryptionConfiguration": {
            "EnableInTransitEncryption" : true,
            "EnableAtRestEncryption" : true,
            "AtRestEncryptionConfiguration" : {
                "S3EncryptionConfiguration" : {
                    "EncryptionMode" : "SSE-S3"
                },
                "LocalDiskEncryptionConfiguration" : {
                    "EncryptionKeyProviderType" : "AwsKms",
                    "AwsKmsKey" : "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012"
                }
            }
            "InTransitEncryptionConfiguration" : {
                "TLSCertificateConfiguration" : {
                    "CertificateProviderType" : "PEM",
                    "S3Object" : "s3://MyConfigStore/artifacts/MyCerts.zip"
                }
            }
        }
    }'''
)

And the same snippet for AWS CloudFormation:

Resources:
    MySecurityConfig:
        Type: AWS::EMR::SecurityConfiguration
        Properties:
            Name: MySecurityConfig
            SecurityConfiguration:
                EncryptionConfiguration:
                    EnableInTransitEncryption: true
                    EnableAtRestEncryption: true
                    AtRestEncryptionConfiguration:
                        S3EncryptionConfiguration:
                            EncryptionMode: SSE-S3
                        LocalDiskEncryptionConfiguration:
                            EncryptionKeyProviderType: AwsKms
                            AwsKmsKey: arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012
                    InTransitEncryptionConfiguration:
                        TLSCertificateConfiguration:
                            CertificateProviderType: PEM
                            S3Object: arn:aws:s3:::MyConfigStore/artifacts/MyCerts.zip

For more information about setting up security configurations in Amazon EMR, see the AWS Big Data Blog post Secure Amazon EMR with Encryption.

Authentication and authorization

Authentication and authorization, otherwise known as AuthN and AuthZ, are two crucial components that need to be considered when controlling access to data. Authentication is the verification of an entity, and authorization is checking whether the entity actually has access to the data or resources it’s asking for. In other words, authentication checks whether you’re really who you say you are. Authorization checks whether you actually have access to what you’re asking for. Alice can be authenticated as indeed being Alice, but this doesn’t necessarily mean that Alice has authorization, or access, to look at Bob’s bank account.

Authentication on Amazon EMR

Kerberos, a network authentication protocol created by the Massachusetts Institute of Technology (MIT), uses secret-key cryptography to provide strong authentication. It helps you avoid having sensitive information such as passwords or other credentials sent over the network in an unencrypted and exposed format.

With Kerberos, you can maintain a set of services (known as a realm) and users that need to authenticate (known as principals). You provide a way for these principals to authenticate. You can also integrate your Kerberos setup with other realms. For example, you can have users authenticate from a Microsoft Active Directory domain and have a cross-realm trust set up. Such a setup can allow these authenticated users to be seamlessly authenticated to access your EMR clusters.

Amazon EMR installs open-source Hadoop-based applications on your cluster, meaning that you can use the existing security features that use Kerberos in these products. For example, you can enable Kerberos authentication for YARN, giving user-level authentication for applications running on YARN such as Apache Spark.

You can configure Kerberos on an EMR cluster (known as Kerberizing) to provide a means of authentication for cluster users. Before you configure Kerberos on Amazon EMR, we recommend that you become familiar with Kerberos concepts by reading Use Kerberos Authentication in the EMR documentation.

The following example shows Kerberizing with CloudFormation:

"SecurityConfiguration": {
      "Type": "AWS::EMR::SecurityConfiguration",
      "Properties": {
        "SecurityConfiguration": {
          "AuthenticationConfiguration": {
            "KerberosConfiguration": {
              "ClusterDedicatedKdcConfiguration": {
                "CrossRealmTrustConfiguration": {
                  "Realm": {
                    "Ref": "KerberosADdomain"
                  },
                  "KdcServer": {
                    "Ref": "DomainDNSName"
                  },
                  "Domain": {
                    "Ref": "DomainDNSName"
                  },
                  "AdminServer": {
                    "Ref": "DomainDNSName"
                  }
                },
                "TicketLifetimeInHours": 24
              },
              "Provider": "ClusterDedicatedKdc"
            }
          }
        }
      }
    }
...

Authorization on Amazon EMR

Amazon EMR uses AWS Identity and Access Management (IAM) to help you manage access to your clusters. You can use IAM to create policies for principals such as users and groups that control actions that can be performed with Amazon EMR and other AWS resources.

There are two roles associated with each cluster in Amazon EMR that typically are of interest—the service role and a role for the Amazon EC2 instance profile. You use the service role, or EMR role, for any action related to provisioning resources and other service-level tasks that aren’t performed in the context of an EC2 instance. The instance profile role is used by EC2 instances within the cluster. The policies associated with this role apply to processes that run on the cluster instances. For more information on these roles and others, see Configure IAM Roles for Amazon EMR Permissions to AWS Services in the EMR documentation.

It’s important to understand how IAM helps you control authorized access to your cluster in relation to these roles and where it does not. IAM controls API-level actions done on other AWS services. IAM helps you with things like controlling access to objects in S3, protecting against cluster modification, and restricting access to keys from AWS KMS.

This has important implications. By default, any process running in a cluster inherits the access permissions of the IAM role associated with the cluster. In contrast, IAM doesn’t control activity inside of your clusters. You need to use other means to appropriately secure the processes running on each EC2 instance.

Because of these characteristics, formerly you used to face a particular Amazon EMR authorization challenge. This challenge was to understand how, by default, the IAM role attached to the EC2 instance profile role on your cluster determined the data that can be accessed in Amazon S3. What this effectively meant was that data access to S3 was granular only at the cluster level. This effect made it difficult to have multiple users with potentially different levels of access to data touching the same cluster.

With Amazon EMR versions 5.10.0 and later, EMRFS fine-grained authorization was introduced. This fine-grained authorization enables the ability to specify the IAM role to assume at the user or group level when EMRFS is accessing Amazon S3. This authorization enables fine-grained access control for Amazon S3 on multitenant EMR clusters and also makes it easier to enable cross-account Amazon S3 access to data.

The EMRFS authorization feature specifically applies to access by using HiveServer2. If your users are using Spark or other applications that allows for the execution of arbitrary code (for example, Jupyter, Zeppelin, SSH, spark-shell…), your users can bypass the roles that EMRFS has mapped to them.

For more information on how to configure your security configurations and IAM roles appropriately, read Configure IAM Roles for EMRFS Requests to Amazon S3 in the EMR documentation.

For a great in-depth guide on setting up a multitenant environment with both Kerberos authentication and EMRFS authorization, take a look at the following post on the AWS Big Data blog: Build a Multi-Tenant Amazon EMR Cluster with Kerberos, Microsoft Active Directory Integration and IAM Roles for EMRFS.

Network

Your network topology is also important when designing for security and privacy. We recommend placing your Amazon EMR clusters in private subnets, and use NAT to perform only outbound internet access.

Security groups control inbound and outbound access from your individual instances. With Amazon EMR, you can use both Amazon EMR-managed security groups and also your own security groups to control network access to your instance. By applying the principle of least privilege to your security groups, you can lock down your EMR cluster to only the applications or individuals who need access.

For more information, see Working With Amazon EMR-Managed Security Groups in the EMR Documentation.

The following example shows a security group with boto3:

import boto3
from botocore.exceptions import ClientError

ec2 = boto3.client('ec2')

response = ec2.describe_vpcs()
vpc_id = response.get('Vpcs', [{}])[0].get('VpcId', '')

try:
    response = ec2.create_security_group(GroupName='SECURITY_GROUP_NAME',
                                         Description='Allow 80',
                                         VpcId=vpc_id)
    security_group_id = response['GroupId']

    data = ec2.authorize_security_group_ingress(
        GroupId=security_group_id,
        IpPermissions=[
            {'IpProtocol': 'tcp',
             'FromPort': 22,
             'ToPort': 22,
             'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
        ])
except ClientError as e:
    print(e)

The following example shows a security group with CloudFormation:

InstanceSecurityGroup:
  Type: AWS::EC2::SecurityGroup
  Properties:
    GroupDescription: Allow 80
    VpcId:
      Ref: myVPC
    SecurityGroupIngress:
    - IpProtocol: tcp
      FromPort: '80'
      ToPort: '80'
      CidrIp: 0.0.0.0/0
    SecurityGroupEgress:
    - IpProtocol: tcp
      FromPort: '80'
      ToPort: '80'
      CidrIp: 0.0.0.0/0
 ...

Minimal IAM policy

By default, the IAM policies that are associated with EMR are generally permissive, to enable you to easily integrate EMR with other AWS services. When securing EMR, a best practice is to start from the minimal set of permissions required for EMR to function and add permissions as necessary.

Following are three policies that are scoped around what EMR minimally requires for basic operation. You can potentially further minimize these policies by removing actions related to spot pricing and autoscaling. For clarity, the policies are annotated with comments—remove the comments before use.

Minimal EMR service-role policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Resource": "*",
            "Action": [
                "ec2:AuthorizeSecurityGroupEgress",
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:CancelSpotInstanceRequests",
                "ec2:CreateNetworkInterface",
                "ec2:CreateSecurityGroup",
                "ec2:CreateTags",
                "ec2:DeleteNetworkInterface", // This is only needed if you are launching clusters in a private subnet. 
                "ec2:DeleteTags",
                "ec2:DeleteSecurityGroup", // This is only needed if you are using Amazon managed security groups for private subnets. You can omit this action if you are using custom security groups.
                "ec2:DescribeAvailabilityZones",
                "ec2:DescribeAccountAttributes",
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeImages",
                "ec2:DescribeInstanceStatus",
                "ec2:DescribeInstances",
                "ec2:DescribeKeyPairs",
                "ec2:DescribeNetworkAcls",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribePrefixLists",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSpotInstanceRequests",
                "ec2:DescribeSpotPriceHistory",
                "ec2:DescribeSubnets",
                "ec2:DescribeTags",
                "ec2:DescribeVpcAttribute",
                "ec2:DescribeVpcEndpoints",
                "ec2:DescribeVpcEndpointServices",
                "ec2:DescribeVpcs",
                "ec2:DetachNetworkInterface",
                "ec2:ModifyImageAttribute",
                "ec2:ModifyInstanceAttribute",
                "ec2:RequestSpotInstances",
                "ec2:RevokeSecurityGroupEgress",
                "ec2:RunInstances",
                "ec2:TerminateInstances",
                "ec2:DeleteVolume",
                "ec2:DescribeVolumeStatus",
                "ec2:DescribeVolumes",
                "ec2:DetachVolume",
                "iam:GetRole",
                "iam:GetRolePolicy",
                "iam:ListInstanceProfiles",
                "iam:ListRolePolicies",
                "s3:CreateBucket",
                "sdb:BatchPutAttributes",
                "sdb:Select",
                "cloudwatch:PutMetricAlarm",
                "cloudwatch:DescribeAlarms",
                "cloudwatch:DeleteAlarms",
                "application-autoscaling:RegisterScalableTarget",
                "application-autoscaling:DeregisterScalableTarget",
                "application-autoscaling:PutScalingPolicy",
                "application-autoscaling:DeleteScalingPolicy",
                "application-autoscaling:Describe*"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": ["arn:aws:s3:::examplebucket/*","arn:aws:s3:::examplebucket2/*"], // Here you can specify the list of buckets which are going to be storing cluster logs, bootstrap action script, custom JAR files, input & output paths for EMR steps
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetBucketCORS",
                "s3:GetObjectVersionForReplication",
                "s3:GetObject",
                "s3:GetBucketTagging",
                "s3:GetObjectVersion",
                "s3:GetObjectTagging",
                "s3:ListMultipartUploadParts",
                "s3:ListBucketByTags",
                "s3:ListBucket",
                "s3:ListObjects",
                "s3:ListBucketMultipartUploads"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": "arn:aws:sqs:*:123456789012:AWS-ElasticMapReduce-*", // This allows EMR to only perform actions (Creating queue, receiving messages, deleting queue, etc) on SQS queues whose names are prefixed with the literal string AWS-ElasticMapReduce-
            "Action": [
                "sqs:CreateQueue",
                "sqs:DeleteQueue",
                "sqs:DeleteMessage",
                "sqs:DeleteMessageBatch",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:PurgeQueue",
                "sqs:ReceiveMessage"
            ]
        },
        {
            "Effect": "Allow",  
            "Action": "iam:CreateServiceLinkedRole",  // EMR needs permissions to create this service-linked role for launching EC2 spot instances
            "Resource": "arn:aws:iam::*:role/aws-service-role/spot.amazonaws.com/AWSServiceRoleForEC2Spot*",
            "Condition": {
                "StringLike": {
                    "iam:AWSServiceName": "spot.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": "iam:PassRole", // We are passing the custom EC2 instance profile (defined following) which has bare minimum permissions
            "Resource": [
                "arn:aws:iam::*:role/Custom_EMR_EC2_role",
                "arn:aws:iam::*:role/EMR_AutoScaling_DefaultRole"
            ]
        }
    ]
}

Minimal EMR role for EC2 (instance profile) policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Resource": "*",
            "Action": [
                "ec2:AuthorizeSecurityGroupEgress",
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:CancelSpotInstanceRequests",
                "ec2:CreateNetworkInterface",
                "ec2:CreateSecurityGroup",
                "ec2:CreateTags",
                "ec2:DeleteNetworkInterface", // This is only needed if you are launching clusters in a private subnet. 
                "ec2:DeleteTags",
                "ec2:DeleteSecurityGroup", // This is only needed if you are using Amazon managed security groups for private subnets. You can omit this action if you are using custom security groups.
                "ec2:DescribeAvailabilityZones",
                "ec2:DescribeAccountAttributes",
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeImages",
                "ec2:DescribeInstanceStatus",
                "ec2:DescribeInstances",
                "ec2:DescribeKeyPairs",
                "ec2:DescribeNetworkAcls",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribePrefixLists",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSpotInstanceRequests",
                "ec2:DescribeSpotPriceHistory",
                "ec2:DescribeSubnets",
                "ec2:DescribeTags",
                "ec2:DescribeVpcAttribute",
                "ec2:DescribeVpcEndpoints",
                "ec2:DescribeVpcEndpointServices",
                "ec2:DescribeVpcs",
                "ec2:DetachNetworkInterface",
                "ec2:ModifyImageAttribute",
                "ec2:ModifyInstanceAttribute",
                "ec2:RequestSpotInstances",
                "ec2:RevokeSecurityGroupEgress",
                "ec2:RunInstances",
                "ec2:TerminateInstances",
                "ec2:DeleteVolume",
                "ec2:DescribeVolumeStatus",
                "ec2:DescribeVolumes",
                "ec2:DetachVolume",
                "iam:GetRole",
                "iam:GetRolePolicy",
                "iam:ListInstanceProfiles",
                "iam:ListRolePolicies",
                "s3:CreateBucket",
                "sdb:BatchPutAttributes",
                "sdb:Select",
                "cloudwatch:PutMetricAlarm",
                "cloudwatch:DescribeAlarms",
                "cloudwatch:DeleteAlarms",
                "application-autoscaling:RegisterScalableTarget",
                "application-autoscaling:DeregisterScalableTarget",
                "application-autoscaling:PutScalingPolicy",
                "application-autoscaling:DeleteScalingPolicy",
                "application-autoscaling:Describe*"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": ["arn:aws:s3:::examplebucket/*","arn:aws:s3:::examplebucket2/*"], // Here you can specify the list of buckets which are going to be storing cluster logs, bootstrap action script, custom JAR files, input & output paths for EMR steps
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetBucketCORS",
                "s3:GetObjectVersionForReplication",
                "s3:GetObject",
                "s3:GetBucketTagging",
                "s3:GetObjectVersion",
                "s3:GetObjectTagging",
                "s3:ListMultipartUploadParts",
                "s3:ListBucketByTags",
                "s3:ListBucket",
                "s3:ListObjects",
                "s3:ListBucketMultipartUploads"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": "arn:aws:sqs:*:123456789012:AWS-ElasticMapReduce-*", // This allows EMR to only perform actions (Creating queue, receiving messages, deleting queue, etc) on SQS queues whose names are prefixed with the literal string AWS-ElasticMapReduce-
            "Action": [
                "sqs:CreateQueue",
                "sqs:DeleteQueue",
                "sqs:DeleteMessage",
                "sqs:DeleteMessageBatch",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:PurgeQueue",
                "sqs:ReceiveMessage"
            ]
        },
        {
            "Effect": "Allow",  
            "Action": "iam:CreateServiceLinkedRole",  // EMR needs permissions to create this service-linked role for launching EC2 spot instances
            "Resource": "arn:aws:iam::*:role/aws-service-role/spot.amazonaws.com/AWSServiceRoleForEC2Spot*",
            "Condition": {
                "StringLike": {
                    "iam:AWSServiceName": "spot.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": "iam:PassRole", // We are passing the custom EC2 instance profile (defined following) which has bare minimum permissions
            "Resource": [
                "arn:aws:iam::*:role/Custom_EMR_EC2_role",
                "arn:aws:iam::*:role/EMR_AutoScaling_DefaultRole"
            ]
        }
    ]
}

Minimal EMR role for EC2 (instance profile) policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Resource": "*",
            "Action": [
                "ec2:Describe*",
                "elasticmapreduce:Describe*",
                "elasticmapreduce:ListBootstrapActions",
                "elasticmapreduce:ListClusters",
                "elasticmapreduce:ListInstanceGroups",
                "elasticmapreduce:ListInstances",
                "elasticmapreduce:ListSteps"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": [    // Here you can specify the list of buckets which are going to be accessed by applications (Spark, Hive, etc) running on the nodes of the cluster
                "arn:aws:s3:::examplebucket1/*",
                "arn:aws:s3:::examplebucket1*",
                "arn:aws:s3:::examplebucket2/*",
                "arn:aws:s3:::examplebucket2*"
            ], 
            "Action": [
                "s3:GetBucketLocation",
                "s3:GetBucketCORS",
                "s3:GetObjectVersionForReplication",
                "s3:GetObject",
                "s3:GetBucketTagging",
                "s3:GetObjectVersion",
                "s3:GetObjectTagging",
                "s3:ListMultipartUploadParts",
                "s3:ListBucketByTags",
                "s3:ListBucket",
                "s3:ListObjects",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
                "s3:PutObjectTagging",
                "s3:HeadBucket",
                "s3:DeleteObject"
            ]
        },
        {
            "Effect": "Allow",
            "Resource": "arn:aws:sqs:*:123456789012:AWS-ElasticMapReduce-*", // This allows EMR to only perform actions (creating queue, receiving messages, deleting queue, and so on) on SQS queues whose names are prefixed with the literal string AWS-ElasticMapReduce-
            "Action": [
                "sqs:CreateQueue",
                "sqs:DeleteQueue",
                "sqs:DeleteMessage",
                "sqs:DeleteMessageBatch",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:PurgeQueue",
                "sqs:ReceiveMessage"
            ]
        }
    ]
}

Minimal role policy for user launching EMR clusters

// This policy can be attached to an IAM user who will be launching EMR clusters. It provides minimum access to the user to launch, monitor and terminate EMR clusters

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": "iam:CreateServiceLinkedRole",
            "Resource": "*",
            "Condition": {
                "StringLike": {
                    "iam:AWSServiceName": [
                        "elasticmapreduce.amazonaws.com",
                        "elasticmapreduce.amazonaws.com.cn"
                    ]
                }
            }
        },
        {
            "Sid": "Statement2",
            "Effect": "Allow",
            "Action": [
                "iam:GetPolicyVersion",
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:DescribeInstances",
                "ec2:RequestSpotInstances",
                "ec2:DeleteTags",
                "ec2:DescribeSpotInstanceRequests",
                "ec2:ModifyImageAttribute",
                "cloudwatch:GetMetricData",
                "cloudwatch:GetMetricStatistics",
                "cloudwatch:ListMetrics",
                "ec2:DescribeVpcAttribute",
                "ec2:DescribeSpotPriceHistory",
                "ec2:DescribeAvailabilityZones",
                "ec2:CreateRoute",
                "ec2:RevokeSecurityGroupEgress",
                "ec2:CreateSecurityGroup",
                "ec2:DescribeAccountAttributes",
                "ec2:ModifyInstanceAttribute",
                "ec2:DescribeKeyPairs",
                "ec2:DescribeNetworkAcls",
                "ec2:DescribeRouteTables",
                "ec2:AuthorizeSecurityGroupEgress",
                "ec2:TerminateInstances", //This action can be scoped in similar manner like it has been done following for "elasticmapreduce:TerminateJobFlows"
                "iam:GetPolicy",
                "ec2:CreateTags",
                "ec2:DeleteRoute",
                "iam:ListRoles",
                "ec2:RunInstances",
                "ec2:DescribeSecurityGroups",
                "ec2:CancelSpotInstanceRequests",
                "ec2:CreateVpcEndpoint",
                "ec2:DescribeVpcs",
                "ec2:DescribeSubnets",
                "elasticmapreduce:*"
            ],
            "Resource": "*"
        },
        {
            "Sid": "Statement3",
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:TerminateJobFlows"
            ],
            "Resource":"*",
            "Condition": {
                "StringEquals": {
                  "elasticmapreduce:ResourceTag/custom_key": "custom_value"  // Here you can specify the key value pair of your custom tag so that this IAM user can only delete the clusters which are appropriately tagged by the user
                }
              }
        },
        {
            "Sid": "Statement4",
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": [
                "arn:aws:iam::*:role/Custom_EMR_Role",
                "arn:aws:iam::*:role/Custom_EMR_EC2_role",
                "arn:aws:iam::*:role/EMR_AutoScaling_DefaultRole"
            ]
        }
    ]
}

Bootstrap actions

Bootstrap actions are commonly used to run custom code for setup before the execution of your cluster. You can use them to install software or configure instances in any language already installed on the cluster, including Bash, Perl, Python, Ruby, C++, or Java.

For more details on how to use bootstrap actions, see Create Bootstrap Actions to Install Additional Software in the EMR documentation.

For the purposes of this blog post, we’ll discuss bootstrap actions to harden your cluster, apply security packages you might require, and set up third-party monitoring solutions.

When do bootstrap actions run in the cluster lifecycle?

Bootstrap actions run on each cluster instance after instance provisioning and before application installation. This approach has implications if you want to use bootstrap actions to modify the security configuration of applications that are provisioned by EMR. If this is the case, you can have your script wait on a particular trigger, like so with a bootstrap action dependent on Presto installation:

# Work to do before Presto being installed executes before this line
while [ ! -f /var/run/presto/presto-presto-server.pid ]
do
  sleep 1
done

# Continue on with script execution

Be sure to run the preceding snippet in a background process so that it doesn’t block the provisioning process. Failing to do so results in a timeout failure.

For more information about the cluster lifecycle, refer to: Understanding the Cluster Lifecycle in the EMR documentation.

Custom AMIs and applying CIS controls to harden your AMI

Custom Amazon Machine Images (AMIs) provide another approach that you can take to help harden and secure your EMR cluster. Amazon EMR uses an Amazon Linux AMI to initialize Amazon EC2 instances when you create and launch a cluster. The AMI contains the Amazon Linux operating system, other software, and configurations required for each instance to host your cluster applications.

Specifying a custom AMI is useful for the following use cases:

  • Encrypting the EBS root device volumes (boot volumes) of EC2 instances in your cluster, which you can’t do with a security configuration. Security configurations only help you encrypt your storage volumes. For more information, see Creating a Custom AMI with an Encrypted Amazon EBS Root Device Volume in the EMR documentation.
  • Pre-installing applications and performing other customizations instead of using bootstrap actions, which can improve cluster start time and streamline the startup work flow. For more information and an example, see Creating a Custom Amazon Linux AMI from a Preconfigured Instance in the EMR documentation.
  • Implementing more sophisticated cluster and node configurations than bootstrap actions enable.

By using a custom AMI instead of a bootstrap action, you can have your hardening steps prebaked into the images you use, rather than having to run the bootstrap action scripts at instance provision time. You don’t have to choose between the two, either. You can create a custom AMI for the common security characteristics of your cluster that are less likely to change. You can use bootstrap actions to pull the latest configurations and scripts that might be cluster-specific.

One approach that many of our customers take is to apply the Center for Internet Security (CIS) benchmarks to harden their EMR clusters, found on the Center for Internet Security website. It’s important to verify each and every control for necessity and function test against your requirements when applying these benchmarks to your clusters.

The following example shows using custom AMIs in CloudFormation:

Resources:
  cluster:
    Type: 'AWS::EMR::Cluster'
    Properties:
      CustomAmiId: "ami-7fb3bc69"
      Instances:
        MasterInstanceGroup:
...

The following example shows using custom AMIs in boto3:

response = client.run_job_flow(
    Name='string',
    LogUri='string',
    AdditionalInfo='string',
    CustomAmiId='ami-7fb3bc69',
...

Auditing

You might want the ability to audit compute environments, which is a key requirement for many customers. There are a variety of ways that you can support this requirement within EMR:

  • From EMR 5.14.0 onwards, EMRFS, Amazon EMR’s connector for S3, supports auditing of users who ran queries that accessed data in S3 through EMRFS. This feature is turned on by default and passes on user and group information to audit logs like CloudTrail. It provides you with comprehensive request tracking.
  • If it exists, you can configure and implement application-specific auditing on EMR. For example, this AWS Big Data Blog post walks through how to configure a custom event listener on Presto to enable audit logging, debugging, and performance analysis: Custom Log Presto Query Events on Amazon EMR for Auditing and Performance Insights.
  • You can use tools such as Apache Ranger to implement another layer of auditing and authorization. For additional information, see this AWS Big Data Blog post: Implementing Authorization and Auditing using Apache Ranger on Amazon EMR
  • AWS CloudTrail, a service that provides a record of actions taken by a user, role, or an AWS service, is integrated with Amazon EMR. CloudTrail captures all API calls for Amazon EMR as events. The calls captured include calls from the Amazon EMR console and code calls to the Amazon EMR API operations. If you create a trail, you can enable continuous delivery of CloudTrail events to an Amazon S3 bucket, including events for Amazon EMR.
  • You can also audit the S3 objects that EMR accesses by using S3 access logs. AWS CloudTrail provides logs only for AWS API calls. Thus, if a user runs a job that reads and writes data to S3, the S3 data that was accessed by EMR doesn’t show up in CloudTrail. By using S3 access logs, you can comprehensively monitor and audit access against your data in S3 from anywhere, including EMR.
  • Because you have full control over your EMR cluster, you can always install your own third-party agents or tooling. You do so by using bootstrap actions or custom AMIs to help support your auditing requirements.

Verifying your security configuration

You also want to verify that your configuration works, of course. Following are some steps that you can take to verify your configuration.

S3 server-side encryption (SSE) on EMR

Here, we want to verify a particular case. We want to ensure that if an S3 object is uploaded using EMRFS and server-side encryption (SSE) is enabled in the EMRFS configuration, it has metadata indicating it’s encrypted. For example, calling getSSEAlgorithm on the S3 object should return AES256 if the object is encrypted using an S3 key. It should return aws:kms if the object is encrypted using a KMS key.

We also want to check that if the file is downloaded using EMRFS, the original file and the downloaded file match in terms of content.

To verify this, do the following:

  1. Use Secure Shell (SSH) to connect to the master node as described in Connect to the Master Node Using SSH in the EMR documentation.
  2. Upload an object to S3 using EMRFS:
    • hadoop fs -put <local path> <s3 bucket path>
  3. Check the metadata of the uploaded object directly:
    • aws s3api head-object --bucket <s3 bucket path> --key file
  4. Download the file from S3 using EMRFS:
    • hadoop fs -get <s3 bucket path> <local path>
  5. Use diff on the original and downloaded file and verify that they are the same.

S3 client-side encryption (CSE) on EMR

Similarly to verifying server-side encryption, we want to confirm another point. If an object is uploaded using EMRFS and client side encryption is enabled in the EMRFS configuration, the S3 object’s contents should be encrypted. A client that doesn’t possess the proper key shouldn’t be able to see them. If we get the object using an Amazon S3 client, the contents of the object shouldn’t be the same as the contents of the original object that was uploaded.

We also want to confirm that if we do use EMRFS to download the file, the contents of the downloaded file should match the contents of the original file.

To verify this, do the following:

  1. Use SSH to connect to the master node as described in Connect to the Master Node Using SSH.
  2. Upload an object to S3 using EMRFS:
    • hadoop fs -put <local path> <s3 bucket path>
  3. Download the object from S3 using the AWS CLI:
    • aws s3 mv <s3 bucket path> <local path>
    • This file should NOT match the original file.
  4. Download the object from S3 using EMRFS:
    • hadoop fs -get <s3 bucket path> <local path>
    • This file SHOULD match the original file.

Local disk encryption

We also want to verify that if local disk encryption is enabled, all the block devices on the cluster are of type crypto_LUKS. If there is a partition that is not a LUKS partition, the following command exits with a return code of 1:

! blkid | grep -v crypto_LUKS

In addition, we want to verify that the local disk encryption uses the kind of key that was defined in the configuration.

Let’s try to use KMS to decrypt our encrypted local disk passphrase. If it is decrypted, we try to use it to open the LUKS partitions. If it doesn’t get decrypted, we assume it is a custom key.

To do so, we run the following commands:

  1. base64 -d /var/setup-devices/.encrypted-diskKey > (local path for encrypted passphrase)
  2. aws kms decrypt --ciphertext-blob fileb://(local path for encrypted passphrase) --query Plaintext > (local path for decrypted passphrase)
    • If this step fails, the key is not from KMS
  3. sudo cryptsetup luksOpen --test-passphrase --key-slot 0 /dev/xvdb1 < (local path for decrypted passphrase) | cut -d '"' -f 2

Wrapping it up in AWS CloudFormation

We have provided the following AWS CloudFormation template to help you more easily deploy an EMR cluster that aligns to the patterns and practices we describe in this post. The following template spins up an EMR cluster with the following characteristics:

  • Encryption at rest enabled for both S3 and the local disk
  • Encryption in transit enabled using a certificate bundle that you specify
  • A custom AMI CloudFormation parameter that can incorporate the concepts described in the custom AMI section
  • Kerberos-enabled
  • Fine-grained S3 authorization
---
Description: Sample CloudFormation template for creating an EMR cluster
Parameters:
  KeyName:
    Description: Name of an existing EC2 KeyPair to enable SSH to the instances
    Type: AWS::EC2::KeyPair::KeyName
  Subnet:
    Description: Subnet ID for creating the EMR cluster
    Type: AWS::EC2::Subnet::Id
Resources:
  EMRInstanceProfile:
    Properties:
      Roles:
      - Ref: EMRJobFlowRole
    Type: AWS::IAM::InstanceProfile
  EMRJobFlowRole:
    Properties:
      AssumeRolePolicyDocument:
        Statement:
        - Action:
          - sts:AssumeRole
          Effect: Allow
          Principal:
            Service:
            - ec2.amazonaws.com
      ManagedPolicyArns:
      - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role
    Type: AWS::IAM::Role
  EMRSampleCluster:
    Properties:
      Applications:
      - Name: Hadoop
      - Name: Hive
      - Name: Spark
      AutoScalingRole:
        Ref: EMR_AutoScaling_DefaultRole
      BootstrapActions:
      - Name: Dummy bootstrap action
        ScriptBootstrapAction:
          Args:
          - dummy
          - parameter
          Path: file:/usr/share/aws/emr/scripts/install-hue
      Configurations:
      - Classification: core-site
        ConfigurationProperties:
          hadoop.security.groups.cache.secs: '250'
      - Classification: mapred-site
        ConfigurationProperties:
          mapred.tasktracker.map.tasks.maximum: '2'
          mapreduce.map.sort.spill.percent: '90'
          mapreduce.tasktracker.reduce.tasks.maximum: '5'
      Instances:
        CoreInstanceGroup:
          EbsConfiguration:
            EbsBlockDeviceConfigs:
            - VolumeSpecification:
                SizeInGB: '10'
                VolumeType: gp2
              VolumesPerInstance: '1'
            EbsOptimized: 'true'
          InstanceCount: '1'
          InstanceType: m4.large
          Name: Core Instance
        Ec2KeyName:
          Ref: KeyName
        Ec2SubnetId:
          Ref: Subnet
        MasterInstanceGroup:
          InstanceCount: '1'
          InstanceType: m4.large
          Name: Master Instance
      JobFlowRole:
        Ref: EMRInstanceProfile
      Name: EMR Sample Cluster
      ReleaseLabel: emr-5.5.0
      SecurityConfiguration:
        Ref: EMRSecurityConfiguration
      ServiceRole:
        Ref: EMRServiceRole
      Tags:
      - Key: Name
        Value: EMR Sample Cluster
      VisibleToAllUsers: 'true'
    Type: AWS::EMR::Cluster
  EMRSecurityConfiguration:
    Properties:
      Name: EMRSampleClusterSecurityConfiguration
      SecurityConfiguration:
        EncryptionConfiguration:
          AtRestEncryptionConfiguration:
            LocalDiskEncryptionConfiguration:
              AwsKmsKey: arn:aws:kms:us-east-1:123456789012:key/1234-1234-1234-1234-1234
              EncryptionKeyProviderType: AwsKms
            S3EncryptionConfiguration:
              AwsKmsKey: arn:aws:kms:us-east-1:123456789012:key/1234-1234-1234-1234-1234
              EncryptionMode: SSE-KMS
          EnableAtRestEncryption: 'true'
          EnableInTransitEncryption: 'true'
          InTransitEncryptionConfiguration:
            TLSCertificateConfiguration:
              CertificateProviderType: PEM
              S3Object: s3://MyConfigStore/artifacts/MyCerts.zip
    Type: AWS::EMR::SecurityConfiguration
  EMRServiceRole:
    Properties:
      AssumeRolePolicyDocument:
        Statement:
        - Action:
          - sts:AssumeRole
          Effect: Allow
          Principal:
            Service:
            - elasticmapreduce.amazonaws.com
      ManagedPolicyArns:
      - arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole
    Type: AWS::IAM::Role

Summary

In this post, we provide a set of best practices to consider and follow when securing your EMR clusters. If you have questions or suggestions, leave a comment.

 


Additional Reading

If you found this post useful, be sure to check out Restrict access to your AWS Glue Data Catalog with resource-level IAM permissions and resource-based policies, Implementing Authorization and Auditing using Apache Ranger on Amazon EMR, and Secure Amazon EMR with Encryption.

 


About the Author

Tony Nguyen is a Senior Consultant with AWS Professional Services. His specialty is in data and analytics, focused on helping public sector customers with their unique data challenges. He works directly with customers to design, architect, and implement big data and analytics solutions on AWS. When he’s not eyeballs-deep in data, he enjoys playing volleyball, cooking, and occasionally fooling himself into thinking that he’s a half-decent photographer.

 

 

 

 

Dr. Aaron Friedman is a Healthcare and Life Sciences Partner Solutions Architect at Amazon Web Services. He works with ISVs and SIs to architect healthcare solutions on AWS, and bring the best possible experience to their customers. His passion is working at the intersection of science, big data, and software. In his spare time, he’s exploring the outdoors, learning a new thing to cook, or spending time with his wife and his dog, Macaroon.

Connecting to and running ETL jobs across multiple VPCs using a dedicated AWS Glue VPC

Post Syndicated from Nivas Shankar original https://aws.amazon.com/blogs/big-data/connecting-to-and-running-etl-jobs-across-multiple-vpcs-using-a-dedicated-aws-glue-vpc/

Many organizations use a setup that includes multiple VPCs based on the Amazon VPC service, with databases isolated in separate VPCs for security, auditing, and compliance purposes. This blog post shows how you can use AWS Glue to perform extract, transform, load (ETL) and crawler operations for databases located in multiple VPCs.

The solution presented here uses a dedicated AWS Glue VPC and subnet to perform the following operations on databases located in different VPCs:

  • Scenario 1: Ingest data from an Amazon RDS for MySQL database, transform it in AWS Glue, and output the results to an Amazon Redshift data warehouse.
  • Scenario 2: Ingest data from an Amazon RDS for MySQL database, transform it in AWS Glue, and output the results to an Amazon RDS for PostgreSQL database.

In this blog post, we’ll go through the steps needed to build an ETL pipeline that consumes from one source in one VPC and outputs it to another source in a different VPC. We’ll set up in multiple VPCs to reproduce a situation where your database instances are in multiple VPCs for isolation related to security, audit, or other purposes.

For this solution, we create a VPC dedicated to AWS Glue. Next, we set up VPC peering between the AWS Glue VPC and all of the other database VPCs. Then we configure an Amazon S3 endpoint, route tables, security groups, and IAM so that AWS Glue can function properly. Lastly, we create AWS Glue connections and an AWS Glue job to perform the task at hand.

Step 1: Set up a VPC

To simulate these scenarios, we create four VPCs with their respective IPv4 CIDR ranges. (Note: CIDR ranges can’t overlap when you use VPC peering.)

VPC 1 Amazon Redshift 172.31.0.0/16
VPC 2 Amazon RDS for MySQL 172.32.0.0/16
VPC 3 Amazon RDS for PostgreSQL 172.33.0.0/16
VPC 4 AWS Glue 172.30.0.0/16

Key configuration notes:

  1. The AWS Glue VPC needs at least one private subnet for AWS Glue to use.
  2. Ensure that DNS hostnames are enabled for all of your VPCs (unless you plan to refer to your databases by IP address later on, which isn’t recommended).

Step 2: Set up a VPC peering connection

Next, we peer our VPCs together to ensure that AWS Glue can communicate with all of the database targets and sources. This approach is necessary because AWS Glue resources are created with private addresses only. Thus, they can’t use an internet gateway to communicate with public addresses, such as public database endpoints. If your database endpoints are public, you can alternatively use a network address translation (NAT) gateway with AWS Glue rather than peer across VPCs.

Create the following peering connections.

Requester Accepter
Peer 1 172.30.0.0/16- VPC 4 172.31.0.0/16- VPC 1
Peer 2 172.30.0.0/16- VPC 4 172.32.0.0/16 -VPC 2
Peer 3 172.30.0.0/16- VPC 4 172.33.0.0/16- VPC 3

These peering connections can be across separate AWS Regions if needed. The database VPCs are not peered together; they are all peered with the AWS Glue VPC instead. We do this because AWS Glue connects to each database from its own VPC. The databases don’t connect to each other.

Key configuration notes:

  1. Create a VPC peering connection, as described in the Amazon VPC documentation. Select the AWS Glue VPC as the requester and the VPC for your database as the accepter.
  2. Accept the VPC peering request. If you are peering to a different AWS Region, switch to that AWS Region to accept the request.

Important: Enable Domain Name Service (DNS) settings for each of the peering connections. Doing this ensures that AWS Glue can retrieve the private IP address of your database endpoints. Otherwise, AWS Glue resolves your database endpoints to public IP addresses. AWS Glue can’t connect to public IP addresses without a NAT gateway.

Step 3: Create an Amazon S3 endpoint for the AWS Glue subnet

We need to add an Amazon S3 endpoint to the AWS Glue VPC (VPC 4). During setup, associate the endpoint with the route table that your private subnet uses. For more details on creating an S3 endpoint for AWS Glue, see Amazon VPC Endpoints for Amazon S3 in the AWS Glue documentation.

AWS Glue uses S3 to store your scripts and temporary data to load into Amazon Redshift.

Step 4: Create a route table configuration

Add the following routes to the route tables used by the respective services’ subnets. These routes are configured along with existing settings.

VPC 4—AWS Glue Destination Target
Route table 172.33.0.0/16- VPC 3 Peer 3
172.31.0.0/16- VPC 1 Peer 1
172.32.0.0/16- VPC 2 Peer 2

 

VPC 1—Amazon Redshift Destination Target
Route table 172.30.0.0/16- VPC 4 Peer 1

 

VPC 2—Amazon RDS MySQL Destination Target
Route table 172.30.0.0/16- VPC 4 Peer 2

 

VPC 3—Amazon RDS PostgreSQL Destination Target
Route table 172.30.0.0/16- VPC 4 Peer 3

Key configuration notes:

  • The route table for the AWS Glue VPC has peering connections to all VPCs. It has these so that AWS Glue can initiate connections to all of the databases.
  • All of the database VPCs have a peering connection back to the AWS Glue VPC. They have these connections to allow return traffic to reach AWS Glue.
  • Ensure that your S3 endpoint is present in the route table for the AWS Glue VPC.

Step 5: Update the database security groups

Each database’s security group must allow traffic to its listening port (3306, 5432, 5439, and so on) from the AWS Glue VPC for AWS Glue to be able to connect to it. It’s also a good idea to restrict the range of source IP addresses as much as possible.

There are two ways to accomplish this. If your AWS Glue job will be in the same AWS Region as the resource, you can define the source as the security group that you use for AWS Glue. If you are using AWS Glue to connect across AWS Regions, specify the IP range from the private subnet in the AWS Glue VPC instead. The examples following use a security group as our AWS Glue job, and data sources are all in the same AWS Region.

In addition to configuring the database’s security groups, AWS Glue requires a special security group that allows all inbound traffic from itself. Because it isn’t secure to allow traffic from 0.0.0.0/0, we create a self-referencing rule that simply allows all traffic originating from the security group. You can create a new security group for this purpose, or you can modify an existing security group. In the example following, we create a new security group to use later when AWS Glue connections are created.

The security group Amazon RDS for MySQL needs to allow traffic from AWS Glue:

Amazon RDS for PostgreSQL allows traffic to its listening port from the same:

Amazon Redshift does it as so:

AWS Glue does it as so:

Step 6: Set up IAM

Make sure that you have an AWS Glue IAM role with access to Amazon S3. You might want to provide your own policy for access to specific Amazon S3 resources. Data sources require s3:ListBucket and s3:GetObject permissions. Data targets require s3:ListBucket, s3:PutObject, and s3:DeleteObject permissions. For more information on creating an Amazon S3 policy for your resources, see Policies and Permissions in the IAM documentation.

The role should look like this:

Or you can create an S3 policy that’s more restricted to suit your use case.

Step 7: Set up an AWS Glue connection

The Amazon RDS for MySQL connection in AWS Glue should look like this:

The Amazon Redshift connection should look like this:

The Amazon RDS for PostgreSQL connection should look like this:

Step 8: Set up an AWS Glue job

Key configuration notes:

  1. Create a crawler to import table metadata from the source database (Amazon RDS for MySQL) into the AWS Glue Data Catalog. The scenario includes a database in the catalog named gluedb, to which the crawler adds the sample tables from the source Amazon RDS for MySQL database.
  2. Use either the source connection or destination connection to create a sample job as shown following. (This step is required for the AWS Glue job to establish a network connection and create the necessary elastic network interfaces with the databases’ VPCs and peered connections.)
  3. This scenario uses pyspark code and performs the load operation from Amazon RDS for MySQL to Amazon Redshift. The ingest from Amazon RDS for MySQL to Amazon RDS for PostgreSQL includes a similar job.
  4. After running the job, verify that the table exists in the target database and that the counts match.

The following screenshots show the steps to create a job in the AWS Glue Management Console.

Following are some of examples of loading data from source tables to target instances. These are simple one-to-one mappings, with no transformations applied. Notice that the data sources and data sink (target) connection configuration access multiple VPCs from a single AWS Glue job.

Sample script 1 (Amazon RDS for MySQL to Amazon Redshift)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource = glueContext.create_dynamic_frame.from_catalog(database = "gluedb", table_name = "mysqldb_events", transformation_ctx = "datasource")


datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource, catalog_connection = "Redshift", connection_options = {"dbtable": "mysqldb_events", "database": "dmartblog"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink")

job.commit()

Sample script 2:  Amazon RDS for MySQL to Amazon RDS for PostgreSQL (can also change with other RDS endpoint)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


datasource = glueContext.create_dynamic_frame.from_catalog(database = "gluedb", table_name = "mysqldb_events", transformation_ctx = "datasource")

datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "vpc-pgsql", connection_options = {"dbtable": "mysqldb_events", "database": "mypgsql"}, transformation_ctx = "datasink")

job.commit()

Summary

In this blog post, you learn how to configure AWS Glue to run in a separate VPC so that it can execute jobs for databases located in multiple VPCs.

The benefits of doing this include the following:

  • A separate VPC and dedicated pool on the running AWS Glue job, isolated from database and compute nodes.
  • Dedicated ETL developer access to a single VPC for better security control and provisioning.

 


Additional Reading

If you found this post useful, be sure to check out Restrict access to your AWS Glue Data Catalog with resource-level IAM permissions and resource-based policies, and Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production.

 


About the Author

Nivas Shankar is a Senior Big Data Consultant at Amazon Web Services. He helps and works closely with enterprise customers building big data applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts. He enjoys spending time with his wife and two adorable kids. In his spare time, he takes his kids to tennis and football practice.

 

 

Ian Eberhart is a Cloud Support Engineer on the Big Data team for AWS Premium Support. He works with customers on a daily basis to find solutions for moving and sorting their data on the AWS platform. In his spare time, Ian enjoys seeing independent and weird movies, riding his bike, and hiking in the mountains.

 

Re-affirming Long-Term Support for Java in Amazon Linux

Post Syndicated from Deepak Singh original https://aws.amazon.com/blogs/compute/re-affirming-long-term-support-for-java-in-amazon-linux/

In light of Oracle’s recent announcement indicating an end to free long-term support for OpenJDK after January 2019, we re-affirm that the OpenJDK 8 and OpenJDK 11 Java runtimes in Amazon Linux 2 will continue to receive free long-term support from Amazon until at least June 30, 2023. We are collaborating and contributing in the OpenJDK community to provide our customers with a free long-term supported Java runtime.

In addition, Amazon Linux AMI 2018.03, the last major release of Amazon Linux AMI, will receive support for the OpenJDK 8 runtime at least until June 30, 2020, to facilitate migration to Amazon Linux 2. Java runtimes provided by AWS Services such as AWS Lambda, AWS Elastic Map Reduce (EMR), and AWS Elastic Beanstalk will also use the AWS supported OpenJDK builds.

Amazon Linux users will not need to make any changes to get support for OpenJDK 8. OpenJDK 11 will be made available through the Amazon Linux 2 repositories at a future date. The Amazon Linux OpenJDK support posture will also apply to the on-premises virtual machine images and Docker base image of Amazon Linux 2.

Amazon Linux 2 provides a secure, stable, and high-performance execution environment. Amazon Linux AMI and Amazon Linux 2 include a Java runtime based on OpenJDK 8 and are available in all public AWS regions at no additional cost beyond the pricing for Amazon EC2 instance usage.

Dynamically scale up storage on Amazon EMR clusters

Post Syndicated from Jigar Mistry original https://aws.amazon.com/blogs/big-data/dynamically-scale-up-storage-on-amazon-emr-clusters/

In a managed Apache Hadoop environment—like an Amazon EMR cluster—when the storage capacity on your cluster fills up, there is no convenient solution to deal with it. This situation occurs because you set up Amazon Elastic Block Store (Amazon EBS) volumes and configure mount points when the cluster is launched, so it’s difficult to modify the storage capacity after the cluster is running. The feasible solutions usually involve adding more nodes to your cluster, backing up your data to a data lake, and then launching a new cluster with a higher storage capacity. Or, if the data that occupies the storage is expendable, removing the excess data is usually the way to go.

To help you deal with this issue in a manageable way in Amazon EMR, I will show you how to dynamically scale up the storage using the elastic volumes feature of Amazon EBS. With this feature, you can increase the volume size, adjust the performance, or change the volume type while the volume is in use. You can continue to use your EMR cluster to run big data applications while the changes take effect.

How HDFS and YARN use disk space on an Amazon EMR cluster

When you create an Amazon EMR cluster, by default, HDFS (Hadoop Distributed File System) and YARN (Yet Another Resource Negotiator) are configured to use the local disk storage available on all the core/task nodes. You configure this inside the yarn-site.xml and hdfs-site.xml configuration files.

Specifically, for HDFS, the dfs.datanode.data.dir parameter is configured to use local storage, where it stores the data blocks for the files maintained by the NameNode daemon. And for YARN, the yarn.nodemanager.local-dirs parameter is configured to store the intermediate files needed for the NodeManager daemon to run the YARN containers.

For example, when the cluster is running a MapReduce job, the map tasks store their output files inside the directories defined by yarn.nodemanager.local-dirs. Additionally, the yarn.nodemanager.log-dirs parameter is configured to store the container logs on the core and task nodes after the YARN application has finished.

General best practices for avoiding storage issues

As you plan to run your jobs on an Amazon EMR cluster, the following are some helpful tips to avoid exceeding the available storage on your cluster.

Estimate your future storage needs

Plan ahead regarding the storage needs of your jobs. When you launch a cluster using the default storage configuration, it might not be sufficient for your workloads, and you might have issues while your jobs are running. It is a good practice to estimate how much intermediate storage your jobs will need. And based on that, you can customize the storage configuration when launching a new cluster.

Store passive data in a data lake

Try to design your workloads in such a way that all your passive data is stored in a data lake like Amazon Simple Storage Service (Amazon S3). This way, you use your cluster only for data processing, performing miscellaneous computation tasks, and storing the results back to the data lake for persistent storage. This approach minimizes the storage requirements on the running cluster.

Plan for more capacity

If your use case dictates that the input or output data should be stored locally on the cluster (HDFS or local storage), you should plan the size of your cluster accordingly. For example, if you are using HDFS, you can create a cluster with a higher number of core nodes that will be enough to store your data. Or, you can customize the core instance group to have more EBS storage capacity than what the default configuration provides.

Possible issues if the storage reaches its maximum capacity

As the EMR cluster is being used to run different data processing applications, at some point, the storage capacity on the cluster might be used up. In that case, the following are some of the issues that can affect your cluster.

Issues from a YARN perspective

If the directories that are defined by the yarn.nodemanager.local-dirs or yarn.nodemanager.log-dirs parameters are filled up to at least 90 percent of the total storage of the volume, the NodeManager marks that particular disk as unhealthy. This action then causes the NodeManager to mark the node that has those disks as unhealthy also. So, if the node is unhealthy, the ResourceManager will not assign any containers to the node.

Additionally, if the termination protection feature is turned off on your EMR cluster, the EMR service eventually terminates the node from your cluster.

Issues from an HDFS perspective

If the HDFS usage on the cluster increases, it corresponds to an increase in the usage of local storage on EBS volumes also. In EMR, the HDFS data directories are configured under the same mount point as the YARN local and log directories. So, if the usage of the mount point exceeds the storage threshold (90 percent) due to HDFS, that again causes YARN to mark that disk as unhealthy, and the ResourceManager blacklists the node.

Dynamically resize the storage space on core and task nodes

To scale up the storage of core and task nodes on your cluster, use the following bootstrap action script:

s3://aws-bigdata-blog/artifacts/resize_storage/resize_storage.sh

Additionally, the EC2 instance profile of your cluster must have the ec2:ModifyVolume permissions to be able to resize the volume.

The script runs on all the nodes of an EMR cluster. It configures a cron job on the nodes and performs a disk utilization check every 2 minutes. On the master node, it performs the check on the root volume and the volumes that are storing the logs of various master daemons. On core and task nodes, it performs the check on volumes that YARN and HDFS use and determines whether there is a need to scale up the storage.

When it determines that a volume has exceeded 90 percent of its usage, the size of the volume is expanded by the percentage specified by the “--scaling-factor” parameter. During the resize process, the partition of the volume is expanded, and the file system is extended to reflect the updated capacity. All of this happens without affecting the applications that are running on the cluster.

Consider the following caveats before using this solution:

  • You can scale up the storage capacity of nodes in an EMR cluster only if the cluster uses EBS volumes as its storage backend. Certain EC2 instance types use only instance store volumes or both instance store and EBS volumes. You can’t resize the storage capacity on clusters that use such EC2 instance types.
  • While you are deciding on the scaling factor option of the script, plan ahead to increase the volume so that the updated configuration will last for quite some time. The scaling up of storage has to wait at least 6 hours before applying further modifications to the same volume.

Conclusion

In this post, I explained how HDFS and YARN use the local storage on Amazon EMR cluster nodes. I covered how you can scale up the storage on an EMR cluster using the elastic volumes feature of Amazon EBS. You can use this feature to increase volume size, adjust performance, or change volume type while the volume is in use. You can then continue to use your EMR cluster to run big data applications while the changes are being applied.

If you have any questions or suggestions, please leave a comment below.

 


About the Author

Jigar Mistry is a Hadoop Systems Engineer with Amazon Web Services. He works with customers to provide them architectural guidance and technical support for processing large datasets in the cloud using open-source applications. In his spare time, he enjoys going for camping and exploring different restaurants in the Seattle area.

 

Migrate to Apache HBase on Amazon S3 on Amazon EMR: Guidelines and Best Practices

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/migrate-to-apache-hbase-on-amazon-s3-on-amazon-emr-guidelines-and-best-practices/

This blog post provides guidance and best practices about how to migrate from Apache HBase on HDFS to Apache HBase on Amazon S3 on Amazon EMR.

Apache HBase on Amazon S3 on Amazon EMR

Amazon EMR version 5.2.0 or later, lets you run Apache HBase on Amazon S3. By using Amazon S3 as a data store for Apache HBase, you can separate your cluster’s storage and compute nodes. This saves costs because you’re sizing your cluster for your compute requirements. You’re not paying to store your entire dataset with 3x replication in the on-cluster HDFS.

Many customers have taken advantage of the benefits of running Apache HBase on Amazon S3 for data storage. These benefits include lower costs, data durability, and more efficient scalability. Customers, such as the Financial Industry Regulatory Agency (FINRA), have lowered their costs by 60% by moving to an Apache HBase on Amazon S3 architecture. They have also experienced operational benefits that come with decoupling storage from compute and using Amazon S3 as the storage layer.

Whitepaper on Migrating to Apache HBase on Amazon S3 on Amazon EMR

This whitepaper walks you through the stages of a migration. It also helps you determine when to choose Apache HBase on Amazon S3 on Amazon EMR, plan for platform security, tune Apache HBase and EMRFS to support your application SLA, identify options to migrate and restore your data, and manage your cluster in production.

For more information, see Migrating to Apache HBase on Amazon S3 on Amazon EMR


Additional Reading

If you found this post useful, be sure to check out Setting up Read Replica Clusters with HBase on Amazon S3, and Tips for Migrating to Apache HBase on Amazon S3 from HDFS.

 


About the Author

Francisco Oliveira is a Senior Big Data Engineer with AWS Professional Services. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

Real-time bushfire alerting with Complex Event Processing in Apache Flink on Amazon EMR and IoT sensor network

Post Syndicated from Santoshkumar Kulkarni original https://aws.amazon.com/blogs/big-data/real-time-bushfire-alerting-with-complex-event-processing-in-apache-flink-on-amazon-emr-and-iot-sensor-network/

Bushfires are frequent events in the warmer months of the year when the climate is hot and dry. Countries like Australia and the United States are severely impacted due to the bushfires causing devastating effects to human lives and property. Over the years the prediction of bushfires has been a subject of study in various research projects. Many of these projects use complex machine learning algorithms. These algorithms learn to predict the bushfires from the real-time spread for the fire over a particular geographical region­.

In this blog post, we use event processing paradigm provided by Apache Flink’s Complex Event Processing (CEP) to detect potential bushfire patterns from incoming temperature events from IoT sensors in real time, and then send alerts via email. A real-time heat-map visualization of the area under surveillance is also integrated for monitoring purposes.

This post uses the following AWS services:

Overview of the real-time bushfire prediction alert system

The development and deployment of a large-scale wireless sensor network for bushfire detection and alerting is a complex task. The scenario for this post assumes that the sensors are long-lived battery powered and deployed over a multi-hop wireless mesh network using technologies like LoRaWAN. It also assumes that the placement of the IoT sensors is placed strategically within an area under surveillance, and not under the direct exposure of sunlight. This placement avoids excessive heating, and constantly recording and emitting the temperature readings where they are installed. The sensors can also communicate with each other to send and receive individual temperature readings to perceive the status of their surroundings. Key parameters recorded by the devices include temperature in degree Celsius, time stamp, node id, and infectedBy, as illustrated in Figure 1.

Figure 1. List of sensor events containing measured temperature by the IoT sensor devices over different time points

Once a potential bushfire starts to spread, the IoT sensors placed within its path can detect the subsequent temperature increase. It can then share the news with their neighboring sensors. This phenomenon is similar to spreading an epidemic over a network following a directional path. It is usually referred to as a Susceptible-Infected (SI) epidemic model in network science.

As shown in Figure 2, the parameter ‘infectedBy’ sent by a given node indicates that it has been infected by a neighboring IoT device (that is, the bushfire has been spread through this path) with the ‘node id’ listed as the parameter value. Here, we assume that once a node is infected within the network by one of its neighbors, it isn’t infected again by another node. Therefore, the value of the ‘infectedBy’ parameter remains the same.

Figure 2. High-level overview of an IoT sensor network monitoring temperature of the surrounding geographical area

For the purposes of this scenario, an 11-node IoT sensor network is shown in Figure 2 that exhibits how the bushfire spreads over time. The placement of IoT sensors can be visualized as an undirected graph network where each node is an IoT sensor device. A link between two neighboring nodes denotes the wireless connectivity within a multi-hop wireless ad hoc network. Figure 2 shows the following details:

  • At time t1, the nodes are all emitting the temperature data. However, none of them have reported any temperature greater than the bushfire alert threshold, which is set as 50°
  • At time t2, node-1 reports a temperature of 50° Celsius, which was over the pre-set threshold. In reality, it could have been a small-scale bushfire that recently triggered in the area under node-1’s surveillance.
  • As time moves forward to t3, we see that the fire rapidly spreads to the area monitored by node-2 in the network. Hence, we can now say that node-2 is infected by node-1. This is reflected in the ‘infectedBy’ parameter emitted by the node-2 at which has the value 1 denoting how the fire is spreading over time.
  • Next, at time t4, the fire spreads further to node-3, and followed by node-4 and node-5 by time .

This analogy helps us visualize the overall spread of the bushfire over a network of IoT devices. In this blog post, we use the CEP feature from Apache Flink to detect an event pattern where the measured temperature is higher than 50° Celsius and has an infection degree of 4 within 5 wirelessly connected IoT devices. Once this pattern is detected by real-time event stream processing in Amazon EMR, an SNS alert email is sent to the subscribed email address. The email highlights the path through which the fire has spread so far.

Architecture overview

In this section, we build a real-time event processing pipeline from start to finish. This streamlines the temperature measurements emitted by the IoT devices over the infrastructure layer to build predictive alert and visualization systems for potential bushfire monitoring. The high-level architectural diagram in Figure 3 shows the components required to build this system.

Figure 3. High-level block diagram of the real-time bushfire alert and visualization systems

The diagram shows that the IoT sensor events (that is, measured temperature) feed into an IoT Gateway. The gateway has internet connectivity to forward the records to a stream processing system. The gateway is the landing zone for all the IoT events, which ingests the records into a durable streaming storage. The IoT gateway should scale automatically to support over a billion devices without requiring us to manage any infrastructure.

Next, we store the events in the durable storage, as the temperature events are coming from non-reliable sources, like the IoT sensor devices. As a result, the events cannot be replayed in case any of the records are lost. The streaming storage should also support ingress and egress of large number of events. The events are then consumed by a stream processing engine that matches the incoming events to a pattern and later sends out alerts to the subscribers, if necessary. The raw events are also ingested into a visualization system. This system displays the real-time heat map of the temperatures in the area under surveillance by the IoT nodes.

Building a real-time bushfire alert and visualization system in AWS

Figure 4. Architecture of the real-time IoT stream processing pipeline using AWS services

In this section, we depict a component-level architecture for an event processing system using several of the AWS services, as shown in Figure 4. The IoT sensor data is sent to the AWS IoT services, which receives the incoming temperature records. It then acts as a gateway to the streaming pipeline. The AWS IoT Services is equipped with a rules engine. This can be used to configure an action for the incoming events so that they can be forwarded to another AWS service as a destination.

In this case, Amazon Kinesis Data Streams service is chosen as the destination to act as reliable underlying stream storage system with 1 day as retention period. Figure 5 below shows one such rule and action configuration used in this blog article. The events are then consumed by the Apache Flink processing engine running on an Amazon EMR cluster. Apache Flink consumes the records from the Amazon Kinesis Data Streams shards and matches the records against a pre-defined pattern to detect the possibility of a potential bushfire.

Figure 5. AWS IoT rule and action for the incoming temperature events

Use Apache Flink as the stream processing engine

In this blog, we have chosen Apache Flink as the stream processing engine as it provides high throughput with low latency processing of real-time events. More importantly, it supports stream processing and windowing with Event Time semantics for Amazon Kinesis Data Streams. This is an important feature where events arrive out of order and may also be delayed due to unreliable wireless network communication. Another great feature available in Apache Flink is the Complex Event Processing (CEP) library. It allows you to detect patterns within the stream of incoming events over a period of time. Let’s explore these features in more detail and how they can be used in this particular use case.

Characteristics of IoT events, Event-time processing and Watermarks

Most IoT use cases deal with a large number of sensor devices continuously generating high volume of events over time. The events generally have a time stamp in the record, which indicates when it was generated. However, for consumers, the events can arrive out of order or with delays during processing. In a stream processing application, the consumer should be able to deal with out of order and delayed events.

Apache Flink does this by using event time windowing. The window slides, not according to the processing time window, but by the event time. This helps to make alerting decisions based on the event time where it is more relevant. When the processing window is based on event time, we must know when to advance the event time. That tells us when we can close the window and trigger the next processing task. This is done through ‘Watermarks’ in Flink. There are various mechanisms to generate watermarks. We have used the TimeLagWatermarkGenerator, which generates watermarks that are lagging behind the processing time by a certain unit of time. This assumes that some of the records arrive in the Flink consumer after a certain delay. It is also important to mention that we chose Flink as the processing engine. It provides the Flink CEP feature to match the event based on the pattern we provide. This feature is currently not available in Structured Spark Streaming and Amazon Kinesis Data Analytics.

Complex Event Processing Apache Flink

Once the records are fetched from the Amazon Kinesis Data stream into the Apache Flink application running on Amazon EMR cluster, they must be matched against a pattern. The pattern filters records that first reached the threshold temperature of 50° Celsius and are next followed by another event (from another IoT sensor), which has also reached the same threshold temperature and has been infected by the first IoT sensor node corresponding to the first event on the pattern.

For example, among the incoming events, we get an event from the IoT sensor from node-1 where the temperature is greater than 50° Celsius, which is the first event in the CEP pattern. Next, we look for an event, which follows this first event. For example, an event from node-2 that has a temperature that reached the 50° threshold mark, and has its ‘infectedBy’ field set to 1 indicating node-1. If this condition repeats iteratively for the other three nodes like node-3, node-4, and node-5, then a complete pattern of four network degree path (N1 -> N2 -> N3 -> N -> N5) of potential bushfire initiated from node-1 is said to be detected.

In our implementation, we have chosen to send out an alert when the fire spreads to five connected nodes. But in reality, the number of nodes the fire spreads to before we send out an alert must be carefully chosen. A logical diagram of this particular pattern is shown in Figure 6. Finally, in response to this potential bushfire, an alert email is published to SNS and it delivers the email to the service’s subscribers. A sample SNS alert email is shown below in Figure 7.

Figure 6. The logical pattern diagram for predicting the bushfire

Figure 7. A sample Amazon SNS email alert to notify a potential bushfire and its traversing path

Real-time visualisation of the potential bushfire spread over time

All the incoming IoT event records (unfiltered and raw events got from the Amazon Kinesis Data Stream) are pushed into an Amazon Elasticsearch Service cluster for durable storage and visualization on the Kibana web UI. (For more information about Kibana, see What Is Kibana.) A real-time heat-map visualization and dashboard is created to continuously monitor the progress of the bushfire as shown in Figure 8. As you can see from Figure 8, the bushfire is spreading from node-1 to node-2 to node-3, and then to node-4 and node-5. This visualization can be further enhanced by recording the geographical location of the IoT sensor nodes. Then, plotting the heat-map over the geographical area under surveillance.

Figure 8. A sample bushfire heat-map visualisation from Amazon Elasticsearch Services

Setup and source code

The URL below explains in detail, the steps to set up all the necessary AWS components and run the IoT simulator and Apache Flink CEP. Here, we provide an AWS CloudFormation template. It creates the architecture from start to finish, shown in Figure 4, by setting up the IoT simulator in an EC2 instance with all other respective components, and then automatically runs the stack. Once the stack creation is completed, users can visit the Kibana web UI to observe the real-time bushfire dashboard. They can also receive an SNS alert email for potential bushfires when they confirm their email subscription for it.

To launch the AWS CloudFormation stack, click on the following Launch Stack button to open the AWS CloudFormation console.

Specify the Amazon S3 URL for the template, and then proceed to the next step where you specify the details explained in the next section. After you provide all the parameters, you can proceed and then create the stack. The parameters that you use are as follows:

1. SNS subscription email: This should be a valid email address for the fire alert notification to be sent. Once the CloudFormation stack creation initiates, you get an email on your provided email account for confirming the subscription. Choose the Confirm subscription button to receive the SNS notification.

Note: You may safely delete the SNS subscription from the Amazon SNS console upon completion of the end-to-end observation.

2. Public subnet ID for EMR cluster: Choose a public subnet from the drop-down menu. This is the subnet where the EMR cluster is created. The EMR cluster requires access to the internet to access the Kinesis stream and the Elasticsearch domain. Therefore, a public subnet with the option auto-assign public IPv4 address enabled within a VPC where the enableDnsHostnames and enableDnsSupport options are set to true.

3. S3 path location for EMR cluster logs: The S3 bucket where EMR puts the cluster logs.

Figure 9. Amazon CloudFormation console to create the AWS resource stack.

4. Public subnet ID for EC2 instance: The subnet where the EC2 instance is created to run the IoT simulator. Once the CloudFormation stack comes up, the IoT simulator automatically runs to ingest the IoT events to the AWS IoT gateway. The choice of this subnet follows the same guidelines set out for the subnet chosen for the EMR cluster shown in Figure 9.

5. Security group ID for EC2 instance: This is the security group attached to the EC2 instance running the IoT simulator. You can optionally add a new rule in this security group for SSH port 22. This allows access to the IoT simulator running in this EC2 instance from your workstation, and use the same public IP address for accessing Kibana web UI described later in this post.

6. Key pair ID for EC2 instance: The key pair to be associated with the EC2 instance and the EMR cluster. It allows you to log in to the instances for further exploration.

Figure 10. Amazon CloudFormation console to create the AWS resource stack.

7. Domain name for Amazon Elasticsearch domain: The Elasticsearch domain name to be created by the CloudFormation template.

8. Public IP address to access Kibana from local machine: The IP address of the local machine from where you want to access Kibana dashboard for visualization. For simplicity, you can provide the public IP address of the workstation from where you are running the CloudFormation stack, this IP address opens up for access on the Amazon Elasticsearch domain for displaying the real-time Kibana dashboard. You also must access the Kibana URL from this IP address only. If your IP address changes, then modify the policy of AWS Elasticsearch cluster with new IP address. For more information, see IP-based Policies in the Amazon Elasticsearch Service Developer Guide.

Once the stack creation is completed, the output section of the CloudFormation stack lists the web URLs to access the necessary resources in this architecture. Use the Kibana web URL and create an index pattern under the Management section with the name “weather-sensor-data,” and then choose the dashboard to see the visualization of the real-time spread of the bushfire covered by the IoT network.

Figure 11. Amazon Elasticsearch domain Kibana Web UI to create the index pattern.

The source codes and an elaborate README are provided in GitHub for the interested users who want to explore the implementation of this architecture in further detail.

https://github.com/aws-samples/realtime-bushfire-alert-with-apache-flink-cep

Troubleshooting Common Issues

1. Kibana Web UI is not accessible

If you see the Message “User: anonymous is not authorized to perform: es:ESHttpGet” while trying to access the Kibana web UI, then this means that the public IP address that was specified during the CloudFormation stack creation time either is not correct or might have been changed. You can confirm the public IP address again from http://checkip.amazonaws.com. Then go to the AWS Management Console for Elasticsearch and modify the security access policy, as shown in the following example, to change the IP address only.

Figure 12. Modifying public IP address in the Amazon Elasticsearch domain access policy.

2.  No records ingested into Elasticsearch

This issue can occur if records fail to be ingested from the EMR cluster. To troubleshoot this, go to the AWS Management Console for IAM and search for the IAM role named “EMR_EC2_Default_Role” and make sure it has the default AWS managed policy “AmazonElasticMapReduceforEC2Role” attached to it.

Figure 13. Verifying the default AWS Managed policy attached to “EMR_EC2_Default_Role” in IAM.

3. No SNS alert E-mail notification

If you do not receive an SNS email alert about the potential bushfire after several minutes of observing the complete visualization, then check whether you had confirmed the SNS subscription at the beginning while the CloudFormation stack was creating by checking your inbox. Additionally, make sure that you have provided a correct email address and re-create the stack again from the scratch.

Summary

In this blog post, we discussed how to build a real-time IoT stream processing, visualization, and alerting pipeline using various AWS services. We took advantage of the Complex Event Processing feature provided by Apache Flink to detect patterns within a network from the incoming events. The GitHub repository contains the resources that are required to run through the example provided in this post. Includes further information that helps you to get started quickly. We encourage you to explore the IoT simulator code and test with different network configuration to ingest more records with different patterns and visualize the bushfire spread path pattern on the Kibana dashboard.


Additional Reading

If you found this post useful, be sure to check out Build a Real-time Stream Processing Pipeline with Apache Flink on AWS, Derive Insights from IoT in Minutes using AWS IoT, Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight, and Integrating IoT Events into Your Analytic Platform.

 


About the Authors

Santoshkumar Kulkarni is a Cloud Support Engineer with AWS Big Data and Analytics Services. He works closely with AWS customers to provide them architectural and engineering assistance and guidance. He is passionate about distributed technologies and streaming systems. In his free time, he enjoys spending time with his family on the beautiful beaches of Sydney.

 

 

 

Joarder Kamal, PhD is a Cloud Support Engineer with AWS Big Data and Analytics Services. He likes building and automating systems that combines distributed communications, real-time data, and collective intelligence. During his spare time, he loves reading travel diaries, doing pencil sketches, and touring across Australia with his wife.

Launch an edge node for Amazon EMR to run RStudio

Post Syndicated from Tanzir Musabbir original https://aws.amazon.com/blogs/big-data/launch-an-edge-node-for-amazon-emr-to-run-rstudio/

RStudio Server provides a browser-based interface for R and a popular tool among data scientists. Data scientist use Apache Spark cluster running on  Amazon EMR to perform distributed training. In a previous blog post, the author showed how you can install RStudio Server on Amazon EMR cluster. However, in certain scenarios you might want to install it on a standalone Amazon EC2 instance and connect to a remote Amazon EMR cluster. Benefits of running RStudio on EC2 include the following:

  • Running RStudio Server on an EC2 instance, you can keep your scientific models and model artifacts on the instance. You might have to relaunch your EMR cluster to meet your application requirements. By running RStudio Server separately, you have more flexibility and don’t have to depend entirely on an Amazon EMR cluster.
  • Installing RStudio on the master node of Amazon EMR requires sharing of resources with the applications running on the same node. By running RStudio on a standalone Amazon EC2 instance, you can use resources as you need without having to share the resources with other applications.
  • You might have multiple Amazon EMR clusters in your environment. With RStudio on Edge node, you have the flexibility to connect to any EMR clusters in your environment.

There is one major difference between running RStudio Server on an Amazon EMR cluster vs. running it on a standalone Amazon EC2 instance. In the latter case, the instance needs to be configured as an Amazon EMR client (or edge node). By doing so, you can submit Apache Spark jobs and other Hadoop-based jobs from an instance other than EMR master node.

In this post, I walk you through a list of steps to configure an Amazon EC2 instance as an Amazon EMR edge node with RStudio Server configured for remote workloads.

Solution overview

In the next few sections, I describe creating an edge node, installing RStudio, and connecting to a remote Spark cluster from R running on the edge node.

At a high level, the solution includes the following steps:

  1. Create an Amazon EC2 instance.
  2. Install RStudio Server and required dependencies on that instance.
  3. Install the Apache Spark and Hadoop client libraries and dependencies on the same instance.
  4. Launch an Amazon EMR cluster with the Apache Spark, Livy, and Hive applications.
  5. Configure the Amazon EC2 instance as an EMR client.
  6. Test EMR client functionality by running sample remote jobs.
  7. Connect to the Spark cluster from R using Sparklyr.
  8. Interact with Hive tables on EMR from RStudio.
  9. Run R models on the data on the EMR cluster.

Let’s take a look at the steps to configure an Amazon EC2 instance as EMR edge node.

Creating an edge node for Amazon EMR

In this exercise, I create a Spark client on the edge node. Because Spark relies on Hadoop libraries, I also install Hadoop on the edge node. To make sure that the client works properly, I install the same Hadoop and Spark versions as those on the EMR cluster. Because most of the libraries also run on JVM, I recommend that you have the same JVM version as the one on the EMR cluster.

After I install Spark and Hadoop on the edge node, I configure the edge node to talk to the remote Amazon EMR cluster. To do that, several configurations files from the EMR cluster need to copy to the edge node. There are two ways to copy the configuration files from a newly created EMR cluster to the edge node on EC2 instance—manual and automated. In the next section, I discuss those two approaches.

Manual approach

After the EMR cluster is up and running, you can use the secure transfer tool scp to copy the required configuration files from an EMR master node to a local machine. In my case, I used my laptop.

> mkdir emr-config
> cd emr-config
> scp -i <key> [email protected]<master-node-dns>:/etc/hadoop/conf/*-site.xml .

You can also use the same tool to copy those files from that local machine to the edge node:

> scp -i <key> hdfs-site.xml [email protected]<edge-node-dns>:/etc/hadoop/conf/.

PC users can use an application like WinSCP to connect and transfer files between an EMR master node and a PC.

Note: Depending on the applications installed on Amazon EMR, you might need to copy other libraries from the cluster. As an example, the open-source distributions of Hadoop and Spark packages that are used in this solution don’t have libraries for EMRFS. So, to use EMRFS, copy the EMRFS libraries to the edge node and update the classpath to include the libraries.

Automated approach (used in this solution)

As you might have noticed in the previous approach, you need to run the copy operation twice:

  1. From the EMR master node to a local machine
  2. From the local machine to the edge node

If you use a bastion host to access EMR, then the copy process also needs to go one extra hop. One way to automate this process is to execute a script as an EMR step, which uploads all the required libraries and configuration files to an Amazon S3 location. A second script on the edge node runs through cfn-init, which downloads files from the S3 location and places them in the right application paths. The following diagram illustrates a sequence of steps that take place during this process.

In this solution, the EMR step (CreateEMRClientDeps) executes the script create-emr-client.sh to copy the configuration files to Amazon S3. The script first creates an archive file awsemrdeps.tgz with all the required libraries. It then uploads that file into a temporary S3 bucket with a prefix ending in /emr-client/. On the edge node, the install-client-and-rstudio.sh script is used to copy the awsemrdeps.tgz file from S3 back to the edge node.

Let’s take a look at the AWS CloudFormation steps to create an edge node for Amazon EMR and run RStudio on the edge node.

Walkthrough using AWS CloudFormation

Following are the prerequisites to run the AWS CloudFormation template for this solution:

  • Create an Amazon VPC with at least one public subnet and one private subnet.
  • Update the IAM policy so that the user has access to create IAM policies, instance profile, roles, and security groups.
  • Enable VPC endpoints for Amazon S3.
  • Create an EC2 key-pair to connect to EC2 instances.

To set up this entire solution, you need to create a few AWS resources. The attached CloudFormation template creates all those required AWS resources and configures them to create an Amazon EMR edge node and running RStudio on it.

This CloudFormation template requires you to pass the following parameters during launch.

Parameter Description
EmrSubnet The subnet where the Amazon EMR cluster is deployed. It can be either a public or private subnet.
InstanceType The Amazon EC2 instance type used for the RStudio Server and edge node, which defaults to m4.xlarge.
KeyName The name of the existing EC2 key pair to access the Amazon EMR and edge node.
RStudioServerSubnet The public subnet where the RStudio Server and edge node are launched.
S3RepoPath The Amazon S3 path where all required files (template, scripts job, sample data, and so on) are stored.
S3TempUploadPath The S3 path in your AWS account for housing temporary dependency files and sample data for Hive.
VPC The ID of the virtual private cloud (VPC) where the EMR and edge node is deployed.

Important: This template is designed only to show how you can create an EMR edge node and configure RStudio for remote EMR workloads. This setup isn’t intended for production use without modification. If you try this solution outside of the US-East-1 Region, be sure to download the necessary files from s3://aws-data-analytics-blog/rstudio-edge-node. You then upload the files to the buckets in your AWS Region, edit the script as appropriate, and then run it.

To launch the CloudFormation stack, choose Launch Stack:

The following sample screenshot shows the stack parameters.

Launching this stack creates the following AWS resources.

Logical ID Resource type Description
EMRCluster Amazon EMR cluster The EMR cluster to run the Spark and Hive jobs
CreateEMRClientDeps EMR step job A job that runs a script to create client dependencies and uploads to S3
CreateHiveTables EMR step job A job to copy sample data for Hive and create Hive tables
RStudioConfigureWaitCondition CloudFormation wait condition A wait condition that works with the wait handler, and waits for the RStudio Server setup process to complete
RStudioEIP Elastic IP address The elastic IP address for RStudio Server
RStudioInstanceProfile Instance profile The instance profile for the RStudio and edge node instance (for this solution, I used the default role EMR_EC2_DefaultRole created during EMR launch)
RStudioSecGroup Amazon EC2 security group The security group that controls incoming traffic to the edge node
RStudioServerEC2 Amazon EC2 instance The EC2 instance for the edge node and RStudio Server
RStudioToEMRSecGroup Amazon EC2 security group The security group that controls traffic between EMR and the edge node
RStudioWaitHandle CloudFormation wait handler The wait handler that gets triggered after RStudio Server is launched
SecGroupSelfIngress Amazon EC2 security group ingress rule An ingress rule to RStudioToEMRSecGroup that allows the instance to talk an instance with the same security group

The CloudFormation template used in this solution configures S3 paths and stores files to their respective locations. The EMR client dependencies archive awsemrdeps.tgz is stored at the <<s3-temp-upload-path>>/emr-client/ location. The sample data file tripdata.csv is stored at <<s3-temp-upload-path>>/ny-taxi/.

The following screenshot shows how the S3 paths are configured after deployment. In this example, I passed an S3 full path, s3://<<my-bucket>>/rstudio-edge-node, which is on my Amazon S3 account.

When the CloudFormation template has successfully completed, the DNS address of RStudio Server is displayed on the Outputs tab, as shown following.

The address shown is the DNS address of the RStudio Server and edge node. A user should be able to connect to this address immediately after enabling FoxyProxy.

Test data and tables

For the source data, I have used New York City Taxi and Limousine Commission (TLC) trip record data. For a description of the data, see this detailed dictionary of the taxi data. The trip data is in comma-separated value (CSV) format with the first row as a header. The following image shows data from the trip dataset.

A second EMR step, CreateHiveTables, is created as part of the CloudFormation template. This step creates two Hive tables that will be later used by R on RStudio to run sample models. Both are external Hive tables—one is stored in HDFS on the EMR cluster and the other in Amazon S3. The goal is to demonstrate how RStudio can consume data with storage that is backed by HDFS and S3.

Table name Storage type Path
ny_taxi_hdfs HDFS /user/ruser/ny_taxi
ny_taxi_s3 S3 s3://<s3-temp-upload-path>/ny_taxi

The following section shows a list of steps to test Amazon EMR client functionality, which is optional.

Testing EMR client functionality (optional)

If the EMR client is configured correctly on the edge node, you should be able to submit Spark jobs from the edge node to the EMR cluster. You can apply the following few steps to the edge node to verify this functionality:

  1. Log in to the edge node using the default user ec2-user.
  2. Choose this host address from the CloudFormation Outputs tab:
ssh -i <<key-pair>> [email protected]<<rstudio-server-address>>
  1. The CloudFormation template also creates a new user, called ruser, that you can use to submit Spark jobs or use the RStudio UI. Switch the user from ec2-user to ruser:
[[email protected] ~]$ sudo -s
[[email protected] ec2-user]# su – ruser
  1. Submit a Spark example job to the remote EMR cluster:
$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn $SPARK_HOME/examples/jars/spark-examples_2.11-2.3.1.jar

  1. Check the job status in the terminal and also on the EMR console. The Spark example job should be able to finish successfully. In the terminal, it should display the value of Pi as shown following.

  1. Check the job status in the Resource Manager UI; notice that the Spark PI job ran as ruser and completed successfully.

  1. Test this setup further by running spark-shell, and retrieve Hive table data from the remote EMR cluster:
[[email protected] ~]$ $SPARK_HOME/bin/spark-shell

  1. Check the list of all available Hive tables and their content:
scala> spark.sql("show tables").show

scala> spark.sql("select * from ny_taxi_s3 limit 10").show

Running R and connecting to Apache Spark

In this section, let’s run some tests and models from RStudio consuming data from Amazon EMR. Locate the RStudio Server address on the Outputs tab on the CloudFormation console. The user name is ruser and the password is BigData26.

A successful login redirects you to this welcome window. The left big window is the console window, where you write R.

Create a SparkContext on R console. No additional configuration is needed, because RStudio is already set up with the required environment variables and files through the AWS CloudFormation stack. In this solution, Sparklyr is used to connect to Spark. Attach the required R packages before creating SparkContext as follows:

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "yarn")

When a connection to Spark is established, it creates a “yarn” connection channel (find this in the RStudio UI, on the Connections tab at the right corner). It also shows Hive metadata on the same widget. Because the CloudFormation template created two Hive tables, they appear under the “yarn” connection as shown following.

A YARN application is also placed under ruser. The status of the application is RUNNING as long as the connection is established. You can find the status of that application on the YARN ResourceManager UI also. Notice that the user is ruser and the name of the application is sparklyr.

For more information, check the YARN app log by choosing Log on the widget.

Now, test whether the data for those two Hive tables is accessible. Choose the ny_taxi_hdfs table to sample the data.

Choose the ny_taxi_s3 table to sample the data on S3.

By successfully running these two tests, you can see that RStudio running on an edge node can consume data stored in a remote EMR cluster.

During the development phase, users might want to write some data back to your S3 bucket. So it’s a good idea to verify whether a user can write data directly to S3 using R and Spark. To test this, I read the ny_taxi_hdfs Hive table using the spark_read_table API. Then I write the data to Amazon S3 by calling the spark_write_csv API and passing my S3 target path. For this solution, I used s3://tm-blogs-placeholder/write-from-rstudio as my new S3 path.

ny_taxi <- spark_read_table(sc, "ny_taxi_hdfs")
spark_write_csv(ny_taxi,path = "s3://tm-blogs-placeholder/write-from-rstudio")

After the write operation, the S3 location appears as follows.

You can also see Spark write logs in the YARN application log.

Now analyze the data with R and draw some plots. To do so, first check the count of ny_taxi data. It should return 20,000.

ny_taxi <- spark_read_table(sc, "ny_taxi_hdfs")
ny_taxi %>% count

Now, find the number of trips for each rate code type. There are six different rate code types where 1 is the standard rate code and 5 is the negotiated rate. For details, see this detailed dictionary of the taxi data.

library(ggplot2)
trip_by_rate_code_id <- ny_taxi %>%
  mutate(rate_code_id) %>%
  group_by(rate_code_id) %>%
  summarize(n = n()) %>%
  collect()

ggplot(trip_by_rate_code_id, aes(rate_code_id, n)) + 
  geom_bar(stat="Identity") +
  scale_y_continuous(labels = scales::comma) +
  labs(title = "Number of Trips by Rate Code", x = "Rate Code Id", y = "")

Based on the graph, I can say that (except for some passengers who paid a negotiated rate) the rest of the passengers paid the standard rate during their ride.

Now find the average trip duration between two New York areas—Queens and Manhattan. The pu_location_id value represents the taxi pick-up zone, and do_location_id represents the taxi drop-off zone. For this test, I use 129 as the pick-up zone and 82 as the drop-off zone. Taxi zone 129 represents the Jackson Heights area in Queens, and taxi zone 82 represents the Elmhurst area. For details, see this taxi zone lookup table.

trip_duration_tbl <- ny_taxi %>%
  filter(pu_location_id == 129 & do_location_id == 82) %>%
  mutate(pickup_time = hour(from_unixtime(unix_timestamp(lpep_pickup_datetime, "MM/dd/yy HH:mm")))) %>%
  mutate(trip_duration = unix_timestamp(lpep_dropoff_datetime, "MM/dd/yy HH:mm") - unix_timestamp(lpep_pickup_datetime, "MM/dd/yy HH:mm")) %>%
  group_by(pickup_time) %>% 
  summarize(n = n(),
            trip_duration_mean = mean(trip_duration),
            trip_duration_p10 = percentile(trip_duration, 0.10),
            trip_duration_p25 = percentile(trip_duration, 0.25),
            trip_duration_p50 = percentile(trip_duration, 0.50),
            trip_duration_p75 = percentile(trip_duration, 0.75),
            trip_duration_p90 = percentile(trip_duration, 0.90)) %>% 
  collect()
            
ggplot(trip_duration_tbl, aes(x = pickup_time)) +
          geom_line(aes(y = trip_duration_p50, alpha = "Median")) +
          geom_ribbon(aes(ymin = trip_duration_p25, ymax = trip_duration_p75, 
                          alpha = "25–75th percentile")) +
          geom_ribbon(aes(ymin = trip_duration_p10, ymax = trip_duration_p90, 
                          alpha = "10–90th percentile")) +
          scale_y_continuous("Trip Duration (in seconds)") + 
          scale_x_continuous("Pickup Time of the day")

Based on the plot, I can say that on average, each trip duration was about 10–12 minutes. There was a rare peak around 1 a.m. for some days, where the trip duration was more than 30 minutes.

Next steps

The goal of this post is to show, first how to create an edge node or Amazon EMR client on an Amazon EC2 instance. Second, it’s to show how other applications—RStudio in this case—can use that edge node or Amazon EMR client to submit workloads remotely. By following the same approach, you can also create an edge node for other Hadoop applications—Hive client, Oozie client, HBase client, and so on. Data scientists can keep enriching their R environment by adding additional packages and keeping it totally isolated from developers EMR environments. To enhance this solution further and make this production ready, you can explore the following options:

  • Use friendly URLs for Amazon EMR interfaces. For example, instead of thrift://ip-10-0-20-253.ec2.internal:9083 for the hive.metastore.uris value, you can use something like thrift://hive-metastore.dev.example.corp:9083. In the same way, instead of using ip-10-0-20-253.ec2.internal:8032 for the yarn.resourcemanager.address property value, you can use dev.emr.example.corp:8032. The benefit of this approach is that, even if you terminate your EMR cluster and recreate it again (with new IP addresses), you don’t have to change your client node’s configuration. This blog post shows how you can create friendly URLs for Amazon EMR.
  • If you already integrated Microsoft Active Directory into your Amazon EMR cluster, you can do the same with RStudio. That way, you can achieve single sign-on across your data analytics solutions.
  • Enable detailed Amazon CloudWatch logs to monitor your edge node behaviors and trigger alerts for different scenarios (disk space utilization, memory usage, and so on). With this approach, you can proactively notify your data scientists before a possible failure.
  • H20 is one of the popular packages used in R. It’s open-source software that allows users to fit thousands of potential models to discover patterns in user data. You can install H20 using CRAN just like the way Sparklyr was installed in this solution. You can execute this on RStudio. Alternatively, you can add the H20 package as part of the installation process by placing it in the install-client-and-rstudio.sh
install.packages("h2o")
library(h2o)
localH2O = h2o.init()

Common issues

Although it’s hard to cover every possible scenario (because these vary on AWS environments), this section covers some common issues that can occur and ways to fix them.

Issue 1: Clicking the RStudio Server URL returns a There is no Internet connection error.

Solution:  Make sure that you configured FoxyProxy in your browser and that you are connecting to the public IP address of the RStudio EC2 instance. You can get this address from the AWS CloudFormation console on the Outputs tab.

Issue 2: The EMR step job CreateClientDeps fails.

Solution: This EMR step job runs the create-emr-client.sh script, which creates an archive with all required dependencies and uploads it to the S3 location. If the edge node doesn’t have write access to S3, this step job fails. In this solution, the default EMR role EMR_EC2_DefaultRole is assigned to the edge node instance also. We assume that EMR_EC2_DefaultRole has write access to the S3 location given through the CloudFormation parameter S3TempUploadPath.

Issue 3: The AWS CloudFormation template Blog-EMR-Edge-Node-With-RStudio times out or fails.

Solution: A script called install-client-and-rstudio.sh runs through cfn-init on the edge node, and it writes logs to the /tmp/edge-node-rstudio-installation.log file. This script contains a sleep clause, where it waits for the awsemrdeps.tgz file to be available on S3. This clause times out after 20 minutes. If the script fails to find that file within that time period, subsequent execution fails. Also, in this solution, RStudio uses http://cran.rstudio.com/ as its repo when installing packages. If the Amazon EC2 instance can’t reach the internet, it can’t download and install those packages, and the template might fail. Make sure that you pick a public subnet or a private subnet with NAT for the edge node.

Issue 4: During Amazon EMR client testing, the Spark sample application fails with a NoClassdefFounderror or UnsupportedOperationException error.

Solution: This blog post uses Amazon EMR 5.16.0. Make sure to use the Hadoop and Spark versions corresponding to the EMR release. If the master node’s application version is different from the edge node’s application version, the client might fail with a NoClassdefFounderror or UnsupportedOperationException error. Make sure that you always install the same version of Hadoop and Spark in both locations.

Cleaning up

When you’ve finished testing this solution, remember to clean up all those AWS resources that you created using AWS CloudFormation. Use the AWS CloudFormation console or AWS CLI to delete the stack named Blog-EMR-Edge-Node-With-RStudio.

Summary

In this post, I show you how to create a client for Amazon EMR. I also show how you can install RStudio on that client node and connect Apache Spark clusters running on Amazon EMR. I used Sparklyr to connect to Spark, consume data from both HDFS and S3, and analyze the data using R models. Go ahead—give this solution a try and share your experience with us!

 


Additional Reading

If you found this post useful, be sure to check out Running sparklyr – RStudio’s R Interface to Spark on Amazon EMR, Statistical Analysis with Open-Source R and RStudio on Amazon EMR, and Running R on Amazon Athena.

 


About the Author

Tanzir Musabbir is an EMR Specialist Solutions Architect with AWS. He is an early adopter of open source Big Data technologies. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.

 

Migrate RDBMS or On-Premise data to EMR Hive, S3, and Amazon Redshift using EMR – Sqoop

Post Syndicated from Nivas Shankar original https://aws.amazon.com/blogs/big-data/migrate-rdbms-or-on-premise-data-to-emr-hive-s3-and-amazon-redshift-using-emr-sqoop/

This blog post shows how our customers can benefit by using the Apache Sqoop tool. This tool is designed to transfer and import data from a Relational Database Management System (RDBMS) into AWS – EMR Hadoop Distributed File System (HDFS), transform the data in Hadoop, and then export the data into a Data Warehouse (e.g. in Hive or Amazon Redshift).

To demonstrate the Sqoop tool, this post uses Amazon RDS for MySQL as a source and imports data in the following three scenarios:

  • Scenario 1AWS EMR (HDFS -> Hive and HDFS)
  • Scenario 2Amazon S3 (EMFRS), and then to EMR-Hive
  • Scenario 3 — S3 (EMFRS), and then to Redshift

 

These scenarios help customers initiate the data transfer simultaneously, so that the transfer can run more expediently and cost efficient than a traditional ETL tool. Once the script is developed, customers can reuse it to transfer a variety of RDBMS data sources into EMR-Hadoop. Examples of these data sources are PostgreSQL, SQL Server, Oracle, and MariaDB.

We can also simulate the same steps for an on-premise RDBMS. This requires us to have the correct JDBC driver installed, and a network connection set up between the Corporate Data Center and the AWS Cloud environment. In this scenario, consider using either the AWS Direct Connect or AWS Snowball methods, based upon the data load volume and network constraints.

Prerequisites

To complete the procedures in this post, you need to perform the following tasks.

Step 1 — Launch an RDS Instance

By using the AWS Management Console or AWS CLI commands, launch MySQL instances with the desired capacity. The following example use the T2.Medium class with default settings.

To call the right services, copy the endpoint and use the following JDBC connection string exactly as shown. This example uses the US East (N. Virginia) us-east-1 AWS Region.

jdbc:mysql:// <<Connection string>>.us-east-1.rds.amazonaws.com.us-east-1.rds.amazonaws.com:3306/sqoopblog

Step 2 — Test the connection and load sample data into RDS – MySQL

First, I used open source data sample from this location: https://bulkdata.uspto.gov/data/trademark/casefile/economics/2016/

Second, I loaded the following two tables:

Third, I used MySQL Workbench tool to load sample tables and the Import/Export wizard to load data.  This loads data automatically and creates the table structure.

Download Steps:

The following steps can help download the MySQL Database Engine and load above mentioned data source into tables: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_GettingStarted.CreatingConnecting.MySQL.html#CHAP_GettingStarted.Connecting.MySQL

I used the following instructions on a Mac:

Step A: Install Homebrew  Step B: Install MySQL
Homebrew is open source software package management system; At the time of this blog post, Homebrew has MySQL version 5.7.15 as the default formula in its main repository. Enter the following command: $ brew info MySQL
To install Homebrew, open terminal, and enter: Expected output: MySQL: stable 8.0.11 (bottled)
$ /usr/bin/ruby -e “$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)” To install MySQL, enter: $ brew install MySQL
Homebrew then downloads and installs command line tools for Xcode 8.0 as part of the installation process)

Fourth, when the download is complete, provide the connection string, port, SID to the connection parameter. In main console, click MySQL connections (+) sign à new connection window and provide connection parameter Name, hostname – RDS endpoint, port, username and password.

Step 3 — Launch EMR Cluster

Open the EMR console, choose Advanced option, and launch the cluster with the following options set:

Step 4 — Test the SSH access and install MySQL-connector-java-version-bin.jar in the EMR folder

a. From the security groups for Master – click link and edit inbound rule to allow your PC or laptop IP to access the Master cluster.

b. Download the MySQL JDBC driver to your local PC from the following location: http://www.mysql.com/downloads/connector/j/5.1.html

c. Unzip the folder and copy the latest version of MySQL Connector available. (In my example, the version I use is MySQL-connector-java-5.1.46-bin.jar file).

d. Copy the file to the /var/lib/sqoop/ directory EMR master cluster. (Note: Because EMR Master doesn’t allow public access to the master node, I had to do a manual download from a local PC. I then used FileZila (Cross platform FTP application) to push the file.)

e. From your terminal, SSH to Master cluster, navigate to the /usr/lib/sqoop directory and copy this JAR file.

Note: This driver copy can be automated by using a bootstrap script to copy the driver file into an S3 path, and then transferring it into a master node. An example script would be:

aws s3 cp s3://mybucket/myfilefolder/ MySQL-connector-java-5.1.46-bin.jar  /usr/lib/sqoop/  

Or, with temporary internet access to download file directly into Master node, copy code below:

wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.46.tar.gz
tar -xvzf mysql-connector-java-5.1.46.tar.gz
sudo cp mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar /usr/lib/sqoop/ 

In the Master node directory /usr/lib/sqoop, it should look like below.

  1. Before you begin working with EMR, you need at least two AWS Identity and Access Management (IAM) service roles with sufficient permissions to access the resources in your account.
  2. Amazon RDS and EMR Master and Slave clusters must have access to connect and then initiate the importing and exporting of data from MySQL RDS instances. For example, I am editing the RDS MySQL instance security group to allow an incoming connection from the EMR nodes – the Master security group and Slave Security group.

Step 5 — Test the connection to MySQL RDS from EMR

After you are logged in, run the following command in the EMR master cluster to validate your connection. It also checks the MySQL RDS login and runs the sample query to check table record count.

sqoop eval --connect "jdbc:mysql:// <<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog"  --query "	
select count(*) from sqoopblog.event" --username admin -P

Note: This record count in the previous sample query should match with MySQL tables, as shown in the following example:

Import data into EMR – bulk load

To import data into EMR, you must first import the full data as a text file, by using the following query:

sqoop import --connect "jdbc:mysql://<<ConnectionString>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event --target-dir /user/hadoop/EVENT --username admin -P -m 1

After the import completes, validate the extract file in respective hadoop directory location.

As shown in the previous example, the original table was not partitioned. Hence, it is extracted as one file and imported into the Hadoop folder. If this had been a larger table, it would have caused performance issues.

To address this issue, I show how performance increases if we select a partition table and use the direct method to export faster and more efficiently. I updated the event table, EVENTS_PARTITION, with the EVENT_DT column as the KEY Partition. I then copied the original table data into this table.  In addition, I used the direct method to take advantage of utilizing MySQL partitions to optimize the efficiency of simultaneous data transfer.

Copy data and run stats.

Run the following query in MySQL Workbench to copy data and run stats:

insert into sqoopblog.event_partition select * from sqoopblog.event

analyze table sqoopblog.event_partition

After running the query in MySQL workbench, run the following Sqoop command in the EMR master node:

sqoop import --connect "jdbc:mysql:// <<ConnectionString>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --target-dir /user/hadoop/EVENTSPARTITION --username admin -P --split-by event_dt

This example shows the performance improvement for the same command with the added argument option, which is a partitioned table.  It also shows the data file split into four parts. Number of map reduce tasks automatically creates 4 based on table partition stats.

We can also use the m 10 argument to increase the map tasks, which equals to the number of input splits

sqoop import --connect "jdbc:mysql:// <<ConnectionString>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --target-dir /user/hadoop/EVENTSPARTITION --username admin -P --split-by event_dt -m 10

Note: You can also split more data extract files during the import process by increasing the map reduce engine argument ( -m <<desired #> , as shown in the above sample code. Make sure that the extract files align with partition distribution, otherwise the output files will be out of order.

Consider the following additional options, if required to import selective columns.

In the following example, add the – COLUMN argument to the selective field.

sqoop import --connect "jdbc:mysql://<<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --columns "EVENT_CD,SERIAL_NO,EVENT_DT" --target-dir /user/hadoop/EVENTSSELECTED --split-by EVENT_DT --username admin -P -m 5

For scenario 2, we will import the table data file into S3 bucket. Before you do, make sure that the EMR-EC2 instance group has added security to the S3 bucket. Run the following command in the EMR master cluster:

sqoop import --connect "jdbc:mysql:// <<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --table event_partition --target-dir s3://nivasblog/sqoopblog/ --username admin -P -m 1 --fields-terminated-by '\t' --lines-terminated-by '\n' --as-textfile 

Import as Hive table – Full Load

Now, let’s try creating a hive table directly from the Sqoop command. This is a more efficient way to create hive tables dynamically, and we can later alter this table as an external table for any additional requirements. With this method, customers can save time creating and transforming data into hive through an automated approach.

sqoop import --connect "jdbc:mysql://<<connection String>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition  --hive-import --create-hive-table --hive-table HIVEIMPORT1 --delete-target-dir --target-dir /user/hadoop/EVENTSHIVE1 --split-by EVENT_DT --hive-overwrite -m 4

Now, let’s try a direct method to see how significantly the load performance and import time improves.

sqoop import --connect "jdbc:mysql://<<connection string>>us-east-1.rds.amazonaws.com:3306/sqoopblog"  --username admin -P --table event_partition  --hive-import --create-hive-table --hive-table HIVEIMPORT2 --delete-target-dir --target-dir /user/hadoop/nivas/EVENTSHIVE2 --split-by EVENT_DT --hive-overwrite --direct

Following are additional optional to consider.

In the following example, add the COLUMN argument to the selective field and import into EMR as hive table.

sqoop import --connect "jdbc:mysql://<<connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition  --columns "event_cd,serial_no,event_dt" --hive-import --create-hive-table --hive-table HIVESELECTED --delete-target-dir --target-dir /user/hadoop/nivas/EVENTSELECTED --split-by EVENT_DT --hive-overwrite –direct

Perform a free-form query and import into EMR as a hive table.

sqoop import --connect "jdbc:mysql:// <<connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --query "select a.serial_no, a.event_cd, a.event_dt, b.us_class_cd, b.class_id from event_partition a, us_class b where a.serial_no=b.serial_no AND \$CONDITIONS" --hive-import --create-hive-table --hive-table HIVEQUERIED --delete-target-dir --target-dir /user/hadoop/EVENTSQUERIED -m 1 --hive-overwrite -direct

– For scenario 2, create a hive table manually from the S3 location.  The following sample creates an external table from the S3 location. Run the select statement to check data counts.

Import note: Using Sqoop version 1.4.7, you can directly create hive tables by using scripts, as shown in the following sample code.  This feature is supported in EMR 5.15.0.

sqoop import --connect "jdbc:mysql://<<connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition  --hive-import --target-dir s3n://nivasblog/sqoopblog/1/ --create-hive-table --hive-table s3table --split-by EVENT_DT --fields-terminated-by '\t' --lines-terminated-by '\n' --as-textfile

For the previous code samples, validate in Hive or Hue, and confirm the table records.

Import the full schema table into Hive.

Note: Create a Hive database in Hue or Hive first, and then run the following command in the EMR master cluster.

sqoop import-all-tables --connect "jdbc:mysql://<<Connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --hive-database sqoopimport --create-hive-table --hive-import --compression-codec=snappy --hive-overwrite –direct

Import as Hive table – Incremental Load

Now, let’s try loading into Hive a sample incremental data feed for the partition table with the event date as the key. Use the following Sqoop command on an incremental basis.

In addition to initial data in table called EVENT_BASETABLE. I loaded the incremental data into  EVENT_BASETABLE table. Let’s follow below steps and command to do incremental updates by sqoop, and import into Hive.

sqoop import --connect "jdbc:mysql:// <<Connection string>>.us-east-1.rds.amazonaws.com:3306/sqoopblog" --username admin -P --table event_partition --target-dir /user/hadoop/INCRTAB --split-by event_dt -m 1 --check-column event_dt --incremental lastmodified --last-value '2018-06-29'

Once the incremental extracts are loaded into the Hadoop directory, you can create temporary, or incremental, tables in Hive and insert them into the main tables.

CREATE TABLE incremental_table (event_cd text, event_dt date, event_seq int(11),event_type_cd text,serial_no int(11)) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','LOCATION '/user/hadoop/INCRTAB'

Insert into default.hiveimport1 select * from default.incremental_table

Alternatively, you can also perform the –query argument to do the incremental operation by joining various tables and condition arguments, and then inserting them into the main table.

--query "select * from EVENT_BASETABLE where modified_date > {last_import_date} AND $CONDITIONS"

All of these steps have been created as a Sqoop job to automate the flow.

Export data to Redshift

Now that data is imported into EMR- HDFS, S3 data store, let’s see how to use the Sqoop command to export data back into the Datawarehouse layer. In this case, we will use the Redshift cluster and demonstrate with an example.

Download the following JDBC API that our SQL client tool or application uses. If you’re not sure, download the latest version of the JDBC 4.2 API driver.

The class name for this driver is 1.2.15.1025/RedshiftJDBC41-1.2.15.1025.jar

com.amazon.redshift.jdbc42.Driver.

Copy this JAR file into the EMR master cluster node. SSH to Master cluster, navigate to /usr/lib/sqoop directory and copy this JAR file.

Note: Because EMR Master doesn’t allow public access to the master node, I had to do a manual download from a local PC. Also, I used FileZila to push the file.

Log in to the EMR master cluster and run this Sqoop command to copy the S3 data file into the Redshift cluster.

Launch the Redshift cluster. This example uses ds2.xLarge(Storage Node).

After Redshift launches, and the security group is associated with the EMR cluster to allow a connection, run the Sqoop command in EMR master node. This exports the data from the S3 location (shown previously in the Code 6 command) into the Redshift cluster as a table.

I created a table structure in Redshift as shown in the following example.

DROP TABLE IF EXISTS sqoopexport CASCADE;

CREATE TABLE sqoopexport
(
   event_cd       varchar(25)   NOT NULL,
   event_dt       varchar(25),
   event_seq      varchar(25)   NOT NULL,
   event_type_cd  varchar(25)   NOT NULL,
   serial_no      varchar(25)   NOT NULL
);

COMMIT;

When the table is created, run the following command to import data into the Redshift table.

sqoop export --connect jdbc:redshift://<<Connection String>>.us-east-1.redshift.amazonaws.com:5439/sqoopexport --table sqoopexport --export-dir s3://nivastest1/events/ --driver com.amazon.redshift.jdbc42.Driver --username admin -P --input-fields-terminated-by '\t'

This command inserts the data records into the table.

For more information, see Loading Data from Amazon EMR.

For information about how to copy data back into RDBMS, see Use Sqoop to Transfer Data from Amazon EMR to Amazon RDS.

Summary

You’ve learned how to use Apache Sqoop on EMR to transfer data from RDBMS to an EMR cluster. You created an EMR cluster with Sqoop, processed a sample dataset on Hive, built sample tables in MySQL-RDS, and then used Sqoop to import the data into EMR. You also created a Redshift cluster and exported data from S3 using Sqoop.

You proved that Sqoop can perform data transfer in parallel, so execution is quick and more cost effective. You also simplified ETL data processing from the source to a target layer.

The advantages of Sqoop are:

  • Fast and parallel data transfer into EMR — taking advantage of EMR compute instances to do an import process by removing external tool dependencies.
  • An import process by using a direct-to-MySQL expediting query and pull performance into EMR Hadoop and S3.
  • An Import Sequential dataset from Source system (Provided tables have primary keys) maintained simplifying growing need to migrate on-premised RDBMS data without re-architect

Sqoop pain points include.

  • Automation by developer/Operations team community. This requires automating through workflow/job method either using Airflow support for Sqoop or other tools.
  • For those tables doesn’t have primary keys and maintains legacy tables dependencies, will have challenges importing data incrementally. Recommendation is to do one-time migration through Sqoop bulk transfer and re-architect your source ingestion mechanism.
  • Import/Export follows JDBC connection and doesn’t support other methods like ODBC or API calls.

If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Use Sqoop to transfer data from Amazon EMR to Amazon RDS and Seven tips for using S3DistCp on AMazon EMR to move data efficiently between HDFS and Amazon S3.

 


About the Author

Nivas Shankar is a Senior Big Data Consultant at Amazon Web Services. He helps and works closely with enterprise customers building big data applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts. He enjoys spending time with his wife and two adorable kids. In his spare time, he takes his kids to tennis and football practice.

 

 

 

Build a Concurrent Data Orchestration Pipeline Using Amazon EMR and Apache Livy

Post Syndicated from Binal Jhaveri original https://aws.amazon.com/blogs/big-data/build-a-concurrent-data-orchestration-pipeline-using-amazon-emr-and-apache-livy/

Many customers use Amazon EMR and Apache Spark to build scalable big data pipelines. For large-scale production pipelines, a common use case is to read complex data originating from a variety of sources. This data must be transformed to make it useful to downstream applications, such as machine learning pipelines, analytics dashboards, and business reports. Such pipelines often require Spark jobs to be run in parallel on Amazon EMR. This post focuses on how to submit multiple Spark jobs in parallel on an EMR cluster using Apache Livy, which is available in EMR version 5.9.0 and later.

Apache Livy is a service that enables easy interaction with a Spark cluster over a REST interface. Apache Livy lets you send simple Scala or Python code over REST API calls instead of having to manage and deploy large jar files. This helps because it scales data pipelines easily with multiple spark jobs running in parallel, rather than running them serially using EMR Step API. Customers can continue to take advantage of transient clusters as part of the workflow resulting in cost savings.

For the purpose of this blog post, we use Apache Airflow to orchestrate the data pipeline. Airflow is an open-sourced task scheduler that helps manage ETL tasks. Customers love Apache Airflow because workflows can be scheduled and managed from one central location. With Airflow’s Configuration as Code approach, automating the generation of workflows, ETL tasks, and dependencies is easy. It helps customers shift their focus from building and debugging data pipelines to focusing on the business problems.

High-level Architecture

Following is a detailed technical diagram showing the configuration of the architecture to be deployed.

We use an AWS CloudFormation script to launch the AWS services required to create this workflow. CloudFormation is a powerful service that allows you to describe and provision all the infrastructure and resources required for your cloud environment, in simple JSON or YAML templates. In this case, the template includes the following:

The Airflow server uses a LocalExecutor (tasks are executed as a subprocess), which helps to parallelize tasks locally. For production workloads, you should consider scaling out with the CeleryExecutor on a cluster with multiple worker nodes.

For demonstration purposes, we use the movielens dataset to concurrently convert the csv files to parquet format and save it to Amazon S3. This dataset is a popular open-source dataset, which is used in exploring data science algorithms. Each dataset file is a comma-separated file with a single header row. The following table describes each file in the dataset.

Dataset Description
movies.tsv Has the title and list of genres for movies being reviewed.
ratings.csv Shows how users rated movies, using a scale from 1-5. The file also contains the time stamp for the movie review.
tags.csv Shows a user-generated tag for each movie. A tag is user-generated metadata about a movie. A tag can be a word or a short phrase. The file also contains the time stamp for the tag.
links.csv Contains identifiers to link to movies used by IMDB and MovieDB.
genome-scores.csv Shows the relevance of each tag for each movie.
genome-tags.csv Provides the tag descriptions for each tag in the genome-scores.csv file.

Building the Pipeline

Step 0: Prerequisites

Make sure that you have a bash-enabled machine with AWS CLI installed.

Step 1: Create an Amazon EC2 key pair

To build this ETL pipeline, connect to an EC2 instance using SSH. This requires access to an Amazon EC2 key pair in the AWS Region you’re launching your CloudFormation stack. If you have an existing Key Pair in your Region, go ahead and use that Key Pair for this exercise. If not, to create a key pair open the AWS Management Console and navigate to the EC2 console. In the EC2 console left navigation pane, choose Key Pairs.

Choose Create Key Pair, type airflow_key_pair (make sure to type it exactly as shown), then choose Create. This downloads a file called airflow_key_pair.pem. Be sure to keep this file in a safe and private place. Without access to this file, you lose the ability to use SSH to connect with your EC2 instance.

Step 2: Execute the CloudFormation Script

Now, we’re ready to run the CloudFormation script!

Note: The CloudFormation script uses a DBSecurityGroup, which is NOT supported in all Regions.

On the next page, choose the key pair that you created in the previous step (airflow_key_pair) along with a S3 bucket name. The S3 bucket should NOT exist as the cloudformation creates a new S3 bucket. Default values for other parameters have been chosen for simplicity.

After filling out these parameters to fit your environment, choose Next. Finally, review all the settings on the next page. Select the box marked I acknowledge that AWS CloudFormation might create IAM resources (this is required since the script creates IAM resources), then choose Create. This creates all the resources required for this pipeline and takes some time to run. To view the stack’s progress, select the stack you created and choose the Events section or panel.

It takes a couple of minutes for the CloudFormation template to complete.

Step 3: Start the Airflow scheduler in the Airflow EC2 instance

To make these changes, we use SSH to connect to the EC2 instance created by the CloudFormation script. Assuming your local machine has an SSH client, this can be accomplished from the command line. Navigate to the directory that contains the airflow_key_pair.pem file you downloaded earlier and insert the following commands, replacing your-public-ip and your-region with the relevant values from your EC2 instance. The public DNS Name of the EC2 instance can be found on the Outputs tab

Type yes when prompted after the SSH command.

chmod 400 airflow_key_pair.pem
ssh -i "airflow_key_pair.pem" [email protected]

For more information or help with issues, see Connecting to Your Linux Instance Using SSH.

Now we need to run some commands as the root user.

# sudo as the root user
sudo su
# Navigate to the airflow directory which was created by the cloudformation template – Look at the user-data section.
cd ~/airflow
source ~/.bash_profile

Below is an image of how the ‘/root/airflow/’ directory should look like.

Now we need to start the airflow scheduler. The Airflow scheduler monitors all tasks and all directed acyclic graphs (DAGs), and triggers the task instances whose dependencies have been met. In the background, it monitors and stays in sync with a folder for all DAG objects that it may contain. It periodically (every minute or so) inspects active tasks to see whether they can be triggered.

The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To get it started, run the airflow scheduler. It will use the configuration specified in airflow.cfg.

To start a scheduler, run the below command in your terminal.

airflow scheduler

Your screen should look like the following with scheduler running.

Step 4: View the transform_movielens DAG on the Airflow Webserver

The Airflow webserver should be running on port 8080. To see the Airflow webserver, open any browser and type in the <EC2-public-dns-name>:8080. The public EC2 DNS name is the same one found in Step 3.

You should see a list of DAGs on the Airflow dashboard. The example DAGs are left there in case you want you experiment with them. But we focus on the transform_movielens DAG for the purposes of this blog. Toggle the ON button next to the name of the DAG.

The following example shows how the dashboard should look.

Choose the transform_movielens DAG, then choose Graph View to view the following image.

This image shows the overall data pipeline. In the current setup, there are six transform tasks that convert each .csv file to parquet format from the movielens dataset. Parquet is a popular columnar storage data format used in big data applications. The DAG also takes care of spinning up and terminating the EMR cluster once the workflow is completed.

The DAG code can also be viewed by choosing the Code button.

Step 5: Run the Airflow DAG

To run the DAG, go back to the Airflow dashboard, and choose the Trigger DAG button for the transform_movielens DAG.

When the Airflow DAG is run, the first task calls the run_job_flow boto3 API to create an EMR cluster. The second task waits until the EMR cluster is ready to take on new tasks. As soon as the cluster is ready, the transform tasks are kicked off in parallel using Apache Livy, which runs on port 8998. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. To run more tasks in parallel (multiple spark sessions) in Airflow without overwhelming the EMR cluster, you can throttle the concurrency.

How does Apache Livy run the Scala code on the EMR cluster in parallel?

Once the EMR cluster is ready, the transform tasks are triggered by the Airflow scheduler. Each transform task triggers Livy to create a new interactive spark session. Each POST request brings up a new Spark context with a Spark interpreter. This remote Spark interpreter is used to receive and run code snippets, and return back the result.

Let’s use one of the transform tasks as an example to understand the steps in detail.

# Converts each of the movielens datafile to parquet
def transform_movies_to_parquet(**kwargs):
    # ti is the Task Instance
    ti = kwargs['ti']
    cluster_id = ti.xcom_pull(task_ids='create_cluster')
    cluster_dns = emr.get_cluster_dns(cluster_id)
    headers = emr.create_spark_session(cluster_dns, 'spark')
    session_url = emr.wait_for_idle_session(cluster_dns, headers)
    statement_response = emr.submit_statement(session_url,   '/root/airflow/dags/transform/movies.scala')
    emr.track_statement_progress(cluster_dns, statement_response.headers)
    emr.kill_spark_session(session_url)

The first three lines of this code helps to look up the EMR cluster details. This is used to create an interactive spark session on the EMR cluster using Apache Livy.

create_spark_session

Apache Livy creates an interactive spark session for each transform task. The code for which is shown below. SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. The Spark session is created by calling the POST /sessions API.

Note: You can also change different parameters like driverMemory, executor Memory, number of driver and executor cores as part of the API call.

# Creates an interactive scala spark session. 
# Python(kind=pyspark), R(kind=sparkr) and SQL(kind=sql) spark sessions can also be created by changing the value of kind.
def create_spark_session(master_dns, kind='spark'):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind}
    headers = {'Content-Type': 'application/json'}
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

submit_statement

Once the session has completed starting up, it transitions to the idle state. The transform task is then submitted to the session. The scala code is submitted as a REST API call to the Livy Server instead of the EMR cluster, to have good fault tolerance and concurrency.

# Submits the scala code as a simple JSON command to the Livy server
def submit_statement(session_url, statement_path):
    statements_url = session_url + '/statements'
    with open(statement_path, 'r') as f:
        code = f.read()
    data = {'code': code}
    response = requests.post(statements_url, data=json.dumps(data), headers={'Content-Type': 'application/json'})
    logging.info(response.json())
    return response

track_statement_progress

The progress of the statement can also be easily tracked and the logs are centralized on the Airflow webserver.

# Function to help track the progress of the scala code submitted to Apache Livy
def track_statement_progress(master_dns, response_headers):
    statement_status = ''
    host = 'http://' + master_dns + ':8998'
    session_url = host + response_headers['location'].split('/statements', 1)[0]
    # Poll the status of the submitted scala code
    while statement_status != 'available':
        # If a statement takes longer than a few milliseconds to execute, Livy returns early and provides a statement URL that can be polled until it is complete:
        statement_url = host + response_headers['location']
        statement_response = requests.get(statement_url, headers={'Content-Type': 'application/json'})
        statement_status = statement_response.json()['state']
        logging.info('Statement status: ' + statement_status)

        #logging the logs
        lines = requests.get(session_url + '/log', headers={'Content-Type': 'application/json'}).json()['log']
        for line in lines:
            logging.info(line)

        if 'progress' in statement_response.json():
            logging.info('Progress: ' + str(statement_response.json()['progress']))
        time.sleep(10)
    final_statement_status = statement_response.json()['output']['status']
    if final_statement_status == 'error':
        logging.info('Statement exception: ' + statement_response.json()['output']['evalue'])
        for trace in statement_response.json()['output']['traceback']:
            logging.info(trace)
        raise ValueError('Final Statement Status: ' + final_statement_status)
    logging.info('Final Statement Status: ' + final_statement_status)

The below is a snapshot of the centralized logs from the Airflow webserver.

Once the job is run successfully, the Spark session is ended and the EMR cluster is terminated.

Analyze the data in Amazon Athena

The output data in S3 can be analyzed in Amazon Athena by creating a crawler on AWS Glue. For information about automatically creating the tables in Athena, see the steps in Build a Data Lake Foundation with AWS Glue and Amazon S3.

Summary

In this post, we explored orchestrating a Spark data pipeline on Amazon EMR using Apache Livy and Apache Airflow. We created a simple Airflow DAG to demonstrate how to run spark jobs concurrently. You can modify this to scale your ETL data pipelines and improve latency. Additionally, we saw how Livy helps to hide the complexity to submit spark jobs via REST by using optimal EMR resources.

For more information about the code shown in this post, see AWS Concurrent Data Orchestration Pipeline EMR Livy. Feel free to leave questions and other feedback in the comments.

 


Additional Reading

If you found this post useful, be sure to check out Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy.

 


About the Author

Binal Jhaveri is a Big Data Engineer at Amazon Web Services. Her passion is to build big data products in the cloud. During her spare time, she likes to travel, read detective fiction and loves exploring the abundant nature that the Pacific Northwest has to offer.

 

 

 

Exploratory data analysis of genomic datasets using ADAM and Mango with Apache Spark on Amazon EMR

Post Syndicated from Alyssa Marrow original https://aws.amazon.com/blogs/big-data/exploratory-data-analysis-of-genomic-datasets-using-adam-and-mango-with-apache-spark-on-amazon-emr/

Exploratory data analysis of genomic datasets using ADAM and Mango with Apache Spark on Amazon EMR

As the cost of genomic sequencing has rapidly decreased, the amount of publicly available genomic data has soared over the past couple of years. New cohorts and studies have produced massive datasets consisting of over 100,000 individuals. Simultaneously, these datasets have been processed to extract genetic variation across populations, producing mass amounts of variation data for each cohort.

In this era of big data, tools like Apache Spark have provided a user-friendly platform for batch processing of large datasets. However, to use such tools as a sufficient replacement to current bioinformatics pipelines, we need more accessible and comprehensive APIs for processing genomic data. We also need support for interactive exploration of these processed datasets.

ADAM and Mango provide a unified environment for processing, filtering, and visualizing large genomic datasets on Apache Spark. ADAM allows you to programmatically load, process, and select raw genomic and variation data using Spark SQL, an SQL interface for aggregating and selecting data in Apache Spark. Mango supports the visualization of both raw and aggregated genomic data in a Jupyter notebook environment, allowing you to draw conclusions from large datasets at multiple resolutions.

With the combined power of ADAM and Mango, you can load, query, and explore datasets in a unified environment. You can interactively explore genomic data at a scale previously impossible using single node bioinformatics tools. In this post, we describe how to set up and run ADAM and Mango on Amazon EMR. We demonstrate how you can use these tools in an interactive notebook environment to explore the 1000 Genomes dataset, which is publicly available in Amazon S3 as a public dataset.

Configuring ADAM and Mango on Amazon EMR

First, you launch and configure an EMR cluster. Mango uses Docker containers to easily run on Amazon EMR. Upon cluster startup, EMR uses the following bootstrap action to install Docker and the required startup scripts. The scripts are available at /home/hadoop/mango-scripts.

aws emr create-cluster 
--release-label emr-5.14.0 \   
--name 'emr-5.14.0 Mango example' \   
--applications Name=Hadoop Name=Hive Name=Spark \   
--ec2-attributes KeyName=<your-ec2-key>,InstanceProfile=EMR_EC2_DefaultRole \   
--service-role EMR_DefaultRole \     
--instance-groups \     InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c5.4xlarge \     InstanceGroupType=CORE,InstanceCount=4,InstanceType=c5.4xlarge \   --region <your-aws-region> \   
--log-uri s3://<your-s3-bucket>/emr-logs/ \   
--bootstrap-actions \     
Name='Install Mango', Path="s3://aws-bigdata-blog/artifacts/mango-emr/install-bdg-mango-emr5.sh"

To start the Mango notebook, run the following:

/home/hadoop/mango-scripts/run-notebook.sh

This file sets up all of the environment variables that are needed to run Mango in Docker on Amazon EMR. In your terminal, you will see the port and Jupyter notebook token for the Mango notebook session. Navigate to this port on the public DNS URL of the master node for your EMR cluster.

Loading data from the 1000 Genomes Project

Now that you have a working environment, you can use ADAM and Mango to discover interesting variants in the child from the genome sequencing data of a trio (data from a mother, father, and child). This data is available from the 1000 Genomes Project AWS public dataset. In this analysis, you will view a trio (NA19685, NA19661, and NA19660) and search for variants that are present in the child but not present in the parents.

In particular, we want to identify genetic variants that are found in the child but not in the parents, known as de novo variants. These are interesting regions, as they can indicate sights of de novo variation that might contribute to multiple disorders.

You can find the Jupyter notebook containing these examples in Mango’s GitHub repository, or at /opt/cgl-docker-lib/mango/example-files/notebooks/aws-1000genomes.ipynb in the running Docker container for Mango.

First, import the ADAM and Mango modules and any Spark modules that you need:

# Import ADAM modules
from bdgenomics.adam.adamContext import ADAMContext
from bdgenomics.adam.rdd import AlignmentRecordRDD, CoverageRDD
from bdgenomics.adam.stringency import LENIENT, _toJava

# Import Mango modules
from bdgenomics.mango.rdd import GenomicVizRDD
from bdgenomics.mango.QC import CoverageDistribution

# Import Spark modules
from pyspark.sql import functions as sf

Next, create a Spark session. You will use this session to run SQL queries on variants.

# Create ADAM Context
ac = ADAMContext(spark)
genomicRDD = GenomicVizRDD(spark)

Variant analysis with Spark SQL

Load in a subset of variant data from chromosome 17:

genotypesPath = 's3://1000genomes/phase1/analysis_results/integrated_call_sets/ALL.chr17.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.gz'
genotypes = ac.loadGenotypes(genotypesPath)

# repartition genotypes to balance the load across memory
genotypes_df  = genotypes.toDF()

You can take a look at the schema by printing the columns in the dataframe.

# cache genotypes and show the schema
genotypes_df.columns

This genotypes dataset contains all samples from the 1000 Genomes Project. Therefore, you will next filter genotypes to only consider samples that are in the NA19685 trio, and cache the results in memory.

# trio IDs
IDs = ['NA19685', 'NA19661','NA19660']

# Filter by individuals in the trio
trio_df = genotypes_df.filter(genotypes_df["sampleId"].isin(IDs))

trio_df.cache()
trio_df.count()

Next, add a new column to your dataframe that determines the genomic location of each variant. This is defined by the chromosome (contigName) and the start and end position of the variant.

# Add ReferenceRegion column and group by referenceRegion
trios_with_referenceRegion = trio_df.withColumn('ReferenceRegion', 
                    sf.concat(sf.col('contigName'),sf.lit(':'), sf.col('start'), sf.lit('-'), sf.col('end')))

Now, you can query your dataset to find de novo variants. But first, you must register your dataframe with Spark SQL.

#  Register df with Spark SQL
trios_with_referenceRegion.createOrReplaceTempView("trios")

Now that your dataframe is registered, you can run SQL queries on it. For the first query, select the names of variants belonging to sample NA19685 that have at least one alternative (ALT) allele.

# filter by alleles. This is a list of variant names that have an alternate allele for the child
alternate_variant_sites = spark.sql("SELECT variant.names[0] AS snp FROM trios \
                                    WHERE array_contains(alleles, 'ALT') AND sampleId == 'NA19685'") 

collected_sites = map(lambda x: x.snp, alternate_variant_sites.collect())

For your next query, filter sites in which the parents have both reference alleles. Then filter these variants by the set produced previously from the child.

# get parent records and filter by only REF locations for variant names that were found in the child with an ALT
filtered1 = spark.sql("SELECT * FROM trios WHERE sampleId == 'NA19661' or sampleId == 'NA19660' \
            AND !array_contains(alleles, 'ALT')")
filtered2 = filtered1.filter(filtered1["variant.names"][0].isin(collected_sites))
snp_counts = filtered2.groupBy("variant.names").count().collect()

# collect snp names as a list
snp_names = map(lambda x: x.names, snp_counts)
denovo_snps = [item for sublist in snp_names for item in sublist]
denovo_snps[:10]

Now that you have found some interesting variants, you can unpersist your genotypes from memory.

trio_df.unpersist()

Working with alignment data

You have found a lot of potential de novo variant sites. Next, you can visually verify some of these sites to see if the raw alignments match up with these de novo hits.

First, load in the alignment data for the NA19685 trio:

# load in NA19685 exome from s3a

childReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19685.mapped.illumina.mosaik.MXL.exome.20110411.bam'
parent1ReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19660.mapped.illumina.mosaik.MXL.exome.20110411.bam'
parent2ReadsPath = 's3a://1000genomes/phase1/data/NA19685/exome_alignment/NA19661.mapped.illumina.mosaik.MXL.exome.20110411.bam'

childReads = ac.loadAlignments(childReadsPath, stringency=LENIENT)
parent1Reads = ac.loadAlignments(parent1ReadsPath, stringency=LENIENT)
parent2Reads = ac.loadAlignments(parent2ReadsPath, stringency=LENIENT)

Note that this example uses s3a:// instead of s3:// style URLs. The reason for this is that the ADAM formats use Java NIO to access BAM files. To do this, we are using a JSR 203 implementation for the Hadoop Distributed File System to access these files. This itself requires the s3a:// protocol. You can view that implementation in this GitHub repository.

You now have data alignment data for three individuals in your trio. However, the data has not yet been loaded into memory. To cache these datasets for fast subsequent access to the data, run the cache() function:

# cache child RDD and count records
# takes about 2 minutes, on 4 c3.4xlarge worker nodes 
childReads.cache()

# Count reads in the child
childReads.toDF().count()
# Output should be 95634679

Quality control of alignment data

One popular analysis to visually re-affirm the quality of genomic alignment data is by viewing coverage distribution. Coverage distribution gives you an idea of the read coverage that you have across a sample.

Next, generate a sample coverage distribution plot for the child alignment data on chromosome 17:

# Calculate read coverage
# Takes 2-3 minutes
childCoverage = childReads.transform(lambda x: x.filter(x.contigName == "17")).toCoverage()

childCoverage.cache()
childCoverage.toDF().count()

# Output should be 51252612

Now that coverage data is calculated and cached, compute the coverage distribution of chromosome 17 and plot the coverage distribution:

# Calculate coverage distribution

# You can check the progress in the SparkUI by navigating to 
# <PUBLIC_MASTER_DNS>:8088 and clicking on the currently running Spark application.
cd = CoverageDistribution(sc, childCoverage)
x = cd.plot(normalize=True, cumulative=False, xScaleLog=True, labels="NA19685")

 

This looks pretty standard because the data you are viewing is exome data. Therefore, you can see a high number of sights with low coverage and a smaller number of genomic positions with more than 100 reads. Now that you are done with coverage, you can unpersist these datasets to clear space in memory for the next analysis.

childCoverage.unpersist()

Viewing sites with missense variants in the proband

After verifying alignment data and filtering variants, you have four genes with potential missense mutations in the proband, including YBX2, ZNF286B, KSR1, and GNA13. You can visually verify these sites by filtering and viewing the raw reads of the child and parents.

First, view the child reads. If you zoom in to the location of the GNA13 variant (63052580-63052581), you can see a heterozygous T to A call:

# missense variant at GNA13: 63052580-63052581 (SNP rs201316886)
# Takes about 2 minutes to collect data from workers
contig = "17"
start = 63052180
end = 63052981

genomicRDD.ViewAlignments(childReads, contig, start, end)

It looks like there indeed is a variant at this position, possibly a heterozygous SNP with alternate allele A. Look at the parent data to verify that this variant does not appear in the parents:

# view missense variant at GNA13: 63052580-63052581 in parent 1
contig = "17"
start = 63052180
end = 63052981

genomicRDD.ViewAlignments(parent1Reads, contig, start, end)

This confirms the filter that this variant is indeed present only in the proband, but not the parents.

Summary

To summarize, this post demonstrated how to set up and run ADAM and Mango in Amazon EMR. We demonstrated how to use these tools in an interactive notebook environment to explore the 1000 Genomes dataset, a publicly available dataset on Amazon S3. We used these tools inspect 1000 Genomes data quality, query for interesting variants in the genome, and validate results through the visualization of raw data.

For more information about Mango, see the Mango User Guide. If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Genomic Analysis with Hail on Amazon EMR and Amazon Athena, Interactive Analysis of Genomic Datasets Using Amazon Athena, and, on the AWS Compute Blog, Building High-Throughput Genomics Batch Workflows on AWS: Introduction (Part 1 of 4).

 


About the Author

Alyssa Marrow is a graduate student in the RISELab and Yosef Lab at the University of California Berkeley. Her research interests lie at the intersection of systems and computational biology. This involves building scalable systems and easily parallelized algorithms to process and compute on all that ‘omics data.

 

 

 

Encrypt data in transit using a TLS custom certificate provider with Amazon EMR

Post Syndicated from Remek Hetman original https://aws.amazon.com/blogs/big-data/encrypt-data-in-transit-using-a-tls-custom-certificate-provider-with-amazon-emr/

Many enterprises have highly regulated policies around cloud security. Those policies might be even more restrictive for Amazon EMR where sensitive data is processed.

EMR provides security configurations that allow you to set up encryption for data at rest stored on Amazon S3 and local Amazon EBS volumes. It also allows the setup of Transport Layer Security (TLS) certificates for the encryption of data in transit.

When in-transit encryption is enabled, EMR supports the following components by default:

  • Hadoop MapReduce Encrypted Shuffle.
  • Secure Hadoop RPC set to Privacy and using SASL, which is activated in EMR when data at rest encryption is enabled.
  • Secure Hadoop RPC set to Privacy and using SASL. This is activated in EMR when data at rest encryption is enabled in the security configuration.
  • Presto internal communication between nodes using SSL/TLS. This applies only to EMR version 5.6.0 and later.
  • Tez Shuffle Handler using TLS.
  • Internal RPC communication between Apache Spark
  • HTTP protocol communication with user interfaces, such as Spark History Server and HTTPS-enabled file servers encrypted using Spark’s Secure Sockets Layer (SSL) configuration.

For more information about EMR in-transit encryption, see Encryption Options.

A security configuration provides the following options to specify TLS certificates:

  1. As a path to a .zip file in an S3 bucket that contains all certificates
  2. Through a custom certificate provider as a Java class

In many cases, company security policies prohibit storing any type of sensitive information in an S3 bucket, including certificate private keys. For that reason, the only remaining option to secure data in transit on EMR is to configure the custom certificate provider.

In this post, I guide you through the configuration process, showing you how to secure data in transit on EMR using the TLS custom certificate provider.

Required knowledge

To walk through this solution, you should know or be familiar with:

Solution overview

The custom certificate provider is a Java class that implements the TLSArtifactsProvider interface and compiles it into a JAR file. The TLSArtifactsProvider interface is available in the AWS SDK for Java version 1.11.+.

The TLSArtifactsProvider interface provides the TLSArtifacs method, which as argument expects certificates.

To make this solution work, you need a secure place to store certificates that can also be accessed by Java code.

In this example, use Parameter Store, which supports encryption using the AWS Key Management Service (AWS KMS) key.

Another way would be to store encrypted certificates in Amazon DynamoDB.

The following diagram and steps show the configuration process from the Java standpoint:

 

  1. During bootstrap, EMR downloads the Java JAR file from the S3 bucket, and runs it on each node.
  2. Java invokes the Lambda function, requesting the value of a specific parameter key.
  3. Lambda calls Parameter Store to get the value. The value returned by Systems Manager remains encrypted.
  4. Lambda returns the encrypted value back to Java.
  5. Java decrypts the value using an AWS KMS API call.
  6. The decrypted value is converted to the correct format of the certificate.
  7. The process repeats for all certificates.
  8. Certificates are returned back to EMR through the TLSArtifactsProvider interface.

In this example, for the master node, I used a certificate signed by a certificate authority (CA) and wildcard self-signed certificate for slave nodes. Depending on requirements, you can use CA certificates for all nodes or only a self-signed wildcard certificate.

Implementing in-transit encryption

This section walks you through all aspects of implementation and configuration for in-transit encryption using a custom certificate provider.

Create a self-signed wildcard certificate

To create a self-signed wildcard certificate, you can use OpenSSL:

openssl req -x509 -newkey rsa:4096 -keyout inter-nodes.key -out inter-nodes.crt -days 365 -subj "/C=US/ST=MA/L=Boston/O=EMR/OU=EMR/CN=*.ec2.internal" -nodes


This command creates a self-signed, 4096-bit certificate.

Explanation of parameter:

-keyout – The output file in which to store the private key.

-out – The output file in which to store the certificate.

-days – The number of days for which to certify the certificate.

-subj – The subject name for a new request.  The CN must match the domain name specified in DHCP that is assigned to the virtual private cloud (VPC). The default is ec2.internal. The “*” prefix is the wildcard certificate.

-nodes – Allows you to create a private key without a password, which is without encryption.

For more information, see req command.

Upload certificates

To upload certificates to the Parameter Store, run the following AWS Command Line Interface (AWS CLI) command for each certificate file, including private keys:

aws ssm put-parameter --name <parameter key name> --key-id < KMS key ID> --type SecureString --value file://<path to certificate file>

The following are examples of uploaded CA and self-signed certificate files:

aws ssm put-parameter --name /emr/certificate --value fileb://emr-ca-certificate.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/private-key --value fileb://emr-ca-private-key.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-certificate --value fileb://inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-private-key --value fileb://inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

The following are examples of uploading certificates when the wildcard certificate is used on all nodes:

aws ssm put-parameter --name /emr/certificate --value fileb:// inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/private-key --value fileb:// inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-certificate --value fileb://inter-nodes.crt --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

aws ssm put-parameter --name /emr/inter-nodes-private-key --value fileb://inter-nodes.key --type SecureString --key-id 00000000-0000-0000-0000-000000000000 --overwrite --region us-east-1

Using the Lambda function

The Lambda function in this solution is a broker that allows Java JAR to retrieve certificates from Parameter Store.

Create a new role for the Lambda function, using the following command:

aws iam create-role --role-name lambda-ssm-parameter-store-role --assume-role-policy-document "{\"Version\": \"2012-10-17\", \"Statement\": [{\"Effect\": \"Allow\",\"Principal\": {\"Service\": \"lambda.amazonaws.com\"},\"Action\": \"sts:AssumeRole\"}]}"

Grant permissions to Parameter Store, using the following command:

aws iam put-role-policy --role-name lambda-ssm-parameter-store-role --policy-name ssm --policy-document "{\"Version\": \"2012-10-17\",\"Statement\": [{\"Effect\": \"Allow\",\"Action\": \"ssm:GetParameter\",\"Resource\": \"*\"}]}"

Create a new Lambda function:

To create a new Lambda function, open the AWS Management Console, choose Lambda, and choose Create function. On the Create function page, complete the form as shown in the following screenshot:

Choose runtime as Python 2.7, and specify the role that you created for the Lambda function.

When the new function is created, add the following code in the Function code section:

import json
import boto3

ssm = boto3.client('ssm')

def lambda_handler(event, context):

    ssmResp = ssm.get_parameter(
        Name=event['ParameterName'],
        WithDecryption=False
    )

    paramValue = ssmResp['Parameter']['Value']
    return(paramValue)

Change the timeout to 1 minute, and then save the function.

Tag resources

For the Java class to call the Lambda function, you must provide information about the function name and names of parameter keys under which the certificates are stored.

To reuse the same Java JAR with different certificates and configurations, provide those values to Java through EMR tags, rather than embedding them in Java code.

In this example, I used the following tags:

  • ssm:ssl:certificate – The name of the Systems Manager parameter key storing the CA-signed certificate.
  • ssm:ssl:private-key – The name of the Systems Manager parameter key storing the CA-signed certificate private key.
  • ssm:ssl:inter-node-certificate – The name of the Systems Manager parameter key storing the self-signed certificate.
  • ssm:ssl:inter-node-private-key – The name of the Systems Manager parameter key storing the self-signed certificate private key.
  • tls:lambda-fn-name – The name of the Lambda function. In this example, this is get-ssm-parameter-lambda.

Use the Java class flow

This section describes the flow in the Java code only. You can download the full code alone with the compiled JAR file from GitHub. For more information, see the Java folder in the emr-tls-security GitHub repo.

Important

Because of EMR dependencies, all other methods must be implemented based on the AWS SDK for Java version 1.10.75. These dependencies do not include the TLSArtifactsProvider interface that should be imported from the AWS SDK for Java version 1.11.170 (aws-java-sdk-emr-1.11.170.jar).

All necessary dependencies are included in the example project.

The following is an example of the basic structure of the Java class, with an implementation of the TLSArtifactsProvider interface:

public class emrtls extends TLSArtifactsProvider {

            public emrtls() {

            }

            @Override

            public TLSArtifacts getTlsArtifacts() {

 

                        List<Certificate> crt = new ArrayList<Certificate>();

                        List<Certificate> crtCA = new ArrayList<Certificate>();

                        PrivateKey privkey;

           

                        //here code to retrieve certificates from secure location

                        // and assign them to local variables

                       

                        TLSArtifacts tls = new TLSArtifacts(privkey,crt,crtCA);

                        return tls;

            }

}

The code to add is related to getting certificates from a secure location.

In the provided code example from GitHub, the following logic was implemented. I’ve listed the methods used in each step.

  1. Read the names of the Systems Manager parameter key. Also read the name of the AWS Lambda function from the EMR tags (see “Tagging” section) – readTags()
  2. Invoke the Lambda function to download certificates from Parameter Store – callLambda():
    • Decrypt the values returned by Lambda using the KMS API call – decryptValue().
    • Assign decrypted values to local variables.
  3. If needed, save CA-signed certificates to a local disk. For more information, see Other Communication – Hue section later in this post– createDirectoryForCerts() and writeCert().
  4. Convert certificates to an X509 format – getX509FromString().
  5. Convert the private key to the correct format – getPrivateKey().
  6. Call the getTlsArtifacts() method to provide certificates in arguments.

You can use a wildcard certificate for all nodes without changing code. Reference the same Systems Manager parameter key in ssm:ssl:certificate/ssm:ssl:private-key, and in the ssm:ssl:inter-node-certificate/ ssm:ssl:inter-node-private-key in EMR tags.

If the implemented methods in the example code meet requirements, you can use the provided Java JAR file in the EMR security configuration, as described in the next section. Otherwise, any changes in code require a compile of Java code into a JAR file.

Create the EMR security configuration

Before creating the security configuration, upload the compiled Java JAR file to an S3 bucket.

To create the security configuration:

  1. Log in to the Amazon EMR console.
  2. Choose Security configurations, Create.
  3. Type a name for your new security configuration; for example, emr-tls-ssm
  4. Select In-transit encryption.
  5. Under TLS certificate provider, for Certificate provider type, choose Custom.
  6. For S3 object, type the path to the uploaded Java JAR file.
  7. For Certificate provider class, type the name of the Java class. In the example code, the name is emrtls.
  8. Configure the At-rest encryption, as required.
  9. Choose Create.

Modify the instance profile role

Applications running on EMR assumes and uses the EMR role for EC2 to interact with other AWS services.

To grant Java permissions to invoke Lambda, and to decrypt certificates, add the following policy to your EC2 instance profile role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "TLS",
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:us-east-1:xxxxxxxxxx:function:get-ssm-parameter-lambda",
                "arn:aws:kms:us-east-1:xxxxxxxxxx:key/<your KMS key used to encrypt certificates in AWS Systems Manager"
            ]
        }
    ]
}

Note

Before creating the policy, update resources with the correct Amazon Resource Name (ARN) for the Lambda function and KMS key.

Other available configurations

In addition to the applications that natively support in-transit encryption in EMR, the custom TLS certificate provider can also be used to secure communication (HTTPS) for other applications like Presto, Hue, and Zeppelin.

The sections that follow describe the configuration of each application that works with the certificates set up by the TLS security configuration.

Presto

For Presto, most configuration is done by EMR when TLS certificates are applied.

Depending on the type of certificates used, there are two additional configurations that must be added:

  1. When the CA-signed certificate with a single common name (not wildcard) is set on the master node, additional configurations are required:
    • The certificate common name must be registered in DNS. The EMR cluster must be able to resolve that name to the IP address of the master node. One solution would be to run a script on the bootstrap action to register the IP address of the EMR master node and name in DNS.
    • The Discovery URI in the Presto configuration file must match the certificate common name. The value of uri must be changed on all nodes. This can be accomplished by two provided scripts.

Each script must be uploaded to an S3 bucket to which the EMR cluster has permission.

The first script, emr-presto-conf.sh, must be run on the EMR bootstrap action, as follows:

          {
            "Name": "PrestoConfiguration",
            "ScriptBootstrapAction": {
              "Path": "s3://xxxxx/emr-presto-conf.sh",
              "Args": [ "emr.mycluster.com" ]
            }
          }
Where the value of Args is the certificate common name

The PrestoConfiguration bootstrap action downloads and runs a script (presto-update-dicovery-uri.sh) as a background process. This script waits for the Presto server to be installed and then modify the configuration files.

Before uploading the emr-presto-conf.sh script to the Amazon S3 bucket, change the path to “presto-update-dicovery-uri.sh”

Both scripts can be downloaded from GitHub:

https://github.com/aws-samples/emr-tls-security/tree/master/scripts

2. When a self-signed wildcard certificate is used on the master node, the certificate must be added to the Java default truststore. This can be accomplished by running the following script:

#!/bin/bash

truststorePass=$(grep -Po "(?<=^internal-communication.https.keystore.key = ).*" /etc/presto/conf/config.properties)

sudo keytool -importkeystore -srckeystore /usr/share/aws/emr/security/conf/truststore.jks -destkeystore /usr/lib/jvm/java/jre/lib/security/cacerts -deststorepass changeit -srcstorepass $truststorePass

The previous script can be run on the EMR step. The following is an AWS CloudFormation snippet:

"EMRPrestoTrustedStorStep": {
      "Type": "AWS::EMR::Step",
      "Properties": {
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
          "Jar": "s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar",
          "Args": [
            "s3://xxxxxx/presto-update-trusted-store.sh"
          ]
        },
        "JobFlowId": {
          "Ref": "EMRCluster"
        },
        "Name": "EMR-Setup-Presto-Trusted-Store"
      }
    }

Hue

To configure access to the Hue UI over HTTPS, the path to the certificate and private key files must be specified in the hue.ini file. Because our Java class has methods, createDirectoryForCerts() and writeCert(), which support exporting TLS certificates to the local disk, the remaining configuration should point to those files in the hue.ini file.

This configuration can be applied by adding the following configuration to the EMR cluster:

         [{
            "Classification": "hue-ini",
            "Configurations": [
              {
                "Classification": "desktop",
                "ConfigurationProperties": {
                  	"ssl_certificate": "/etc/certs/public.crt",
                 	"ssl_private_key": "/etc/certs/private.key"
                }
	}]
          }]

The port for the HTTPS connection to Hue remains the same. The default is: 8888

Zeppelin

Unlike Hue, Zeppelin configuration files reference certificates from the Java keystore.

Because EMR already added all certificates to the Java keystore, the only modification needed is to reference the same Java keystore files and password in zeppelin-site.xml.

The path to the Java keystore and the password can be read directly from the Presto configuration file.

This configuration can be done by running the following script on EMR:

#!/bin/bash
sudo cp /etc/zeppelin/conf/zeppelin-site.xml.template /etc/zeppelin/conf/zeppelin-site.xml
truststorePath=$(grep -Po "(?<=^internal-communication.https.keystore.path = ).*" /etc/presto/conf/config.properties)
truststorePass=$(grep -Po "(?<=^internal-communication.https.keystore.key = ).*" /etc/presto/conf/config.properties)
keystorePath=$(grep -Po "(?<=^http-server.https.keystore.path = ).*" /etc/presto/conf/config.properties)
keystorePass=$(grep -Po "(?<=^http-server.https.keystore.key = ).*" /etc/presto/conf/config.properties)
keymanager=$(grep -Po "(?<=^http-server.https.keymanager.password = ).*" /etc/presto/conf/config.properties)
sudo sed -i '/<name>zeppelin.server.port<\/name>/!b;n;c<value>8890<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.server.ssl.port<\/name>/!b;n;c<value>7773<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl<\/name>/!b;n;c<value>true<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl.keystore.path<\/name>/!b;n;c<value>'"$keystorePath"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl.keystore.password<\/name>/!b;n;c<value>'"$keystorePass"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
sudo sed -i '/<name>zeppelin.ssl.truststore.path<\/name>/!b;n;c<value>'"$truststorePath"'<\/value>' /etc/zeppelin/conf/zeppelin-site.xml
CONTENT1="<property>\n  <name>zeppelin.ssl.truststore.password</name>\n  <value>${truststorePass}</value>\n</property>"
sudo sed -i '/<\/configuration>/i'"$CONTENT1" /etc/zeppelin/conf/zeppelin-site.xml
CONTENT2="<property>\n  <name>zeppelin.ssl.key.manager.password</name>\n  <value>${keymanager}</value>\n</property>"
sudo sed -i '/<\/configuration>/i'"$CONTENT2" /etc/zeppelin/conf/zeppelin-site.xml
sudo stop zeppelin
sudo start zeppelin

The previous script sets the HTTPS port in Zeppelin to 7773, which can be changed as needed.

GitHub example

On GitHub, you can download an example of the CloudFormation templates that can be used to launch an EMR cluster with all the security features discussed in this post.

The following is an example of the AWS CLI command required to launch an EMR cluster with security features.

Before running this command, you must change the <key pair name>, <subnet id> and <security group id> values to the correct one. 

aws emr create-cluster --configurations https://s3.amazonaws.com/tls-blog-cf/emr-configuration.json \

--applications Name=Hadoop Name=Hive Name=Presto Name=Spark Name=Hue Name=Zeppelin \

--instance-groups 'InstanceGroupType=MASTER,InstanceCount=1,InstanceType='m4.xlarge'' \

'InstanceGroupType=CORE,InstanceCount='2',InstanceType='m4.xlarge'' \

--release-label emr-5.14.0 \

--service-role EMR_DefaultRole \

--ec2-attributes KeyName=<key pair name>,SubnetId=<subnet id>,\

EmrManagedMasterSecurityGroup=<security group id>,\

AdditionalMasterSecurityGroups=<security group id>,\

EmrManagedSlaveSecurityGroup=<security group id>,\

InstanceProfile=EMR_EC2_DefaultRole \

--name EMR-TLS-Demo \

--visible-to-all-users \

--security-configuration emr-tls-ssm \

--steps Type=CUSTOM_JAR,Name=ZeppelinSSL,ActionOnFailure=CONTINUE,\

Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,\

Args=s3://tls-blog-cf/zeppelin-ssl.sh \

Type=CUSTOM_JAR,Name=PrestoSSL,ActionOnFailure=CONTINUE,\

Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,\

Args=s3://tls-blog-cf/presto-update-trusted-store.sh \

--tags ssm:ssl:private-key="/emr/private-key" \

ssm:ssl:certificate="/emr/certificate" \

ssm:ssl:inter-node-private-key="/emr/inter-nodes-private-key" \

ssm:ssl:inter-node-certificate="/emr/inter-nodes-certificate" \

tls:lambda-fn-name="get-ssm-parameter-lambda" \

--region us-east-1

Note

Before running any of the provided examples, the certificates must be uploaded to the Parameter Store.

For test purposes, you can download the shell script from:

https://github.com/aws-samples/emr-tls-security/blob/master/scripts/upload-certificates.sh

This script uploads self-signed certificates issued for *.ec2.internal to the Parameter Store. Make sure that the DNS associated with your VPC, where you launch the EMR cluster, matches the certificate.

Use the following command:

sh upload-certificates.sh <KMS key ID> <AWS Region>

Where:

KMS key ID – is the identifier of the KMS key that is used to encrypt certificates.

AWS Region – is the Region where certificates are uploaded.

Validation

There are a few methods to validate whether the TLS certificate was installed correctly.

Hue

To test the HTTPS connection to Hue from the browser, connect to https://<EMR URL or IP>:8888

Zeppelin
To test the HTTPS connection to Zeppelin from the browser, connect to https://<EMR URL or IP>:7773

Port 7773 is the one used in this example. If you changed it, make sure you’re connecting to the port under which the Zeppelin is running on your EMR cluster.

There’s something to remember in both of these scenarios. If the certificate doesn’t match the provided URL (or when you created a self-signed certificate), you get a warning message in the browser that the certificate is invalid.

Presto

You can test Presto using one of the following methods:

  1. HTTPS connection to the Presto UI
  2. Using the Presto CLI

To test the connection to Presto UI from the browser, connect to https://<EMR URL or IP>:8446

To test the certificate using the Presto CLI, follow these steps:

  1. Connect (SSH) to the EMR master node
  2. Create a test Hive table by running the following command:

hive -e “create table test1 (id int, name string); insert into test1 values (1, ‘John’), (2,’Robert’), (3,’David’);”

  1. Run the following Presto command:

presto-cli –server https://master-node-url:8446 –schema default –catalog hive –execute ‘select count(*) from test;’

Change ‘master-node-url’ to the correct value.

If the command succeeds, you should see 3 as the command output.

Conclusion

Amazon EMR security configurations provide options to encrypt data at rest, but also data in transit.

This post demonstrated how to create and apply the TLS custom certificate provider to an EMR cluster to secure data in-transit without storing certificate’s private key in an S3 bucket. For a company with strict policies, this might be the only solution to encrypt data in-transit.

 


Additional Reading

If you found this post useful, be sure to check out Build a Multi-Tenant Amazon EMR Cluster with Kerberos, Microsoft Active Directory Integration and IAM Roles for EMRFS and Encrypt Data At-Rest and In-Flight on Amazon EMR with Security Configurations.

 


About the Author

Remek Hetman is a Senior Cloud Infrastructure Architect with Amazon Web Services Professional Services. He works with AWS enterprise customers, providing technical guidance and assistance for Infrastructure, DevOps, and Big Data to help them make the best use of AWS services. Outside of work, he enjoys spending his time actively, and pursuing his passion – astronomy.

 

Best Practices for resizing and automatic scaling in Amazon EMR

Post Syndicated from Brandon Scheller original https://aws.amazon.com/blogs/big-data/best-practices-for-resizing-and-automatic-scaling-in-amazon-emr/

You can increase your savings by taking advantage of the dynamic scaling feature set available in Amazon EMR. The ability to scale the number of nodes in your cluster up and down on the fly is among the major features that make Amazon EMR elastic. You can take advantage of scaling in EMR by resizing your cluster down when you have little or no workload. You can also scale your cluster up to add processing power when the job gets too slow. This allows you to spend just enough to cover the cost of your job and little more.

Knowing the complex logic behind this feature can help you take advantage of it to save on cluster costs. In this post, I detail how EMR clusters resize, and I present some best practices for getting the maximum benefit and resulting cost savings for your own cluster through this feature.

EMR scaling is more complex than simply adding or removing nodes from the cluster. One common misconception is that scaling in Amazon EMR works exactly like Amazon EC2 scaling. With EC2 scaling, you can add/remove nodes almost instantly and without worry, but EMR has more complexity to it, especially when scaling a cluster down. This is because important data or jobs could be running on your nodes.

To prevent data loss, Amazon EMR scaling ensures that your node has no running Apache Hadoop tasks or unique data that could be lost before removing your node. It is worth considering this decommissioning delay when resizing your EMR cluster. By understanding and accounting for how this process works, you can avoid issues that have plagued others, such as slow cluster resizes and inefficient automatic scaling policies.

When an EMR scale cluster is scaled down, two different decommission processes are triggered on the nodes that will be terminated. The first process is the decommissioning of Hadoop YARN, which is the Hadoop resource manager. Hadoop tasks that are submitted to Amazon EMR generally run through YARN, so EMR must ensure that any running YARN tasks are complete before removing the node. If for some reason the YARN task is stuck, there is a configurable timeout to ensure that the decommissioning still finishes. When this timeout happens, the YARN task is terminated and is instead rescheduled to a different node so that the task can finish.

The second decommission process is that of the Hadoop Distributed File System or HDFS. HDFS stores data in blocks that are spread through the EMR cluster on any nodes that are running HDFS. When an HDFS node is decommissioning, it must replicate those data blocks to other HDFS nodes so that they are not lost when the node is terminated.

So how can you use this knowledge in Amazon EMR?

Tips for resizing clusters

The following are some issues to consider when resizing your clusters.

EMR clusters can use two types of nodes for Hadoop tasks: core nodes and task nodes. Core nodes host persistent data by running the HDFS DataNode process and run Hadoop tasks through YARN’s resource manager. Task nodes only run Hadoop tasks through YARN and DO NOT store data in HDFS.

When scaling down task nodes on a running cluster, expect a short delay for any running Hadoop task on the cluster to decommission. This allows you to get the best usage of your task node by not losing task progress through interruption. However, if your job allows for this interruption, you can adjust the one hour default timeout on the resize by adjusting the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property (in EMR 5.14) in yarn-site.xml. When this process times out, your task node is shut down regardless of any running tasks. This process is usually relatively quick, which makes it fast to scale down task nodes.

When you’re scaling down core nodes, Amazon EMR must also wait for HDFS to decommission to protect your data. HDFS can take a relatively long time to decommission. This is because HDFS block replication is throttled by design through configurations located in hdfs-site.xml. This in turn means that HDFS decommissioning is throttled. This protects your cluster from a spiked workload if a node goes down, but it slows down decommissioning. When scaling down a large number of core nodes, consider adjusting these configurations beforehand so that you can scale down more quickly.

For example, consider this exercise with HDFS and resizing speed.

The HDFS configurations, located in hdfs-site.xml, have some of the most significant impact on throttling block replication:

  • datanode.balance.bandwidthPerSec: Bandwidth for each node’s replication
  • namenode.replication.max-streams: Max streams running for block replication
  • namenode.replication.max-streams-hard-limit: Hard limit on max streams
  • datanode.balance.max.concurrent.moves: Number of threads used by the block balancer for pending moves
  • namenode.replication.work.multiplier.per.iteration: Used to determine the number of blocks to begin transfers immediately during each replication interval

(Beware when modifying: Changing these configurations improperly, especially on a cluster with high load, can seriously degrade cluster performance.)

Cluster resizing speed exercise

Modifying these configurations can speed up the decommissioning time significantly. Try the following exercise to see this difference for yourself.

  1. Create an EMR cluster with the following hardware configuration:
  • Master: 1 node – m3.xlarge
  • Core: 6 nodes – m3.xlarge
  1. Connect to the master node of your cluster using SSH (Secure Shell).

For more information, see Connect to the Master Node Using SSH in the Amazon EMR documentation.

  1. Load data into HDFS by using the following jobs:
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar teragen 1000000000 /user/hadoop/data1/
$ s3-dist-cp --src s3://aws-bigdata-blog/artifacts/ClusterResize/smallfiles25k/ --dest  hdfs:///user/hadoop/data2/
  1. Edit your hdfs-site.xml configs:
$ sudo vim /etc/hadoop/conf/hdfs-site.xml

Then paste in the following configuration setup in the hdfs-site properties.

Disclaimer: These values are relatively high for example purposes and should not necessarily be used in production. Be sure to test config values for production clusters under load before modifying them.

<property>
  <name>dfs.datanode.balance.bandwidthPerSec</name>
  <value>100m</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams</name>
  <value>100</value>
</property>

<property>
  <name>dfs.namenode.replication.max-streams-hard-limit</name>
  <value>200</value>
</property>

<property>
  <name>dfs.datanode.balance.max.concurrent.moves</name>
  <value>500</value>
</property>

<property>
  <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
  <value>30</value>
</property>
  1. Resize your EMR cluster from six to five core nodes, and look in the EMR events tab to see how long the resize took.
  2. Repeat the previous steps without modifying the configurations, and check the difference in resize time.

While performing this exercise, I saw resizing time lower from 45+ minutes (without config changes) down to about 6 minutes (with modified hdfs-site configs). This exercise demonstrates how much HDFS is throttled under default configurations. Although removing these throttles is dangerous and performance using them should be tested first, they can significantly speed up decommissioning time and therefore resizing.

The following are some additional tips for resizing clusters:

  • Shrink resizing timeouts. You can configure EMR nodes in two ways: instance groups or instance fleets. For more information, see Create a Cluster with Instance Fleets or Uniform Instance Groups. EMR has implemented shrink resize timeouts when nodes are configured in instance fleets. This timeout prevents an instance fleet from attempting to resize forever if something goes wrong during the resize. It currently defaults to one day, so keep it in mind when you are resizing an instance fleet down.

If an instance fleet shrink request takes longer than one day, it finishes and pauses at however many instances are currently running. On the other hand, instance groups have no default shrink resize timeout. However, both types have the one-hour YARN timeout described earlier in the yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs property (in EMR 5.14) in yarn-site.xml.

  • Watch out for high frequency HDFS writes when resizing core nodes. If HDFS is receiving a lot of writes, it will modify a large number of blocks that require replication. This replication can interfere with the block replication from any decommissioning core nodes and significantly slow down the resizing process.

Setting up policies for automatic scaling

Although manual scaling is useful, a majority of the time cluster resizes are executed dynamically through Amazon EMR automatic scaling. Generally, the details of the automatic scaling policy must be tailored to the specific Hadoop job, so I won’t go into detail there. Instead, I provide some general guidelines for setting up your cluster’s auto scaling policies.

The following are some considerations when setting up your auto scaling policy.

Metrics for scaling

Choose the right metrics for your node types to trigger scaling. For example, scaling core nodes solely on the YARNMemoryAvailablePercentage metric doesn’t make sense. This is because you would be increasing/decreasing HDFS total size when really you only need more processing power. Scaling task nodes on HDFSUtilization also doesn’t make sense because you would want more HDFS storage space that does not come with task nodes. A common automatic scaling metric for core nodes is HDFSUtilization. Common auto scaling metrics for task nodes include ContainerPending-Out and YarnMemoryAvailablePercentage.

Note: Keep in mind that Amazon EMR currently requires HDFS, so you must have at least one core node in your cluster. Core nodes can also provide CPU and memory resources. But if you don’t need to scale HDFS, and you just need more CPU or memory resources for your job, we recommend that you use task nodes for that purpose.

Scaling core nodes

As described earlier, one of the two EMR node types in your cluster is the core node. Core nodes run HDFS, so they have a longer decommissioning delay. This means that they are slow to scale and should not be aggressively scaled. Only adding and removing a few core nodes at a time will help you avoid scaling issues. Unless you need the HDFS storage, scaling task nodes is usually a better option. If you find that you have to scale large numbers of core nodes, consider changing hdfs-site.xml configurations to allow faster decommission time and faster scale down.

Scaling task nodes

Task nodes don’t run HDFS, which makes them perfect for aggressively scaling with a dynamic job. When your Hadoop task has spikes of work between periods of downtime, this is the node type that you want to use.

You can set up task nodes with a very aggressive auto scaling policy, and they can be scaled up or down easily. If you don’t need HDFS space, you can use task nodes in your cluster.

Using Spot Instances

Automatic scaling is a perfect time to use EMR Spot Instance types. The tendency of Spot Instances to disappear and reappear makes them perfect for task nodes. Because these task nodes are already used to scale in and out aggressively, Spot Instances can have very little disadvantage here. However, for time-sensitive Hadoop tasks, On-Demand Instances might be prioritized for the guaranteed availability.

Scale-in vs. scale-out policies for core nodes

Don’t fall into the trap of making your scale-in policy the exact opposite of your scale-out policy, especially for core nodes. Many times, scaling in results in additional delays for decommissioning. Take this into account and allow your scale-in policy to be more forgiving than your scale-out policy. This means longer cooldowns and higher metric requirements to trigger resizes.

You can think of scale-out policies as easily triggered with a low cooldown and small node increments. Scale-in policies should be hard to trigger, with larger cooldowns and node increments.

Minimum nodes for core node auto scaling

One last thing to consider when scaling core nodes is the yarn.app.mapreduce.am.labels property located in yarn-site.xml. In Amazon EMR, yarn.app.mapreduce.am.labels is set to “CORE” by default, which means that the application master always runs on core nodes and not task nodes. This is to help prevent application failure in a scenario where Spot Instances are used for the task nodes.

This means that when setting a minimum number of core nodes, you should choose a number that is greater than or at least equal to the number of simultaneous application masters that you plan to have running on your cluster. If you want the application master to also run on task nodes, you should modify this property to include “TASK.” However, as a best practice, don’t set the yarn.app.mapreduce.am.labels property to TASK if Spot Instances are used for task nodes.

Aggregating data using S3DistCp

Before wrapping up this post, I have one last piece of information to share about cluster resizing. When resizing core nodes, you might notice that HDFS decommissioning takes a very long time. Often this is the result of storing many small files in your cluster’s HDFS. Having many small files within HDFS (files smaller than the HDFS block size of 128 MB) adds lots of metadata overhead and can cause slowdowns in both decommissioning and Hadoop tasks.

Keeping your small files to a minimum by aggregating your data can help your cluster and jobs run more smoothly. For information about how to aggregate files, see the post Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3.

Summary

In this post, you read about how Amazon EMR resizing logic works to protect your data and Hadoop tasks. I also provided some additional considerations for EMR resizing and automatic scaling. Keeping these practices in mind can help you maximize cluster savings by allowing you to use only the required cluster resources.

If you have questions or suggestions, please leave a comment below.

 


Additional Reading

If you found this post useful, be sure to check out Seven Tips for Using S3DistCp on Amazon EMR to Move Data Efficiently Between HDFS and Amazon S3 and Dynamically Scale Applications on Amazon EMR with Auto Scaling.

 


About the Author

Brandon Scheller is a software development engineer for Amazon EMR. His passion lies in developing and advancing the applications of the Hadoop ecosystem and working with the open source community. He enjoys mountaineering in the Cascades with his free time.

Build a blockchain analytic solution with AWS Lambda, Amazon Kinesis, and Amazon Athena

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/big-data/build-a-blockchain-analytic-solution-with-aws-lambda-amazon-kinesis-and-amazon-athena/

There are many potential benefits to using a blockchain. A blockchain is a distributed data structure that can record transactions in a verifiable and immutable manner. Depending upon the use case, there are opportunities for reducing costs, improving speed and efficiency, stronger regulatory compliance, and greater resilience and scalability.

Early adopters of the blockchain are finding innovative ways of using it in such areas as finance, healthcare, eGovernment, and non-profit organizations. The blockchain was even initially pioneered as the key technology behind the cryptocurrency Bitcoin.

Many of the opportunities to use blockchains arise from their design. They are typically large-scale distributed systems that often consist of many thousands of nodes. It can be challenging to gain insight into user activity, events, anomalies, and other state changes on a blockchain. But AWS analytics services provide the ability to analyze blockchain applications and provide meaningful information about these areas.

Walkthrough

In this post, we’ll show you how to:

You can readily adapt this Ethereum deployment and the blockchain analytics for use with a wide range of blockchain scenarios.

Prerequisites

This post assumes that you are familiar with AWS and Ethereum. The following documentation provides background reading to help you perform the steps described in this post:

Additionally, it’s useful to be familiar with Amazon Kinesis, AWS Lambda, Amazon QuickSight, and Amazon Athena to get the most out of this blog post. For more information, see:

For an introduction to serverless computing with AWS Lambda, see Introduction to AWS Lambda – Serverless Compute on Amazon Web Services.

Blockchain 101

Before we proceed with the solution in this post, we’ll provide a short discussion regarding blockchains and Ethereum, which is the blockchain implementation used in this solution.

In short, blockchains are a means for achieving consensus. The motivation behind blockchain was in allowing the Bitcoin network to agree upon the order of financial transactions while resisting vulnerability, malicious threats, and omission errors. Other blockchain implementations are used to agree upon the state of generic computation. This is achieved through a process called mining, whereby an arbitrary computational problem is solved to make falsifying transactions computationally challenging.

Ethereum is a major blockchain implementation. Unlike Bitcoin and other earlier blockchain systems, Ethereum is not solely a cryptocurrency platform, though it does have its own cryptocurrency called Ether. Ethereum extends the blockchain concept by building an Ethereum virtual machine (VM) that is Turing-complete on top of the blockchain. This allows for the development of smart contracts, which are programs that run on the blockchain. The appeal of smart contracts is the ability to translate natural language contracts, such as insurance contracts, into code that can run on Ethereum. This allows contractual agreements to be built without the need for a centralized authority, such as a bank or notary, potentially decreasing time to market and reducing costs.

An overview of the blockchain solution

The following is an overview of the solution provided in this post. The solution consists of:

  • An Ethereum blockchain running on Amazon Elastic Container Service (Amazon ECS) via the AWS Blockchain Template
  • An Application Load Balancer, providing access to the various Ethereum APIs.
  • A Lambda function, which deploys a smart contract to the blockchain
  • A Lambda function, which runs transactions against the smart contract
  • A Lambda function, which listens for events on the smart contract and pushes those events to Amazon Kinesis
  • An Amazon DynamoDB table used to share the blockchain state between Lambda functions
  • A blockchain analytics pipeline that uses Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, Amazon Kinesis Data Streams, and Amazon Athena.
  • An analytics dashboard built using Amazon QuickSight

The solution is presented in the following architectural diagram:

As shown, the solution is comprised of two main portions:

  • The blockchain hosted on Amazon Elastic Compute Cloud (Amazon EC2) and the Lambda functions that interact with the blockchain.
  • The analytics pipeline based around Kinesis that consumes data from the blockchain.

The AWS CloudFormation template we provide deploys the left side of that architecture diagram up to and including Kinesis Data Streams. It is the right side of the diagram that we’re going to build in this post.

Create the initial resources

  1. First, download the AWS CloudFormation template from: https://s3.amazonaws.com/blockchainblog/blockchainblogpost.template
  2. Use AWS CloudFormation to launch the template. The AWS CloudFormation stack deploys a virtual private cloud (VPC), two subnets, and a series of Lambda functions, which interact with the blockchain. This provides a foundation on which to build the analytics pipeline. You can either provide your own CIDR blocks or use the default parameters. Each subnet must have at least eight IP addresses.
  3. Deploy the AWS Blockchain Templates. The AWS Blockchain Templates make it efficient to deploy Ethereum and Hyperledger blockchain networks on AWS. In this case, we’re deploying an Ethereum network into the VPC created by the AWS CloudFormation template in step 2.
  4. Launch the following AWS CloudFormation template: https://aws-blockchain-templates-us-east-1.s3.us-east-1.amazonaws.com/ethereum/templates/latest/ethereum-network.template.yaml This template requires a number of parameters:
  • Set the Initial List of Accounts to the following predefined accounts the Lambda functions use:
0x34db0A1D7FE9D482C389b191e703Bf0182E0baE3,0xB3Bbce5d76aF28EcE4318c28479565F802f96808,0x877108a8825222cf669Ca9bFA3397D6973fE1640,0xb272056E07C94C7E762F642685bE822df6d08D03,0x0c00e92343f7AA255e0BBC17b21a02f188b53D6C,0xaDf205a5fcb846C4f8D5e9f5228196e3c157e8E0,0x1373a92b9BEbBCda6B87a4B5F94137Bc64E47261,0x9038284431F878f17F4387943169d5263eA55650,0xe1cd3399F6b0A1Ef6ac8Cebe228D7609B601ca8a,0x0A67cCC3FD9d664D815D229CEA7EF215d4C00A0a
  • In VPC Network Configuration:
    • Set the VPC ID to the blockchainblog VPC created by the first AWS CloudFormation template.
    • Add the blockchainblog-public subnet to the list of subnets to use.
    • Add blockchainblog-public and blockchainblog-private to the list of ALB subnets.
  • In Security Configuration:
    • Choose your Amazon EC2 key pair.
    • Provide the blockchainblog security group.
    • Provide the blockchainblog-ec2-role for the Amazon EC2 role.
    • Provide the blockchainblog-ecs-role for the Amazon ECS role.
    • Set the ALB security group to the blockchainblog security group.
  1. Leave all other variables unchanged, create the template, and wait for all resources to be deployed. This deploys an Ethereum blockchain, starts the mining process, and exposes the Web3 API through an Application Load Balancer.

After the resources are created, move on to deploying the smart contract.

Deploy a smart contract

To use the blockchain, deploy a smart contract to it. This smart contract is not complex — it provides the functions for holding an auction.

The auction contract represents a public auction, which is an auction whereby all parties involved can be identified. The user offering the item to be auctioned deploys the contract and other users can bid using the contract. The auction is considered completed after a pre-defined number of blocks have been mined. When the auction ends, losing bids can then be withdrawn and the funds returned to the bidders. Later, the user who created the auction can withdraw the funds of the winning bid.

Note that the contract does nothing to ensure that the winner receives the commodity in question. In fact, this contract is entirely separate from what is being auctioned. The contract could be extended to provide this functionality, but for the scope of this post, we’re keeping the contract simple.

The auction contract is located at https://s3.amazonaws.com/blockchainblog/Auction.sol.

Examine the auction contract

The auction contract is automatically pulled by our Lambda function and deployed to our blockchain. This contract is written in a domain-specific language called Solidity. The syntax is inspired by the C family of languages; however, unlike C it doesn’t compile to object code. Instead, it compiles to bytecode, which runs on the Ethereum VM.

This smart contract has two functions: bid and withdraw. Bid allows users to bid in the auction, and withdraw allows users to withdraw funds from the contract when the auction has finished. The auction owner can obtain the winning bid and the losers can recoup their funds. Note that the data structure BidEvent is similar to a C struct, and is how we’ll trigger Solidity events. The Solidity events are captured and sent to our analytics pipeline.

Now it’s time to deploy our smart contract, run transactions against it, and listen for events by using pre-built Lambda functions. The following diagram shows the interactions of these Lambda functions:

DeployContract is a Lambda function created by the AWS CloudFormation stack that we deployed earlier. This function takes our Solidity source code from the Amazon Simple Storage Service (Amazon S3) bucket, compiles it to EVM bytecode using the solc compiler, deploys that to our blockchain, and stores the blockchain address of the contract in a DynamoDB table. The function interacts with the Ethereum blockchain on our Amazon EC2 instance via the web3 1.0.0 API. You can see the source code for this function at https://s3.amazonaws.com/blockchainblog/DeployContract.zip.

After deploying the AWS CloudFormation template, wait about 5 minutes before deploying the contract to give the blockchain time to start the mining process. The majority of this time is the blockchain generating the initial directed acyclic graph (DAG).

DeployContract can be invoked in the Lambda console by testing it with an empty test event. Before invoking the function, provide it with the address of the blockchain. To do this, locate the output of the AWS Blockchain Template and obtain the EthJSONRPCURL value from the output. Later, provide this value in an environment variable named BLOCKCHAIN_HOST, for the DeployContract function, as shown in the following example:

Now invoke the DeployContract function. It should print various states, including the blockchain address of the deployed contract and the JSON ABI of the contract. After the function completes, the contract is deployed to our private blockchain and available for use. If the function produces an error, it’s likely because the blockchain has not yet been initialized. Wait a few minutes after creating the AWS CloudFormation template before invoking DeployContract.

Execute Transactions

To generate some transaction data to analyze, we must first have some transactions. To get transactions, we are using a second Lambda function named ExecuteTransactions.

In the smart contract, an event is specified at the start of the file. Events are a useful mechanism in Solidity that can be used as a callback to code outside of the blockchain. The final Lambda function, ListenForTransactions, listens for events occurring against the contract and then sends those events to Kinesis for analysis.

Ethereum currently does not support sending events directly to Kinesis. So we’ll run the ListenForTransactions function to pull events from the blockchain. We can do this manually by invoking the function with an empty test event. ListenForTransactions pulls all events from the blockchain since the last time it was run. However, if we wanted transactions to be pulled from the blockchain in real time, we’d want the function running perpetually. In the following section, you can optionally schedule the Lambda function to run periodically or regularly. Once again, provide the address of the Ethereum RPC endpoint via the BLOCKCHAIN_HOST environment variable, per DeployContract for both ListenForTransactions and for ExecuteTransactions.

Optional: Use an Amazon CloudWatch event to schedule ListenForTransactions

To have ListenForTransactions run continually, we’ll use Amazon CloudWatch Events as a trigger for our Lambda function. In the Amazon CloudWatch console, choose the Triggers tab, and add a new Amazon CloudWatch Events trigger, with the schedule pattern rate(5). This ensures that the function is continually running and thus ensure that all events are sent to Kinesis as soon as they occur. This allows us to do near real-time processing of events against our smart contract. However, if we want to reduce costs, or if real-time analytics isn’t a key objective, we could run our ListenForTransactions function periodically. Running the function periodically fetches all events since the last time it was run; however, this is less timely than having it wait for events to occur.

To configure a CloudWatch event to trigger ListenForTransactions:

  1. In the designer section on the Lambda console for ListenForTransactions, select CloudWatch events
  2. Click on configure and scroll down to the CloudWatch event configuration
  3. Select Create New Rule from the rule dropdown menu
  4. Name the rule and provide a description
  5. Select schedule expression
  6. Provide the expression: rate(5)
  7. Select enable trigger
  8. Click add

After the function is scheduled, we can then generate some events against our contract. We can run ExecuteTransactions, with an empty test event. We can do this any number of times to generate more transactions to send to our analytics pipeline. ExecuteTransactions produces batches of 10 transactions at a time.

Analyze Transactions with Kinesis Data Analytics

Because our Lambda function is listening to events on our smart contract, all voting activity is sent to a Kinesis Data Stream that was already by an AWS CloudFormation called BlockchainBlogEvents.

Right now, all events go to Kinesis but no further. We’ll persist our events for analysis with Athena later on. To do so, navigate to the Kinesis Data Streams console and choose the BlockchainBlog stream that was created for you.

  1. In the upper right-hand corner, choose Connect to Firehose. This forwards all events to a Kinesis Data Firehose stream, which delivers them to an S3 bucket.
  2. Name the delivery stream choose Next, and don’t enable record transformation.
  3. Provide an S3 bucket in which to store your results. Remember so you can use it later with Athena.

All events coming from the blockchain should now be persisted to Amazon S3.

Now that our events are being persisted, we’ll use Kinesis Data Analytics to perform a series of real-time analytics on the Kinesis Data Stream. Later, we’ll perform batch analytics on the data stored in Amazon S3 via Athena.

First, look at Kinesis Data Analytics. Our ListenForTransactions Lambda function sends a message to a stream each time a transaction is run against our Auction smart contract.

The message is a JSON object. It contains the address of the bidder who initiated the transaction, how much they bid, the contract they bid on, when the transaction was run, and which block the transaction was added to.

Kinesis Data Analytics processes each incoming message to the stream and lets us perform analysis over the stream. In this example, we use Kinesis Data Analytics to:

  1. Calculate the amount of Ether being bid in each block within a sliding window of 15 seconds.
  2. Detect the number of unique bidders occurring within a sliding window of 15 seconds.

Our sliding window is 15 seconds because this is the Ethereum target block time. This is the measure of how long it takes to add a block to the blockchain. By setting the sliding window to 15 seconds, we can gain insight into transactions occurring within the mining interval. We could also set the window to be longer to learn how it pertains to our auction application.

To start with our real time analytics, we must create a Kinesis data analytics application. To do so:

  1. Navigate to the Kinesis data analytics application console on the AWS Management Console.
  2. Create a new Kinesis data analytics application with appropriate name and description, then specify the pre-made blockchainblog Kinesis Data Stream as the source.
  3. Run ExecuteTransactions to send a set of transactions to Kinesis and automatically discover the schema.
  4. Open the SQL editor for the application.

Next, we’re going to add SQL to our Kinesis data analytics application to find out the amount of Ether being sent in each block. This includes all bids sent to the contract and all funds withdrawn from a completed auction.

Copy the following SQL, paste it into the SQL editor in Kinesis Data Analytics, then execute it.

CREATE OR REPLACE STREAM "SPEND_PER_BLOCK_STREAM" (block INTEGER, spend INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "SPEND_PER_BLOCK_STREAM"

SELECT STREAM "Block", SUM("Amount") AS block_sum
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "Block", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '15' SECOND);

This simple piece of SQL provides some insight into our smart contract. The output of SPEND_PER_BLOCK_STREAM yields the block number and the volume of funds, from our contract, in that block. This output explains how much cryptocurrency is spent in relation to our smart contract and when it’s spent.

Make sure that there is data for the Kinesis data analytics application to process by running the ExecuteTransactions and ListenForTransactions functions. You can run these functions either with an Amazon CloudWatch event or manually.

Now, we’ll modify our application to detect the number of unique bidders placing bids within a 15-second window. This is about the time required to add a block to the blockchain. To do so, add the following code to our Kinesis data analytics application:

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    NUMBER_OF_DISTINCT_ITEMS BIGINT
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
   INSERT INTO "DESTINATION_SQL_STREAM" 
      SELECT STREAM * 
      FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
          CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            'Bidder',                                     
            10                                                 
      )
);

The resulting output of this code is the count of unique bidders occurring within the 15-second window. This is useful in helping us understand who is running transactions against our contract. For example, if it’s a large number of blockchain addresses responsible for the bids or if it is a smaller number of addresses bidding.

Finally, as denoted in our architectural diagram, we can add a destination stream to our Kinesis data analytics application. We’ll send the output of our application to Kinesis Data Firehose to persist the results. Then we’ll enable the resulting data to be used in batch analytics with Athena or other tools. To send the output, create a destination for the analytics output stream and point it at a Kinesis Data Firehose stream.

This section has shown real time analytics that can be run on blockchain transactions. The next section shows using Athena to run batch analytics against our stored transactions.

Analyze Transactions with Athena

In this section, we’ll create a table in Athena so we can query our transaction data in batch form.

  1. Create a database in Athena and then create a table, specifying the location that you provided earlier to Kinesis Data Firehose. Your configuration should look like the following example:

  1. Choose Next, choose JSON as the input format, then click next.
  2. In Columns, provide the data types for each of our columns. Choose Bulk add columns, then copy and paste the following column information:
Block int, Blockhash string, Bidder string, Maxbidder string, Contractowner string, Amount int, Auction string, EventTimestamp string
Column Description
Block The block that this event pertains to.
Auction Which auction smart contract the event pertains to
ContractOwner The address of the owner of the contract
Bidder The address of the bidder
BlockHash The SHA hash of the block
Address The address of the transaction
MaxBidder The address of the currently winning bidder (current to when the event was generated)
Amount The amount of the bid

 

  1. Click next and then create the table.

After you configure Athena, you can then explore the data. First, look at whether the user who created the auction has bid in their own auction. Most auctions typically disallow this bidding, but our smart contract doesn’t prohibit this. We could solve this by modifying the contract, but for now let’s see if we can detect this via Athena. Run the following query:

select * from events where contractowner=bidder

The result should resemble the following:

You should see at least one instance where the contract owner has bid on their own contract. Note that the Lambda function running transactions does this at random. Bidding on one’s own contract could be permissible or it might violate the terms of the auction. In that scenario, we can easily detect this violation.

This scenario is an example of using analytics to detect and enforce compliance in a blockchain-backed system. Compliance remains an open question for many blockchain users, as detecting regulatory and compliance issues involving smart contracts often involves significant complexity. Analytics is one way to gain insight and answer these regulatory questions.

Useful queries for analyzing transactions

This section provides some other queries that we can use to analyze our smart contract transactions.

Find the number of transactions per block

SELECT block, COUNT(amount) as transactions FROM events Group By block    

This query yields results similar to the following:

Find the winning bid for each auction

SELECT DISTINCT t.auction, t.amount
    FROM events t
        INNER JOIN (SELECT auction, MAX(amount) AS maxamount
                        FROM events
                        GROUP BY auction) q
            ON t.auction = q.auction
                AND t.amount = q.maxamount

This query yields a set of results such as the following:

The results show each auction that you’ve created on the blockchain and the resulting highest bid.

Visualize queries with Amazon QuickSight

Instead of querying data in plain SQL, it is often beneficial to have a graphical representation of your analysis. You can do this with Amazon QuickSight, which can use Athena as a data source. As a result, with little effort we can build a dashboard solution on top of what we’ve already built. We’re going to use Amazon QuickSight to visualize data stored in Amazon S3, via Athena.

In Amazon QuickSight, we can create a new data source and use the Athena database and table that we created earlier.

To create a new data source

  1. Open the Amazon QuickSight console, then choose New Dataset.
  2. From the list of data sources, choose Athena, then name your data source.

  1. Choose the database and table in Athena that you created earlier.

  1. Import the data into SPICE. SPICE is instrumental for faster querying and visualization of data, without having to go directly to the source data. For more information about SPICE, see the Amazon QuickSight Documentation.
  2. Choose Visualize to start investigating the data.

With Amazon QuickSight, we can visualize the behavior of our simulated blockchain users. We’ll choose Amount as our measurement and Auction as our dimension from teh QuickSight side pane. This shows us how much ether has been bid in each auction. Doing so yields results similar to the following:

The amount depends on the number of times you ran the ExecuteTransactions function.

If we look at MaxBidder, we see a pie chart. In the chart, we can see which blockchain address (user) is most often our highest bidder. This looks like the following:

This sort of information can be challenging to obtain from within a blockchain-based application. But in Amazon QuickSight, with our analytics pipeline, getting the information can be easier.

Finally, we can look at the mining time in Amazon QuickSight by choosing Eventtimestamp as the x-axis, choosing block as the y-axis, and using the minimum aggregate function. This produces a line graph that resembles the following:

The graph shows that we start at around block 9200 and have a steady rate of mining occurring. This is roughly consistent with around a 15 to 20 second block mining time. Note that the time stamp is in Unix time.

This section has shown analysis that can be performed on a blockchain event to understand the behavior of both the blockchain and the smart contracts deployed to it. Using the same methodology, you can build your own analytics pipelines that perform useful analytics that shed light on your blockchain-backed applications.

Conclusion

Blockchain is an emerging technology with a great deal of potential. AWS analytics services provide a means to gain insight into blockchain applications that run over thousands of nodes and deal with millions of transactions. This allows developers to better understand the complexities of blockchain applications and aid in the creation of new applications. Moreover, the analytics portion can all be done without provisioning servers, reducing the need for managing infrastructure. This allows you to focus on building the blockchain applications that you want.

Important: Remember to destroy the stacks created by AWS CloudFormation. Also delete the resources you deployed, including the scheduled Lambda function that listens for blockchain events.


Additional Reading

If you found this post useful, be sure to check out Analyze Apache Parquet optimized data using 10 visualizatinos to try in Amazon QuickSight with sample data and Analyzing Bitcoin Data: AWS CloudFormation Support for AWS Glue.

 


About the Author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.

 

 

Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyzing-apache-parquet-optimized-data-using-amazon-kinesis-data-firehose-amazon-athena-and-amazon-redshift/

Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.

To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.

Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.

What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.

Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.

Solution overview

Here is how the solution is laid out:

 

 

The following sections walk you through each of these steps to set up the pipeline.

1. Define the schema

When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.

To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.

The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.

CREATE EXTERNAL TABLE default.kfhconnectblog (
  awsaccountid string,
  agentarn string,
  currentagentsnapshot string,
  eventid string,
  eventtimestamp string,
  eventtype string,
  instancearn string,
  previousagentsnapshot string,
  version string
)
STORED AS parquet
LOCATION 's3://your_bucket/kfhconnectblog/'
TBLPROPERTIES ("parquet.compression"="SNAPPY")

That’s all you have to do to prepare the schema for Kinesis Data Firehose.

2. Define the data streams

Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events.  Open the Kinesis Data Streams console and create two streams.  You can configure them with only one shard each because you don’t have a lot of data right now.

3. Define the Kinesis Data Firehose delivery stream for Parquet

Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.

As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.

To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.

On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.

Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.

Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.

Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.

4. Set up the Amazon Connect contact center

After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.

After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.

At this point, your pipeline is complete.  Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.

So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.

To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.

5. Analyze the data with Athena

Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures.  The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.

Let’s run the first query to see which agents have logged into the system.

The query might look complex, but it’s fairly straightforward:

WITH dataset AS (
  SELECT 
    from_iso8601_timestamp(eventtimestamp) AS event_ts,
    eventtype,
    -- CURRENT STATE
    json_extract_scalar(
      currentagentsnapshot,
      '$.agentstatus.name') AS current_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        currentagentsnapshot,
        '$.agentstatus.starttimestamp')) AS current_starttimestamp,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.firstname') AS current_firstname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.lastname') AS current_lastname,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.username') AS current_username,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') AS               current_outboundqueue,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
    -- PREVIOUS STATE
    json_extract_scalar(
      previousagentsnapshot, 
      '$.agentstatus.name') as prev_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        previousagentsnapshot, 
       '$.agentstatus.starttimestamp')) as prev_starttimestamp,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.firstname') as prev_firstname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.lastname') as prev_lastname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.username') as prev_username,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
  from kfhconnectblog
  where eventtype <> 'HEART_BEAT'
)
SELECT
  current_status as status,
  current_username as username,
  event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC

The query output looks something like this:

Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.

WITH src AS (
  SELECT
     eventid,
     json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
     cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
     cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
  from kfhconnectblog
),
src2 AS (
  SELECT *
  FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT 
  eventid,
  username,
  json_extract_scalar(c_item, '$.contactid') as c_contactid,
  json_extract_scalar(c_item, '$.channel') as c_channel,
  json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
  json_extract_scalar(c_item, '$.queue.name') as c_queue,
  json_extract_scalar(c_item, '$.state') as c_state,
  from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
  
  json_extract_scalar(p_item, '$.contactid') as p_contactid,
  json_extract_scalar(p_item, '$.channel') as p_channel,
  json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
  json_extract_scalar(p_item, '$.queue.name') as p_queue,
  json_extract_scalar(p_item, '$.state') as p_state,
  from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT 
  username,
  c_channel as channel,
  c_direction as direction,
  p_state as prev_state,
  c_state as current_state,
  c_ts as current_ts,
  c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC

The query output looks similar to the following:

6. Analyze the data with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.

Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.

SELECT 
  eventtype,
  json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
  json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
  json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
  json_extract_path_text(
    currentagentsnapshot,
    'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog

The following shows the query output:

Summary

In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.

 


Additional Reading

If you found this post useful, be sure to check out Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight and Work with partitioned data in AWS Glue.


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.

 

 

 

Analyze data in Amazon DynamoDB using Amazon SageMaker for real-time prediction

Post Syndicated from YongSeong Lee original https://aws.amazon.com/blogs/big-data/analyze-data-in-amazon-dynamodb-using-amazon-sagemaker-for-real-time-prediction/

Many companies across the globe use Amazon DynamoDB to store and query historical user-interaction data. DynamoDB is a fast NoSQL database used by applications that need consistent, single-digit millisecond latency.

Often, customers want to turn their valuable data in DynamoDB into insights by analyzing a copy of their table stored in Amazon S3. Doing this separates their analytical queries from their low-latency critical paths. This data can be the primary source for understanding customers’ past behavior, predicting future behavior, and generating downstream business value. Customers often turn to DynamoDB because of its great scalability and high availability. After a successful launch, many customers want to use the data in DynamoDB to predict future behaviors or provide personalized recommendations.

DynamoDB is a good fit for low-latency reads and writes, but it’s not practical to scan all data in a DynamoDB database to train a model. In this post, I demonstrate how you can use DynamoDB table data copied to Amazon S3 by AWS Data Pipeline to predict customer behavior. I also demonstrate how you can use this data to provide personalized recommendations for customers using Amazon SageMaker. You can also run ad hoc queries using Amazon Athena against the data. DynamoDB recently released on-demand backups to create full table backups with no performance impact. However, it’s not suitable for our purposes in this post, so I chose AWS Data Pipeline instead to create managed backups are accessible from other services.

To do this, I describe how to read the DynamoDB backup file format in Data Pipeline. I also describe how to convert the objects in S3 to a CSV format that Amazon SageMaker can read. In addition, I show how to schedule regular exports and transformations using Data Pipeline. The sample data used in this post is from Bank Marketing Data Set of UCI.

The solution that I describe provides the following benefits:

  • Separates analytical queries from production traffic on your DynamoDB table, preserving your DynamoDB read capacity units (RCUs) for important production requests
  • Automatically updates your model to get real-time predictions
  • Optimizes for performance (so it doesn’t compete with DynamoDB RCUs after the export) and for cost (using data you already have)
  • Makes it easier for developers of all skill levels to use Amazon SageMaker

All code and data set in this post are available in this .zip file.

Solution architecture

The following diagram shows the overall architecture of the solution.

The steps that data follows through the architecture are as follows:

  1. Data Pipeline regularly copies the full contents of a DynamoDB table as JSON into an S3
  2. Exported JSON files are converted to comma-separated value (CSV) format to use as a data source for Amazon SageMaker.
  3. Amazon SageMaker renews the model artifact and update the endpoint.
  4. The converted CSV is available for ad hoc queries with Amazon Athena.
  5. Data Pipeline controls this flow and repeats the cycle based on the schedule defined by customer requirements.

Building the auto-updating model

This section discusses details about how to read the DynamoDB exported data in Data Pipeline and build automated workflows for real-time prediction with a regularly updated model.

Download sample scripts and data

Before you begin, take the following steps:

  1. Download sample scripts in this .zip file.
  2. Unzip the src.zip file.
  3. Find the automation_script.sh file and edit it for your environment. For example, you need to replace 's3://<your bucket>/<datasource path>/' with your own S3 path to the data source for Amazon ML. In the script, the text enclosed by angle brackets—< and >—should be replaced with your own path.
  4. Upload the json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar file to your S3 path so that the ADD jar command in Apache Hive can refer to it.

For this solution, the banking.csv  should be imported into a DynamoDB table.

Export a DynamoDB table

To export the DynamoDB table to S3, open the Data Pipeline console and choose the Export DynamoDB table to S3 template. In this template, Data Pipeline creates an Amazon EMR cluster and performs an export in the EMRActivity activity. Set proper intervals for backups according to your business requirements.

One core node(m3.xlarge) provides the default capacity for the EMR cluster and should be suitable for the solution in this post. Leave the option to resize the cluster before running enabled in the TableBackupActivity activity to let Data Pipeline scale the cluster to match the table size. The process of converting to CSV format and renewing models happens in this EMR cluster.

For a more in-depth look at how to export data from DynamoDB, see Export Data from DynamoDB in the Data Pipeline documentation.

Add the script to an existing pipeline

After you export your DynamoDB table, you add an additional EMR step to EMRActivity by following these steps:

  1. Open the Data Pipeline console and choose the ID for the pipeline that you want to add the script to.
  2. For Actions, choose Edit.
  3. In the editing console, choose the Activities category and add an EMR step using the custom script downloaded in the previous section, as shown below.

Paste the following command into the new step after the data ­­upload step:

s3://#{myDDBRegion}.elasticmapreduce/libs/script-runner/script-runner.jar,s3://<your bucket name>/automation_script.sh,#{output.directoryPath},#{myDDBRegion}

The element #{output.directoryPath} references the S3 path where the data pipeline exports DynamoDB data as JSON. The path should be passed to the script as an argument.

The bash script has two goals, converting data formats and renewing the Amazon SageMaker model. Subsequent sections discuss the contents of the automation script.

Automation script: Convert JSON data to CSV with Hive

We use Apache Hive to transform the data into a new format. The Hive QL script to create an external table and transform the data is included in the custom script that you added to the Data Pipeline definition.

When you run the Hive scripts, do so with the -e option. Also, define the Hive table with the 'org.openx.data.jsonserde.JsonSerDe' row format to parse and read JSON format. The SQL creates a Hive EXTERNAL table, and it reads the DynamoDB backup data on the S3 path passed to it by Data Pipeline.

Note: You should create the table with the “EXTERNAL” keyword to avoid the backup data being accidentally deleted from S3 if you drop the table.

The full automation script for converting follows. Add your own bucket name and data source path in the highlighted areas.

#!/bin/bash
hive -e "
ADD jar s3://<your bucket name>/json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar ; 
DROP TABLE IF EXISTS blog_backup_data ;
CREATE EXTERNAL TABLE blog_backup_data (
 customer_id map<string,string>,
 age map<string,string>, job map<string,string>, 
 marital map<string,string>,education map<string,string>, 
 default map<string,string>, housing map<string,string>,
 loan map<string,string>, contact map<string,string>, 
 month map<string,string>, day_of_week map<string,string>, 
 duration map<string,string>, campaign map<string,string>,
 pdays map<string,string>, previous map<string,string>, 
 poutcome map<string,string>, emp_var_rate map<string,string>, 
 cons_price_idx map<string,string>, cons_conf_idx map<string,string>,
 euribor3m map<string,string>, nr_employed map<string,string>, 
 y map<string,string> ) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
LOCATION '$1/';

INSERT OVERWRITE DIRECTORY 's3://<your bucket name>/<datasource path>/' 
SELECT concat( customer_id['s'],',', 
 age['n'],',', job['s'],',', 
 marital['s'],',', education['s'],',', default['s'],',', 
 housing['s'],',', loan['s'],',', contact['s'],',', 
 month['s'],',', day_of_week['s'],',', duration['n'],',', 
 campaign['n'],',',pdays['n'],',',previous['n'],',', 
 poutcome['s'],',', emp_var_rate['n'],',', cons_price_idx['n'],',',
 cons_conf_idx['n'],',', euribor3m['n'],',', nr_employed['n'],',', y['n'] ) 
FROM blog_backup_data
WHERE customer_id['s'] > 0 ; 

After creating an external table, you need to read data. You then use the INSERT OVERWRITE DIRECTORY ~ SELECT command to write CSV data to the S3 path that you designated as the data source for Amazon SageMaker.

Depending on your requirements, you can eliminate or process the columns in the SELECT clause in this step to optimize data analysis. For example, you might remove some columns that have unpredictable correlations with the target value because keeping the wrong columns might expose your model to “overfitting” during the training. In this post, customer_id  columns is removed. Overfitting can make your prediction weak. More information about overfitting can be found in the topic Model Fit: Underfitting vs. Overfitting in the Amazon ML documentation.

Automation script: Renew the Amazon SageMaker model

After the CSV data is replaced and ready to use, create a new model artifact for Amazon SageMaker with the updated dataset on S3.  For renewing model artifact, you must create a new training job.  Training jobs can be run using the AWS SDK ( for example, Amazon SageMaker boto3 ) or the Amazon SageMaker Python SDK that can be installed with “pip install sagemaker” command as well as the AWS CLI for Amazon SageMaker described in this post.

In addition, consider how to smoothly renew your existing model without service impact, because your model is called by applications in real time. To do this, you need to create a new endpoint configuration first and update a current endpoint with the endpoint configuration that is just created.

#!/bin/bash
## Define variable 
REGION=$2
DTTIME=`date +%Y-%m-%d-%H-%M-%S`
ROLE="<your AmazonSageMaker-ExecutionRole>" 


# Select containers image based on region.  
case "$REGION" in
"us-west-2" )
    IMAGE="174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest"
    ;;
"us-east-1" )
    IMAGE="382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest" 
    ;;
"us-east-2" )
    IMAGE="404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest" 
    ;;
"eu-west-1" )
    IMAGE="438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest" 
    ;;
 *)
    echo "Invalid Region Name"
    exit 1 ;  
esac

# Start training job and creating model artifact 
TRAINING_JOB_NAME=TRAIN-${DTTIME} 
S3OUTPUT="s3://<your bucket name>/model/" 
INSTANCETYPE="ml.m4.xlarge"
INSTANCECOUNT=1
VOLUMESIZE=5 
aws sagemaker create-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --algorithm-specification TrainingImage=${IMAGE},TrainingInputMode=File --role-arn ${ROLE}  --input-data-config '[{ "ChannelName": "train", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://<your bucket name>/<datasource path>/", "S3DataDistributionType": "FullyReplicated" } }, "ContentType": "text/csv", "CompressionType": "None" , "RecordWrapperType": "None"  }]'  --output-data-config S3OutputPath=${S3OUTPUT} --resource-config  InstanceType=${INSTANCETYPE},InstanceCount=${INSTANCECOUNT},VolumeSizeInGB=${VOLUMESIZE} --stopping-condition MaxRuntimeInSeconds=120 --hyper-parameters feature_dim=20,predictor_type=binary_classifier  

# Wait until job completed 
aws sagemaker wait training-job-completed-or-stopped --training-job-name ${TRAINING_JOB_NAME}  --region ${REGION}

# Get newly created model artifact and create model
MODELARTIFACT=`aws sagemaker describe-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --query 'ModelArtifacts.S3ModelArtifacts' --output text `
MODELNAME=MODEL-${DTTIME}
aws sagemaker create-model --region ${REGION} --model-name ${MODELNAME}  --primary-container Image=${IMAGE},ModelDataUrl=${MODELARTIFACT}  --execution-role-arn ${ROLE}

# create a new endpoint configuration 
CONFIGNAME=CONFIG-${DTTIME}
aws sagemaker  create-endpoint-config --region ${REGION} --endpoint-config-name ${CONFIGNAME}  --production-variants  VariantName=Users,ModelName=${MODELNAME},InitialInstanceCount=1,InstanceType=ml.m4.xlarge

# create or update the endpoint
STATUS=`aws sagemaker describe-endpoint --endpoint-name  ServiceEndpoint --query 'EndpointStatus' --output text --region ${REGION} `
if [[ $STATUS -ne "InService" ]] ;
then
    aws sagemaker  create-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}    
else
    aws sagemaker  update-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}
fi

Grant permission

Before you execute the script, you must grant proper permission to Data Pipeline. Data Pipeline uses the DataPipelineDefaultResourceRole role by default. I added the following policy to DataPipelineDefaultResourceRole to allow Data Pipeline to create, delete, and update the Amazon SageMaker model and data source in the script.

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "sagemaker:CreateTrainingJob",
 "sagemaker:DescribeTrainingJob",
 "sagemaker:CreateModel",
 "sagemaker:CreateEndpointConfig",
 "sagemaker:DescribeEndpoint",
 "sagemaker:CreateEndpoint",
 "sagemaker:UpdateEndpoint",
 "iam:PassRole"
 ],
 "Resource": "*"
 }
 ]
}

Use real-time prediction

After you deploy a model into production using Amazon SageMaker hosting services, your client applications use this API to get inferences from the model hosted at the specified endpoint. This approach is useful for interactive web, mobile, or desktop applications.

Following, I provide a simple Python code example that queries against Amazon SageMaker endpoint URL with its name (“ServiceEndpoint”) and then uses them for real-time prediction.

=== Python sample for real-time prediction ===

#!/usr/bin/env python
import boto3
import json 

client = boto3.client('sagemaker-runtime', region_name ='<your region>' )
new_customer_info = '34,10,2,4,1,2,1,1,6,3,190,1,3,4,3,-1.7,94.055,-39.8,0.715,4991.6'
response = client.invoke_endpoint(
    EndpointName='ServiceEndpoint',
    Body=new_customer_info, 
    ContentType='text/csv'
)
result = json.loads(response['Body'].read().decode())
print(result)
--- output(response) ---
{u'predictions': [{u'score': 0.7528127431869507, u'predicted_label': 1.0}]}

Solution summary

The solution takes the following steps:

  1. Data Pipeline exports DynamoDB table data into S3. The original JSON data should be kept to recover the table in the rare event that this is needed. Data Pipeline then converts JSON to CSV so that Amazon SageMaker can read the data.Note: You should select only meaningful attributes when you convert CSV. For example, if you judge that the “campaign” attribute is not correlated, you can eliminate this attribute from the CSV.
  2. Train the Amazon SageMaker model with the new data source.
  3. When a new customer comes to your site, you can judge how likely it is for this customer to subscribe to your new product based on “predictedScores” provided by Amazon SageMaker.
  4. If the new user subscribes your new product, your application must update the attribute “y” to the value 1 (for yes). This updated data is provided for the next model renewal as a new data source. It serves to improve the accuracy of your prediction. With each new entry, your application can become smarter and deliver better predictions.

Running ad hoc queries using Amazon Athena

Amazon Athena is a serverless query service that makes it easy to analyze large amounts of data stored in Amazon S3 using standard SQL. Athena is useful for examining data and collecting statistics or informative summaries about data. You can also use the powerful analytic functions of Presto, as described in the topic Aggregate Functions of Presto in the Presto documentation.

With the Data Pipeline scheduled activity, recent CSV data is always located in S3 so that you can run ad hoc queries against the data using Amazon Athena. I show this with example SQL statements following. For an in-depth description of this process, see the post Interactive SQL Queries for Data in Amazon S3 on the AWS News Blog. 

Creating an Amazon Athena table and running it

Simply, you can create an EXTERNAL table for the CSV data on S3 in Amazon Athena Management Console.

=== Table Creation ===
CREATE EXTERNAL TABLE datasource (
 age int, 
 job string, 
 marital string , 
 education string, 
 default string, 
 housing string, 
 loan string, 
 contact string, 
 month string, 
 day_of_week string, 
 duration int, 
 campaign int, 
 pdays int , 
 previous int , 
 poutcome string, 
 emp_var_rate double, 
 cons_price_idx double,
 cons_conf_idx double, 
 euribor3m double, 
 nr_employed double, 
 y int 
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n' 
LOCATION 's3://<your bucket name>/<datasource path>/';

The following query calculates the correlation coefficient between the target attribute and other attributes using Amazon Athena.

=== Sample Query ===

SELECT corr(age,y) AS correlation_age_and_target, 
 corr(duration,y) AS correlation_duration_and_target, 
 corr(campaign,y) AS correlation_campaign_and_target,
 corr(contact,y) AS correlation_contact_and_target
FROM ( SELECT age , duration , campaign , y , 
 CASE WHEN contact = 'telephone' THEN 1 ELSE 0 END AS contact 
 FROM datasource 
 ) datasource ;

Conclusion

In this post, I introduce an example of how to analyze data in DynamoDB by using table data in Amazon S3 to optimize DynamoDB table read capacity. You can then use the analyzed data as a new data source to train an Amazon SageMaker model for accurate real-time prediction. In addition, you can run ad hoc queries against the data on S3 using Amazon Athena. I also present how to automate these procedures by using Data Pipeline.

You can adapt this example to your specific use case at hand, and hopefully this post helps you accelerate your development. You can find more examples and use cases for Amazon SageMaker in the video AWS 2017: Introducing Amazon SageMaker on the AWS website.

 


Additional Reading

If you found this post useful, be sure to check out Serving Real-Time Machine Learning Predictions on Amazon EMR and Analyzing Data in S3 using Amazon Athena.

 


About the Author

Yong Seong Lee is a Cloud Support Engineer for AWS Big Data Services. He is interested in every technology related to data/databases and helping customers who have difficulties in using AWS services. His motto is “Enjoy life, be curious and have maximum experience.”

 

 

How to retain system tables’ data spanning multiple Amazon Redshift clusters and run cross-cluster diagnostic queries

Post Syndicated from Karthik Sonti original https://aws.amazon.com/blogs/big-data/how-to-retain-system-tables-data-spanning-multiple-amazon-redshift-clusters-and-run-cross-cluster-diagnostic-queries/

Amazon Redshift is a data warehouse service that logs the history of the system in STL log tables. The STL log tables manage disk space by retaining only two to five days of log history, depending on log usage and available disk space.

To retain STL tables’ data for an extended period, you usually have to create a replica table for every system table. Then, for each you load the data from the system table into the replica at regular intervals. By maintaining replica tables for STL tables, you can run diagnostic queries on historical data from the STL tables. You then can derive insights from query execution times, query plans, and disk-spill patterns, and make better cluster-sizing decisions. However, refreshing replica tables with live data from STL tables at regular intervals requires schedulers such as Cron or AWS Data Pipeline. Also, these tables are specific to one cluster and they are not accessible after the cluster is terminated. This is especially true for transient Amazon Redshift clusters that last for only a finite period of ad hoc query execution.

In this blog post, I present a solution that exports system tables from multiple Amazon Redshift clusters into an Amazon S3 bucket. This solution is serverless, and you can schedule it as frequently as every five minutes. The AWS CloudFormation deployment template that I provide automates the solution setup in your environment. The system tables’ data in the Amazon S3 bucket is partitioned by cluster name and query execution date to enable efficient joins in cross-cluster diagnostic queries.

I also provide another CloudFormation template later in this post. This second template helps to automate the creation of tables in the AWS Glue Data Catalog for the system tables’ data stored in Amazon S3. After the system tables are exported to Amazon S3, you can run cross-cluster diagnostic queries on the system tables’ data and derive insights about query executions in each Amazon Redshift cluster. You can do this using Amazon QuickSight, Amazon Athena, Amazon EMR, or Amazon Redshift Spectrum.

You can find all the code examples in this post, including the CloudFormation templates, AWS Glue extract, transform, and load (ETL) scripts, and the resolution steps for common errors you might encounter in this GitHub repository.

Solution overview

The solution in this post uses AWS Glue to export system tables’ log data from Amazon Redshift clusters into Amazon S3. The AWS Glue ETL jobs are invoked at a scheduled interval by AWS Lambda. AWS Systems Manager, which provides secure, hierarchical storage for configuration data management and secrets management, maintains the details of Amazon Redshift clusters for which the solution is enabled. The last-fetched time stamp values for the respective cluster-table combination are maintained in an Amazon DynamoDB table.

The following diagram covers the key steps involved in this solution.

The solution as illustrated in the preceding diagram flows like this:

  1. The Lambda function, invoke_rs_stl_export_etl, is triggered at regular intervals, as controlled by Amazon CloudWatch. It’s triggered to look up the AWS Systems Manager parameter store to get the details of the Amazon Redshift clusters for which the system table export is enabled.
  2. The same Lambda function, based on the Amazon Redshift cluster details obtained in step 1, invokes the AWS Glue ETL job designated for the Amazon Redshift cluster. If an ETL job for the cluster is not found, the Lambda function creates one.
  3. The ETL job invoked for the Amazon Redshift cluster gets the cluster credentials from the parameter store. It gets from the DynamoDB table the last exported time stamp of when each of the system tables was exported from the respective Amazon Redshift cluster.
  4. The ETL job unloads the system tables’ data from the Amazon Redshift cluster into an Amazon S3 bucket.
  5. The ETL job updates the DynamoDB table with the last exported time stamp value for each system table exported from the Amazon Redshift cluster.
  6. The Amazon Redshift cluster system tables’ data is available in Amazon S3 and is partitioned by cluster name and date for running cross-cluster diagnostic queries.

Understanding the configuration data

This solution uses AWS Systems Manager parameter store to store the Amazon Redshift cluster credentials securely. The parameter store also securely stores other configuration information that the AWS Glue ETL job needs for extracting and storing system tables’ data in Amazon S3. Systems Manager comes with a default AWS Key Management Service (AWS KMS) key that it uses to encrypt the password component of the Amazon Redshift cluster credentials.

The following table explains the global parameters and cluster-specific parameters required in this solution. The global parameters are defined once and applicable at the overall solution level. The cluster-specific parameters are specific to an Amazon Redshift cluster and repeat for each cluster for which you enable this post’s solution. The CloudFormation template explained later in this post creates these parameters as part of the deployment process.

Parameter name Type Description
Global parametersdefined once and applied to all jobs
redshift_query_logs.global.s3_prefix String The Amazon S3 path where the query logs are exported. Under this path, each exported table is partitioned by cluster name and date.
redshift_query_logs.global.tempdir String The Amazon S3 path that AWS Glue ETL jobs use for temporarily staging the data.
redshift_query_logs.global.role> String The name of the role that the AWS Glue ETL jobs assume. Just the role name is sufficient. The complete Amazon Resource Name (ARN) is not required.
redshift_query_logs.global.enabled_cluster_list StringList A comma-separated list of cluster names for which system tables’ data export is enabled. This gives flexibility for a user to exclude certain clusters.
Cluster-specific parametersfor each cluster specified in the enabled_cluster_list parameter
redshift_query_logs.<<cluster_name>>.connection String The name of the AWS Glue Data Catalog connection to the Amazon Redshift cluster. For example, if the cluster name is product_warehouse, the entry is redshift_query_logs.product_warehouse.connection.
redshift_query_logs.<<cluster_name>>.user String The user name that AWS Glue uses to connect to the Amazon Redshift cluster.
redshift_query_logs.<<cluster_name>>.password Secure String The password that AWS Glue uses to connect the Amazon Redshift cluster’s encrypted-by key that is managed in AWS KMS.

For example, suppose that you have two Amazon Redshift clusters, product-warehouse and category-management, for which the solution described in this post is enabled. In this case, the parameters shown in the following screenshot are created by the solution deployment CloudFormation template in the AWS Systems Manager parameter store.

Solution deployment

To make it easier for you to get started, I created a CloudFormation template that automatically configures and deploys the solution—only one step is required after deployment.

Prerequisites

To deploy the solution, you must have one or more Amazon Redshift clusters in a private subnet. This subnet must have a network address translation (NAT) gateway or a NAT instance configured, and also a security group with a self-referencing inbound rule for all TCP ports. For more information about why AWS Glue ETL needs the configuration it does, described previously, see Connecting to a JDBC Data Store in a VPC in the AWS Glue documentation.

To start the deployment, launch the CloudFormation template:

CloudFormation stack parameters

The following table lists and describes the parameters for deploying the solution to export query logs from multiple Amazon Redshift clusters.

Property Default Description
S3Bucket mybucket The bucket this solution uses to store the exported query logs, stage code artifacts, and perform unloads from Amazon Redshift. For example, the mybucket/extract_rs_logs/data bucket is used for storing all the exported query logs for each system table partitioned by the cluster. The mybucket/extract_rs_logs/temp/ bucket is used for temporarily staging the unloaded data from Amazon Redshift. The mybucket/extract_rs_logs/code bucket is used for storing all the code artifacts required for Lambda and the AWS Glue ETL jobs.
ExportEnabledRedshiftClusters Requires Input A comma-separated list of cluster names from which the system table logs need to be exported.
DataStoreSecurityGroups Requires Input A list of security groups with an inbound rule to the Amazon Redshift clusters provided in the parameter, ExportEnabledClusters. These security groups should also have a self-referencing inbound rule on all TCP ports, as explained on Connecting to a JDBC Data Store in a VPC.

After you launch the template and create the stack, you see that the following resources have been created:

  1. AWS Glue connections for each Amazon Redshift cluster you provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
  2. All parameters required for this solution created in the parameter store.
  3. The Lambda function that invokes the AWS Glue ETL jobs for each configured Amazon Redshift cluster at a regular interval of five minutes.
  4. The DynamoDB table that captures the last exported time stamps for each exported cluster-table combination.
  5. The AWS Glue ETL jobs to export query logs from each Amazon Redshift cluster provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
  6. The IAM roles and policies required for the Lambda function and AWS Glue ETL jobs.

After the deployment

For each Amazon Redshift cluster for which you enabled the solution through the CloudFormation stack parameter, ExportEnabledRedshiftClusters, the automated deployment includes temporary credentials that you must update after the deployment:

  1. Go to the parameter store.
  2. Note the parameters <<cluster_name>>.user and redshift_query_logs.<<cluster_name>>.password that correspond to each Amazon Redshift cluster for which you enabled this solution. Edit these parameters to replace the placeholder values with the right credentials.

For example, if product-warehouse is one of the clusters for which you enabled system table export, you edit these two parameters with the right user name and password and choose Save parameter.

Querying the exported system tables

Within a few minutes after the solution deployment, you should see Amazon Redshift query logs being exported to the Amazon S3 location, <<S3Bucket_you_provided>>/extract_redshift_query_logs/data/. In that bucket, you should see the eight system tables partitioned by customer name and date: stl_alert_event_log, stl_dlltext, stl_explain, stl_query, stl_querytext, stl_scan, stl_utilitytext, and stl_wlm_query.

To run cross-cluster diagnostic queries on the exported system tables, create external tables in the AWS Glue Data Catalog. To make it easier for you to get started, I provide a CloudFormation template that creates an AWS Glue crawler, which crawls the exported system tables stored in Amazon S3 and builds the external tables in the AWS Glue Data Catalog.

Launch this CloudFormation template to create external tables that correspond to the Amazon Redshift system tables. S3Bucket is the only input parameter required for this stack deployment. Provide the same Amazon S3 bucket name where the system tables’ data is being exported. After you successfully create the stack, you can see the eight tables in the database, redshift_query_logs_db, as shown in the following screenshot.

Now, navigate to the Athena console to run cross-cluster diagnostic queries. The following screenshot shows a diagnostic query executed in Athena that retrieves query alerts logged across multiple Amazon Redshift clusters.

You can build the following example Amazon QuickSight dashboard by running cross-cluster diagnostic queries on Athena to identify the hourly query count and the key query alert events across multiple Amazon Redshift clusters.

How to extend the solution

You can extend this post’s solution in two ways:

  • Add any new Amazon Redshift clusters that you spin up after you deploy the solution.
  • Add other system tables or custom query results to the list of exports from an Amazon Redshift cluster.

Extend the solution to other Amazon Redshift clusters

To extend the solution to more Amazon Redshift clusters, add the three cluster-specific parameters in the AWS Systems Manager parameter store following the guidelines earlier in this post. Modify the redshift_query_logs.global.enabled_cluster_list parameter to append the new cluster to the comma-separated string.

Extend the solution to add other tables or custom queries to an Amazon Redshift cluster

The current solution ships with the export functionality for the following Amazon Redshift system tables:

  • stl_alert_event_log
  • stl_dlltext
  • stl_explain
  • stl_query
  • stl_querytext
  • stl_scan
  • stl_utilitytext
  • stl_wlm_query

You can easily add another system table or custom query by adding a few lines of code to the AWS Glue ETL job, <<cluster-name>_extract_rs_query_logs. For example, suppose that from the product-warehouse Amazon Redshift cluster you want to export orders greater than $2,000. To do so, add the following five lines of code to the AWS Glue ETL job product-warehouse_extract_rs_query_logs, where product-warehouse is your cluster name:

  1. Get the last-processed time-stamp value. The function creates a value if it doesn’t already exist.

salesLastProcessTSValue = functions.getLastProcessedTSValue(trackingEntry=”mydb.sales_2000",job_configs=job_configs)

  1. Run the custom query with the time stamp.

returnDF=functions.runQuery(query="select * from sales s join order o where o.order_amnt > 2000 and sale_timestamp > '{}'".format (salesLastProcessTSValue) ,tableName="mydb.sales_2000",job_configs=job_configs)

  1. Save the results to Amazon S3.

functions.saveToS3(dataframe=returnDF,s3Prefix=s3Prefix,tableName="mydb.sales_2000",partitionColumns=["sale_date"],job_configs=job_configs)

  1. Get the latest time-stamp value from the returned data frame in Step 2.

latestTimestampVal=functions.getMaxValue(returnDF,"sale_timestamp",job_configs)

  1. Update the last-processed time-stamp value in the DynamoDB table.

functions.updateLastProcessedTSValue(“mydb.sales_2000",latestTimestampVal[0],job_configs)

Conclusion

In this post, I demonstrate a serverless solution to retain the system tables’ log data across multiple Amazon Redshift clusters. By using this solution, you can incrementally export the data from system tables into Amazon S3. By performing this export, you can build cross-cluster diagnostic queries, build audit dashboards, and derive insights into capacity planning by using services such as Athena. I also demonstrate how you can extend this solution to other ad hoc query use cases or tables other than system tables by adding a few lines of code.


Additional Reading

If you found this post useful, be sure to check out Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production and Amazon Redshift – 2017 Recap.


About the Author

Karthik Sonti is a senior big data architect at Amazon Web Services. He helps AWS customers build big data and analytical solutions and provides guidance on architecture and best practices.

 

 

 

 

AWS Online Tech Talks – April & Early May 2018

Post Syndicated from Betsy Chernoff original https://aws.amazon.com/blogs/aws/aws-online-tech-talks-april-early-may-2018/

We have several upcoming tech talks in the month of April and early May. Come join us to learn about AWS services and solution offerings. We’ll have AWS experts online to help answer questions in real-time. Sign up now to learn more, we look forward to seeing you.

Note – All sessions are free and in Pacific Time.

April & early May — 2018 Schedule

Compute

April 30, 2018 | 01:00 PM – 01:45 PM PTBest Practices for Running Amazon EC2 Spot Instances with Amazon EMR (300) – Learn about the best practices for scaling big data workloads as well as process, store, and analyze big data securely and cost effectively with Amazon EMR and Amazon EC2 Spot Instances.

May 1, 2018 | 01:00 PM – 01:45 PM PTHow to Bring Microsoft Apps to AWS (300) – Learn more about how to save significant money by bringing your Microsoft workloads to AWS.

May 2, 2018 | 01:00 PM – 01:45 PM PTDeep Dive on Amazon EC2 Accelerated Computing (300) – Get a technical deep dive on how AWS’ GPU and FGPA-based compute services can help you to optimize and accelerate your ML/DL and HPC workloads in the cloud.

Containers

April 23, 2018 | 11:00 AM – 11:45 AM PTNew Features for Building Powerful Containerized Microservices on AWS (300) – Learn about how this new feature works and how you can start using it to build and run modern, containerized applications on AWS.

Databases

April 23, 2018 | 01:00 PM – 01:45 PM PTElastiCache: Deep Dive Best Practices and Usage Patterns (200) – Learn about Redis-compatible in-memory data store and cache with Amazon ElastiCache.

April 25, 2018 | 01:00 PM – 01:45 PM PTIntro to Open Source Databases on AWS (200) – Learn how to tap the benefits of open source databases on AWS without the administrative hassle.

DevOps

April 25, 2018 | 09:00 AM – 09:45 AM PTDebug your Container and Serverless Applications with AWS X-Ray in 5 Minutes (300) – Learn how AWS X-Ray makes debugging your Container and Serverless applications fun.

Enterprise & Hybrid

April 23, 2018 | 09:00 AM – 09:45 AM PTAn Overview of Best Practices of Large-Scale Migrations (300) – Learn about the tools and best practices on how to migrate to AWS at scale.

April 24, 2018 | 11:00 AM – 11:45 AM PTDeploy your Desktops and Apps on AWS (300) – Learn how to deploy your desktops and apps on AWS with Amazon WorkSpaces and Amazon AppStream 2.0

IoT

May 2, 2018 | 11:00 AM – 11:45 AM PTHow to Easily and Securely Connect Devices to AWS IoT (200) – Learn how to easily and securely connect devices to the cloud and reliably scale to billions of devices and trillions of messages with AWS IoT.

Machine Learning

April 24, 2018 | 09:00 AM – 09:45 AM PT Automate for Efficiency with Amazon Transcribe and Amazon Translate (200) – Learn how you can increase the efficiency and reach your operations with Amazon Translate and Amazon Transcribe.

April 26, 2018 | 09:00 AM – 09:45 AM PT Perform Machine Learning at the IoT Edge using AWS Greengrass and Amazon Sagemaker (200) – Learn more about developing machine learning applications for the IoT edge.

Mobile

April 30, 2018 | 11:00 AM – 11:45 AM PTOffline GraphQL Apps with AWS AppSync (300) – Come learn how to enable real-time and offline data in your applications with GraphQL using AWS AppSync.

Networking

May 2, 2018 | 09:00 AM – 09:45 AM PT Taking Serverless to the Edge (300) – Learn how to run your code closer to your end users in a serverless fashion. Also, David Von Lehman from Aerobatic will discuss how they used [email protected] to reduce latency and cloud costs for their customer’s websites.

Security, Identity & Compliance

April 30, 2018 | 09:00 AM – 09:45 AM PTAmazon GuardDuty – Let’s Attack My Account! (300) – Amazon GuardDuty Test Drive – Practical steps on generating test findings.

May 3, 2018 | 09:00 AM – 09:45 AM PTProtect Your Game Servers from DDoS Attacks (200) – Learn how to use the new AWS Shield Advanced for EC2 to protect your internet-facing game servers against network layer DDoS attacks and application layer attacks of all kinds.

Serverless

April 24, 2018 | 01:00 PM – 01:45 PM PTTips and Tricks for Building and Deploying Serverless Apps In Minutes (200) – Learn how to build and deploy apps in minutes.

Storage

May 1, 2018 | 11:00 AM – 11:45 AM PTBuilding Data Lakes That Cost Less and Deliver Results Faster (300) – Learn how Amazon S3 Select And Amazon Glacier Select increase application performance by up to 400% and reduce total cost of ownership by extending your data lake into cost-effective archive storage.

May 3, 2018 | 11:00 AM – 11:45 AM PTIntegrating On-Premises Vendors with AWS for Backup (300) – Learn how to work with AWS and technology partners to build backup & restore solutions for your on-premises, hybrid, and cloud native environments.

How to migrate a Hue database from an existing Amazon EMR cluster

Post Syndicated from Anvesh Ragi original https://aws.amazon.com/blogs/big-data/how-to-migrate-a-hue-database-from-an-existing-amazon-emr-cluster/

Hadoop User Experience (Hue) is an open-source, web-based, graphical user interface for use with Amazon EMR and Apache Hadoop. The Hue database stores things like users, groups, authorization permissions, Apache Hive queries, Apache Oozie workflows, and so on.

There might come a time when you want to migrate your Hue database to a new EMR cluster. For example, you might want to upgrade from an older version of the Amazon EMR AMI (Amazon Machine Image), but your Hue application and its database have had a lot of customization.You can avoid re-creating these user entities and retain query/workflow histories in Hue by migrating the existing Hue database, or remote database in Amazon RDS, to a new cluster.

By default, Hue user information and query histories are stored in a local MySQL database on the EMR cluster’s master node. However, you can create one or more Hue-enabled clusters using a configuration stored in Amazon S3 and a remote MySQL database in Amazon RDS. This allows you to preserve user information and query history that Hue creates without keeping your Amazon EMR cluster running.

This post describes the step-by-step process for migrating the Hue database from an existing EMR cluster.

Note: Amazon EMR supports different Hue versions across different AMI releases. Keep in mind the compatibility of Hue versions between the old and new clusters in this migration activity. Currently, Hue 3.x.x versions are not compatible with Hue 4.x.x versions, and therefore a migration between these two Hue versions might create issues. In addition, Hue 3.10.0 is not backward compatible with its previous 3.x.x versions.

Before you begin

First, let’s create a new testUser in Hue on an existing EMR cluster, as shown following:

You will use these credentials later to log in to Hue on the new EMR cluster and validate whether you have successfully migrated the Hue database.

Let’s get started!

Migration how-to

Follow these steps to migrate your database to a new EMR cluster and then validate the migration process.

1.) Make a backup of the existing Hue database.

Use SSH to connect to the master node of the old cluster, as shown following (if you are using Linux/Unix/macOS), and dump the Hue database to a JSON file.

$ ssh -i ~/key.pem [email protected]
$ /usr/lib/hue/build/env/bin/hue dumpdata > ./hue-mysql.json

Edit the hue-mysql.json output file by removing all JSON objects that have useradmin.userprofile in the model field, and save the file. For example, remove the objects as shown following:

{
  "pk": 1,
  "model": "useradmin.userprofile",
  "fields": {
    "last_activity": "2018-01-10T11:41:04",
    "creation_method": "HUE",
    "first_login": false,
    "user": 1,
    "home_directory": "/user/hue_admin"
  }
},

2.) Store the hue-mysql.json file on persistent storage like Amazon S3.

You can copy the file from the old EMR cluster to Amazon S3 using the AWS CLI or Secure Copy (SCP) client. For example, the following uses the AWS CLI:

$ aws s3 cp ./hue-mysql.json s3://YourBucketName/folder/

3.) Recover/reload the backed-up Hue database into the new EMR cluster.

a.) Use SSH to connect to the master node of the new EMR cluster, and stop the Hue service that is already running.

$ ssh -i ~/key.pem [email protected]
$ sudo stop hue
hue stop/waiting

b.) Connect to the Hue database—either the local MySQL database or the remote database in Amazon RDS for your cluster as shown following, using the mysql client.

$ mysql -h HOST –u USER –pPASSWORD

For a local MySQL database, you can find the hostname, user name, and password for connecting to the database in the /etc/hue/conf/hue.ini file on the master node.

[[database]]
    engine = mysql
    name = huedb
    case_insensitive_collation = utf8_unicode_ci
    test_charset = utf8
    test_collation = utf8_bin
    host = ip-172-31-37-133.us-west-2.compute.internal
    user = hue
    test_name = test_huedb
    password = QdWbL3Ai6GcBqk26
    port = 3306

Based on the preceding example configuration, the sample command is as follows. (Replace the host, user, and password details based on your EMR cluster settings.)

$ mysql -h ip-172-31-37-133.us-west-2.compute.internal -u hue -pQdWbL3Ai6GcBqk26

c.) Drop the existing Hue database with the name huedb from the MySQL server.

mysql> DROP DATABASE IF EXISTS huedb;

d.) Create a new empty database with the same name huedb.

mysql> CREATE DATABASE huedb DEFAULT CHARACTER SET utf8 DEFAULT COLLATE=utf8_bin;

e.) Now, synchronize Hue with its database huedb.

$ sudo /usr/lib/hue/build/env/bin/hue syncdb --noinput
$ sudo /usr/lib/hue/build/env/bin/hue migrate

(This populates the new huedb with all Hue tables that are required.)

f.) Log in to MySQL again, and drop the foreign key to clean tables.

mysql> SHOW CREATE TABLE huedb.auth_permission;

In the following example, replace <id value> with the actual value from the preceding output.

mysql> ALTER TABLE huedb.auth_permission DROP FOREIGN KEY
content_type_id_refs_id_<id value>;

g.) Delete the contents of the django_content_type

mysql> DELETE FROM huedb.django_content_type;

h.) Download the backed-up Hue database dump from Amazon S3 to the new EMR cluster, and load it into Hue.

$ aws s3 cp s3://YourBucketName/folder/hue-mysql.json ./
$ sudo /usr/lib/hue/build/env/bin/hue loaddata ./hue-mysql.json

i.) In MySQL, add the foreign key content_type_id back to the auth_permission

mysql> use huedb;
mysql> ALTER TABLE huedb.auth_permission ADD FOREIGN KEY (`content_type_id`) REFERENCES `django_content_type` (`id`);

j.) Start the Hue service again.

$ sudo start hue
hue start/running, process XXXX

That’s it! Now, verify whether you can successfully access the Hue UI, and sign in using your existing testUser credentials.

After a successful sign in to Hue on the new EMR cluster, you should see a similar Hue homepage as shown following with testUser as the user signed in:

Conclusion

You have now learned how to migrate an existing Hue database to a new Amazon EMR cluster and validate the migration process. If you have any similar Amazon EMR administration topics that you want to see covered in a future post, please let us know in the comments below.


Additional Reading

If you found this post useful, be sure to check out Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR and Dynamically Create Friendly URLs for Your Amazon EMR Web Interfaces.


About the Author


Anvesh Ragi is a Big Data Support Engineer with Amazon Web Services. He works closely with AWS customers to provide them architectural and engineering assistance for their data processing workflows. In his free time, he enjoys traveling and going for hikes.