Tag Archives: Amazon EMR

Optimizing Amazon EMR for resilience and cost with capacity-optimized Spot Instances

Post Syndicated from Ran Sheinberg original https://aws.amazon.com/blogs/big-data/optimizing-amazon-emr-for-resilience-and-cost-with-capacity-optimized-spot-instances/

Amazon EMR now supports the capacity-optimized allocation strategy for Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances for launching Spot Instances from the most available Spot Instance capacity pools by analyzing capacity metrics in real time. You can now specify up to 15 instance types in your EMR task instance fleet configuration. This provides Amazon EMR with more options in choosing the optimal pools to launch Spot Instances from in order to decrease chances of Spot interruptions, and increases the ability to relaunch capacity using other instance types in case Spot Instances are interrupted when Amazon EC2 needs the capacity back.

Background

Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open-source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto. With Amazon EMR, you can run petabyte-scale analysis at less than half of the cost of traditional on-premises solutions, and over three times as fast on Amazon EMR runtime for Apache Spark compared to running without the runtime. If you have existing on-premises deployments of open-source tools such as Apache Spark and Apache Hive, you can also run Amazon EMR clusters on AWS Outposts.

Spot Instances are spare Amazon EC2 compute capacity in the AWS Cloud available to you at savings of up to 90% compared to On-Demand Instance prices. The only difference between On-Demand Instances and Spot Instances is that Amazon EC2 can interrupt Spot Instances with 2 minutes of notification when Amazon EC2 needs the capacity back. Using Spot Instances in Amazon EMR is a common pattern that allows AWS customers to achieve significant cost savings.

The capacity-optimized allocation strategy in the Amazon EC2 fleet (also available for Amazon EC2 Auto Scaling and Spot Fleet) provisions Spot Instances from the most-available Spot Instance pools by analyzing capacity metrics. By offering the possibility of fewer interruptions, the capacity-optimized strategy can lower the overall cost of your workload. For more information about how AWS customers are benefiting from decreased Spot interruptions with the capacity-optimized allocation strategy, see Capacity-Optimized Spot Instance Allocation in Action at Mobileye and Skyscanner.

Amazon EMR uses the Amazon EC2 RunInstances API to provision compute capacity. We are enhancing the way Amazon EMR provisions EC2 instances to provide more flexibility and increased cluster resilience using EC2 Fleet (CreateFleet) in Instant mode, as a drop-in replacement for RunInstances.

Optimizing capacity for greater resilience

With this launch, you can configure Amazon EMR to use allocation strategies.

The capacity-optimized allocation strategy uses real-time capacity data to allocate instances from the Spot Instance pools with the optimal capacity for the number of instances that are launching. This allocation strategy is appropriate for workloads that have a higher cost of interruption. Examples include long-running jobs and multi-tenant persistent clusters running Apache Spark, Apache Hive, and Presto. This allocation strategy lets you specify up to 15 EC2 instance types on task instance fleets to diversify your Spot requests and get steep discounts. Previously, instance fleets allowed a maximum of five instance types. You can now diversify your Spot requests across these 15 pools within each Availability Zone and prioritize deploying into a deeper capacity pool to lower the chance of interruptions. With more instance type diversification, Amazon EMR has more capacity pools to allocate capacity from, and chooses the Spot Instances which are least likely to be interrupted.

For example, if you’re initially using EC2 memory-optimized r5.4xlarge instances (with 16 vCPUs and 128GB of RAM) for your EMR task nodes, you can configure the EMR task instance fleet with different instances types. First, explore different-sized instance types such as r5.2xlarge and r5.8xlarge. Second, add previous generation r4.4xlarge and other R4 instance sizes. After you’ve added different sizes within the same family, as well as previous generation instance types, you can add extra instance types with similar hardware characteristics and vCPU to memory ratio, such as the r5a instance family with AMD processors, r5d instance family with locally attached NVMe storage, and more.

The allocation strategy is also taken into account in case the cluster scales out after the initial provisioning phase—for example, if you manually resize the core or task fleets, or if you’re using managed scaling to automatically increase or decrease the number of instances or units in your cluster based on workload.

For more information about Spot Instance configuration if you’re using Amazon EMR to run Apache Spark workloads, see Best practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR. This blog emphasizes best practices that will help you build your Spark workloads with Amazon EMR to achieve deep cost savings.

Amazon EMR has made significant enhancements to improve elasticity and resilience, including graceful decommissioning of Spot Instances running Apache Spark and Apache Hadoop applications. Amazon EMR has customizations to open-source Apache Spark that make it more resilient to node loss—integrating with YARN’s decommissioning mechanism, extending Apache Spark’s blacklisting mechanism and actions on decommissioned nodes.

For example, when Spot Instances are interrupted in a running EMR cluster, stage failures don’t count towards the total number of failures that trigger a total job failure. For more information, see Spark enhancements for elasticity and resilience on Amazon EMR.

New configuration options and IAM policy requirements

To leverage the allocation strategies in your EMR clusters, you need to use a new AllocationStrategy parameter in your cluster configurations. Amazon EMR added support for an On-Demand allocation strategy: you can specify multiple On-Demand Instance types in your core or task instance fleets, and specify an allocation strategy of “lowest-price” to have Amazon EMR provision On-Demand Instances that have the lowest costs. This allows you to also be flexible with your selection of On-Demand instance types.

The following is an example snippet from an Amazon EMR JSON configuration file with the new capabilities:

{
    "Name": "Taskfleet",
    "InstanceFleetType": "TASK",
    "TargetSpotCapacity": 1,
    "TargetOnDemandCapacity": 1,
    "LaunchSpecifications": {
        "OnDemandSpecification": {
            "AllocationStrategy": "lowest-price"
        },
        "SpotSpecification": {
            "AllocationStrategy": "capacity-optimized",
            "TimeoutDurationMinutes": 120,
            "TimeoutAction": "TERMINATE_CLUSTER"
        }
    }
}

For more information about the API options, see InstanceFleetProvisioningSpecifications.

The following IAM policy shows the additional service role permissions required to create a cluster that uses the instance fleet allocation strategy option. If your clusters are using the default role EMR_DefaultRole (which has the default managed policy attached AmazonElasticMapReduceRole), the managed policy is already updated to include these new role permissions. If your clusters are using a different role or policy, make sure you add these new permissions to your policy. See the following IAM policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DeleteLaunchTemplate",
                "ec2:CreateLaunchTemplate",
                "ec2:DescribeLaunchTemplates",
                "ec2:CreateFleet"
            ],
            "Resource": "*"
        }

Launching an EMR cluster with capacity-optimized Spot Instances and a diversified task fleet

In this section, we look at how to create an EMR cluster that includes allocation strategy configurations and a diversified task fleet. The reason for specifying more instance types is in order to allow Amazon EMR to launch instances from the most optimal capacity pools, and be able to replenish the cluster’s target capacity in case Spot Instances are interrupted. Moreover, by using the capacity-optimized allocation strategy, Spot Instances will be launched from the most available capacity pools, effectively decreasing the chances of Spot interruptions.

The following AWS Command Line Interface (AWS CLI) command launches an EMR cluster in the default AWS Region configured in your AWS CLI configuration, with the master and core nodes running on On-Demand Instances, and a task fleet running Spot Instances.

Amazon EMR uses a wide selection of instance types in the task instance fleet and uses "AllocationStrategy": "CAPACITY_OPTIMIZED" to launch instances from the most available Spot capacity pools and decrease the chances of workload interruptions. By providing a WeightedCapacity for each instance type that is equal to the number of vCPU (or YARN vCores), you can specify a TargetSpotCapacity that defines the number of vCPUs (YARN vCores) in your task fleet and be flexible around the instance sizes, effectively providing more capacity pools to choose from. You should specify a subnet ID per Availability Zone in the AWS Region. While each EMR cluster runs in a single Availability Zone, specifying multiple Availability Zones allows you to architect your workload with increased fault-tolerance.

See the following AWS CLI command for an example of launching an Amazon EMR cluster that adheres to the recommendations in this blog post (uses Allocation Strategies and a diversified set of instance types in the task fleet):

aws emr create-cluster \
--use-default-roles --release-label emr-5.30.1 \
--ec2-attributes SubnetIds=['subnet-1234567890abcdefg','subnet-1234567890abcdefg','subnet-1234567890abcdefg'] \
--name 'EMRCluster-TaskFleet' \
--instance-fleets \InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{WeightedCapacity=1,InstanceType=m5.xlarge}'] \InstanceFleetType=CORE,TargetOnDemandCapacity=4,LaunchSpecifications={OnDemandSpecification='{AllocationStrategy=LOWEST_PRICE}'},InstanceTypeConfigs=['{WeightedCapacity=4,InstanceType=r4.xlarge},{WeightedCapacity=4,InstanceType=r5.xlarge}'] \InstanceFleetType=TASK,TargetSpotCapacity=64,LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=60,AllocationStrategy=CAPACITY_OPTIMIZED,TimeoutAction=TERMINATE_CLUSTER}'},InstanceTypeConfigs=['{WeightedCapacity=4,InstanceType=r5.xlarge},{WeightedCapacity=4,InstanceType=r4.xlarge},{WeightedCapacity=8,InstanceType=r5.2xlarge},{WeightedCapacity=8,InstanceType=r4.2xlarge},{WeightedCapacity=16,InstanceType=r5.4xlarge},{WeightedCapacity=16,InstanceType=r4.4xlarge},{WeightedCapacity=32,InstanceType=r5.8xlarge},{WeightedCapacity=32,InstanceType=r4.8xlarge},{WeightedCapacity=64,InstanceType=r5.16xlarge},{WeightedCapacity=64,InstanceType=r4.16xlarge},{WeightedCapacity=16,InstanceType=r5d.4xlarge},{WeightedCapacity=16,InstanceType=r5a.4xlarge}']

The following screenshot shows the result on the Amazon EMR console.

Conclusion

With this new functionality in Amazon EMR, you can increase the resilience of your organization’s data-processing workloads and optimize your costs by using Spot Instances. The capacity-optimized allocation strategy works to decrease the possibility of Spot interruptions in your cluster and allows you to specify up to 15 different instance types for your task fleet, enabling Amazon EMR to find the most available Spot capacity pools for your workload. With the Amazon EMR enhancements for increased resilience and the capacity-optimized allocation strategy for Spot Instances, you can achieve deep cost savings without compromising on availability.


About the authors

Ran Sheinberg is a principal solutions architect in the EC2 Spot team with Amazon Web Services. He works with AWS customers on cost optimizing their compute spend by utilizing Spot Instances across different types of workloads: stateless web applications, queue workers, containerized workloads, analytics, HPC and others.

 

 

 

 

 

 

Apply record level changes from relational databases to Amazon S3 data lake using Apache Hudi on Amazon EMR and AWS Database Migration Service

Post Syndicated from Ninad Phatak original https://aws.amazon.com/blogs/big-data/apply-record-level-changes-from-relational-databases-to-amazon-s3-data-lake-using-apache-hudi-on-amazon-emr-and-aws-database-migration-service/

Data lakes give organizations the ability to harness data from multiple sources in less time. Users across different roles are now empowered to collaborate and analyze data in different ways, leading to better, faster decision-making. Amazon Simple Storage Service (Amazon S3) is the highly performant object storage service for structured and unstructured data and the storage service of choice to build a data lake.

However, many use cases like performing change data capture (CDC) from an upstream relational database to an Amazon S3-based data lake require handling data at a record level. Performing an operation like inserting, updating, and deleting individual records from a dataset requires the processing engine to read all the objects (files), make the changes, and rewrite the entire dataset as new files. Furthermore, making the data available in the data lake in near-real time often leads to the data being fragmented over many small files, resulting in poor query performance. Apache Hudi is an open-source data management framework that enables you to manage data at the record level in Amazon S3 data lakes, thereby simplifying building CDC pipelines and making it efficient to do streaming data ingestion. Datasets managed by Hudi are stored in Amazon S3 using open storage formats, and integrations with Presto, Apache Hive, Apache Spark, and the AWS Glue Data Catalog give you near real-time access to updated data using familiar tools. Hudi is supported in Amazon EMR and is automatically installed when you choose Spark, Hive, or Presto when deploying your EMR cluster.

In this post, we show you how to build a CDC pipeline that captures the data from an Amazon Relational Database Service (Amazon RDS) for MySQL database using AWS Database Migration Service (AWS DMS) and applies those changes to a dataset in Amazon S3 using Apache Hudi on Amazon EMR. Apache Hudi includes the utility HoodieDeltaStreamer, which provides an easy way to ingest data from many sources, such as a distributed file system or Kafka. It manages checkpointing, rollback, and recovery so you don’t need to keep track of what data has been read and processed from the source, which makes it easy to consume change data. It also allows for lightweight SQL-based transformations on the data as it is being ingested. For more information, see Writing Hudi Tables. Support for AWS DMS with HoodieDeltaStreamer is provided with Apache Hudi version 0.5.2 and is available on Amazon EMR 5.30.x and 6.1.0.

Architecture overview

The following diagram illustrates the architecture we deploy to build our CDC pipeline.

In this architecture, we have a MySQL instance on Amazon RDS. AWS DMS pulls full and incremental data (using the CDC feature of AWS DMS) into an S3 bucket in Parquet format. HoodieDeltaStreamer on an EMR cluster is used to process the full and incremental data to create a Hudi dataset. As the data in the MySQL database gets updated, the AWS DMS task picks up the changes and takes them to the raw S3 bucket. The HoodieDeltastreamer job can be run on the EMR cluster at a certain frequency or in a continuous mode to apply these changes to the Hudi dataset in the Amazon S3 data lake. You can query this data with tools such as SparkSQL, Presto, Apache Hive running on the EMR cluster, and Amazon Athena.

Deploying the solution resources

We use AWS CloudFormation to deploy these components in your AWS account. Choose an AWS Region for deployment where the following services are available:

You need to meet the following prerequisites before deploying the CloudFormation template:

  • Have a VPC with at least two public subnets in your account.
  • Have a S3 bucket where you want to collect logs from the EMR cluster. This should be in the same AWS region where you spin up the CloudFormation stack.
  • Have an AWS Identity and Access Management (IAM) role dms-vpc-role. For instructions on creating one, see Security in AWS Database Migration Service.
  • If you’re deploying the stack in an account using the AWS Lake Formation permission model, validate the following settings:
    • The IAM user used to deploy the stack is added as a data lake administrator under Lake Formation or the IAM user used to deploy the stack has IAM privileges to create databases in the AWS Glue Data Catalog.
    • The Data Catalog settings under Lake Formation are configured to use only IAM access control for new databases and new tables in new databases. This makes sure that all access to the newly created databases and tables in the Data Catalog are controlled solely using IAM permissions.
  • IAMAllowedPrincipals is granted database creator privilege on the Lake Formation Database creators page.

If this privilege is not in place, grant it by choosing Grant and selecting the Create database permission.

These Lake Formation settings are required so that all permissions to the Data Catalog objects are controlled using IAM only.

Launching the CloudFormation stack

To launch the CloudFormation stack, complete the following steps:

  1. Choose Launch Stack:
  2. Provide the mandatory parameters in the Parameters section, including an S3 bucket to store the Amazon EMR logs and a CIDR IP range from where you want to access Amazon RDS for MySQL.
  3. Follow through the CloudFormation stack creation wizard, leaving rest of the default values unchanged.
  4. On the final page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create stack.
  6. When the stack creation is complete, record the details of the S3 bucket, EMR cluster, and Amazon RDS for MySQL details on the Outputs tab of the CloudFormation stack.

The CloudFormation template uses m5.xlarge and m5.2xlarge instances for the EMR cluster. If these instance types aren’t available in the Region or Availability Zone you have selected for deployment, the creation of the CloudFormation stack fails. If that happens, choose a Region or subnet where the instance type is available. For more information about working around this issue, see Instance Type Not Supported.

CloudFormation also creates and configures the AWS DMS endpoints and tasks with requisite connection attributes such as dataFormat, timestampColumnName, and parquetTimestampInMillisecond. For more information, see Extra connection attributes when using Amazon S3 as a target for AWS DMS.

The database instance deployed as part of the CloudFormation stack has already been created with the settings needed for AWS DMS to work in CDC mode on the database. These are:

  • binlog_format=ROW
  • binlog_checksum=NONE

Also, automatic backups are enabled on the RDS DB instance. This is a required attribute for AWS DMS to do CDC. For more information, see Using a MySQL-compatible database as a source for AWS DMS.

Running the end-to-end data flow

Now that the CloudFormation stack is deployed, we can run our data flow to get the full and incremental data from MySQL into a Hudi dataset in our data lake.

  1. As a best practice, retain your binlogs for at least 24 hours. Log in to your Amazon RDS for MySQL database using your SQL client and run the following command:
    call mysql.rds_set_configuration('binlog retention hours', 24)

  2. Create a table in the dev database:
    create table dev.retail_transactions(
    tran_id INT,
    tran_date DATE,
    store_id INT,
    store_city varchar(50),
    store_state char(2),
    item_code varchar(50),
    quantity INT,
    total FLOAT);

  3. When the table is created, insert some dummy data into the database:
    insert into dev.retail_transactions values(1,'2019-03-17',1,'CHICAGO','IL','XXXXXX',5,106.25);
    insert into dev.retail_transactions values(2,'2019-03-16',2,'NEW YORK','NY','XXXXXX',6,116.25);
    insert into dev.retail_transactions values(3,'2019-03-15',3,'SPRINGFIELD','IL','XXXXXX',7,126.25);
    insert into dev.retail_transactions values(4,'2019-03-17',4,'SAN FRANCISCO','CA','XXXXXX',8,136.25);
    insert into dev.retail_transactions values(5,'2019-03-11',1,'CHICAGO','IL','XXXXXX',9,146.25);
    insert into dev.retail_transactions values(6,'2019-03-18',1,'CHICAGO','IL','XXXXXX',10,156.25);
    insert into dev.retail_transactions values(7,'2019-03-14',2,'NEW YORK','NY','XXXXXX',11,166.25);
    insert into dev.retail_transactions values(8,'2019-03-11',1,'CHICAGO','IL','XXXXXX',12,176.25);
    insert into dev.retail_transactions values(9,'2019-03-10',4,'SAN FRANCISCO','CA','XXXXXX',13,186.25);
    insert into dev.retail_transactions values(10,'2019-03-13',1,'CHICAGO','IL','XXXXXX',14,196.25);
    insert into dev.retail_transactions values(11,'2019-03-14',5,'CHICAGO','IL','XXXXXX',15,106.25);
    insert into dev.retail_transactions values(12,'2019-03-15',6,'CHICAGO','IL','XXXXXX',16,116.25);
    insert into dev.retail_transactions values(13,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    insert into dev.retail_transactions values(14,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    

    We now use AWS DMS to start pushing this data to Amazon S3.

  4. On the AWS DMS console, run the task hudiblogload.

This task does a full load of the table to Amazon S3 and then starts writing incremental data.

If you’re prompted to test the AWS DMS endpoints while starting the AWS DMS task for the first time, you should do so. It’s generally a good practice to test the source and target endpoints before starting an AWS DMS task for the first time.

In a few minutes, the status of the task changes to Load complete, replication ongoing, which means that the full load is complete and the ongoing replication has started. You can go to the S3 bucket created by the stack and you should see a .parquet file under the dmsdata/dev/retail_transactions folder in your S3 bucket.

  1. On the Hardware tab of your EMR cluster, choose the master instance group and note the EC2 instance ID for the master instance.
  2. On the Systems Manager console, choose Session Manager.
  3. Choose Start Session to start a session with the master node of your cluster.

If you face challenges connecting to the master instance of the EMR cluster, see Troubleshooting Session Manager.

  1. Switch the user to Hadoop by running the following command:
    sudo su hadoop

In a real-life use case, the AWS DMS task starts writing incremental files to the same Amazon S3 location when the full load is complete. The way to distinguish full load vs. incremental load files is that the full load files have a name starting with LOAD, whereas CDC filenames have datetimestamps, as you see in a later step. From a processing perspective, we want to process the full load into the Hudi dataset and then start incremental data processing. To do this, we move the full load files to a different S3 folder under the same S3 bucket and process those before we start processing incremental files.

  1. Run the following command on the master node of the EMR cluster (replace <s3-bucket-name> with your actual bucket name):
    aws s3 mv s3://<s3-bucket-name>/dmsdata/dev/retail_transactions/ s3://<s3-bucket-name>/dmsdata/data-full/dev/retail_transactions/  --exclude "*" --include "LOAD*.parquet" --recursive

With the full table dump available in the data-full folder, we now use the HoodieDeltaStreamer utility on the EMR cluster to populate the Hudi dataset on Amazon S3.

  1. Run the following command to populate the Hudi dataset to the hudi folder in the same S3 bucket (replace <s3-bucket-name> with the name of the S3 bucket created by the CloudFormation stack):
    spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
      --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
      --master yarn --deploy-mode cluster \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
      --table-type COPY_ON_WRITE \
      --source-ordering-field dms_received_ts \
      --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-full.properties \
      --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
      --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \
      --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
        --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
      --enable-hive-sync
    

The preceding command runs a Spark job that runs the HoodieDeltaStreamer utility. For more information about the parameters used in this command, see Writing Hudi Tables.

When the Spark job is complete, you can navigate to the AWS Glue console and find a table called retail_transactions created under the hudiblogdb database. The input format for the table is org.apache.hudi.hadoop.HoodieParquetInputFormat.

Next, we query the data and look at the data in the retail_transactions table in the catalog.

  1. In the Systems Manager session established earlier, run the following command (make sure that you have completed all the prerequisites for the post, including adding IAMAllowedPrincipals as a database creator in Lake Formation):
    spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" \
    --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
    --jars /usr/lib/hudi/hudi-spark-bundle_2.11-0.5.2-incubating.jar,/usr/lib/spark/external/lib/spark-avro.jar
    

  2. Run the following query on the retail_transactions table:
    spark.sql("Select * from hudiblogdb.retail_transactions order by tran_id").show()

You should see the same data in the table as the MySQL database with a few columns added by the HoodieDeltaStreamer process.

We now run some DML statements on our MySQL database and take these changes through to the Hudi dataset.

  1. Run the following DML statements on the MySQL database:
    insert into dev.retail_transactions values(15,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
    update dev.retail_transactions set store_city='SPRINGFIELD' where tran_id=12;
    delete from dev.retail_transactions where tran_id=2;

In a few minutes, you see a new .parquet file created under dmsdata/dev/retail_transactions folder in the S3 bucket.

  1. Run the following command on the EMR cluster to get the incremental data to the Hudi dataset (replace <s3-bucket-name> with the name of the S3 bucket created by the CloudFormation template):
    spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
      --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
      --master yarn --deploy-mode cluster \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
      --table-type COPY_ON_WRITE \
      --source-ordering-field dms_received_ts \
      --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-incremental.properties \
      --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
      --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \
      --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
        --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
    --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
      --enable-hive-sync \
    --checkpoint 0

The key difference between this command and the previous one is in the properties file that was used as an argument to the –-props and --checkpoint parameters. For the earlier command that performed the full load, we used dfs-source-retail-transactions-full.properties; for the incremental one, we used dfs-source-retail-transactions-incremental.properties. The differences between these two property files are:

  • The location of source data changes between full and incremental data in Amazon S3.
  • The SQL transformer query included a hard-coded Op field for the full load task, because an AWS DMS first-time full load doesn’t include the Op field for Parquet datasets. The Op field can have values of I, U, and D—for Insert, Update and Delete indicators.

We cover the details of the --checkpoint parameter in the Considerations when deploying to production section later in this post.

  1. When the job is complete, run the same query in spark-shell.

You should see these updates applied to the Hudi dataset.

You can use the Hudi CLI to administer Hudi datasets to view information about commits, the filesystem, statistics, and more.

  1. To do this, in the Systems Manager session, run the following command:
    /usr/lib/hudi/cli/bin/hudi-cli.sh

  2. Inside the Hudi-cli, run the following command (replace the <s3-bucket-name> with the S3 bucket created by the Cloud Formation stack):
    connect --path s3://<s3-bucket-name>/hudi/retail_transactions

  3. To inspect commits on your Hudi dataset, run the following command:
    commits show

You can also query incremental data from the Hudi dataset. This is particularly useful when you want to take incremental data for downstream processing like aggregations. Hudi provides multiple ways of pulling data incrementally which is documented here. An example of how to use this feature is available in the Hudi Quick Start Guide.

Considerations when deploying to production

The preceding setup showed an example of how to build a CDC pipeline from your relational database to your Amazon S3-based data lake. However, if you want to use this solution for production, you should consider the following:

  • To ensure high availability, you can set up the AWS DMS instance in a Multi-AZ configuration.
  • The CloudFormation stack deployed the required properties files needed by the deltastreamer utility into the S3 bucket at s3://<s3-bucket-name>/properties/. You may need to customize these based on your requirements. For more information, see Configurations. There are a few parameters that may need your attention:
    • deltastreamer.transformer.sql – This property exposes an extremely powerful feature of the deltastreamer utility: it enables you to transform data on the fly as it’s being ingested and persisted in the Hudi dataset. In this post, we have shown a basic transformation that casts the tran_date column to a string, but you can apply any transformation as part of this query.
    • parquet.small.file.limit – This field is in bytes and a critical storage configuration specifying how Hudi handles small files on Amazon S3. Small files can happen due to the number of records being processed in each insert per partition. Setting this value allows Hudi to continue to treat inserts in a particular partition as updates to the existing files, causing files that are up to the size of this small.file.limit to be rewritten and keep growing in size.
    • parquet.max.file.size – This is the max file size of a single Parquet in your Hudi dataset, after which a new file is created to store more data. For Amazon S3 storage and data querying needs, we can keep this around 256 MB–1 GB (256x1024x1024 = 268435456).
    • [Insert|Upsert|bulkinsert].shuffle.parallelism – In this post, we dealt with a small dataset of few records only. However, in real-life situations, you might want to bring in hundreds of millions of records in the first load, and then incremental CDC can potentially be in millions per day. There is a very important parameter to set when you want quite predictable control on the number of files in each of your Hudi dataset partitions. This is also needed to ensure you don’t hit an Apache Spark limit of 2 GB for data shuffle blocks when processing large amounts of data. For example, if you plan to load 200 GBs of data in first load and want to keep file sizes of approximately 256 MB, set the shuffle parallelism parameters for this dataset as 800 (200×1024/256). For more information, see Tuning Guide.
  • In the incremental load deltastreamer command, we used an additional parameter: --checkpoint 0. When deltastreamer writes a Hudi dataset, it persists checkpoint information in the .commit files under the .hoodie folder. It uses this information in subsequent runs and only reads that data from Amazon S3, which is created after this checkpoint time. In a production scenario, after you start the AWS DMS task, the task keeps writing incremental data to the target S3 folder as soon as the full load is complete. In the steps that we followed, we ran a command on the EMR cluster to manually move the full load files to another folder and process the data from there. When we did that, the timestamp associated with the S3 objects changes to the most current timestamp. If we run the incremental load without the checkpoint argument, deltastreamer doesn’t pick up any incremental data written to Amazon S3 before we manually moved the full load files. To make sure that all incremental data is processed by deltastreamer the first time, set the checkpoint to 0, which makes it process all incremental data in the folder. However, only use this parameter for the first incremental load and let deltastreamer use its own checkpointing methodology from that point onwards.
  • For this post, we ran the spark-submit command manually. However, in production, you can run it as a step on the EMR cluster.
  • You can either schedule the incremental data load command to run at a regular interval using a scheduling or orchestration tool, or run it in a continuous fashion at a certain frequency by passing additional parameters to the spark-submit command --min-sync-interval-seconds XX –continuous, where XX is the number of seconds between each run of the data pull. For example, if you want to run the processing every 5 minutes, replace XX with 300.

Cleaning up

When you are done exploring the solution, complete the following steps to clean up the resources deployed by CloudFormation:

  1. Empty the S3 bucket created by the CloudFormation stack
  2. Delete any Amazon EMR log files generated under s3://<EMR-Logs-S3-Bucket> /HudiBlogEMRLogs/.
  3. Stop the AWS DMS task Hudiblogload.
  4. Delete the CloudFormation stack.
  5. Delete any Amazon RDS for MySQL database snapshots retained after the CloudFormation template is deleted.

Conclusion

More and more data lakes are being built on Amazon S3, and these data lakes often need to be hydrated with change data from transactional systems. Handling deletes and upserts of data into the data lake using traditional methods involves a lot of heavy lifting. In this post, we saw how to easily build a solution with AWS DMS and HoodieDeltaStreamer on Amazon EMR. We also looked at how to perform lightweight record-level transformations when integrating data into the data lake, and how to use this data for downstream processes like aggregations. We also discussed the important settings and command line options that were used and how you could modify them to suit your requirements.


About the Authors

Ninad Phatak is a Senior Analytics Specialist Solutions Architect with Amazon Internet Services Private Limited. He specializes in data engineering and datawarehousing technologies and helps customers architect their analytics use cases and platforms on AWS.

 

 

 

Raghu Dubey is a Senior Analytics Specialist Solutions Architect with Amazon Internet Services Private Limited. He specializes in Big Data Analytics, Data warehousing and BI and helps customers build scalable data analytics platforms.

 

 

 

 

Automating EMR workloads using AWS Step Functions

Post Syndicated from Afsar Jahangir original https://aws.amazon.com/blogs/big-data/automating-emr-workloads-using-aws-step-functions/

Amazon EMR allows you to process vast amounts of data quickly and cost-effectively at scale. Using open-source tools such as Apache Spark, Apache Hive, and Presto, and coupled with the scalable storage of Amazon Simple Storage Service (Amazon S3), Amazon EMR gives analytical teams the engines and elasticity to run petabyte-scale analysis for a fraction of the cost of traditional on-premises clusters. Developers and analysts can use Jupyter-based Amazon EMR notebooks for iterative development, collaboration, and access to data stored across AWS data products.

What happens if you have Amazon EMR code that needs to run automatically on a regular basis? Maybe the job only runs when for certain events, like new data arriving in Amazon S3. Or maybe you want to run a job every Friday afternoon at 2:00 PM. What if there is a multiple step process?

To run Amazon EMR workloads on a schedule, you can automate everything with AWS Step Functions. This post walks through how to use Step Functions state machines and the callback pattern to automate EMR jobs. You can download the code examples from the GitHub repo.

Prerequisites

To follow along with this walkthrough, you must have the following:

Solution overview

For this use case, I want to run two applications on my EMR cluster. The start of the second application depends on the successful completion and output of the first. At a high level, I want to launch an EMR cluster automatically, run the code, and remove the cluster. Specifically, when the first program successfully completes, I want to run the second program.

At the conclusion of the second application, in some cases I may want to run both programs multiple times (with different dataset sizes, perhaps). I need a way to decide to run the process again with the same cluster. Whether the steps succeed or fail, at the conclusion, I always want to delete the CloudFormation stack that contains my EMR cluster to reduce cost. The following diagram illustrates this high-level overview of the pipeline operation.

Workflow details

I run two programs, and I need the first program to complete before running the second one. I optionally want to repeat those two programs with different datasets to get the final state of the data. To orchestrate the jobs, I can run through the same steps multiple times with the same active EMR cluster.

To facilitate automating the pipeline, I use an inner state machine to check the cluster status and submit EMR job steps. I then wrap that inner state machine in an outer state machine. The outer state machine starts the cluster and submits information to the inner state machine. It waits for all steps to complete, then deletes the EMR cluster.

The following flow chart illustrates the steps and checkpoints that make up the pipeline.

Deploying the pipeline state machines

To simplify pipeline deployment, I use AWS SAM, an open-source framework for building serverless applications. AWS SAM provides a single deployment configuration, extensions to CloudFormation templates, built-in best practices, and local debugging and testing. You can use AWS SAM with a suite of AWS tools for building serverless applications. For more information, see What Is the AWS Serverless Application Model (AWS SAM)?

Initiating the application

Navigate to the path where you want to download the files and initiate the AWS SAM application. I want to run the code from my local machine and have created the following location:

~/Documents/projects/blog/steppipeline/automation-ml-step-data-pipeline

From this directory, I initialize the application using sam init. This connects to the repository and downloads the files for creation of the ML pipeline. See the following code:

sam init -l https://github.com/aws-samples/automation-ml-step-data-pipeline.git

Creating the S3 bucket and moving dependencies

For this post, I orchestrate an existing process from the post Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR, which runs on Amazon EMR. The pipeline reads code artifacts from Amazon S3, where the EMR cluster has read permission. There are two programs: kmeansandey.py and kmeanswsssey.py.

First, create the bucket from the command line using the aws s3 mb command and upload the code. Your bucket name must be globally unique:

aws s3 mb s3://<your bucket name>

Move the artifacts to your bucket, replacing <your bucket name> with your bucket name:

aws s3 cp sample_ml_code/kmeansandey.py s3://<your bucket name>/testcode/kmeansandey.py
aws s3 cp sample_ml_code/kmeanswsssey.py s3://<your bucket name>/testcode/kmeanswsssey.py
aws s3 cp emr/bootstrapactions.sh s3://<your bucket name>/emr-bootstrap-scripts/bootstrapactions.sh
aws s3 cp emr/emr-cluster-config.json s3://<your bucket name>/emr-cluster-config.json
aws s3 cp emr/emr-cluster-sample.yaml s3://<your bucket name>/emr-cluster-sample.yaml

Deploying the application

Deploy the build artifacts to the AWS Cloud using the following code:

sam deploy --guided

AWS SAM prompts you for the parameters that you need to build and deploy the application. I have provided some default values where possible.

The final output of your deployment process should indicate that all stacks were built:

Successfully created/updated stack - step-pipeline in us-east-1

After deployment, you receive an email to confirm your subscription. Choose the confirmation link in the email to receive pipeline notifications.

Submitting a workload to your Step Functions state machine

To create a cluster and submit EMR jobs, the outer state machine needs a JSON payload. This contains the location of the programs in Amazon S3, the Amazon EMR CloudFormation template, and the parameter files used to launch the EMR cluster.

Creating an Amazon EC2 key pair

To use the same sample programs and EMR cluster template that you used to test your pipeline, you need to use an Amazon EC2 key pair for SSH credentials. When you create a cluster, you can specify the Amazon Elastic Compute Cloud (Amazon EC2) key pair to use for SSH connections to all cluster instances. The name of the keypair for this cluster is referenced in the emr-cluster-config.json file. See the following code:

<snip>
  {
    "ParameterKey": "Keyname",
    "ParameterValue": "emrcluster-launch"
  },
</snip>

To use the example as-is with the parameters unchanged, create an Amazon EC2 key pair on the AWS Management Console or AWS Command Line Interface (AWS CLI).

  1. On the Amazon EC2 console, under Network & Security, choose Key Pairs.
  2. On the Key Pairs page, choose Create Key Pair.
  3. For Key pair name, enter emrcluster-launch.
  4. Choose Create.
  5. When the console prompts you to save the private key file, save it in a safe place.

This is the only chance for you to save the private key file.

Inputting JSON for launching the pipeline

The simplest way for you to run the pipeline is to use the Start execution feature on the Step Functions console. The console gives you full functionality to initiate the function and submit a payload. In the example test_input.json, update the bucket values, security group, and subnet with the information for your account:

{  
    "ModelName": "PipelineTest_01",  
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "EMRCloudFormation": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-sample.yaml",  
    "EMRParameters": "https://s3.amazonaws.com/<your bucket name>/emr-cluster-config.json",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "SecurityGroup": "<your security group>",  
    "SubNet": "<your subnet>",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"]
}

The payload includes the following information:

  • ModelName – A short descriptive identifier used to identify the transient EMR cluster created during this process. This name shows on the Amazon EMR console for easy identification.
  • ModelProgram – The Amazon S3 URL location of the program that runs when the model initiates on the EMR cluster (step 3).
  • PreProcessingProgram – The Amazon S3 URL location of the program that runs when preprocessing initiates on the EMR cluster (step 2).
  • EMRCloudFormation – The S3 bucket HTTPS location of the CloudFormation template for launching the transient EMR cluster.
  • EMRParameters – The Amazon S3 HTTPS location of the parameter file supporting the Amazon EMR CloudFormation template.
  • JobInput – The Amazon S3 URL location of the input data for the preprocessing program.
  • SecurityGroup – The security group with ingress and egress rules for the launched EMR cluster
  • SubNet – The subnet identifier where you place your EMR cluster.
  • ClusterSize – Denotes the number of EMR cluster nodes to run the job and can be changed based on the compute need. I use 4 nodes as the input value for the sample program.
  • ProcessingMode – This is an array of values. The pipeline runs steps 2 and 3 for each value in the array. The value is passed into the program unchanged and can be used to internally control how the program runs. For this use case, it runs a single time on the small dataset.

Opening the Step Functions Executions page

On the Step Functions console, choose MLStateMachine. This is the outer state machine. On the detail page for the outer state machine, choose Start execution.

Entering your payload

On the New execution page, enter the JSON for your pipeline based on the example test_input.json. Choose Start execution.

Reviewing the workflow as it runs

You can see the pipeline running in the visual workflow and review the event history on the detail page. The following diagram shows the state machine definition used:

Diving into the pipelines details

There are four processes that run in the outer state machine pipeline:

  1. Step 1 launches an EMR cluster using the CloudFormation template. The AWS Lambda function downloads the template and parameter file from the specified Amazon S3 location and initiates the stack build.
  2. When the EMR cluster is ready, step 2 initiates the first set of code against the newly created EMR cluster, passing in the remaining parameters to the inner state machine. It adds the stack id, EMR cluster id, and status to the payload. These values are obtained from the output of the CloudFormation stack. See the following code:
    "ModelName": "PipelineTest_01",  
    "PreProcessingProgram": "s3://<your bucket name>/testcode/kmeanswsssey.py",  
    "JobInput": "s3://aws-bigdata-blog/artifacts/anomaly-detection-using-pyspark/sensorinputsmall/",  
    "ClusterSize": "4",  
    "ProcessingMode": ["TRAINING"],
    "StackId": "arn:aws:cloudformation:us-east-1:575444587305:stack/PipelineTest01-auto-emr-02142020041612/bc5fd7a0-4ee0-11ea-a395-0e4c53e0aefd",
    "Status": "CREATE_COMPLETE",
    "ClusterId": "j-MF6LWBLJZ88K"

The code contains the following information:

  • ModelName is used in the EMR cluster name to make it easier to identify in the console and AWS CLI output.
  • PreProcessingProgram in our use case points to the first code step (py). The code is passed through the first state machine and submitted to the second state machine and Amazon EMR.
  • JobInput, ClusterSize, ClusterId, StackId, and ProcessingMode are passthrough values that the program needs to run.

The step initiates the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx, which engages the inner state machine asynchronously to run a high-level process of checking the cluster, adding a step, checking to see if the step is complete, and exiting back to the outer state machine when complete.

  1. Next, the outer state machine runs the Model program code (step 3) by submitting it to the Lambda function awsblog-testproject-inner-sm-AddStepLambda-x45123xxxxxx to engage the inner state machine for the second set of code (py). The process is the same as step 2 but the code it runs is from a different file and the output from the preprocessing step becomes the input for the step. See the following code:
    "ModelProgram": "s3://<your bucket name>/testcode/kmeansandey.py",

When the inner state machine is complete, it moves to a step that removes the first value from the ProcessingMode array. For this use case, there is only one value (TRAINING), which is removed, leaving the array empty. The next step in the state machine looks for remaining values; if there are none, it marks all steps as complete and moves to Delete EMR cluster.

  1. The final step in the outer state machine is to remove the EMR cluster. The Delete EMR cluster step passes the CloudFormation stack ID into lambda/delete_cfn_stack.py, initiating the deletion of the stack and cleaning up all the resources.

The output of the test programs is stored in Amazon S3 in two folders under the pipeline artifacts. The preprocessing folder contains data that is used to drive the output in the model folder. The following screenshot shows the folders in Amazon S3.

Conclusion

The Step Functions workflow in this post is a repeatable two-step pipeline. It starts an EMR cluster, runs a program that outputs data, and initiates a second program that depends on the previous job finishing. It then deletes all resources automatically.

You can adapt the workflow to respond to Amazon S3 events, a message received in a queue, a file checked into a code repository, or a schedule. Any event that can invoke Lambda can initiate the pipeline. For more information, see Invoking AWS Lambda functions.

You can download the example code from the GitHub repo and adapt it for your use case. Let me know in the comments what you built for your environment.


About the Authors

Mohammed “Afsar ” Jahangir Ali is a Senior Big Data Consultant with Amazon since January 2018. He is a data enthusiast helping customers shape their data lakes and analytic journeys on AWS.In his spare time, he enjoys taking pictures, listening to music, and spend time with family.

 

 

 

Wendy Neu has worked as a Data Architect with Amazon since January 2015. Prior to joining Amazon, she worked as a consultant in Cincinnati, OH helping customers integrate and manage their data from different unrelated data sources.

 

 

 

Implementing LDAP authentication for Hive on a multi-tenant Amazon EMR cluster

Post Syndicated from Kiran Erra original https://aws.amazon.com/blogs/big-data/implementing-ldap-authentication-for-hive-on-a-multi-tenant-amazon-emr-cluster/

As Amazon EMR continues its widespread adoption, it’s important to enforce separation of duties using role-based access when submitting your hive jobs on EMR clusters in multi-tenant environments. In this post, we walk through the steps to set up authentication for Hive using Lightweight Directory Access Protocol (LDAP) and Microsoft Active Directory Domain Controller.

Solution overview

In a multi-tenant environment, it’s critical to enforce role-based access when submitting Hive jobs to an EMR cluster. Although you may add Hive steps to an existing cluster, such a setup doesn’t enforce role-based access, because Amazon EMR steps are always submitted using the default Hive user. The default way of submitting a Hive job to an EMR cluster is by using the Add Step functionality. This post outlines the process by which you can enforce EMRFS role mappings when an active directory user submits a Hive job after authenticating via LDAP and Microsoft Active Directory Domain Controller. The following diagram illustrates the provisioned infrastructure from AWS CloudFormation.

The following AWS services are used as part of the recommended solution:

  • AWS Secrets ManagerAWS Secrets Manager helps you protect secrets needed to access your applications, services, and IT resources. The service enables you to easily rotate, manage, and retrieve database credentials, API keys, and other secrets throughout their lifecycle.
  • Amazon EMR – Amazon EMR makes it easy to process large amounts of data efficiently. Amazon EMR uses Hadoop processing combined with several AWS products to do tasks such as web indexing, data mining, log file analysis, machine learning, scientific simulation, and data warehousing.
  • Amazon EC2Amazon Elastic Compute Cloud (Amazon EC2) provides secure, resizable compute capacity in the cloud. It’s designed to make web-scale cloud computing easier for developers.

In our solution (as we discuss it in this post), the corporate user base is maintained in the Microsoft Active Directory Domain Controller. The EMR cluster is integrated with AD using a bootstrap action so that you can securely submit Hive jobs using a beeline by establishing an LDAP connection from an edge node (represented by an EC2 instance). The user credentials are stored in and fetched from Secrets Manager, when establishing the beeline connection.

Prerequisites

Before getting started, you must have the following prerequisites:

  • Microsoft Active Directory Domain Controller needs to be installed and set up. For a quick setup of Microsoft Active Directory Domain Controller and VPC, see the step Launch and configure an Active Directory domain controller in the Deploying each component individually section of the post Implement perimeter security in Amazon EMR using Apache Knox.
  • A valid AWS account with access to AWS services.
  • An Amazon VPC with a public subnet.
  • An AWS Identity and Access Management (IAM) policy for Secrets Manager permissions.

Implementing the solution

We provide the CloudFormation template in this post as a general guide. Please review and customize it as needed. You should also be aware that some of the resources deployed by this stack incur costs when they remain in use. The CloudFormation template has the following steps:

  1. Start an EMR cluster with the configuration from the parameters.
  2. Integrate the EMR cluster with AD using a bootstrap action.
  3. Create and launch an EC2 instance to test the integration.
  4. Add an inbound rule to the Amazon EMR primary additional security group to allow port 10000 on the newly launched EC2 instance.

This section describes how to use the Cloud Formation templates to launch an EMR cluster with the following parameters:

ParameterDefault ValueDescription
ClusterNameemr-ldap4hiveThe name of the cluster.
CoreInstanceTypem4.xlargeThe instance type of the nodes.
CoreNodeCount2The number of nodes in the cluster.
CreateLogBucketFALSEA Boolean flag to see if we need to set up a bucket for logs.
KeyPairKey pair used to log in to the EC2 instance for validation.
MasterInstanceTypem4.xlargeThe instance type of the nodes.
ReleaseLabelemr-6.0.0Amazon EMR version. This template is tested with emr-6.0.0 or emr-5.29.0.
RemoteAccessCIDRThe CIDR range to access Amazon EMR. This is usually the same as the IP address of the local machine.
VPCIDVPC ID used in Amazon EMR configuration. Make sure you select a public VPC.
SubnetIdSubnet ID used in Amazon EMR configuration. Make sure you select the subnet that belongs to the VPC selected.
ldapurlThe LDAP URL of the AD domain controller, in the format ldap://<Private IP of AD domain controller>:389. Please refer to the first item in the Prerequisites section.
passwd4awsadmin[email protected]The AD admin password. Must be at least eight characters containing letters, numbers, and symbols.
EC2 AMIami-0ac80df6eff0e70b5The AMI used to create the EC2 instance for validation.
My IPThe IP address of the local machine.

The following screenshot shows the Specify stack details page when launching your template.

A bootstrap script ldap-bootstrap.sh is invoked during the cluster creation to perform the following actions:

  • Fetch the login credentials for the Active Directory domain admin from Secrets Manager
  • Perform the realm join using the credentials fetched
  • Enable password-based authentication to the cluster

To deploy the template into your account, choose Launch Stack:

The following screenshot shows the EMR cluster the Cloud Formation stack created.

Validating the solution

To validate the solution, SSH to the Ubuntu EC2 instance using the EC2 key pair, as shown in the following screenshot. Refer to the Outputs tab from your AWS CloudFormation stack.

For this post, we used the Ubuntu Server 18.04 LTS (HVM), SSD Volume Type – ami-07ebfd5b3428b6f4d (64-bit x86) / ami-0400a1104d5b9caa1 (64-bit Arm) AMI.

You should see the Python Hive beeline script in /home/ubuntu:

Run demo-hive-beeline.py as shown in the following screenshot. This Python script fetches the AD credentials from Secrets Manager, establishes a beeline connection for Hive on Amazon EMR, submits Hive commands to create an external table for the NYC taxi dataset located in your Amazon Simple Storage Service (Amazon S3) bucket, and runs a sample select statement on the table.

The script has the following parameters:

  • -r or –region_name – AWS Region
  • -s or –secret-id – Secret ARN
  • -h or –host-name – Amazon EMR public DNS address

Cleaning up

Delete the CloudFormation stack to clean up all the resources created in this post. Also, stop the EC2 Ubuntu instance that you created in the verification step. If you used the nested stack, AWS CloudFormation deletes all resources in one operation. If you deployed the templates individually, delete them in the reverse order of creation, deleting the VPC stack last.

Conclusion

In this post, we went through the setup and validation of LDAP authentication for Hive using an EMR cluster. This decouples the authentication mechanism from Hive and Amazon EMR and leverages the system of record using LDAP and Active Directory Domain Controller.


About the authors

Kiran Erra is a data architect with AWS. He works with AWS customers to provide guidance and technical assistance about Big Data, AI/ML and Security projects, helping them improve the value of their solutions when using AWS.

 

 

 

Rajarao Vijjapu is a security data architect with AWS. He works with AWS customers and partners to provide guidance and technical assistance about Big Data, Analytics, AI/ML and Security projects, helping them improve the value of their solutions when using AWS.

 

 

 

Amazon EMR supports Apache Hive ACID transactions

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/amazon-emr-supports-apache-hive-acid-transactions/

Apache Hive is an open-source data warehouse package that runs on top of an Apache Hadoop cluster. You can use Hive for batch processing and large-scale data analysis. Hive uses Hive Query Language (HiveQL), which is similar to SQL.

ACID (atomicity, consistency, isolation, and durability) properties make sure that the transactions in a database are atomic, consistent, isolated, and reliable.

Amazon EMR 6.1.0 adds support for Hive ACID transactions so it complies with the ACID properties of a database. With this feature, you can run INSERT, UPDATE, DELETE, and MERGE operations in Hive managed tables with data in Amazon Simple Storage Service (Amazon S3). This is a key feature for use cases like streaming ingestion, data restatement, bulk updates using MERGE, and slowly changing dimensions.

This post demonstrates how to enable Hive ACID transactions in Amazon EMR, how to create a Hive transactional table, how it can achieve atomic and isolated operations, and the concepts, best practices, and limitations of using Hive ACID in Amazon EMR.

Enabling Hive ACID in Amazon EMR

To enable Hive ACID as the default for all Hive managed tables in an EMR 6.1.0 cluster, use the following hive-site configuration:

[
   {
      "classification": "hive-site",
      "properties": {
         "hive.support.concurrency": "true",
         "hive.exec.dynamic.partition.mode": "nonstrict",
         "hive.txn.manager": "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"
      }
   }
]

For the complete list of configuration parameters related to Hive ACID and descriptions of the preceding parameters, see Hive Transactions.

Hive ACID use case

In this section, we explain the Hive ACID transactions with a straightforward use case in Amazon EMR.

Enter the following Hive command in the master node of an EMR cluster (6.1.0 release) and replace <s3-bucket-name> with the bucket name in your account:

hive --hivevar location=<s3-bucket-name> -f s3://aws-bigdata-blog/artifacts/hive-acid-blog/hive_acid_example.hql 

After Hive ACID is enabled on an Amazon EMR cluster, you can run the CREATE TABLE DDLs for Hive transaction tables.

To define a Hive table as transactional, set the table property transactional=true.

The following CREATE TABLE DDL is used in the script that creates a Hive transaction table acid_tbl:

CREATE TABLE acid_tbl (key INT, value STRING, action STRING)
PARTITIONED BY (trans_date DATE)
CLUSTERED BY (key) INTO 3 BUCKETS
STORED AS ORC
LOCATION 's3://${hivevar:location}/acid_tbl' 
TBLPROPERTIES ('transactional'='true');

This script generates three partitions in the provided Amazon S3 path. See the following screenshot.

The first partition, trans_date=2020-08-01, has the data generated as a result of sample INSERT, UPDATE, DELETE, and MERGE statements. We use the second and third partitions when explaining minor and major compactions later in this post.

ACID is achieved in Apache Hive using three types of files: base, delta, and delete_delta. Edits are written in delta and delete_delta files.

The base file is created by the Insert Overwrite Table query or as the result of major compaction over a partition, where all the files are consolidated into a single base_<write id> file, where the write ID is allocated by the Hive transaction manager for every write. This helps achieve isolation of Hive write queries and enables them to run in parallel.

The INSERT operation creates a new delta_<write id>_<write id> directory.

The DELETE operation creates a new delete_delta_<write id>_<write id> directory.

To support deletes, a unique row__id is added to each row on writes. When a DELETE statement runs, the corresponding row__id gets added to the delete_delta_<write id>_<write id> directory, which should be ignored on reads. See the following screenshot.

The UPDATE operation creates a new delta_<write id>_<write id> directory and a delete<write id>_<write id> directory.

The following screenshot shows the second partition in Amazon S3, trans_date=2020-08-02.

A Hive transaction provides snapshot isolation for reads. When an application or query reads the transaction table, it opens all the files of a partition/bucket and returns the records from the last transaction committed.

Hive compactions

With the previously mentioned logic for Hive writes on a transactional table, many small delta and delete_delta files are created, which could adversely impact read performance over time because each read over a particular partition has to open all the files (including delete_delta) to eliminate the deleted rows.

This brings the need for a compaction logic for Hive transactions. In the following sections, we use the same use case to explain minor and major compactions in Hive.

Minor compaction

A minor compaction merges all the delta and delete_delta files within a partition or bucket to a single delta_<start write id>_<end write id> and delete_delta_<start write id>_<end write id> file.

We can trigger the minor compaction manually for the second partition (trans_date=2020-08-02) in Amazon S3 with the following code:

ALTER TABLE acid_tbl PARTITION (trans_date='2020-08-02') COMPACT 'minor';

If you check the same second partition in Amazon S3, after a minor compaction, it looks like the following screenshot.

You can see all the delta and delete_delta files from write ID 0000005–0000009 merged to single delta and delete_delta files, respectively.

Major compaction

A major compaction merges the base, delta, and delete_delta files within a partition or bucket to a single base_<latest write id>. Here the deleted data gets cleaned.

A major compaction is automatically triggered in the third partition (trans_date='2020-08-03') because the default Amazon EMR compaction threshold is met, as described in the next section. See the following screenshot.

To check the progress of compactions, enter the following command:

hive> show compactions;

The following screenshot shows the output.

Compaction in Amazon EMR

Compaction is enabled by default in Amazon EMR 6.1.0. The following property determines the number of concurrent compaction tasks:

  • hive.compactor.worker.threads – Number of worker threads to run in the instance. The default is 1 or vCores/8, whichever is greater.

Automatic compaction is triggered in Amazon EMR 6.1.0 based on the following configuration parameters:

  • hive.compactor.check.interval – Time period in seconds to check if any partition requires compaction. The default is 300 seconds.
  • hive.compactor.delta.num.threshold – Triggers minor compaction when the total number of delta files is greater than this value. The default is 10.
  • hive.compactor.delta.pct.threshold – Triggers major compaction when the total size of delta files is greater than this percentage size of base file. The default is 0.1, or 10%.

Best practices

The following are some best practices when using this feature:

  • Use an external Hive metastore for Hive ACID tables – Our customers use EMR clusters for compute purposes and Amazon S3 as storage for cost-optimization. With this architecture, you can stop the EMR cluster when the Hive jobs are complete. However, if you use a local Hive metastore, the metadata is lost upon stopping the cluster, and the corresponding data in Amazon S3 becomes unusable. To persist the metastore, we strongly recommend using an external Hive metastore like an Amazon RDS for MySQL instance or Amazon Aurora. Also, if you need multiple EMR clusters running ACID transactions (read or write) on the same Hive table, you need to use an external Hive metastore.
  • Use ORC format – Use ORC format to get full ACID support for INSERT, UPDATE, DELETE, and MERGE statements.
  • Partition your data – This technique helps improve performance for large datasets.
  • Enable an EMRFS consistent view if using Amazon S3 as storage – Because you have frequent movement of files in Amazon S3, we recommend using an EMRFS consistent view to mitigate the issues related to the eventual consistency nature of Amazon S3.
  • Use Hive authorization – Because Hive transactional tables are Hive managed tables, to prevent users from deleting data in Amazon S3, we suggest implementing Hive authorization with required privileges for each user.

Limitations

Keep in mind the following limitations of this feature:

  • The AWS Glue Data Catalog doesn’t support Hive ACID transactions.
  • Hive external tables don’t support Hive ACID transactions.
  • Bucketing is optional in Hive 3, but in Amazon EMR 6.1.0 (as of this writing), if the table is partitioned, it needs to be bucketed. You can mitigate this issue in Amazon EMR 6.1.0 using the following bootstrap action:
    --bootstrap-actions '[{"Path":"s3://aws-bigdata-blog/artifacts/hive-acid-blog/make_bucketing_optional_for_hive_acid_EMR_6_1.sh","Name":"Set bucketing as optional for Hive ACID"}]'

Conclusion

This post introduced the Hive ACID feature in EMR 6.1.0 clusters, explained how it works and its concepts with a straightforward use case, described the default behavior of Hive ACID on Amazon EMR, and offered some best practices. Stay tuned for additional updates on new features and further improvements in Apache Hive on Amazon EMR.


About the Authors

Suthan Phillips is a big data architect at AWS. He works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

 

 

 

 

Chao Gao is a Software Development Engineer at Amazon EMR. He mainly works on Apache Hive project at EMR, and has some in-depth knowledge of distributed database and database internals. In his spare time, he enjoys making roadtrips, visiting all the national parks and traveling around the world.

Build a self-service environment for each line of business using Amazon EMR and AWS Service Catalog

Post Syndicated from Tanzir Musabbir original https://aws.amazon.com/blogs/big-data/build-a-self-service-environment-for-each-line-of-business-using-amazon-emr-and-aws-service-catalog/

Enterprises often want to centralize governance and compliance requirements, and provide a common set of policies on how Amazon EMR instances should be set up. You can use AWS Service Catalog to centrally manage commonly deployed Amazon EMR cluster configurations, and this helps you achieve consistent governance and meet your compliance requirements, while at the same time enabling your end users to quickly deploy only the approved EMR cluster configurations on a self-service basis.

In this post, we will demonstrate how enterprise administrators can use AWS Service Catalog to create and manage catalogs, that data engineers and data scientists use to quickly discover and deploy clusters using a self-service environment. With AWS Service Catalog you can control which EMR release versions are available, cluster configuration, and permission access by individual, group, department, or cost center.

The following are a few key AWS Service Catalog concepts:

  • An AWS Service Catalog product is a blueprint for building the AWS resources that you want available for deployment. You create your products by importing AWS CloudFormation templates.
  • A portfolio is a collection of products. With AWS Service Catalog, you can create a customized portfolio for each type of user in your organization and selectively grant access to the appropriate portfolio.
  • A provisioned product is a collection of resources that result from instantiating an AWS CloudFormation

Use cases

You can use AWS Service Catalog to provide Amazon EMR as a self-serve Extract, Transform, Load (ETL) platform at scale while hiding all the security and network configurations from end users.

As an administrator in AWS Service Catalog, you can create one or more Service Catalog products that define different configurations to be used for EMR clusters. In those Service Catalog products, you can define the security and network configurations to be used for the EMR cluster, you can define auto-scaling rules, instance configurations, different purchase options, or you can preconfigure EMR to run different EMR Step jobs. On the other hand, as a user in Service Catalog, you can browse through different EMR templates through Service Catalog products and provision the product based on your requirement. By following this approach, you can make your EMR usage self-serviceable, reduce the EMR learning curve for your users, and ensure adherence to security standards and best practices.

The following image illustrates how the interactions look between Amazon EMR administrators and end-users when using AWS Service Catalog to provision EMR clusters.

The use cases in this post have three AWS Identity and Access Management (IAM) users with different access permissions:

  • emr-admin: This user is the administrator and has access to all the resources. This user creates EMR clusters for their end-users based on their requirements.
  • emr-data-engineer: The data engineer uses Spark and Hive most of the time. They run different ETL scripts on Hive and Spark to process, transform, and enrich their datasets.
  • emr-data-analyst: This user is very familiar with SQL and mostly uses Hue to submit queries to Hive.

You can solve several Amazon EMR operational use cases using AWS Service Catalog. The following sections discuss three different use cases. Later in this post, you walk through each of the use cases with a solution.

Use case 1: Ensuring least privilege and appropriate access

The administrator wants to enforce a few organizational standards. The first one is no default EMR_EC2_ROLE for any EMR cluster. Instead, the administrator wants to have a role that has limited access to Amazon Simple Storage Service (Amazon S3) and assigns that role automatically every time an EMR cluster is launched. Second, end-users sometimes forget to add appropriate tags to their resources. Because of that, often times it is hard for the administrator to identify their resources and allocate cost appropriately. So, the administrator wants to have a mechanism that assigns tags to EMR clusters automatically when they launch.

Use case 2: Providing Amazon EMR as a self-serve ETL platform with Spark and Hive

Data engineers use Spark and Hive applications, and they prefer to have a platform where they just submit their jobs without spending time creating the cluster. They also want to try out different Amazon EMR versions to see how their jobs run on different Spark or Hive versions. They don’t want to spend time learning AWS or Amazon EMR. Additionally, the administrator doesn’t want to give full Amazon EMR access to all users.

Use case 3: Automatically scaling the Hive cluster for analysts

Data analysts have strong SQL backgrounds, so they typically use Hue to submit their Hive queries. They run queries against a large dataset, so they want to have an EMR cluster that can scale when needed. They also don’t have access to the Amazon EMR console and don’t know how to configure automatic scaling for Amazon EMR.

Solution overview

Service Catalog, self-serve your Amazon EMR users, enforce best practices and compliance, and speed up the adoption process.

At a high level, the solution includes the following steps:

  1. Configuring the AWS environment to run this solution.
  2. Creating a CloudFormation template.
  3. Setting up AWS Service Catalog products and portfolios.
  4. Managing access to AWS Service Catalog and provisioning products.
  5. Demonstrating the self-service Amazon EMR platform for users.
  6. Enforcing best practices and compliance through AWS Service Catalog.
  7. Executing ETL workloads on Amazon EMR using AWS Service Catalog.
  8. Optionally, setting up AWS Service Catalog and launching Amazon EMR products through the AWS Command Line Interface (AWS CLI).

The following section looks at the CloudFormation template, which you use to set up the AWS environment to run this solution.

Setting up the AWS environment

To set up this solution, you need to create a few AWS resources. The CloudFormation template provided in this post creates all the required AWS resources. This template requires you to pass the following parameters during the launch:

  • A password for your test users.
  • An Amazon Compute Cloud (Amazon EC2) key pair.
  • The latest AMI ID for the EC2 helper instance. This instance configures the environment and sets up the required files and templates for this solution.

This template is designed only to show how you can use Amazon EMR with AWS Service Catalog. This setup isn’t intended for production use without modification.

To launch the CloudFormation stack, choose Launch Stack:

Launching this stack creates several AWS resources. The following resources shown in the AWS CloudFormation output are the ones you need in the next step:

KeyDescription
ConsoleLoginURLURL you use to switch between multiple users
EMRSCBlogBucketName of the S3 bucket to store blog-related files
UserPasswordPassword to use for all the test users
DataAdminUsernameIAM user name for the administrator user
DataEngineerUsernameIAM user name for the data engineer user
DataAnalystUsernameIAM user name for the data analyst user
HiveScriptURLAmazon S3 path for the Hive script
HiveETLInputParameterPath for the Hive input parameter
HiveETLOutputParameterPath for the Hive output parameter
SparkScriptURLAmazon S3 path for the Spark script
SparkETLInputParameterPath for the Spark input parameter
SparkETLOutputParameterPath for the Spark output parameter

When the CloudFormation template is complete, record the outputs listed on the Outputs tab on the AWS CloudFormation console. See the following screenshot.

(Optional) Configuring the AWS CLI

The AWS CLI is a unified tool to manage your AWS services. In the optional step, you use the AWS CLI to create AWS Service Catalog products and portfolios. Installation of AWS CLI isn’t required for this solution. For instructions on configuring the AWS CLI in your environment, see Configuring the AWS CLI.

Provisioning EMR clusters through AWS Service Catalog

You can create AWS Service Catalog products from the existing CloudFormation template and use those products to provision a variety of EMR clusters. You can create an EMR cluster and consume the cluster’s services without having access to the cluster, which improves the Amazon EMR adoption process.

The following CloudFormation template creates an EMR cluster. This template takes two parameters:

  • Cluster size – You select how many core nodes you want to have in the EMR cluster
  • Compute type – Based on the compute type you choose; the template selects the respective EC2 instance type

As an account administrator, you can define the internal configuration for the EMR cluster. End users are not required to know all the security groups, subnet ID, key pair, and other information. They also don’t need to access the EMR cluster or spend time setting up your cluster. As an administrator, you define a template for the cluster; enforce all the compliance, versions, applications, automatic scaling rules through the CloudFormation template, and expose this template as a product through AWS Service Catalog.

The following section walks you through the solution for each use case.

Use cases walkthrough

The CloudFormation template already configured AWS Service Catalog portfolios and products. You can review these on the AWS Service Catalog console.

  1. Use the ConsoleLoginURL from the AWS CloudFormation console Outputs tab and sign in as an emr-admin user.
  2. On the AWS Service Catalog console, you can see two portfolios for engineers and analysts. In each of those portfolios, you can see two products.

The Data Analysts Stack contains products for the analyst and is assigned to the user emr-data-analyst. The Data Engineering Stack contains products for engineers and is assigned to the emr-data-engineer user. Upon logging in, they can see their respective products and portfolios.

Use case 1: Ensuring least privilege and appropriate access

The cluster administrator creates the least privilege IAM role for their users and associated that role through the Service Catalog product. Similarly, the administrator also assigns appropriate tags for each product. When data engineers or analysts launch an EMR cluster using any of their assigned products, the cluster has the least privilege access and resources are tagged automatically. To confirm this access is in place, complete the following steps:

  1. Sign in to the AWS Management Console as either emr-data-engineer user or emr-data-analyst user.

Your console looks slightly different because the end-user does not manage the products, they just use the product to launch the clusters or execute jobs on the cluster.

  1. Choose Default EMR and provision this product by choosing Launch Product.
  2. For the name of the provisioned product, enter SampleEMR.

The next screen shows a list of allowed parameters your administrator thinks you may need.

  1. Leave all parameters as default.
  2. For the cluster name, enter Sample EMR.
  3. Review all the information and launch the product.

It takes few minutes to spin up the cluster. When the cluster is ready, the status changes to Succeeded. The provision product page also shows you a list of outputs your product owner wants you to see. For example, using output values, your product owner can share Master DNS Address, Resource Manager URL, and Hue URL as shown in the following figure.

To verify if this launched EMR cluster has the expected IAM role and tags, sign in as emr-admin user and go to the AWS EMR Console to review the service role for EC2 instances and tags.

Use case 2: Providing Amazon EMR as a self-serve ETL platform with Spark and Hive

For this use case, data engineers have two different ETL scripts:

  • A Spark script that reads Amazon reviews stored in Amazon S3 and converts them into Parquet before writing back to Amazon S3
  • A Hive script that reads Amazon reviews data from Amazon S3 and finds out the top toys based on customer ratings.

The administrator creates a product to self-serve these users; the product defines the job type and the job parameters. End users selects the job type and passes script, input and output locations.

  1. Sign in as emr-data-engineer.
  2. Select the EMR ETL Engine product.
  3. Choose Launch.

The next page shows if the product has multiple versions. Because the engineer wants to try out two different Amazon EMR versions, the administrator provided both options through the product version. You can launch the EMR cluster with the required version by selecting your preferred product version.

  1. Enter the name of the product.
  2. For this post, select EMR 5.29.0.
  3. Choose Next.

  1. For JobType, choose Spark.
  2. For JobArtifacts, enter the following value (you can get these values from the AWS CloudFormation output):
s3://blog-emr-sc-<account-id>/scripts/spark_converter.py s3://amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz s3://blog-emr-sc-<account-id>/spark/
  1. Choose Next.

Based on your configuration, an EMR cluster launches. When the cluster is ready, the Spark job runs.

  1. In a different browser, sign in as emr-admin using the ConsoleLoginURL (from the AWS CloudFormation output).

You can see the cluster status, job status, and output path from the Amazon EMR console.

Now, go to Amazon S3 console to check the output path:

The Parquet files are written inside the Spark folder.

  1. To test the Hive job, go back to the first browser where you already signed in as emr-data-engineer.
  2. Choose Provisioned products list.
  3. Choose the product options menu (right-click) and choose Update provisioned product.

  1. On the next page, you can select a different version or the same version.
  2. In the Parameters section, choose Hive.
  3. In the JobArtifacts field, enter the following Hive parameters:
s3://blog-emr-sc-<account-id>/scripts/hive_converter.sql -d INPUT=s3://amazon-reviews-pds/tsv/ -d OUTPUT=s3://blog-emr-sc-<account-id>/hive/
  1. Choose Update.

If you select the same version, AWS Service Catalog compares the old provisioned product with the updated product and only runs the portion that you changed. For this post, I chose the same Amazon EMR version and only updated the job type and parameters. You can see that the same EMR cluster is still there, but on the Steps tab, a new step is executed for Hive.

  1. On the Amazon S3 console using the second browser, verify that a new folder hive is created with data that represents top toys based on Amazon reviews.

To recap, you saw how to use AWS Service Catalog to provide a product to run your ETL jobs. Your data engineers can focus on their ETL scripts and your platform can self-serve them to run their ETL jobs on the EMR cluster.

Use case 3: Automatically scaling the Hive cluster for data analysts

To automatically scale the Hive cluster for data analysts, complete the following steps:

  1. Using the console login URL from the AWS CloudFormation output, and sign in as emr-data-analyst and go to AWS Service Catalog console.

You can see a different set of products for this user.

For this use case, your data analysts want to have an automatically scaling EMR cluster with Hive application. The administrator set up the Auto-scaling EMR product with preconfigured rules.

  1. Choose Auto-scaling EMR.
  2. Enter a provisioned product name.
  3. Select Hive Auto-scaling.
  4. Choose Next.
  5. In the Parameters section, leave the options at their default and enter a cluster name.
  6. Launch the product.

The product owner also provided a client URL (for example, Hue URL) through the product output so business analysts can connect to it.

  1. Sign in as emr-admin and validate if this new cluster is configured with the expected automatic scaling rules.
  2. On the Amazon EMR console, choose the cluster.

You can see the configuration on the Hardware tab.

In this use case, you learned how to use AWS Service Catalog to provide business analyst users a preconfigured, automatically scaled EMR cluster.

(Optional) Setting up AWS Service Catalog for Amazon EMR using AWS CLI

In the previous section, I demonstrated the solution using the AWS Service Catalog console. In the following section, I will show you how you use AWS Service Catalog using the AWS CLI. You can create AWS Service Catalog products and portfolios, assign IAM principals, and launch products.

  1. Create a portfolio named CLI – Stack for the user emr-admin. See the following command:
aws --region us-east-1 servicecatalog create-portfolio --display-name "CLI - Stack" --provider-name "@emr-admin" --description "Sample stack for pre-defined EMR clusters"

You receive a JSON output.

  1. Record the portfolio id port-xxxxxxxx from the output to use later.

The emr-admin user is the provider for this portfolio. The user is created with power user access, so the user can see the full-service catalog console and can manage products and portfolios.

You can associate this portfolio with multiple users. By assigning them to a portfolio, they can use the portfolio, browse through its products, and provision new products. For this use case, you associate a portfolio to emr-admin and the AWS CLI user name (the name of the user that you used to configure your AWS CLI). Make sure to update the portfolio and AWS account ID.

  1. Enter the following code:
aws --region us-east-1 servicecatalog associate-principal-with-portfolio --portfolio-id port-xxxxxxxxxx --principal-type IAM --principal-arn arn:aws:iam::xxxxx:user/emr-admin

aws --region us-east-1 servicecatalog associate-principal-with-portfolio --portfolio-id port-xxxxxxxxxx --principal-type IAM --principal-arn arn:aws:iam::xxxxx:user/<aws-cli-user-name>
  1. To verify the portfolio to the user’s association, enter the following command with the portfolio ID:
aws --region us-east-1 servicecatalog list-principals-for-portfolio --portfolio-id port-xxxxxxxxx

It will list out the associated principals for the above portfolio as shown in this following figure:

The CloudFormation template already copied the Amazon EMR template into your Amazon S3 account at the path s3://blog-emr-sc-<account-id>/products.

  1. To create the product CLI - Sample EMR using that template from Amazon S3, enter the following command:
aws --region us-east-1 servicecatalog create-product --name "CLI - Sample EMR" --owner "@emr-admin" --description "Sample EMR cluster with default" --product-type CLOUD_FORMATION_TEMPLATE --provisioning-artifact-parameters '{"Name": "Initial revision", "Description": "", "Info":{"LoadTemplateFromURL":"https://s3.amazonaws.com/blog-emr-sc-<account-id>/products/sample-cluster.template"},"Type":"CLOUD_FORMATION_TEMPLATE"}'

  1. Record the product ID and provision ID from the JSON output.

You now have a product and a portfolio. A portfolio can have one to many products, and each product can have multiple versions.

  1. To assign the CLI -Sample EMR product to the portfolio you created in Step 1, enter the following command:
aws --region us-east-1 servicecatalog associate-product-with-portfolio --product-id prod-xxxxxx --portfolio-id port-xxxxxx

A launch constraint specifies the IAM role that AWS Service Catalog assumes when an end-user launches a product. With a launch constraint, you can control end-user access to your AWS resources and limit usage.

The CloudFormation template already created the role Blog-SCLaunchRole; create a launch constraint using that IAM role. Use the portfolio and product IDs that you collected from the previous step and your AWS account ID.

  1. To create the launch constraint, enter the following command:
aws --region us-east-1 servicecatalog create-constraint --type LAUNCH --portfolio-id port-xxxxxx --product-id prod-xxxxxx --parameters '{"RoleArn" : "arn:aws:iam::<account-id>:role/Blog-SCLaunchRole"}'

  1. Record the launch constraint ID to use later.

You now have an AWS Service Catalog product that you can use to provision an EMR cluster. The CloudFormation template that you used to create the CLI - Sample EMR product takes three parameters (ClusterName, ComputeRequirements, ClusterSize).

  1. To pass those three parameters as a key value pair, enter the following command (use the product ID and provision ID that you recorded earlier):
aws --region us-east-1 servicecatalog provision-product --product-id prod-xxxxxx --provisioning-artifact-id pa-xxxxx --provisioned-product-name cli-emr --provisioning-parameters Key=ClusterName,Value=cli-emr-cluster Key=ComputeRequirements,Value=CPU Key=ClusterSize,Value=2

  1. Check the provisioned product’s status by using the provisioned product ID:
aws --region us-east-1 servicecatalog describe-provisioned-product --id pp-xxxxx

To recap, in this section you learned how to use AWS Service Catalog CLI to configure AWS Service Catalog products and portfolios, and how to provision an EMR cluster through AWS Service Catalog product.

Cleaning up

To clean up the resources you created, complete the following steps:

  1. Terminate the product that you provisioned in the previous step:
aws --region us-east-1 servicecatalog terminate-provisioned-product --provisioned-product-id pp-xxxxx
  1. Disassociate the product CLI – Sample EMR from the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog disassociate-product-from-portfolio --product-id prod-xxxxx --portfolio-id port-xxxxx
  1. Disassociate IAM principals from the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog disassociate-principal-from-portfolio --portfolio-id port-xxxxx --principal-arn arn:aws:iam::xxxxxx:user/emr-admin

aws --region us-east-1 servicecatalog disassociate-principal-from-portfolio --portfolio-id port-xxxxx --principal-arn arn:aws:iam::xxxxxx:user/<aws-cli-user-name> 
  1. Delete the launch constraint created in the previous step:
aws --region us-east-1 servicecatalog delete-constraint --id cons-xxxxx
  1. Delete the product CLI – Sample EMR:
aws --region us-east-1 servicecatalog delete-product --id prod-xxxxx
  1. Delete the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog delete-portfolio --id port-xxxxx

Cleaning up additional resources

You must also clean up the resources you created with the CloudFormation template.

  1. On the AWS Service Catalog console, choose Provisioned products list.
  2. Terminate each product that you provisioned for these use cases.
  3. Check each of the users and their provisioned products to make sure they’re terminated.
  4. On the Amazon S3 console, empty the bucket blog-emr-sc-<account-id>.
  5. If you are using the AWS CLI, delete the objects in the blog-emr-sc-<account-id> bucket with the following command (make sure you’re running this command on the correct bucket):
aws S3 s3://blog-emr-sc-<account-id> --recursive
  1. If you ran the optional AWS CLI section, make sure you follow the cleanup process mentioned in that section.
  2. On the AWS CloudFormation console or AWS CLI, delete the stack named Blog-EMR-Service-Catalog.

Next steps

To enhance this solution, you can explore the following options:

  • In this post, I enforced resource tagging through AWS CloudFormation. You can also use the AWS Service Catalog TagOptions library to provide a consistent taxonomy and tagging of AWS Service Catalog resources. During a product launch (provisioning), AWS Service Catalog aggregates the associated portfolio and product TagOptions and applies them to the provisioned product.
  • This solution demonstrates the usage of launch constraints and how you can provide limited access to your AWS resources to your users. You can also use template constraints to manage parameters. Template constraints make sure that end-users only have options that you allow them when launching products. This can help you maintain your organization’s compliance requirements.
  • You can integrate AWS Budgets with AWS Service Catalog. By associating AWS Budgets with your products and portfolios, you can track your usage and service costs. You can set a custom budget for each of the portfolios and trigger alerts when your costs exceed your threshold.

Summary

In this post, I showed you how you can simplify your Amazon EMR provisional process using the AWS Service Catalog, how to make Amazon EMR a self-service platform for your end-users, and how you can enforce best practices and compliance to your EMR clusters. You also walked through three different use cases and implemented solutions with AWS Service Catalog. Give this solution a try and share your experience with us!

 


About the Author

Tanzir Musabbir is a Data & Analytics Architect with AWS. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.

Enhancing customer safety by leveraging the scalable, secure, and cost-optimized Toyota Connected Data Lake

Post Syndicated from Sandeep Kulkarni original https://aws.amazon.com/blogs/big-data/enhancing-customer-safety-by-leveraging-the-scalable-secure-and-cost-optimized-toyota-connected-data-lake/

Toyota Motor Corporation (TMC), a global automotive manufacturer, has made “connected cars” a core priority as part of its broader transformation from an auto company to a mobility company. In recent years, TMC and its affiliate technology and big data company, Toyota Connected, have developed an array of new technologies to provide connected services that enhance customer safety and the vehicle ownership experience. Today, Toyota’s connected cars come standard with an on-board Data Communication Module (DCM) that links to a Controller Area Network (CAN). By using this hardware, Toyota provides various connected services to its customers.

Some of the connected services help drivers to safely enjoy their cars. Telemetry data is available from the car 24×7, and Toyota makes the data available to its dealers (when their customers opt-in for data sharing). For instance, a vehicle’s auxiliary battery voltage declines over time. With this data, dealership staff can proactively contact customers to recommend a charge prior to experiencing any issues. This automotive telemetry can also help fleet management companies monitor vehicle diagnostics, perform preventive maintenance and help avoid breakdowns.

There are other services such as usage-based auto insurance that leverage driving behavior data that can help safe drivers receive discounts on their car insurance. Telemetry plays a vital role in understanding driver behavior. If drivers choose to opt-in, a safety score can be generated based on their driving data and drivers can use their smartphones to check their safe driving scores.

A vehicle generates data every second, which can be bundled into larger packets at one-minute intervals. With millions of connected cars that have data points available every second, the incredible scale required to capture and store that data is immense—there are billions of messages daily generating petabytes of data. To make this vision a reality, Toyota Connected’s Mobility Team embarked on building a real-time “Toyota Connected Data Lake.” Given the scale, we leveraged AWS to build this platform. In this post, we show how we built the data lake and how we provide significant value to our customers.

Overview

The guiding principles for architecture and design that we used are as follows:

  • Serverless: We want to use cloud native technologies and spend minimal time on infrastructure maintenance.
  • Rapid speed to market: We work backwards from customer requirements and iterate frequently to develop minimally viable products (MVPs).
  • Cost-efficient at scale.
  • Low latency: near real time processing.

Our data lake needed to be able to:

  • Capture and store new data (relational and non-relational) at petabyte scale in real time.
  • Provide analytics that go beyond batch reporting and incorporate real time and predictive capabilities.
  • Democratize access to data in a secure and governed way, allowing our team to unleash their creative energy and deliver innovative solutions.

The following diagram shows the high-level architecture

Walkthrough

We built the serverless data lake with Amazon S3 as the primary data store, given the scalability and high availability of S3. The entire process is automated, which reduces the likelihood of human error, increases efficiency, and ensures consistent configurations over time, as well as reduces the cost of operations.

The key components of a data lake include Ingest, Decode, Transform, Analyze, and Consume:

  • IngestConnected vehicles send telemetry data once a minute—which includes speed, acceleration, turns, geo location, fuel level, and diagnostic error codes. This data is ingested into Amazon Kinesis Data Streams, processed through AWS Lambda to make it readable, and the “raw copy” is saved through Amazon Kinesis Data Firehose into an S3
  • Decode:  Data arriving into the Kinesis data stream in the ‘Decode’ pillar is decoded by a serverless Lambda function, which does most of the heavy lifting. Based upon a proprietary specification, this Lambda function does the bit-by-bit decoding of the input message to capture the particular sensor values. The small input payload of 35KB with data from over 180 sensors is now decoded and converted to a JSON message of 3 MB. This is then compressed and written to the ‘Decoded S3 bucket’.
  • Transform The aggregation jobs leverage the massively parallel capability of Amazon EMR, decrypt the decoded messages and convert the data to Apache Parquet Apache Parquet is a columnar storage file format designed for querying large amounts of data, regardless of the data processing framework, or programming language. Parquet allows for better compression, which reduces the amount of storage required. It also reduces I/O, since we can efficiently scan the data. The data sets are now available for analytics purposes, partitioned by masked identification numbers as well as by automotive models and dispatch type. A separate set of jobs transform the data and store it in Amazon DynamoDB to be consumed in real time from APIs.
  • ConsumeApplications needing to consume the data make API calls through the Amazon API Gateway. Authentication to the API calls is based on temporary tokens issued by Amazon Cognito.
  • AnalyzeData analytics can be directly performed off Amazon S3 by leveraging serverless Amazon Athena. Data access is democratized and made available to data science groups, who build and test various models that provide value to our customers.

Additionally, comprehensive monitoring is set up by leveraging Amazon CloudWatch, Amazon ES, and AWS KMS for managing the keys securely.

Scalability

The scalability capabilities of the building blocks in our architecture that allow us to reach this massive scale are:

  • S3: S3 is a massively scalable key-based object store that is well-suited for storing and retrieving large datasets. S3 partitions the index based on key name. To maximize performance of high-concurrency operations on S3, we introduced randomness into each of the Parquet object keys to increase the likelihood that the keys are distributed across many partitions.
  • Lambda: We can run as many concurrent functions as needed and can raise limits as required with AWS support.
  • Kinesis Firehose: It scales elastically based on volume without requiring any human intervention. We batch requests up to 128MiB or 15 minutes, whichever comes earlier to avoid small files. Additional details are available in Srikanth Kodali’s blog post.
  • Kinesis Data Streams: We developed an automated program that adjusts the shards based on incoming volume. This is based on the Kinesis Scaling Utility from AWS Labs, which allows us to scale in a way similar to EC2 Auto Scaling groups.
  • API Gateway: automatically scales to billions of requests and seamlessly handles our API traffic.
  • EMR cluster: We can programmatically scale out to hundreds of nodes based on our volume and scale in after processing is completed.

Our volumes have increased seven-fold since we migrated to AWS and we have only adjusted the number of shards in Kinesis Data Streams and the number of core nodes for EMR processing to scale with the volume.

Security in the AWS cloud

AWS provides a robust suite of security services, allowing us to have a higher level of security in the AWS cloud. Consistent with our security guidelines, data is encrypted both in transit and at rest. Additionally, we use VPC Endpoints, allowing us to keep traffic within the AWS network.

Data protection in transit:

Data protection at rest:

  • S3 server-side encryption handles all encryption, decryption and key management transparently. All user data stored in DynamoDB is fully encrypted at rest, for which we use an AWS-owned customer master key at no additional charge. Server-side encryption for Kinesis Data streams and Kinesis Data Firehose is also enabled to ensure that data is encrypted at rest.

Cost optimization

Given our very large data volumes, we were methodical about optimizing costs across all components of the infrastructure. The ultimate goal was to figure out the cost of the APIs we were exposing. We developed a robust cost model validated with performance testing at production volumes:

  • NAT gateway: When we started this project, one of the significant cost drivers was traffic flowing from Lambda to Kinesis Data Firehose that went over the NAT gateway, since Kinesis Data Firehose did not have a VPC endpoint. Traffic flowing through the NAT gateway costs $0.045/GB, whereas traffic flowing through the VPC endpoint costs $0.01/GB. Based on a product feature request from Toyota, AWS implemented this feature (VPC Endpoint for Firehose) early this year. We implemented this feature, which resulted in a four-and-a-half-fold reduction in our costs for data transfer.
  • Kinesis Data Firehose: Since Kinesis Data Firehose did not support encryption of data at rest initially, we had to use client-side encryption using KMS–this was the second significant cost driver. We requested a feature for native server-side encryption in Kinesis Data Firehose. This was released earlier this year and we enabled server-side encryption on the Kinesis Data Firehose stream. This removed the Key Management Service (KMS), resulting in another 10% reduction in our total costs.

Since Kinesis Data Firehose charges based on the amount of data ingested ($0.029/GB), our Lambda function compresses the data before writing to Kinesis Data Firehose, which saves on the ingestion cost.

  • S3– We use lifecycle policies to move data from S3 (which costs $0.023/GB) to Amazon S3 Glacier (which costs $0.004/GB) after a specified duration. Glacier provides a six-fold cost reduction over S3. We further plan to move the data from Glacier to Amazon S3 Glacier Deep Archive (which costs $0.00099/GB), which will provide us a four-fold reduction over Glacier costs. Additionally, we have set up automated deletes of certain data sets at periodic intervals.
  • EMR– We were planning to use AWS Glue and keep the architecture serverless, but made the decision to leverage EMR from a cost perspective. We leveraged spot instances for transformation jobs in EMR, which can provide up to 60% savings. The hourly jobs complete successfully with spot instances, however the nightly aggregation jobs leveraging r5.4xlarge instances failed frequently as sufficient spot capacity was not available. We decided to move to “on-demand” instances, while we finalize our strategy for “reserved instances” to reduce costs.
  • DynamoDB: Time to Live (TTL) for DynamoDB lets us define when items in a table expire so that they can be automatically deleted from the database. We enabled TTL to expire objects that are not needed after a certain duration. We plan to use reserved capacity for read and write control units to reduce costs. We also use DynamoDB auto scaling ,which helps us manage capacity efficiently, and lower the cost of our workloads because they have a predictable traffic pattern. In Q2 of 2019, DynamoDBremoved the associated costs of DynamoDB Streams used in replicating data globally, which translated to extra cost savings in global tables.
  • Amazon DynamoDB Accelerator(DAX):  Our DynamoDB tables are front-ended by DAX, which improves the response time of our application by dramatically reducing read latency, as compared to using DynamoDB. Using DAX, we also lower the cost of DynamoDB by reducing the amount of provisioned read throughput needed for read-heavy applications.
  • Lambda: We ran benchmarks to arrive at the optimal memory configuration for Lambda functions. Memory allocation in Lambda determines CPU allocation and for some of our Lambda functions, we allocated higher memory, which results in faster execution, thereby reducing the amount of GB-seconds per function execution, which saves time and cost. Using DynamoDB Accelerator (DAX) from  Lambda has several benefits for serverless applications that also use DynamoDB. DAX can improve the response time of your application by dramatically reducing read latency, as compared to using DynamoDB. For serverless applications, combining Lambda with DAX provides an additional benefit: Lower latency results in shorter execution times, which means lower costs for Lambda.
  • Kinesis Data Streams: We scale our streams through an automated job, since our traffic patterns are fairly predictable. During peak hours we add additional shards and delete them during the off-peak hours, thus allowing us to reduce costs when shards are not in use

Enhancing customer safety

The Data Lake presents multiple opportunities to enhance customer safety. Early detection of market defects and pinpointing of target vehicles affected by those defects is made possible through the telemetry data ingested from the vehicles. This early detection leads to early resolution way before the customer is affected. On-board software in the automobiles can be constantly updated over-the-air (OTA), thereby saving time and costs. The automobile can generate a Health Check Report based on the driving style of its drivers, which can create the ideal maintenance plan for drivers for worry-free driving.

The driving data for an individual driver based on speed, sharp turns, rapid acceleration, and sudden braking can be converted into a “driver score” which ranges from 1 to 100 in value. The higher the driver-score, the safer the driver. Drivers can view their scores on mobile devices and monitor the specific locations of harsh driving on the journey map. They can then use this input to self-correct and modify their driving habits to improve their scores, which will not only result in a safer environment but drivers could also get lower insurance rates from insurance companies. This also gives parents an opportunity to monitor the scores for their teenage drivers and coach them appropriately on safe driving habits. Additionally, notifications can be generated if the teenage driver exceeds an agreed-upon speed or leaves a specific area.

Summary

The automated serverless data lake is a robust scalable platform that allows us to analyze data as it becomes available in real time. From an operations perspective, our costs are down significantly. Several aggregation jobs that took 15+ hours to run, now finish in 1/40th of the time. We are impressed with the reliability of the platform that we built. The architectural decision to go serverless has reduced operational burden and will also allow us to have a good handle on our costs going forward. Additionally, we can deploy this pipeline in other geographies with smaller volumes and only pay for what we consume.

Our team accomplished this ambitious development in a short span of six months. They worked in an agile, iterative fashion and continued to deliver robust MVPs to our business partners. Working with the service teams at AWS on product feature requests and seeing them come to fruition in a very short time frame has been a rewarding experience and we look forward to the continued partnership on additional requests.

 


About the Authors


Sandeep Kulkarni is an enterprise architect at AWS. His passion is to accelerate digital transformation for customers and build highly scalable and cost-effective solutions in the cloud. In his spare time, he loves to do yoga and gardening.

 

 

 

 

Shravanthi Denthumdas is the director of mobility services at Toyota Connected.Her team is responsible for building the Data Lake and delivering services that allow drivers to safely enjoy their cars. In her spare time, she likes to spend time with her family and children.

 

 

 

 

Monitor and Optimize Analytic Workloads on Amazon EMR with Prometheus and Grafana

Post Syndicated from Derek Tan original https://aws.amazon.com/blogs/big-data/monitor-and-optimize-analytic-workloads-on-amazon-emr-with-prometheus-and-grafana/

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS in a cost-effective manner. Monitoring Amazon EMR clusters is essential to help detect critical issues with the applications or infrastructure in real time and identify root causes quickly. Viewing how the clusters are being used over time helps operations and engineering teams find potential performance bottlenecks and optimization opportunities to scale out or scale in their clusters and plan for capacity. In this post, we show how to integrate Prometheus, an open-source systems monitoring and alerting tool, and Grafana, an open-source visualization and analytics tool, to provide an end-to-end monitoring system for EMR clusters. Furthermore, we share an example that demonstrates how you can use Prometheus and Grafana to easily identify opportunities to optimize your EMR jobs to improve performance and reduce cost.

This post discusses the following:

  • Installing and configuring Prometheus and Grafana on an Amazon Elastic Compute Cloud (Amazon EC2) instance.
  • Configuring an EMR cluster to emit metrics that Prometheus can scrape from the cluster.
  • Using the Grafana dashboards to analyze the metrics for a workload on the EMR cluster and optimize it.
  • How Prometheus can push alerts to the Alertmanager, which is a component in Prometheus. The Alertmanager sends notifications to the alert_sns_forwarder component, which forwards the notifications to Amazon Simple Notification Service (Amazon SNS).
  • Configuring Amazon SNS to send email notifications.
  • A few considerations when bringing this monitoring system to production.

The following diagram illustrates the solution architecture.

Exporters are agents that gather metrics from the systems being monitored and provide endpoints for Prometheus to poll the metrics. Node_exporter and jmx_exporters collect metrics from the operating system and applications such as YARN and HDFS, respectively, on each node in the EMR cluster.

The Prometheus server polls the endpoints exposed by these exporters on each node to gather the metrics. These metrics are then stored locally on the Prometheus server. When a user opens the Grafana dashboards in their browser, the Grafana server queries the Prometheus server to generate the dashboards that are displayed in the browser.

You can set up alerts in Prometheus; when the alert thresholds are breached, the Prometheus server pushes alerts to the Alertmanager. The Alertmanager takes care of deduplicating, grouping, and routing the notifications to the receivers. Receivers that send events to PagerDuty, Slack, Opsgenie, and others, are natively supported by the Alertmanager.

Because there is no native integration of the Alertmanager with Amazon SNS, we use an implementation of the generic webhook receiver, alert_sns_forwarder, to transform and route the notification message to a pre-configured topic in Amazon SNS. You can subscribe to the topic to receive the alerts via email, SMS, HTTP/S, Amazon Simple Queue Service (Amazon SQS), or AWS Lambda.

Prerequisites

Before getting started, you must have the following prerequisites:

Deploying the solution

We provide the CloudFormation template in this post as a general guide. Please review and customize it as needed. You should also be aware that some of the resources deployed by this stack incur costs when they remain in use.

For purposes of this post, the resources are installed in a VPC with a public subnet. We recommend installing the resources in a private subnet where possible for production. In addition, we recommend enabling TLS connections and password authentication from Prometheus to node_exporter and jmx_exporter and for Grafana. To make it easier to troubleshoot the setup, the CloudFormation template includes setting up network ingress access to port 9090 so you can access the Prometheus UI remotely. You can remove this access if not needed.

The CloudFormation template contains several nested templates. Together, they do the following:

  1. Choose the VPC and subnet to deploy this solution.
  2. Create an EC2 instance with the instance type of your choosing.
  3. Download, install, and configure Prometheus as a service with the right scrape configuration to connect to the EMR cluster being monitored.
  4. Download, install, and configure alert_sns_forwarder as a service that transforms alert notifications from the AlertManager to Amazon SNS messages and publishes those messages to Amazon SNS.
  5. Download, install, and configure the Alertmanager as a service that forwards alert notifications from Prometheus server to alert_sns_forwarder.
  6. Set up a sample alert to send notification messages to Amazon SNS when disk space usage is over 90% on any of the nodes in the EMR cluster being monitored.
  7. Download, install, and configure Grafana as a service that connects to the Prometheus data source. The following dashboards are pre-installed to visualize various metrics on the EMR cluster being monitored:
    • OS Level Metrics – Select CPU, memory, and disk metrics exposed by the Amazon Linux operating system.
    • HDFS – DataNode Metrics – Select storage and network metrics exposed by the HDFS data node process.
    • HDFS – NameNode Metrics – Select storage and replication metrics exposed by the HDFS name node process.
    • YARN – Resource Manager – Select resource, application, and container metrics exposed by the YARN resource manager process.
    • YARN – Node Manager – Select resource and container metrics exposed by the YARN node manager process.
    • YARN – Queues – Select resource, application, and container metrics filtered by YARN queues.
    • JVM Metrics – Select memory and garbage collection metrics exposed by JVM of HDFS and YARN processes.
    • Log Metrics – Log fatals, errors, and warnings by the logger of HDFS and YARN processes.
    • RPC Metrics – Select RPC metrics exposed by HDFS and YARN processes.
  8. Create an Amazon EC2 security group. You can configure network access to inbound TCP ports 22 (SSH), Grafana (3000), and Prometheus UI (9090) with parameters in the CloudFormation template. This allows you to lock down access to the Prometheus and Grafana EC2 instance launched to known CIDR scopes and ports.
  9. Create an IAM instance profile that is used to associate with the EC2 instance with Prometheus and Grafana installed.
  10. Create an SNS topic and subscribe the email address provided as a parameter in the template to receive the notifications from Prometheus Alertmanager.
  11. Launch an EMR cluster with a bootstrap action script that does the following:
    • Download and set up node_exporter, which exposes OS metrics to Prometheus, as a service, on all nodes.
    • Download jmx_exporter, which is used by HDFS Name Node, HDFS Data Node, YARN Resource Manager, and YARN Node Manager processes on all nodes to expose application metrics to Prometheus.
    • Configure HDFS Name Node, HDFS Data Node, YARN Resource Manager, and YARN Node Manager processes on the cluster to launch with jmx_exporter as a Java agent.
  12. Create additional master and slave security groups of the EMR cluster to allow network ingress to ports 7001, 7005, and 9100 from the Prometheus server.

Launching the CloudFormation stack

To launch your stack and provision your resources, complete the following steps:

  1. Choose the following Launch Stack link:

This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters:

ParameterDescriptionDefault Value
Stack nameA meaningful name for the stack; for example, emrPrometheusGrafana.None
Network Configuration
VPCThe VPC where the EC2 instance and EMR cluster should be launched.None
SubnetThe subnet where the EC2 instance and EMR cluster should be launched.None
Amazon EMR Configuration
EMRClusterName

Name of the EMR cluster.

 

emrPrometheusBlog
MasterInstanceTypeInstance type of the master node.m5.xlarge
CoreInstanceType

Instance type of the core node.

 

m5.xlarge
CoreInstanceCount

Number of core instances.

 

2

EMRSSHIPRange

 

The IP address range in CIDR notation (for example, <your ip address>/32) for SSHing to the master node of the EMR cluster. If you want to grant access to your local computer’s public IPv4 address, you can go to https://checkip.amazonaws.com/ or run curl https://checkip.amazonaws.com/ in the terminal. Rules with source of 0.0.0.0/0 allow all IP addresses to access your instance. We recommend setting security group rules to allow access from known IP addresses only.None

EMRKeyName

 

An existing EC2 key pair to enable SSH access to the master node of the EMR cluster.None
EMRReleaseLabel

The Amazon EMR release version.

 

emr-6.0.0
Amazon EC2 Configuration
InstanceTypeThe EC2 instance type to install Prometheus, Alertmanager, alert_sns_forwarder, and Grafana services. Because the instance needs to host Prometheus server, which works as a time series database storing metrics data, an instance with at least 50 GB disk space would be advisable, depending on the usage. For this post, we choose t3.small.t3.small
KeyNameAn existing EC2 key pair to enable SSH access to the instance.None
SSHIPRangeThe IP address range in CIDR notation (for example, <your ip address>/32) for SSHing to the EC2 instances. To grant access to your local computer’s public IPv4 address, go to http://checkip.amazonaws.com/ or run curl http://checkip.amazonaws.com/ in the terminal.None

User Interfaces Network Ingress Access

GrafanaIPRangeThe IP address range in CIDR notation (for example, <your ip address>/32) for accessing the Grafana dashboards on port 3000. To grant access to your local computer’s public IPv4 address, go to http://checkip.amazonaws.com/ or run curl http://checkip.amazonaws.com/ in the terminal.None
PrometheusUIIPRangeThe IP address range in CIDR notation (for example, <your ip address>/32) for accessing the Prometheus UI on port 9090. To grant access to your local computer’s public IPv4 address, go to http://checkip.amazonaws.com/ or run curl http://checkip.amazonaws.com/ in the terminal.None
Alerts from Prometheus
EmailAddressThe email address that you use to subscribe to alerts from Prometheus. You must confirm this subscription via an email message from Amazon SNS.None

 

  1. Enter the parameter values from the preceding table.
  2. Choose Next.
  3. On the next screen, enter any required tags, an IAM role, or any advanced options.
  4. Choose Next.
  5. Review the details on the final screen and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names or require CAPABILITY_AUTO_EXPAND.

  1. Choose Create.

Stack creation takes a few minutes. After the CloudFormation stack is created, on the Outputs tab, you can find the following three key-value pairs:

    • ClusterId – The ID of the EMR cluster created.
    • MasterPublicDnsName – The public DNS name of the master node the EMR cluster.
    • WebsiteURL – The URL for the newly created Grafana dashboard. The default login and password are both admin. You are prompted to change the password the first time you log in.

You should also receive an email from [email protected] asking you to confirm the subscription from Amazon SNS.

  1. Choose the Confirm subscription link in your email.

You have now subscribed your email to the SNS topic that alerts from Prometheus are published to.

Workload example

The following use case demonstrates how you can use the insights of Amazon EMR metrics from the Grafana dashboards to tune the performance of a Hadoop job running on the cluster.

The job for this use case is a simple WordCount that counts the number of words in the input files. You can download the source code for the WordCount program from sourcecode.zip. It is a basic MapReduce program. You use the Yelp business review dataset from Yelp Open Dataset. The original data is in JSON format. For this post, convert the same dataset to GZIP (2.4 GB in size) and BZIP2 (1.8 GB in size) formats. You run this WordCount job on the EMR cluster you launched earlier.

Start the job by adding a step to the cluster with the following code (replace <j-*************> with the cluster ID specified on the Outputs tab of the CloudFormation stack you created and also <s3://bucket-name/outputs-folder> with the Amazon Simple Storage Service (Amazon S3) location for the job output):

$ aws emr add-steps --cluster-id < j-*************>  \
--steps Type=CUSTOM_JAR,Name=WordCountJarGZIP,ActionOnFailure=CONTINUE,Jar=s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/jars/Wordcount-1.0-SNAPSHOT-job.jar,Args=com.amazonaws.support.training.emr.Wordcount,s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/datasets/YelpDataGzip/,<s3://bucket-name/outputs-folder>/outputGzip

You can cancel the step at any time by entering the following code (replace <j-*************> with the cluster ID and <s-*************> with the step ID):

 $ aws emr cancel-steps --cluster-id < j-*************>  \
--step-ids < s-*************> 

When the step is in RUNNING status, go to the Grafana dashboards for metrics insights. The following screenshot shows the YARD – Resource Manager dashboard.

For a very long time during the execution of the job, the Container Stats metrics show only two containers (one ApplicationMaster container and one mapper container) were allocated, while no containers were pending. This is expected, because the input format for this job is GZIP, which isn’t splittable. Therefore, no matter how big the input file is, it only starts one mapper container because there is only one InputSplit. Container allocation doesn’t increase until the mapper stage finishes when the reducer stage starts.

VCores Utilization shows the same insight: it stayed below 50% for a long time until multiple reducer containers kicked in after the completion of the single mapper container.

The OS Level Metrics dashboard in the following screenshot also shows the resource utilization for CPU and memory was quite low during the mapper stage of this job, which took most of the process time.

For a cluster with one m5.xlarge master and two m5.xlarge core nodes, the job took 30 minutes to finish. Out of the 30 minutes, 28 minutes were used by the mapper stage.

Based on these insights from the dashboards, you can do some performance tuning.

For the same dataset, instead of using GZIP, use BZIP2 format for the input file. Submit the step with the following code (replace <j-*************> with the cluster ID specified on the Outputs tab of the CloudFormation stack and <s3://bucket-name/outputs-folder> with Amazon S3 location for the job output):

$ aws emr add-steps --cluster-id < j-*************>  \
--steps Type=CUSTOM_JAR,Name=WordCountJarGBZ2,ActionOnFailure=CONTINUE,Jar=s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/jars/Wordcount-1.0-SNAPSHOT-job.jar,Args=com.amazonaws.support.training.emr.Wordcount,s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/datasets/YelpDataBZ2/,<s3://bucket-name/outputs-folder>/outputBZ2_1

When the step is in RUNNING status, check the same dashboards.

The following screenshot shows the YARN – Resource Manager dashboard.

Container Stats shows that seven containers got allocated immediately when the job started. Additionally, the number of pending containers is significantly higher, at 46. You changed the input format to BZIP2 this time, which is a splittable compression format. As a result, multiple mapper containers were launched, each processing one InputSplit. This improved the parallelism at the mapper stage.

VCores Utilization also shows that 100% of the VCores were used during the peak of the job process.

The OS Level Metrics dashboard in the following screenshot shows the resource utilization also increased during this run of the job.

the job took 11 minutes to finish, which is a 63% performance improvement compared to the previous run.

Can the job run even faster? Based on what you have found from your dashboards, yes. At some point during the second run, 46 pending containers were waiting to be allocated, and VCores Utilization is 100%, which means the cluster allocated all its container resources at capacity. You can resize the cluster by adding a task instance group with 10 m5.xlarge task nodes with the following code (replace <j-*************> with the cluster ID):

$ aws emr add-instance-groups --cluster-id < j-*************> \
--instance-groups InstanceCount=10,InstanceGroupType=task,InstanceType=m5.xlarge

The add-instance-groups command returns an output similar to the following code:

{
    "ClusterId": "j-*************",
    "InstanceGroupIds": [
        "ig-************"
    ]
}

Record the value of InstanceGroupIds; you use this later to set the nodes in the instance group to 0.

The resizing is reflected on the YARN -Resource Manager dashboard in the following screenshot.

You run the same job with the same BZIP2 formatted input dataset, using the following code:

$ aws emr add-steps --cluster-id < j-*************>  \
--steps Type=CUSTOM_JAR,Name=WordCountJarGBZ2,ActionOnFailure=CONTINUE,Jar=s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/jars/Wordcount-1.0-SNAPSHOT-job.jar,Args=com.amazonaws.support.training.emr.Wordcount,s3://aws-bigdata-blog/artifacts/aws-blog-emr-prometheus-grafana/demo/datasets/YelpDataBZ2/,<s3://bucket-name/outputs-folder>/outputBZ2_2

This time, all the pending containers immediately got allocated when the job started, because the cluster has more containers in total because you added task nodes.

The job took 4 minutes to complete. This is a 64% performance improvement compared to the second run and 87% performance improvement compared to the first run.

To save cost, after the job is complete, you can scale down the cluster by reducing the task node number in the task instance group that you resized earlier to 0 with the following code (replace <ig-************> with the instance group ID you recorded earlier):

$ aws emr modify-instance-groups \
--instance-groups InstanceGroupId=<ig-************>,InstanceCount=0

Alert example

The CloudFormation stack you created set up an alert to monitor any nodes in the EMR cluster such that if the disk partitions are more than 90% full, an email alert is sent to the email address that you specified.

Follow these steps to write a large file to a disk partition on the master node in the cluster and wait for the system to fire off the email alert:

Connect to the master node in the EMR cluster via SSH with the key:

ssh -i <key-name>.pem [email protected]<ec2 instance public DNS name>

The public DNS name of the master node is specified on the Outputs tab of the CloudFormation stack you created.

  1. Show the current disk usage by entering the following code:
$ df -h /emr
Filesystem      Size  Used Avail Use% Mounted on
/dev/xvdb1      5.0G   56M  5.0G   2% /emr
  1. Create a file sized 4.5 GB so that the disk usage goes above 90%:
$ fallocate -l 4.5G /emr/test.img
  1. Check the disk usage again to confirm that the disk usage is now above 90%. See the following code:
$ df -h /emr
Filesystem      Size  Used Avail Use% Mounted on
/dev/xvdb1      5.0G  4.6G  459M  92% /emr

You can also view the disk usage information from the Grafana OS Level Metrics dashboard. In the following screenshot, the /emr partition shows 91.1% used on the Disk Space Used panel.

You should now expect an email with the subject “Prometheus Alert” from [email protected] in your inbox.

Considerations for production

Prometheus stores the metrics locally on a time series database that is included in the installation. Planning for disk capacity, disk availability, and snapshots for backup are recommended for improving durability.

By default, the metrics are stored for 15 days. You can configure this retention period with the --storage.tsdb.retention.time command line flag. For solutions that provide remote long-term storage and in some cases high availability, see Remote Endpoints and Storage. The Alertmanager supports creating a cluster for high availability. For more information, see High Availability on the GitHub repo.

Another consideration is how to get notified when the Prometheus monitoring system stops running. The Prometheus server and Alertmanager expose their own metrics by default through localhost:9090/metrics and localhost:9093/metrics endpoints, respectively. These metrics can be scraped and published to Amazon CloudWatch, and you can set up CloudWatch alarms to trigger on missing data points for metrics. For more information, see Using Amazon CloudWatch Alarms.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack and output files in Amazon S3 that you created during the workload use case.

Conclusion

This post showed how you can set up a monitoring system based on Prometheus and Grafana to monitor an EMR cluster and use Grafana dashboards to view metrics to troubleshoot a performance issue. You can also set up alerts in Prometheus to notify you when critical issues arise, and you can view the dashboards to narrow down the root causes. You can extend this monitoring system to monitor multiple EMR clusters and other applications, which makes it a one-stop system for monitoring metrics across your infrastructure and applications.

 


About the authors

Derek Tan is a principal big data architect, covering Amazon EMR and Athena. Prior to working as a Big Data Architect, he was leading engineering teams in Amazon EMR and Amazon Redshift. During his free time, he enjoys traveling, reading, and spending time with his family.

 

 

 

 

Fei Lang is a senior big data architect at AWS. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

Build a distributed big data reconciliation engine using Amazon EMR and Amazon Athena

Post Syndicated from Sara Miller original https://aws.amazon.com/blogs/big-data/build-a-distributed-big-data-reconciliation-engine-using-amazon-emr-and-amazon-athena/

This is a guest post by Sara Miller, Head of Data Management and Data Lake, Direct Energy; and Zhouyi Liu, Senior AWS Developer, Direct Energy.

Enterprise companies like Direct Energy migrate on-premises data warehouses and services to AWS to achieve fully manageable digital transformation of their organization. Freedom from traditional data warehouse constraints frees up time and resources to focus on business innovation, and for Direct Energy, building data models that allow us to focus on customer satisfaction. These projects typically start by replicating data from source relational database systems (RDBS) into a single data repository, also known as a data lake. Before you can use this data to drive business insights, you should test it for quality and integrity. The quality of your data has a ripple effect on the insights you can extract and the trust people have in your insights.

Our post tackles data quality and integrity as we introduce a reconciliation tool that uses Amazon EMR and Amazon Athena to provide you with a cloud-native engine to validate vast amounts of data effectively at scale. The engine provides customized summary reports so you can interactively analyze the quality of your data. We also discuss the clarity that the engine has bought to the performance and quality of source ingestion and extract, transform, and load (ETL) processes.

About Direct Energy

Direct Energy is a North American retail energy and energy services provider and a subsidiary of the UK-based utility company, Centrica. Serving over four million residential and business customers, we strive to give you choice, simplicity, and innovation with simple solutions to track, understand, and control the electricity and natural gas you use.

Teams across Direct Energy came together to ensure a successful transition from on-premises SQL Server data warehouses to AWS data lakes, to include the following:

  1. Our Management Information Systems (MIS) team manages core AWS infrastructure, including AWS Identity and Access Management (IAM), Amazon EMR configurations, and more. The team also manages the ingestion of raw data directly from source data warehouses and billing systems housed on SQL Server, Oracle, and MySQL.
  2. Our Data Engineering team, embedded in business teams, picks up the raw source data after it lands in Amazon Simple Storage Service (Amazon S3), casts data types, merges data for select tables, and transforms the data to build out our business data warehouse, data marts, and datasets using PySpark on Amazon EMR. This includes the orchestration of dependencies across these steps.
  3. Completed data marts and datasets are made available to the business through Amazon Redshift and Microsoft Power BI for reporting and analytical modeling processes, such as churn propensity, customer lifetime value, price elasticity, and segmentation.

Due to the various layers and intricacies required to build out our finalized business data marts and datasets, testing is required at each step.

Why Direct Energy needed a reconciliation engine

The Data Engineering team at Direct Energy needed an automated way to check data quality at the row and field levels for data stored in Amazon S3 and those produced by AWS Glue and viewed through Athena against on-premises source systems.

Initially, our MIS team ran daily row counts against source and Athena and was held to a +/-99% SLA, accounting for potential timing issues. We quickly found this process had issues: this only checks that the row count matches between systems and doesn’t adequately profile data or measure comprehensive data quality. Furthermore, the target data on Amazon S3 should match exactly with the source system, but it’s less efficient to run row- and field-level checks on large volumes of data.

Beyond this, through the decommissioning of one of our major customer data warehouses, we rewrote over 350 SQL Server stored procedures in PySpark. Another layer of testing is required here to debug our code and to ensure our ETL processes are producing the same results that they previously produced from our on-premises server. This includes the testing of both staging and final tables.

Because we can’t do manual testing efficiently at this scale, we needed a solution that would enable automated testing of data quality at the row and field levels. We architected and developed a reconciliation engine called Pythagoras to randomly select a sample of records to check cell by cell. The tool runs new samples daily to ensure better coverage. This check validates whether individual values match between tables on Amazon S3 and in our source systems.

Pythagoras architecture

We use a config.yaml file to define the source databases, source tables, how to connect to source systems, reconciliation tasks, and other parameters required to specify the task. A sample ratio for the on-premises server tables is also defined here.

We use table_cols_mapping.json to define the column mapping between our on-premises server tables and the tables we expect to see when querying through Athena.

The reconciliation engine outputs a report, which is saved directly to Amazon S3 and can be viewed with Athena. Here, we can check results in SQL or PySpark or can export to Microsoft Excel.

The following diagram provides a walkthrough of the process. We use Apache Spark running on Amazon EMR to execute the reconciliation engine steps: preprocess, comparison, and postprocess.

Preprocess

The preprocessing component in the preceding architecture prepares data for reconciliation, and the output is used as an input in the comparison component. The preprocess step is responsible for several important functions:

  • The parse file config.yaml defines parameters such as connection strings for data sources, the log path, performance-tuning parameters, the location of reconciliation reports, and more. In addition, we add the reconciliation group and reconciliation task in the file to instruct Pythagoras to do the data integrity testing. This information tells the PySpark job where to fetch the data and the random sample ratio percentage to use in the reconciliation run. See the following code example:
    data_rec:
    - rec_task_1:
      - rec_task_1:
          sample_ratio: 10  #percent
          sourceA: fruit1
          sourceB: fruitA
          sourceA_database: database_sourceA
          sourceB_database: database_sourceB

  • We fetch data from the two data sources in this step.
  • The parse file tablecolmap.json maps the columns to be reconciled from the two data sources. For example, in the following JSON code, we see the value of column fruitA from data source B needs to be compared with the value of column fruit1 from data source A. The same logic applies for column animalB and animal2.
    "rec_task_1": {
      "cols_mapping_b_a": {
        "fruitA": "fruit1",
        "animalB": "animal2"
      }
    }

  • If we have a record from data source A, how can we find the corresponding record in target B? The answer is by using a key column. This key should exist in both sources and it should be unique. The component looks up the user-defined key in the joined_keys.py file to join the two datasets as a single data frame via the key and passes it to the comparison component.
  • Normalizing data types for both data sources also happens in this step if needed. For example, if the value of column x from source A is 1 stored as an integer, and the value of column x from source B is 1 stored as a string, these values are treated as different without any data type normalization; however, they are the same if we ignore the data type formatting.

Comparison

After fetching the output data frame from the preprocess pipeline, the comparison component is ready to perform the reconciliation. The following table shows an example schema of an input data frame.

idsouceA_animalsourceA_fruitsourceA_caranimalfruitcar
1catorangejeepcatorangejeep
2dogappletoyotadogpeartoyota

Columns starting with prefix sourceA_ indicate the data is from data source A. Columns titled animal, fruit, and car come from data source B. In this instance, the separated tabular data is linked and joined on a key id; the key should be uniquely defined in both data sources.

The engine reconciles values from columns in the preceding input data frame. The following table summarizes the output.

idsouceA_animalsourceA_fruitsourceA_caranimalfruitcarmatched_animalmatched_fruitmatched_cartotal_matched
1catorangejeepcatorangejeepTRUETRUETRUETRUE
2dogappletoyotadogpeartoyotaTRUEFALSETRUEFALSE

Columns with the prefix matched_ indicate the reconciled result of the corresponding cells. For instance, in the second record, column sourceA_fruit is apple and column fruit is pear, so the value of matched_fruit is FALSE.

Postprocess

Based on the detailed reconciled results, the engine generates various customized reports and saves those reports to a configured location on Amazon S3. The end-user can use SQL to query against Athena to quickly analyze the data, or can download the results to Microsoft Excel. In this section, we describe three of the reports we use daily.

The following table shows the reconciliation summary at the column level.

table_namecol_namenum_matched_truenum_matched_false
MyFavoritematched_animal20
MyFavoritematched_car20
MyFavoritematched_fruit11
MyFavoritetotal_matched11

This report is generated based on the information from the preceding output. Let’s assume the table is called MyFavorite. The engine counts the matched and unmatched cases over each column and provides an aggregated view of the number of records matching for each column.

In a production environment, the Data Engineering team needs to reconcile hundreds of tables in one batch. Pythagoras naturally supports wrapping these into one reconciliation task. In this case, we can generate another report to show the reconciliation summary per reconciliation task.

For example, in addition to the table MyFavorite, the reconciliation task has another table called MySkills, which needs to be reconciled. The following table shows its reconciliation summary at the column level.

table_namecol_namenum_matched_truenum_matched_false
MySkillsmatched_sport40
MySkillsmatched_instrument31
MyFavoritetotal_matched31

Base on the two preceding summaries, Pythagoras calculates high-level table statistics, as shown in the following table.

table_namedata_qualitytotal_match_rateissue_desc
MyFavoriteRed50%columns: fruit
MySkillsAmber75%columns: instrument

In the config.yml file, the user defines the parameter total_match_rate_threshold. For this use case, let’s say we define it as 50%. If the total_match_rate is greater than 50% and less than 100%, we label data_quality as Amber; if it is less than or equal than 50%, we label it as Red. The column issue_desc shows the list of unmatched columns (all columns with any unmatched values).

These reports help us assess overall data quality for all tables in one reconciliation task and quickly locate problem tables and columns. We use Athena to query the Pythagoras results using SQL. For example, in the following SQL query, we can filter ingested tables with data quality flagged as Red by Pythagoras (in other words, the MyFavorite table is returned):

SELECT  table_name  FROM “recon_summary” WHERE data_quality = ‘Red’

We also use Athena to query Pythagoras results for every step of our ETL process. We review a daily report that enables us to focus on which steps are having issues and the top columns that are experiencing a mismatch.

In the following output table, we would focus on steps 2, 3, and 9 first, because there are glaring issues and other steps may have dependencies on these, then come back to step 5 for some minor cleanup. Otherwise, anything above 99.9% is left alone, accounting for timing issues in our billing systems, for example.

table_namedata_qualitytotal_match_rateissue_desc
step1green0.9996608columns: col1: 200, col2: 93, col3: 12, col4: 10, col5:10
step2red0.82664197columns: col5: 30, col3: 22, col4: 16, col1: 15, col2: 14
step3red0.95370907columns: col1: 50, col2: 43, col3: 12, col4: 10, col5:1
step4green1
step5amber0.9987953columns: col1: 200, col2: 93, col3: 12, col4: 10, col5:14
step6green0.99992985columns: col1: 25
step7green0.99950025columns: col1: 200, col2: 93
step8green0.99993417columns: col1: 50
step9red0.24940514columns: col1: 19000, col2: 4293, col3: 1400, col4: 1000, col5:180
step10green0.99930817columns: col1: 573, col2: 420, col3: 120

We can also perform SQL queries in Athena to drill down further into each step to identify each column’s actual match rate for that particular sample. For example, the following table looks at step2:

table_namecol_namenum_match_truenum_match_false
step2col527730
step2col328522
step2col429116
step2col129215
step2col229314

For this use case, we want to look at col5 first because 30 records in the sample are unmatched, whereas 277 records are matched, then work our way down the list to clean up each column.

Conclusion

In this post, we discussed how Direct Energy uses a data reconciliation tool called Pythagoras to automate and test data quality checks at scale, using Amazon EMR for verify the data quality and Athena to analyze and report the results. Pythagoras brought significant clarity regarding the performance and quality of both Direct Energy source data ingestion and ETL processes, while eliminating the need for manual testing and enabling automated, randomized testing on a much greater scale.

Thus far, the ETL processes for two billing systems have been thoroughly vetted, resulting in 15% and 48% improvements in accuracy. We found that value mismatches are the most common data integrity issue in our data ingestion pipeline. Thanks to Pythagoras, we can quickly and precisely determine these mismatches in large datasets. Data engineering and platform teams then use the data Pythagoras provides to debug our ETL pipelines. After we adjust our pipelines, we run Pythagoras again to ensure the issue is fixed and stays fixed.

The implementation of this tool empowers Direct Energy to decommission widely used data platforms with precision and efficiency, and builds trust in our company’s data quality and integrity across the business.

 


About the Authors

Sara Miller is the Head of Data Management and Data Lake at Direct Energy, a subsidiary of Centrica. She has been with the organization for more than five years. As a versatile leader proficient in data engineering, mathematics, and statistics, Sara has helped organizations transform their reporting and analytics capabilities and has been instrumental in establishing various data science and analytics teams. She currently manages the end-to-end ETL pipeline for the North America residential portfolio, to include the transition from on-premise data warehousing to Amazon Web Services.

 

 

 

Zhouyi Liu is the Senior AWS Developer at Direct Energy, a subsidiary of Centrica. He focuses on the tech stack of Big Data, Machine Learning, AI. He currently works on the end-to-end ETL pipeline for the North America residential portfolio, to include the transition from on-premise data warehousing to Amazon Web Services. Outside of work, he also enjoys the roles of father, husband, and spending time with family.

 

 

 

Enable fine-grained data access in Zeppelin Notebook with AWS Lake Formation

Post Syndicated from Behram Irani original https://aws.amazon.com/blogs/big-data/enable-fine-grained-data-access-in-zeppelin-notebook-with-aws-lake-formation/

This post explores how you can use AWS Lake Formation integration with Amazon EMR (still in beta) to implement fine-grained column-level access controls while using Spark in a Zeppelin Notebook.

My previous post Extract Salesforce.com data using AWS Glue and analyzing with Amazon Athena showed you a simple use case for extracting any Salesforce object data using AWS Glue and Apache Spark, saving it to Amazon Simple Storage Service (Amazon S3), cataloging the data using the Data Catalog in Glue, and querying it using Amazon Athena.

Preparing your data

For simplicity of setup and to build on the concept of fine-grained access control of data, you use the same data that you extracted from the Salesforce account object in the post Extract Salesforce.com data using AWS Glue and analyzing with Amazon Athena. Follow all the steps from the preceding post to create a table called sfdc_output, which you can query in Athena and see all the fields of the account object.

In the following sections, you see how to restrict access to only a select set of columns in this table for a user who queries this data using Spark SQL in Zeppelin Notebook.

Setting up Lake Formation

Lake Formation aims to simplify and accelerate the creation of data lakes. Amazon EMR integrates with Lake Formation and its security model to allow fine-grained access control on databases, tables, and columns defined in the Data Catalog for data stored in Amazon S3. Users authenticate against third-party identity providers (IdPs) through SAML, and the principal is used to determine if the user has the appropriate access to the columns within a table and partitions in the Data Catalog.

Lake Formation provides its own permissions model that augments the AWS Identity and Access Management (IAM) permissions model. This centrally defined permissions model enables fine-grained access to data stored in data lakes through a simple grant and revoke mechanism.

The following diagram illustrates the workflow.

In the preceding flow, you still authenticate the principal at the IdP, and use the IAM policy to authorize access to AWS resources. Additionally, you use Lake Formation to authorize data access. When a principal attempts to run a query in Amazon EMR against a table set up with Lake Formation, Amazon EMR requests temporary credentials for data access from Lake Formation. Lake Formation returns temporary credentials and allows data access.

For more information about setting up Lake Formation, see Setting Up AWS Lake Formation. For this use case, you want to enable the integration of Amazon EMR with Lake Formation so you can use Zeppelin Notebook to see the fine-grained data access controls in action. For this post, I’ve configured the authentication module using the third-party SAML provider Auth0. You can also use Okta or Active Directory Federation Services (ADFS) to set up authentication with the IdP of your choice. For more information about setting up IdP and to launch Amazon EMR with an AWS CloudFormation stack, see Integration with Amazon EMR.

Granting fine-grained access with Lake Formation

For the purpose of demonstrating fine-grained data access, I created a user called developer in Auth0. Suppose that for your table sfdc_output, you don’t want to give this user access to certain billing-related fields. Complete the following steps:

  1. On the Lake Formation console, choose Data permissions.
  2. For Active Directory users and groups, enter the ARN for the user developer.
  3. For Database, choose default.
  4. For Table, choose sfdc_output.
  5. For Column, choose Include columns.
  6. For Include columns, choose sic, name, accountnumber, and type.

These are the specific columns from this table that you want the developer user to have access to.

  1. For Table permissions¸ select Select.
  2. Choose Grant.

After you grant specific permissions to the developer user, you have to remove LFPassthrough access by revoking Super access given to the IAMAllowedPrincipals group. For backward compatibility, Lake Formation only passes through IAM permissions for all existing Data Catalog tables. Revoking Super access enables it to apply specific Lake Formation grants and IAM permissions.

  1. On the Lake Formation console, choose Permissions.
  2. Select IAMAllowedPrincipals for sfdc_output
  3. Choose Revoke.

You also need to register the Amazon S3 location in Lake Formation where the table data resides.

  1. On the Lake Formation console, choose Data lake locations.
  2. Select the Amazon S3 path for your table.
  3. Choose Register location.

Running a query

You can now test the restrictions by running a query.

Log in to the Zeppelin console using its URL. To access Zeppelin Notebook, you must first ensure that your cluster’s master security group is configured to allow access to the Proxy Agent (port 8442) from your desktop. Do not open your EMR master to the public (0.0.0.0/0 or ::0). It redirects the link to the IdP provider for login and authentication with the developer user credentials. After authentication is complete, create a new notebook and run a Spark SQL query against the sfdc_output table.

The following screenshot shows that even though the developer user queried for the full table, only the columns that you granted them access to in Lake Formation are visible.

As an additional exercise, you can create another user in IdP and give a different set of column access in Lake Formation to that user. Query the table again by logging in as that user and observe the corresponding security mechanism being applied.

Querying with Jupyter Notebook

If you want to use Jupyter Notebook, you can spin up an Amazon EMR notebook, attach it to the running cluster, and run the same query in it. The following screenshot shows that the results are the same.

Conclusion

This post showed how Lake Formation provides fine-grained, column-level access to tables in the Data Catalog using Spark. It enables federated single sign-on to Apache Zeppelin or Amazon EMR notebooks from your enterprise identity system that is compatible with SAML 2.0.

You can try this solution for your use-cases and if you have comments or feedback, please leave them below.

 


About the Authors

Behram Irani is a Senior Data Architect at Amazon Web Services.

 

 

 

 

Rahul Sonawane is a Senior Consultant, Big Data at Amazon Web Services.

 

Improving RAPIDS XGBoost performance and reducing costs with Amazon EMR running Amazon EC2 G4 instances

Post Syndicated from Kong Zhao original https://aws.amazon.com/blogs/big-data/improving-rapids-xgboost-performance-and-reducing-costs-with-amazon-emr-running-amazon-ec2-g4-instances/

This is a guest post by Kong Zhao, Solution Architect at NVIDIA Corporation

This post shares how NVIDIA sped up RAPIDS XGBoost performance up to 4.5 times faster and reduced costs up to 5.4 times less by using Amazon EMR running Amazon Elastic Compute Cloud (Amazon EC2) G4 instances.

Gradient boosting is a powerful machine learning (ML) algorithm used to achieve state-of-the-art accuracy on tasks such as regression, classification, and ranking. If you’re not using deep neural networks, there’s a good chance you use gradient boosting.

Data scientists use open-source XGBoost libraries in industries such as:

  • Financial services – Predicting loan performance and other financial risks
  • Retail – Predicting customer churn
  • Advertising – Predicting click rate

Amazon EMR and NVIDIA GPU instances

Amazon EMR is the industry-leading cloud big data platform for processing vast amounts of data using open-source tools such as Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto. With Amazon EMR, you can run petabyte-scale analysis at less than half the cost of traditional on-premises solutions and over three times faster than standard Apache Spark. Data scientists use Amazon EMR to run open-source deep learning and ML tools such as TensorFlow and Apache MXNet, and use case-specific tools and libraries. You can quickly and easily create scalable and secure clusters with the latest GPU instances for distributed ML training with a few clicks on the Amazon EMR console.

For more information about getting started with Amazon EMR, see What Is Amazon EMR?

G4 instance on Amazon EMR

Amazon EMR continually evolves GPU offerings and collaborates with partners like NVIDIA to improve the platform’s performance. Our latest development is the support for the EC2 G4 instance type featuring an NVIDIA T4 Tensor Core GPU with 16 GB of GPU memory, offered under the Nitro hypervisor with 1–4 GPUs per node. It also includes up to 1.8 TB of local non-volatile memory express (NVMe) storage and up to 100 Gbps of network bandwidth.

The T4 Tensor Core GPU offering from NVIDIA is a cost-effective and versatile GPU instance for accelerating ML models training and inferencing, video transcoding, and other compute-intensive workloads. G4 instances are offered in different instance sizes with access to one GPU or multiple GPUs with different amounts of vCPU and memory—giving you the flexibility to pick the right instance size for your applications.

Accelerated XGBoost open-source library integrated into Apache Spark by NVIDIA

While ML at scale can deliver powerful, predictive capabilities to millions of users, it hinges on overcoming two key challenges across infrastructure to save costs and deliver results faster: speeding up preprocessing massive volumes of data and accelerating compute-intensive model training.

To tackle these challenges, NVIDIA is incubating RAPIDS, a set of open-source software libraries, and the RAPIDS team works closely with the Distributed Machine Learning Common (DMLC) XGBoost organization to upstream code and make sure that all components of the GPU-accelerated analytics ecosystem work together smoothly. We used the XGBoost4J-Spark open-source library, which enables training and inferencing of XGBoost models across Apache Spark nodes. With GPUs, you can exploit data parallelism through columnar processing instead of traditional row-based reading designed initially for CPUs. This allows for cost savings and higher performance.

Higher performance at lower cost with Amazon EMR on GPU instances

We benchmarked the latest RAPIDS-Spark XGBoost4j open-source library on an EMR cluster with EC2 G4 instances running Apache Spark. We ran the benchmark using a 1 TB open-source dataset called Criteo on Amazon Simple Storage Service (Amazon S3) directly. Criteo is commonly used for predicting click-through rates on display ads. We used Amazon S3 as a data store.

The following graphs show our improvements in training time and costs.

An EMR cluster running G4dn instances is 5.4 times cheaper and 4.5 times faster than an EMR cluster running EC2 R5 memory-optimized instances. The EMR cluster with g4dn GPU instances gave us almost the same training time but at half the cost of running the training on an EMR cluster running EC2 P3 instances. The following table summarizes our findings.

TypeNumber of InstancesHardware per InstanceInstance TypeAmazon EC2 Cost per HourAmazon EMR Cost per HourTraining (Minutes)Training Costs
GPU164x T4g4dn.12xlarge$3.912$0.276$6.69
GPU68 x V100p3.16xlarge$24.48$0.275$12.38
CPU1664 vCPUr5a.16xlarge$4.608$0.2733$42.93

Solution overview

You can use the following step-by-step walkthrough to run the example mortgage dataset using the open-source XGBoost library on EMR GPU clusters. For more examples, see the GitHub repo.

Implementing this solution includes the following steps:

  1. Create an EMR notebook and launch Amazon EMR with NVIDIA GPU nodes.
  2. Run the open-source XGBoost library and Apache Spark examples on the notebook.
  3. View the training and transform results and benchmark.
  4. Launch example applications with the Apache Spark spark-submit

Creating an EMR notebook and launching Amazon EMR with NVIDIA GPU nodes

An EMR notebook is a serverless Jupyter notebook. Unlike a traditional notebook, the contents of an EMR notebook—the equations, visualizations, queries, models, code, and narrative text—are saved in Amazon S3 separately from the cluster that runs the code. This provides an EMR notebook with durable storage, efficient access, and flexibility.

To create your notebook and launch Amazon EMR, complete the following steps.

  1. On the Amazon EMR console, choose the Region you want to launch your cluster in (typically the same Region as your S3 bucket where you store large training datasets).
  2. Choose Notebooks.
  3. Choose Create notebook.
  4. Create a new cluster with notebook instances by setting the GPU node.

For this use case, we add three EC2 g4dn.xlarge GPU nodes to the new cluster.

If you want to customize your cluster with advanced configuration, you can create a GPU cluster separately, then create an EMR notebook and connect to the existing GPU cluster. You can enter the following code in the AWS Command Line Interface (AWS CLI) to launch a GPU cluster with two EC2 G4dn instances as core nodes in one command line:

aws emr create-cluster --termination-protected --applications Name=Hadoop Name=Spark Name=Livy --tags 'name=nvidia-gpu-spark' --ec2-attributes '{"KeyName":"your-key-name","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"your-subnet-ID","EmrManagedSlaveSecurityGroup":"your-EMR-worker-security-group-ID","EmrManagedMasterSecurityGroup":"your-EMR-leader-security-group-ID"}' --release-label emr-5.30.0 --log-uri 's3n://your-s3-bucket/elasticmapreduce/' --instance-groups '[{"InstanceCount":2,"InstanceGroupType":"CORE","InstanceType":"g4dn.xlarge","Name":"Core - 2"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","Name":"Master - 1"}]' --configurations '[{"Classification":"spark-defaults","Properties":{"spark.dynamicAllocation.enabled":"false"}}]' --auto-scaling-role EMR_AutoScaling_DefaultRole --ebs-root-volume-size 10 --service-role EMR_DefaultRole --enable-debugging --name 'nvidia-gpu-spark' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region us-east-1

Replace the values for KeyName, SubnetId, EmrManagedSlaveSecurityGroup, EmrManagedMasterSecurityGroup, and S3 bucket with your logs, name, and Region.

You can also use the AWS Management Console to configure the EMR cluster. For instructions, see Get Started with XGBoost4J-Spark on AWS EMR on GitHub.

Running the XGBoost library and Apache Spark examples on your EMR notebook

When the cluster is ready, go to the Amazon EMR notebooks, choose the notebook instance, and choose Open in Jupyter. Start the notebook instance if it’s not running.

Download the example notebook EMR_Mortgage_Example_G4dn.ipynb from Rapids/spark-examples on GitHub and upload it to the EMR notebook instance. For more Scala example code, see the GitHub repo.

Enter the mortgage example notebook to run the GPU-accelerated Apache Spark code on the open-source XGBoost library towards a small mortgage dataset on Amazon S3. If the notebook kernel isn’t set to Apache Spark, choose Kernel, Change Kernel to set Apache Spark as the kernel. The EMR notebook is now talking to the EMR cluster running Apache Spark using Apache Livy. The following diagram illustrates this architecture.

You can customize your Apache Spark job configurations, such as number of executors, number of cores, and executor memory base on your GPU cluster. Each GPU maps to one executor.

Viewing the training and transform results and benchmark

In the EMR notebook, you can view the Apache Spark job progress and the benchmark results. You can save the trained model to a local folder or S3 bucket. The following screenshot shows our job progress.

The following code shows the benchmark results:

------ Training ------
==> Benchmark: Elapsed time for [train]: 37.881s
model: ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel = xgbc_d5a83fea59b5
 
------ Transforming ------
==> Benchmark: Elapsed time for [transform]: 0.115s
…
------Accuracy of Evaluation------
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_daa2cccd95a4
accuracy: Double = 0.9875007589803053
0.9875007589803053
…

Launching example applications with the spark-submit script

Alternatively, you can SSH into the EMR primary node and use the Apache Spark spark-submit script to launch the application directly on a cluster. Follow the walkthrough on the GitHub repo to use Apache Maven to create a jar containing the example code for the mortgage dataset and its dependencies, and launch the application using the Apache Spark spark-submit script CLI.

Cleaning up

To avoid ongoing charges for resources you created for this benchmarking, delete all the resources you created. This includes the data on the S3 bucket, EMR cluster, and EMR notebook.

Conclusion

Get started on Amazon EMR today, or reach out to AWS for help if you want to migrate your big data and applications to AWS. You can also learn more about Nvidia’s contribution to RAPIDS.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the author

Kong Zhao is a Solution Architect at Nvidia. He provides technical thought leadership, architecture guidance, and conducts PoC for both Nvidia and AWS customers to meet their AI and HPC requirement by developing, optimizing, and deploying GPU-accelerated solutions in the cloud. His core areas of focus are GPU-related cloud architecture, HPC, machine learning, and analytics. Previously, Kong worked as a Senior Solution Architect in AWS, an Architect in Equinix for Cloud Exchange, and a Product Manager in Cisco.

Control data access and permissions with AWS Lake Formation and Amazon EMR

Post Syndicated from Nabil Ezzarhouni original https://aws.amazon.com/blogs/big-data/control-data-access-and-permissions-with-aws-lake-formation-and-amazon-emr/

What if you could control the access to your data lake, centrally? Would it be more convenient to share specific data securely with internal and external customers?

With AWS Lake Formation and its integration with Amazon EMR, you can easily perform these administrative tasks.

This post goes through a use case and reviews the steps to control the data access and permissions of your existing data lake. Before you get started, review the following:

Use case

Let’s assume your company has augmented their on-premises infrastructure with AWS. Because your data center has a fixed capacity for the company’s analytics and machine learning needs, you’re using the cloud for additional compute and storage. AWS Direct Connect links your data center to the closest AWS Region. Because your Active Directory server is still on premises, you use an Active Directory Connector to federate user authentication. For cost optimization and agility, you built a data lake to centralize business data to Amazon Simple Storage Service (Amazon S3) using Lake Formation.

Your organization is looking to improve their data analytics function and has hired external data analysis consultants. Based on the least privilege best practice, you want to only share the relevant data with the external consultants, without any personally identifiable information (PII) such as name, date of birth, and Social Security number.

You’re concerned about third-party data access in the cloud. You want a solution where the data is safe, controlled, audited, encrypted, and secure. You want to restrict access at the column level, so the PII isn’t available to the external consultants.

In addition, you want to restrict the consultants’ access to your cloud resource. They should only access the EMR cluster with a specific AWS Identity and Access Management (IAM) role.

The following diagram illustrates the architecture for this use case.

The external consultants authenticate the AWS resources against the on-premises, SAML-compatible directory service, federated with IAM. You control the access to the cloud resources from your on-premises identity provider (IdP). For more information, see About SAML 2.0-based Federation.

Lake Formation manages data access. The data lake administrator defines the data access at the column level for any principal defined in Lake Formation. The principal can be a user that you federate with the on-premises directory service. For this use case, the principal is a specific role for the external consultants that allows them to only access the EMR cluster.

Because you defined fine-grained Lake Formation permissions to your data, the external consultants can’t access employee first names, last names, and Social Security numbers. They only have access to the non-PII columns. This measure is called pseudonymization, in which you can’t identify the PII without additional data. Pseudonymization has the following benefits:

  • Centralized authentication and user and data access governance
  • Less management overhead and security improvement because there is a canonical source of authentication
  • The consultant uses an IAM role allowing only Lake Formation data access and the instance profile role associated with the EMR cluster

You don’t need to manage access to Amazon S3; access is centralized from Lake Formation. If you want to share the data lake data with more users, you only need to define it one time in Lake Formation.

In the next section, you see how to implement this solution.

Creating a data lake

Before you get started, create a data lake. You can control the access to the data lake with policies and permissions, and define permissions at the database, table, or column level.

When creating the database, complete the following steps to enable fine-grained access control with Lake Formation permissions.

  1. On the Lake Formation console, under Data catalog, choose Databases.
  2. Choose Create database.

  1. For Name, enter a name for your database.
  2. Deselect Use only IAM access control for new tables in this database.

 This step enables fine-grained access control with Lake Formation permissions.

  1. Choose Create database.

Adjusting the permissions

  1. Under Permissions, choose Data permissions.
  2. Choose Grant.

  1. For IAM users and roles, choose which specific IAM users and roles can access the data lake.

These accounts can be federated with your SAML 2.0 compatible IdP with AWS, so you can control the access from your on-premises Active Directory.

You can also define Active Directory users and groups directly, but only in the context of Amazon EMR integration with Lake Formation. For more information, see Amazon EMR Integration With AWS Lake Formation Is Now In Beta, Supporting Database, Table, and Column-level access controls for Apache Spark and Amazon EMR Components.

  1. For Database, choose your database.
  2. For Table, choose your table.
  3. For Columns, choose Exclude columns.
  4. For Exclude columns, choose which columns to exclude (for this use case, first name, last name, and ssn).
  5. For Table permissions, select Select.

This feature allows you to control column-level access for an IAM user or a role.

  1. Choose Grant.

Integrating Lake Formation with Amazon EMR

From Amazon EMR version 5.26 onwards, you can launch an EMR cluster that integrates with Lake Formation. Amazon EMR can only access specific columns or data based on the permissions defined in Lake Formation. For more information, see Architecture of SAML-Enabled Single Sign-On and Fine-Grained Access Control.

A key requirement is to have an external IdP, such as Microsoft Active Directory, Okta, or Auth0, defined for the EMR cluster. The benefit is that you can use your existing enterprise directory, compliance, and audits to govern the data access to Lake Formation. For instructions, see Integrating Amazon EMR with AWS Lake Formation (Beta).

When integration is complete, the consultants can consume the data from Amazon EMR via Zeppelin or Apache Spark, without accessing the PII.

Additional security measures

As with most AWS services, Amazon EMR and Lake Formation use IAM features. With IAM, you can define IAM users or roles to grant access to other AWS services and data.

On top of this security model, AWS CloudTrail tracks all AWS API requests. You can use this audit trail for governance and compliance purposes because you can trace all usage of AWS resources.

To protect the data, you can use in-transit and at-rest encryption. You can also define specific security configurations to apply to your EMR cluster. For more information, see Encryption Options.

For additional security services, you can use Amazon GuardDuty (a threat detection service) and Amazon Macie (data discovery and protection at scale). For more information, see Security, Identity, and Compliance on AWS.

Conclusion

Data usage has grown rapidly in terms of formats and sizes. Managing different technologies (relational databases, NoSQL, Graph, flat files, and more) adds management overhead and costs. Due to constant competition, as data size soars with compute and storage needs, organizations require more agility and speed.

In addition, you may be sharing business needs data with more internal and external customers. This causes increasing complexity in data governance and additional burdens in permission and access management.

In this post, we explained how you can control the access and permission of your data lake. AWS allows you to support your organization’s growth and security because data access is controlled, encrypted, and audited. This is possible in the context of a hybrid infrastructure with an on-premises IdP controlling access to AWS and other resources.

You can start your data lake project and share data with different roles in your organization in a scalable and secure way. It takes minutes instead of months, without mobilizing an important engineering footprint.

As a next step, you can use the curated data as the foundation for machine learning projects with AWS Machine Learning tools.

 


About the Authors

Nabil Ezzarhouni is a Partner Solutions Architect with AWS. His interests are DevOps, Machine Learning and his dog Bandit (he’s a born-again pet lover).

 

 

 

 

 

Pawan Matta is a Solutions Architect with AWS. Pawan enjoys working with customers and helping them in area of Storage and Migration. During his free time, Pawan loves spending time watching cricket and playing video games with friends

 

 

 

Introducing Amazon EMR Managed Scaling – Automatically Resize Clusters to Lower Cost

Post Syndicated from Abhishek Sinha original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-managed-scaling-automatically-resize-clusters-to-lower-cost/

AWS is happy to announce the release of Amazon EMR Managed Scaling—a new feature that automatically resizes your cluster for best performance at the lowest possible cost. With EMR Managed Scaling you specify the minimum and maximum compute limits for your clusters and Amazon EMR automatically resizes them for best performance and resource utilization. EMR Managed Scaling continuously samples key metrics associated with the workloads running on clusters. EMR Managed Scaling is supported for Apache Spark, Apache Hive and YARN-based workloads on Amazon EMR versions 5.30.1 and above.*

Use cases and benefits

Before EMR Managed Scaling, you had to predict workload patterns in advance or write custom automatic scaling rules that depended on in-depth understanding of the application framework (for example, Apache Spark or Apache Hive). Predicting your workload or writing custom rules can be difficult and error-prone. Incorrect sizing of cluster resources can often lead to either missed SLA or unpredictable performance, or underutilization of resources and cost overruns.

EMR Managed Scaling solves this problem by automatically sizing cluster resources based on the workload for best performance and lowest cost. You don’t need to predict your workload patterns or write custom logic to scale your cluster. EMR Managed Scaling constantly monitors key metrics based on workload and optimizes the cluster size for best resource utilization. Amazon EMR can scale the cluster up during peaks and scale it down gracefully during idle periods, reducing your costs and optimizing cluster capacity for best performance. With a few clicks, you can set the compute limits for your cluster and Amazon EMR manages the rest. With EMR Managed Scaling, Amazon EMR also emits high-resolution metrics at 1-minute granularity, allowing you to visualize how Amazon EMR Managed Scaling is reacting to your incoming workload. For more information, see Understanding Managed Scaling Metrics.

To illustrate by example, we configured an EMR cluster with EMR Managed Scaling to scale between 1 to 20 nodes, with 16 VCPU per node. We submitted multiple parallel Spark jobs (from the TPC-DS benchmark) to the cluster at 30-minute intervals. We set EMR cluster settings to default and turned on EMR Managed Scaling. The following Amazon CloudWatch dashboard shows how EMR Managed Scaling sized the cluster to the cluster load, scaling it up during peaks and scaling down when idle. Enabling EMR Managed Scaling in this use case lowered costs by 60% compared to a fixed size cluster.

EMR Managed Scaling vs. Auto Scaling

Amazon EMR offers two ways to scale your clusters: you can either use Amazon EMR’s support for Auto Scaling released in 2016, or EMR Managed Scaling. If you’re running Apache Spark, Apache Hive, or YARN-based applications and want a completely managed experience, we recommend using EMR Managed Scaling. If you need to define custom rules involving custom metrics for applications running in the cluster, you should use Auto Scaling. The following table summarizes the differences between these methods.

EMR Managed ScalingAuto Scaling
Scaling rules managementAmazon EMR managed algorithm that constantly monitors key metrics based on the workloads and optimizes the cluster size for best resource utilization.You can choose custom metrics and apply scaling rules.
Cluster types supportedInstance groups and instance fleetsInstance groups only
Configuration granularityCluster level minimum / maximum constraintsInstance group level configuration
Minimum Amazon EMR release version5.30+4.0+
Metric collection frequency to aid scaling decisionsEvery 1–5 secondsEvery 5 minutes
Evaluation frequencyEvery 5–10 secondsEvery 5 minutes
Scaling algorithm

No configuration required. Amazon EMR follows the dynamic scaling strategy and computes the actual cluster’s resource requirement and reaches to the correct scale directly. This happens in both scale-up and scale-down use cases. Amazon EMR automatically detects the need to scale up or down without specific cooldown periods.

 

Auto Scaling allows you to define a fixed count of instances to add or remove in case of condition breach.
Cooldowns between resizesYou can choose to define your own cooldown periods between consecutive resizes
Scaling based on custom metricsYou can choose to define custom application or infrastructure metrics. You can also define custom scaling actions and thresholds.

 

EMR Managed Scaling now supports EMR instance fleets

Spot Instances are spare Amazon Elastic Compute Cloud (Amazon EC2) compute capacity in the AWS Cloud, and are available to you at a savings of up to 90% less compared to On-Demand prices. Amazon EC2 can interrupt Spot Instances with 2 minutes of notification when it needs the capacity back. A common use case is to run data processing workloads with Amazon EMR using Spot Instances, because of the fault-tolerant nature of Spark and other YARN-based workloads. For more information, see Best Practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR.

EMR Managed Scaling also introduces support for Amazon EMR instance fleets. You can seamlessly scale Spot Instances, On-Demand Instances, and instances that are part of a Savings Plan all within the same cluster.

Combining capacity from multiple Spot capacity pools (a combination of EC2 instance type in an Availability Zone) across multiple instance families and sizes decreases the impact on your workload in case Spot capacity is interrupted or unavailable. We call this Spot diversification. Amazon EMR automates this strategy by allowing you to configure instance fleets. If you’re running large-scale data processing workloads with Amazon EMR and your workload is fault-tolerant, the recommended way to use Spot Instances is to scale using the task fleet with Spot Instances, and use On-Demand Instances in the core fleet for the non-fault tolerant part of the cluster, such as the Spark Driver or HDFS nodes.

With EMR Managed Scaling, this configuration pattern is supported automatically, and when Amazon EMR detects the need for a scale-out activity, it chooses to scale the task fleet, effectively choosing the cheaper scale-out option for increasing compute capacity in the cluster. With instance fleets, you specify target capacities for On-Demand Instances and Spot Instances within the cluster. You can specify up to five EC2 instance types per fleet for Amazon EMR to use when fulfilling the targets. You can also select multiple subnets for different Availability Zones. When the cluster launches, Amazon EMR provisions instances until the targets are fulfilled. When Amazon EMR launches the cluster, it looks across provided subnets, Availability Zones, instance families, and sizes to provision the cluster capacity that has lowest chance of getting interrupted, for the lowest cost. With EMR Managed Scaling, you can also resize instance fleets and set On-Demand and Spot limits to each instance fleet. For more information, see Configure Instance Fleets.

Configuring EMR Managed Scaling

Configuring EMR Managed Scaling is simple. Just enable EMR Managed Scaling and set the minimum and maximum limits on the number of instances or VCPUs (in case of instance groups) or capacity units (in case of instance fleets) for the cluster nodes. You can enable Managed Scaling on a running cluster or at the time of provisioning the cluster. For more information, see Using EMR Managed Scaling in Amazon EMR.

Node allocation strategy

EMR Managed Scaling lets you control the minimum and maximum capacity that the cluster can scale up to. The parameters that let you control these are:

  • MinimumCapacityUnits – Lower boundary of the size of the cluster
  • MaximumCapacityUnits – Upper boundary of the size of the cluster
  • MaximumCoreCapacityUnits – Upper boundary of core node group
  • MaximumOnDemandCapacityUnits – Upper boundary of capacity to be provisioned from the On-Demand market

The last two parameters let you choose if you want to scale core nodes or task nodes (MaximumCoreCapacityUnits) or use instances from the On-Demand or Spot market (MaximumOnDemandCapacityUnits). You can use these parameters to split capacity between core and task and On-Demand or Spot. A simpler way to think about them is to think about sliders on a line.

Scaling only core nodes

In this use case, your cluster has a minimum of 5 nodes and maximum of 100 nodes. By setting maximum core nodes to maximum capacity and maximum On-Demand nodes to maximum, you’re instructing EMR Managed Scaling to only scale core nodes and only scale them On-Demand. The following diagram illustrates this configuration.

Scaling only task nodes (On-Demand)

If you change the maximum core to the minimum value, you would only scale the nodes using task instances. In this use case, the cluster only scales task nodes and only On-Demand. The following diagram illustrates this configuration.

Scaling only task nodes (Spot Instances)

If you change the maximum On-Demand capacity to minimum, you can scale the cluster between the minimum and maximum capacity using Spot nodes. Because the maximum core is also set to the minimum size of the cluster, the scaling only happens using task nodes. The following diagram illustrates this configuration.

Best Practice: Scaling core nodes On-Demand and task nodes on Spot

As a best practice, it’s recommended to use core nodes (because they have HDFS) On-Demand and task nodes on Spot Instances. Because core nodes contain HDFS, a sudden loss of a node can lead to degraded job performance or loss of data. The following diagram shows that the cluster scales up core nodes to maximum core capacity using the On-Demand market and scales the rest using task nodes on the Spot market.

Conclusion

This post discussed EMR Managed Scaling, which automatically resizes your cluster for best performance at the lowest possible cost. For more information, see Using EMR Managed Scaling in Amazon EMR, or view the EMR Managed Scaling demo:

*We do not support EMR Managed Scaling on EMR 6.0 but will support in subsequent releases 6.1+

 


About the Authors

Abhishek Sinha is a Principal Product Manager at Amazon Web Services.

 

 

 

 

 

Joseph Marques is a principal engineer for EMR at Amazon Web Services.

 

 

 

 

 

Srinivas Addanki is a Software Development Manager at Amazon Web Services.

 

 

 

 

 

Vishal Vyas is a Software Development Engineer at Amazon Web Services.

Access web interfaces securely on Amazon EMR launched in a private subnet using an Application Load Balancer

Post Syndicated from Hitesh Parikh original https://aws.amazon.com/blogs/big-data/access-web-interfaces-securely-on-amazon-emr-launched-in-a-private-subnet-using-an-application-load-balancer/

Amazon EMR web interfaces are hosted on the master node of an EMR cluster. When you launch an EMR cluster in a private subnet, the EMR master node doesn’t have a public DNS record. The web interfaces hosted in a private subnet aren’t easily accessible outside the subnet. You can use an Application Load Balancer (ALB) as an HTTPS proxy to access EMR web interfaces over the internet without requiring SSH tunneling through a bastion host. This approach greatly simplifies accessing EMR web interfaces.

This post outlines how to use an ALB to securely access EMR web interfaces over the internet for an EMR cluster launched in a private subnet.

Solution overview

Nodes that are launched within a VPC subnet can’t communicate outside of the subnet unless one of the following exists:

  • A network route from the subnet to other subnets in its VPC
  • Subnets in other VPCs using VPC Peering
  • A route through AWS Direct Connect to the subnet
  • A route to an internet gateway
  • A route to the subnet from a VPN connection

If you want the highest level of security to an EMR cluster, you should place the cluster in a subnet with a minimal number of routes to the cluster. This makes it more difficult to access web interfaces running on the master node of an EMR cluster launched in a private subnet.

This solution uses an internet-facing ALB that acts as an HTTPS proxy to web interface endpoints on the EMR master node. The ALB listens on HTTPS ports for incoming web interface access requests and routes requests to the configured ALB targets that point to the web interface endpoints on the EMR master node.

The following diagram shows the network flow from the client to the EMR master node through Amazon Route 53 and ALB to access the web interfaces running on the EMR master node in a private subnet.

Securing your endpoints

The solution outlined in this post restricts access to EMR web interfaces for a range of client IP addresses using an ingress security group on ALB. You should further secure the endpoints that are reachable using ALB by having a user authentication mechanism like LDAP or SSO. For more information about Jupyter authentication methods, see Adding Jupyter Notebook Users and Administrators. For more information about Hue, see Configure Hue for LDAP Users. For more information about Hive, see  User and Group Filter Support with LDAP Atn Provider in HiveServer2.

Additionally, it may be a good idea to enable access logs through the ALB. For more information about ALB access logs, see Access Logs for Your Application Load Balancer.

Solution walkthrough

When a client accesses an EMR web interface, the process includes the following sequence of steps:

  • A client submits an EMR web interface request from a web browser (for example, YARN Node Manager).
  • Route 53 resolves the HTTPS request using the record set name sample-emr-web in the hosted zone example.com for the registered domain example.com. Route 53 resolves the request URL to the IP address of the ALB, and routes the request to the ALB.
  • The ALB receives the EMR web interface request on its HTTPS listener and forwards it to the web interface endpoint configured in the load balancer target group. There are multiple HTTPS listener and load balancer target group pairs created, one pair for each EMR web interface endpoint.
  • The ALB ingress security group controls what other VPCs or corporate networks can access the ALB.
  • The EMR ingress security group on the master node allows inbound traffic from the ALB to the EMR master node.

The AWS CloudFormation template for this solution creates the following AWS objects in the solution stack:

  • An ALB.
  • HTTPS listener and target pairs; one pair for each EMR web application. It supports Ganglia, YARN Resource Manager, JupyterHub, Livy, and Hue EMR web applications. You can modify the CloudFormation stack to add ALB HTTPS listeners and targets for any other EMR web applications. The following AWS CloudFormation code example shows the code for the ALB, HTTPS listener, and load balancer target:
    # EMR ALB Resources
      # ALB, Target Groups, Listeners and R53 RecordSet
      SampleEmrApplicationLoadBalancer:
        Type: AWS::ElasticLoadBalancingV2::LoadBalancer
        Properties:
          IpAddressType: ipv4
          Name: sample-emr-alb
          Scheme: internet-facing
          SecurityGroups:
            - !Ref AlbIngressSecurityGroup
    
          Subnets:
            - !Ref ElbSubnet1
            - !Ref ElbSubnet2
          LoadBalancerAttributes:
            -
              Key: deletion_protection.enabled
              Value: false
          Tags:
            -
              Key: businessunit
              Value: heloc
            -
              Key: environment
              Value: !Ref EnvironmentName
            -
              Key: name
              Value: sample-emr-alb
    
      ALBHttpGangliaTargetGroup:
        Type: 'AWS::ElasticLoadBalancingV2::TargetGroup'
        Properties:
          HealthCheckIntervalSeconds: 30
          HealthCheckTimeoutSeconds: 5
          HealthyThresholdCount: 3
          UnhealthyThresholdCount: 5
          HealthCheckPath: '/ganglia'
          Matcher:
            HttpCode: 200-399
          Name: sample-emr-ganglia-tgt
          Port: 80
          Protocol: HTTP
          VpcId: !Ref VpcID
          TargetType: instance
          Targets:
           - Id: !Ref EMRMasterEC2NodeId
             Port: 80
          Tags:
            -
              Key: Name
              Value: sample-emr-ganglia-tgt
            -
              Key: LoadBalancer
              Value: !Ref SampleEmrApplicationLoadBalancer
    
      ALBHttpsGangliaListener:
        Type: 'AWS::ElasticLoadBalancingV2::Listener'
        Properties:
          DefaultActions:
            - Type: forward
              TargetGroupArn: !Ref ALBHttpGangliaTargetGroup
          LoadBalancerArn: !Ref SampleEmrApplicationLoadBalancer
          Certificates:
            - CertificateArn: !Ref SSLCertificateArn
          Port: 443
          Protocol: HTTPS

  • The AWS::Route53::RecordSet object (sample-emr-web) in the hosted zone (example.com) for a given registered domain (example.com). The hosted zone and record set name are parameters on the CloudFormation template.
  • An Ingress Security Group attached to the ALB that controls what CIDR blocks can access the ALB. You can modify the template to customize the security group to meet your requirements.

For more information and to download the CloudFormation stack, see the GitHub repo.

Prerequisites

To follow along with this walkthrough, you need the following:

  • An AWS account.
  • A VPC with private and public subnets. An ALB requires at least two Availability Zones, with one public subnet in each Availability Zone. For the sample code to create a basic VPC with private and public subnets, see the GitHub repo.
  • An EMR cluster launched in a private subnet.
  • Web applications such as Ganglia, Livy, Jupyter, and Hue installed on the EMR cluster when the cluster is launched.
  • A hosted zone entry in Route 53 for your domain. If you don’t have a domain, you can register a new one in Route 53. There is a non-refundable cost associated with registering a new domain. For more information, see Amazon Route 53 Pricing.
  • A public certificate to access HTTPS endpoints in the domain. You can request a public certificate if you don’t have one.

Creating an ALB as an HTTPS proxy

To create an ALB as an HTTPS proxy in front of an EMR cluster, you first launch the CloudFormation stack.

  1. Log in to your AWS account.
  2. Select the Region where you’re running your EMR cluster.
  3. To launch your CloudFormation stack, choose Launch Stack.

  4. Enter your parameter values and follow the screen prompts to create the stack.

The following screenshot shows examples of stack parameters.

  1. Modify the EMR master node security group to allow ingress traffic from the ALB.
  2. Create a Custom TCP rule with port range 80–65535.
  3. Add a source security group that is attached with the ALB.

In the following steps, you add an inbound rule to the security group.

  1. Choose the EMR master node security group.
  2. Choose Security group for master on the EMR cluster Summary tab to open the security group.

  1. Choose Edit inbound rules.

  1. Choose Add Rule.

  1. Add a port range and select the ALB security group as a source security group.
  2. Choose Save rules.

  1. Test the following EMR web interfaces in your browser:
    1. Gangliahttps://sample-emr-web.[web domain]/ganglia/
    2. YARN Resource Managerhttps://sample-emr-web.[web domain]:8088/cluster
    3. JupyterHubhttps://sample-emr-web.[web domain]:9443/hub/login
    4. Huehttps://sample-emr-web.[web domain]:8888/hue/accounts/login
    5. Livyhttps://sample-emr-web.[web domain]:8998/ui

If you don’t get a response from web interface endpoints, disconnect from your VPN connection and test it. Some organizations may block outgoing web requests on ports other than 80.

Sometimes Route 53 DNS record updates propagation to the worldwide network of DNS servers may take longer than it takes under normal conditions. If you don’t get a response from the EMR web interfaces, wait to test for a minute or two after the CloudFormation stack is created.

You can add code to support other EMR web interface endpoints in the CloudFormation template. For more information, see View Web Interfaces Hosted on Amazon EMR Clusters.

Locating the public certificate ARN from AWS Certificate Manager

You can find the public certificate ARN from AWS Certificate Manager (ACM) on the ACM console. When you expand the domain for a given certificate, locate the ARN in the Details section.

Creating a hosted zone from Route 53

To create a hosted zone from Route 53, complete the following steps:

  1. On the Route 53 console, choose Hosted zones.
  2. Choose the hosted zone in your domain.
  3. In the Hosted Zone Details section, copy the entry for Domain Name.

  1. Enter the domain name in the R53 Hosted Zone AWS CloudFormation parameter box.

Cost breakdown

The following AWS Cost Explorer report table shows an example total cost and cost breakdown by services for the time it takes to complete this walkthrough. This cost includes the cost for a minimal EMR cluster created without any data stored at the start of the exercise, and other resources that the CloudFormation template creates.

Cleaning up

To avoid incurring future charges, delete the CloudFormation stack to delete all the resources created.

Conclusion

You can now create an ALB as a HTTPS proxy to access EMR web interfaces securely over the internet, without requiring a bastion host for SSH tunneling. This simplifies securely accessing EMR web interfaces for the EMR launched in a private subnet.

 


About the Authors

Hitesh Parikh is a Cloud Architect with AWS. Hitesh is passionate about partnering with customers on their cloud adoption journey and building innovative and modern cloud native digital solutions on AWS. Outside of work, he loves to spend time with his family, travel, watch movies, and do community service.

 

 

 

James Sun is a Senior Solutions Architect with Amazon Web Services. James has over 15 years of experience in information technology. Prior to AWS, he held several senior technical positions at MapR, HP, NetApp, Yahoo, and EMC. He holds a PhD from Stanford University.

 

 

Monitor Spark streaming applications on Amazon EMR

Post Syndicated from Amir Shenavandeh original https://aws.amazon.com/blogs/big-data/monitor-spark-streaming-applications-on-amazon-emr/

For applications to be enterprise-ready, you need to consider many aspects of the application before moving to a production environment and have operational visibility of your application. You can get that visibility through metrics that measure your application’s health and performance and feed application dashboards and alarms.

In streaming applications, you need to benchmark different stages and tasks in each stage. Spark has provided some interfaces to plug in your probes for real-time monitoring and observation of your applications. SparkListeners is a flexible and powerful tool for both steaming and batch applications. You can combine it with Amazon CloudWatch metrics, dashboards, and alarms for visibility and generate notifications when issues arise or automatically scale clusters and services.

This post demonstrates how to implement a simple SparkListener, monitor and observe Spark streaming applications, and set up some alerts. The post also shows how to use alerts to set up automatic scaling on Amazon EMR clusters, based on your CloudWatch custom metrics.

Monitoring Spark streaming applications

For production use cases, you need to plan ahead to determine the amount of resources your Spark application requires. Real-time applications often have SLAs that they need to meet, such as how long each batch execution can safely run or how long each micro-batch can be delayed. Quite often, in the lifecycle of an application, sudden increases of data in the input stream require more application resources to process and catch up with the influx.

For these use cases, you may be interested in common metrics such as the count of records in each micro-batch, the delay on running scheduled micro-batches, and how long each batch takes to run. For example, in Amazon Kinesis Data Streams, you can monitor the IteratorAge metric. With Apache Kafka as a streaming source, you might monitor consumer lag, such as the delta between the latest offset and the consumer offset. For Kafka, there are various open-source tools for this purpose.

You can react in real time or raise alerts based on environment changes by provisioning more resources or reducing unused resources for cost optimization.

Different methods to monitor Spark streaming applications are already available. A very efficient, out-of-the-box feature of Spark is the Spark metrics system. Additionally, Spark can report metrics to various sinks including HTTP, JMX, and CSV files.

You can also monitor and record application metrics from within the application by emitting logs. This requires running count().print(), printing metrics in maps and reading the data that may cause delays, adding to the application stages, or performing unwanted shuffles that may be useful for testing but often prove to be expensive as a long-term solution.

This post discusses another method: using the SparkStreaming interface. The following screenshot shows some available metrics on the Spark UI’s Streaming tab.

Apache Spark listeners

Spark internally relies on SparkListeners for communication between its internal components in an event-based fashion. Also, Spark scheduler emits events for SparkListeners whenever the stage of each task changes. SparkListeners listen to the events that are coming from Spark’s DAGScheduler, which is the heart of the Spark execution engine. You can use custom Spark listeners to intercept SparkScheduler events so you know when a task or stage starts and finishes.

The Spark Developer API provides eight methods in the SparkListener trait called on different SparkEvents, mainly at start and stop, failure, completion, or submission of receivers, batches, and output operation. You can execute an application logic at each event by implementing these methods. For more information, see StreamingListener.scala on GitHub.

To register your custom Spark listener, set spark.extraListeners when launching the application, or programmatically by calling addSparkListener when setting up SparkContext in your application.

SparkStreaming micro-batches

By default, SparkStreaming has a micro-batch execution model. Spark starts a job in intervals on a continuous stream. Each micro-batch contains stages, and stages have tasks. Stages are based on the DAG and the operation that the application code defines, and the number of tasks in each stage is based on the number of DStream partitions.

At the start of a streaming application, the receivers are assigned to executors, in a round-robin fashion, as long-running tasks.

Receivers create blocks of data based on blockInterval. The received blocks are distributed by the BlockManager of the executors, and the network input tracker running on the driver is informed about the block locations for further processing.

On the driver, an RDD is created for the blocks in each batchInterval. Each block translates to a partition of the RDD and a task is scheduled to process each partition.

The following diagram illustrates this architecture.

Creating a custom SparkListener and sending metrics to CloudWatch

You can rely on CloudWatch custom metrics to react or raise alarms based on the custom Spark metrics you collect from a custom Spark listener.

You can implement your custom streaming listeners by directly implementing the SparkListener trait if writing in Scala, or its equivalent Java interface or PySpark Python wrapper pyspark.streaming.listener.

For this post, you only override onBatchCompleted and onReceiverError because you’re only collecting metrics about micro-batches.

From OnBatchCompleted, you submit the following metrics:

  • Heartbeat – A numeric 1 (one) whenever a batch completes so you can sum or average time periods to see how many micro-batches ran
  • Records – The number of records per batch
  • Scheduling delay – The delay from when the batch was scheduled to run until when it actually ran
  • Processing delay – How long the batch execution took
  • Total delay – The sum of the processing delay and scheduling delay

From OnRecieverError, you submit a numeric 1 (one), whenever a receiver fails. See the following code:

/**
    * This method executes when a Spark Streaming batch completes.
    *
    * @param batchCompleted Class having information on the completed batch
    */

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    log.info("CloudWatch Streaming Listener, onBatchCompleted:" + appName)

    // write performance metrics to CloutWatch Metrics
    writeBatchStatsToCloudWatch(batchCompleted)

  }
  /**
  * This method executes when a Spark Streaming batch completes.
  *
  * @param receiverError Class having information on the reciever Errors
  */

  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { 
    log.warn("CloudWatch Streaming Listener, onReceiverError:" + appName)

    writeRecieverStatsToCloudWatch(receiverError)
  }

For the full source code of this example for Scala implementation and a sample Spark Kinesis streaming application, see the AWSLabs GitHub repository.

To register your custom listener, make an instance of the custom listener object and pass the object to the streaming context, in the driver code, using the addStreamingListener method. See the following code:

val conf = new SparkConf().setAppName(appName)
val batchInterval = Milliseconds(1000)
val ssc = new StreamingContext(conf, batchInterval)
val cwListener = new CloudWatchSparkListener(appName)

ssc.addStreamingListener(cwListener)

When you run the application, you can find your metrics in CloudWatch in the same account as the one the EMR cluster is running in. See the following screenshot.

Using the sample code

This post provides an AWS CloudFormation template, which demonstrates the code. Download the emrtemplate.json file from the GitHub repo. The template launches an EMR cluster in a public subnet and a Kinesis data stream with three shards with the required default AWS Identity and Access Management (IAM) roles. The sample Spark Kinesis streaming application is a simple word count that an Amazon EMR step script compiles and packages with the sample custom StreamListener.

Using application alarms in CloudWatch

The alerts you need to set up mainly depend on the SLA of your application. As a general rule, you don’t want your batches to take longer than the micro-batch intervals because it causes the scheduled batches to queue and you start falling behind the input stream. Also, if the rate of your receivers reading from the stream is more than what you can process in the batches due to a surge, the read records can spill to disk and cause more delays to shuffle across to other executors. You can set up a CloudWatch alarm to notify you when a processing delay is approaching your application’s batchInterval. For instructions on setting up an alarm, see Using Amazon CloudWatch Alarms.

The CloudFormation template for this post has two sample alarms to monitor. One is based on the anomaly detection band on the processingDelays metric; the second is based on a threshold on a math expression that calculates schedulingDelay ratio to totalDelay or (schedulingDelay / totalDelay) * 100 .

Scaling streaming applications

In terms of scaling, as the amount of data grows, you have more DStream partitions, based on the blockIntervals of the streaming application. In addition to the batches that should catch up with the received records and finish within batch intervals, the receivers should also keep up with the influx of records. The source streams should provide enough bandwidth for the receivers to read fast enough from the stream, and there should be enough receivers reading at the right rate to consume the records from the source.

If your DStreams are backed by receivers and WALs, you need to consider the number of receivers in advance. When you launch the application, the number of receivers may not change without restarting the application.

When a SparkStreaming application starts, by default, the driver schedules the receivers in a round-robin fashion on the available executors unless a preferred location is defined for receivers. When all executors are allocated with receivers, the rest of the required receivers are scheduled on the executors to balance the number of receivers on each executor, and the receivers stay up in the executors as long-running tasks. For more information about scheduling receivers on executors, see ReceiverSchedulingPolicy.scala on GitHub and SPARK-8882 on the Spark issues website.

Sometimes you may want to slow down receivers because you want less data in micro-batches and don’t want to surpass your micro-batch intervals. To slow down receivers, in case you have streaming sources that can hold on to the records when the batches can’t run fast enough to keep up with the surge of records, you can enable the BackPressure feature to adapt to the input rate from receivers. To do so, set spark.streaming.backpressure.enabled to true.

Another factor you can consider is the dynamic allocation for streaming applications. By default, spark.dynamicAllocation is enabled on Amazon EMR, which is mutually exclusive to spark.streaming.dynamicAllocation. If you want the driver to request for more executors for your DStream tasks, you need to set spark.dynamicAllocation.enabled to false and spark.streaming.dynamicAllocation.enabled to true. Spark periodically looks into the average batch duration. If it’s above the scale-up ratio, it requests for more executors. If it’s below the scale-down ratio, it releases the idle executors, preferably those that aren’t running any receivers. For more information, see ExecutorAllocationManager.scala on GitHub and the Spark Streaming Programming Guide.

The ExecutorAllocationManager is already looking into the batch execution average time and requests more executors based on the scale-up and scale-down ratios. Because of this, you can set up automatic scaling in Amazon EMR, preferably on tasks instance groups, to add and remove nodes based on the ContainerPendingRatio and assign PreferredLocation for receivers to core nodes. The example code for this post provides a custom KinesisInputDStream, which allows assigning the preferred location for every receiver you request. It’s basically a function that returns a hostname to preferably place the receiver. The GitHub repo also has a sample application that uses the customKinesisInputDStream and customKinesisReciever, which allows requesting a preferredLocation for receivers.

At scale-down, Amazon EMR nominates the nodes with the fewest containers running for decommissioning in the task instance group.

For more information about setting up automatic scaling, see Using Automatic Scaling with a Custom Policy for Instance Groups. The example code contains a threshold on schedulingDelay. As a general rule, you should base the threshold on the batchIntervals and processingDelay. A growth in schedulingDelay usually means a lack of resources to schedule a task.

The following table summarizes the configuration attributes to tune when you launch your Spark streaming job.

Configuration AttributeDefault
spark.streaming.backpressure.enabledFalse
spark.streaming.backpressure.pid.proportional1.0
spark.streaming.backpressure.pid.integral0.2
spark.streaming.backpressure.pid.derived0.0
spark.streaming.backpressure.pid.minRate100
spark.dynamicAllocation.enabledTrue
spark.streaming.dynamicAllocation.enabledFalse
spark.streaming.dynamicAllocation.scalingInterval60 Seconds
spark.streaming.dynamicAllocation.minExecutorsmax(1,numReceivers)
spark.streaming.dynamicAllocation.maxExecutorsInteger.MAX_VALUE

Monitoring structured streaming with a listener

Structured streaming still processed records in micro-batches and triggers queries when there is data from receivers. You can monitor these queries using another listener interface, StreamingQueryListener. This post provides a sample listener for structured streaming on Kafka, with a sample application to run. For more information, see CloudWatchQueryListener.scala GitHub. The following image is a snapshot of few CloudWatch custom metrics the custom StreamingQueryListerer will collect.

Scaling down your EMR cluster

When you launch a Spark streaming application, Spark evenly schedules the receivers on all available executors at the start of the application. When an EMR cluster is set to scale down, Amazon EMR nominates the nodes running fewer tasks in the instance group with an automatic scaling rule. Although Spark receivers are long-running tasks, Amazon EMR waits for yarn.resourcemanager.decommissioning.timeout, or when the NodeManagers are decommissioned, to gracefully terminate and shrink the nodes. You’re always at risk of losing a running executor with a receiver. You should always consider enough Spark block replication and CheckPointing for the DStreams and ideally define a PreferedLocation so you don’t risk losing receivers.

Metrics pricing

In general, Amazon EMR metrics don’t incur CloudWatch costs. However, custom metrics incur charges based on CloudWatch metrics pricing. For more information, see Amazon CloudWatch pricing. Additionally, Spark Kinesis Streaming relies on the Kinesis Client Library, and it publishes custom CloudWatch metrics that also incur charges based on CloudWatch metrics pricing. For more information, see Monitoring the Kinesis Client Library with Amazon CloudWatch.

Conclusion

Monitoring and tuning Spark streaming and real-time applications is challenging, and you must react to environment changes in real time. You also need to monitor your source streams and job outputs to get a full picture. Spark is a very flexible and rich framework that provides multiple options for monitoring jobs. This post looked into an efficient way to monitor the performance of Spark streaming micro-batches using SparkListeners and integrate the extracted metrics with CloudWatch metrics.

 


About the Author

Amir Shenavandeh is a Hadoop systems engineer with AWS. He helps customers with architectural guidance and technical support using open-source applications, develops and advances the applications of the Hadoop ecosystem and works with the open source community. 

 

 

How Drop used the Amazon EMR runtime for Apache Spark to halve costs and get results 5.4 times faster

Post Syndicated from Michael Chau original https://aws.amazon.com/blogs/big-data/how-drop-used-the-amazon-emr-runtime-for-apache-spark-to-halve-costs-and-get-results-5-4-times-faster/

This is a guest post by Michael Chau, software engineer with Drop, and Leonardo Gomez, AWS big data specialist solutions architect. In their own words, “Drop is on a mission to level up consumer lives, one reward at a time. Through our personalized commerce platform, we intelligently surface the right brands, at the right time, to make our members’ everyday better than it was before. Powered by machine learning, we match consumers with over 200+ partner brands to satisfy two main goals: to earn points from their purchases and redeem them for instant rewards. Calling Toronto home but operating under a global mindset, Drop is building the next-level experience for our 3 million+ members across North America. Learn more by visiting www.joindrop.com.”

Overview

At Drop, our data lake infrastructure plays a foundational role in enabling better data-informed product and business decisions. A critical feature is its ability to process vast amounts of raw data and produce reconciled datasets that follow our data lake’s standardized file format and partitioning structure. Our business intelligence, experimentation analytics, and machine learning (ML) systems use these transformed datasets directly.

This post details how we designed and implemented our data lake’s batch ETL pipeline to use Amazon EMR, and the numerous ways we iterated on its architecture to reduce Apache Spark runtimes from hours to minutes and save over 50% on operational costs.

Building the pipeline

Drop’s data lake serves as the center and source of truth for the company’s entire data infrastructure upon which our downstream business intelligence, experimentation analytics, and ML systems critically rely. Our data lake’s goal is to ingest vast amounts of raw data from various sources and generate reliable and reconciled datasets that our downstream systems can access via Amazon Simple Storage Service (Amazon S3). To accomplish this, we architected our data lake’s batch ETL pipeline to follow the Lambda architecture processing model and used a combination of Apache Spark and Amazon EMR to transform the raw ingested data that lands into our Amazon S3 lake into reconciled columnar datasets. When designing and implementing this pipeline, we adopted the following core guiding principles and considerations:

  • Keep our tech stack simple
  • Use infrastructure as code
  • Work with transient resources

Keeping our tech stack simple

We aimed to keep our tech stack simple by using existing and proven AWS technologies and only adopting services that would drive substantial impact. Drop is primarily an AWS shop, so continuing to use AWS technologies made sense due to our existing experience, the ability to prototype new features quickly, and the inherent integration benefits of using other services within Amazon’s ecosystem.

Another effort to keep our tech stack simple was to limit the overhead and complexity of newly adopted open-source Apache Hadoop technologies. Our engineering team initially had limited experience working with these technologies, so we made a conscious effort to mitigate additional technical overhead to our stack by using proven fully-managed services. We integrated Amazon EMR as part of our idempotent data pipelines because we could use the service when our pipeline operations required it, which eliminated the need to maintain the service when no longer required. This allowed us to reduce the technical overhead of constantly maintaining production clusters.

Using infrastructure as code

We use Apache Airflow to manage and schedule our data lake pipeline operations. Using Airflow enables us to build our entire workflows and infrastructure as code via Airflow Directed Acyclic Graphs (DAGs). This key decision also simplified our engineering development and deployment processes, while providing version control for all aspects of our data infrastructure.

Working with transient resources

To reduce operational costs, we made a key decision to build our data processing pipelines using transient resources. By designing our pipelines to spin up EMR clusters only upon operational demand and terminate upon job completion, we can use Amazon Elastic Compute Cloud (Amazon EC2) Spot and On-Demand Instances without paying for idle resources. This approach has enabled a dramatic reduction in costs associated with idle clusters.

Batch ETL pipeline overview

The following diagram illustrates our batch ETL pipeline architecture.

The pipeline includes the following steps:

  1. A core requirement for the Lambda architecture data model is to have access to both batch and stream data sources of a dataset. The batch ETL pipeline primarily ingests data in batch and stream formats from our Amazon Relational Database Service (Amazon RDS) Postgres database using AWS Database Migration Service (AWS DMS). The pipeline initiates full-migration AWS DMS tasks for comprehensive batch snapshots using Airflow, and ingests stream data using ongoing replication AWS DMS tasks for 1-minute latency change data capture (CDC) files. Data from both batch and stream formats are landed into our Amazon S3 lake, and cataloged in our AWS Glue Data Catalog using AWS Glue crawlers.
  2. The batch ETL pipeline Apache Airflow DAG runs a series of tasks, which begins with uploading our Lambda architecture Spark application to Amazon S3, spinning up an EMR cluster, and ultimately running the Spark application as an Amazon EMR step. Depending on the characteristics of the datasets, the necessary Amazon EMR resources are calibrated to produce the reconciled dataset. To produce the resultant datasets in Apache Parquet format, we must allocate sufficient CPU and memory to our clusters.
  3. Upon completion of all of the Amazon EMR steps, the cluster is terminated, and the newly produced dataset is crawled using AWS Glue crawlers to update the dataset’s metadata within the Data Catalog. The output dataset is now ready for consumer systems to access via Amazon S3 or query using Amazon Athena or Amazon Redshift Spectrum.

Evolving the EMR pipeline

Our engineering team is constantly iterating on the architecture of our batch ETL pipeline in an effort to reduce its runtime duration and operational costs. The following iterations and notable feature enhancements have generated the largest impact to the downstream systems, as well as the end-users that rely on this pipeline.

Migrating from AWS Glue to Amazon EMR

The first iteration of our batch ETL pipeline used AWS Glue to process our Spark applications rather than Amazon EMR due to our limited in-house Hadoop experience in the initial stages. AWS Glue was an appealing first solution due to itsETL as a service” features, and simplified resource allocation. The AWS Glue solution successfully delivered desired results; however, as we gained experience with Hadoop technologies, we recognized a significant opportunity to use Amazon EMR to improve pipeline performance and reduce operational costs.

The migration from AWS Glue to Amazon EMR was seamless and only required EMR cluster configurations and minor modifications to our Spark application that used AWS Glue libraries. Thanks to this, we achieved the following operational benefits:

  • Faster cluster bootstrapping and resource provisioning durations. We found that AWS Glue clusters have a cold start time of 10–12 minutes, whereas EMR clusters have a cold start time of 7–8 minutes.
  • An 80% reduction in cost while using equivalent resources. We swapped the standard AWS Glue worker type at the cost of $0.44 per DPU-Hour, for the resource equivalent m5.xlarge Amazon EMR instance type, which has a Spot Instance price of approximately $0.085 per instnace per hour.

File committers

Our original partitioning strategy attempted to use Spark’s dynamic write partitioning feature to reduce the number of written files per run. See the following code:

sparkSession.conf.set(“spark.sql.sources.partitionOverwriteMode”, “dynamic”)

This strategy didn’t translate well in our pipeline’s performance; we quickly experienced the limitations and considerations of working with cloud object stores. By pivoting our Spark application’s file-writing strategy to completely overwrite an existing directory and using the Amazon EMR EMRFS S3-optimized committer, we could realize critical performance gains. In scenarios where datasets were nearly a terabyte, deployment of this optimized file committer reduced runtime from hours to less than half an hour! It’s worth noting that Amazon EMR 5.30.0 includes an optimization that should help with dynamic partitionOverwriteMode.

Upgrading Amazon EMR versions to 5.28+

Our datasets often exceed billions of rows, which necessitated the comparison and processing of hundreds of thousands of stream files against large batch files. The ability to execute these Spark operations given the input data sources comes at a high cost to query and process the data.

A huge improvement in our pipeline’s overall performance came from using the Amazon EMR runtime for Apache Spark feature introduced in Amazon EMR version 5.28. We saw immediate performance gains by upgrading from Amazon EMR 5.27 to 5.29, without having to make any additional changes to our existing pipeline. Our Spark application total runtime and subsequent Amazon EMR cost was reduced by over 35% using identical resource configurations. These improvements were benchmarked against two of our datasets and averaged against three production runs.

The following table summarizes the dataset and EMR cluster properties.

DatasetTable RowsTotal Batch Files SizeTotal Stream Files SizeCount Stream FilesEC2 Instance TypeCount EC2 Instances
Dataset A~3.5M~0.5GB~0.2GB~100km5.xlarge10
Dataset B~3,500M~500GB~120GB~250kr5.2xlarge30

The following diagrams summarize the Amazon EMR upgrade performance benchmarks and metrics. We calculated these cost metrics with Amazon EMR bootstrapping and resource provisioning time included.

Amazon EMR step concurrency

Early iterations of our pipeline architecture involved creating a new batch ETL pipeline per dataset, as well as a dedicated EMR cluster for that dataset. Cloning new pipelines was a quick and simple way to scale our processing capabilities because our infrastructure was written as code and the operations and resources were self-contained. Although this enabled pipeline generation quickly for our most important datasets, there was ample opportunity for operational improvements.

The following screenshot shows Drop’s batch ETL processing DAG. All of the clusters are named after the Drop engineering team’s pets.

The evolution of the pipeline architecture involved grouping datasets based on its Amazon EMR resource requirements and running them as Spark application Amazon EMR steps in a common EMR cluster concurrently using Amazon EMR step concurrency. Re-architecting our batch ETL pipelines in this manner allowed us to do the following:

  • Remove the EMR cluster bootstrapping and provisioning duration associated within individual EMR clusters per dataset
  • Reduce overall Spark runtimes in aggregate
  • Simplify our Amazon EMR resource configurations with fewer EMR clusters

On average, our clusters required 8–10 minutes to bootstrap and source the Spot Instances requested. By migrating multiple Spark applications to a common EMR cluster, we removed this bottleneck, and ultimately reduced overall runtime and Amazon EMR costs. Amazon EMR step concurrency also allowed us to run multiple applications at the same time against a dramatically reduced set of resources. For our smaller datasets (under 15 million rows), we learned that running Spark applications concurrently with reduced resources didn’t have a linear effect on overall runtime, and we could achieve shorter runtimes with fewer resources compared to the previous architecture in aggregate. However, our larger datasets (over 1 billion rows) didn’t exhibit the same performance behaviors or gains as the smaller tables when running Amazon EMR steps concurrently. Therefore, EMR clusters for larger tables required additional resources and fewer steps; however, the overall result is still marginally better in terms of cost and overall runtime in aggregate compared to the previous architecture.

Amazon EMR instance fleets

Working with Amazon EMR and Amazon EC2 Spot Instances has allowed us to realize tremendous cost savings, but it can come at the expense of EMR cluster reliability. We have experienced Spot Instance availability issues due to Spot Instance type supply constraints on the available market and losing EC2 instances due to competitive bidding. Both issues directly contribute to overall pipeline performance degradation in the form of longer EMR cluster resource provisioning and longer Spark runtimes due to lost nodes.

To improve our pipeline reliability and protect against these risks, we began to use Amazon EMR instance fleets. Instance fleets addressed both pain points—they limited supply of a specific EC2 Spot Instance type by sourcing an alternative Amazon EMR instance type, and the ability to automatically switch to On-Demand Instances if provisioning Spot Instances exceeds a specified threshold duration. Prior to using instance fleets, about 15% of our Amazon EMR production runs were affected by limitations related to Spot Instance supply or price bidding. Since implementing instance fleets, we haven’t had a cluster fail or experienced prolonged resource provisioning past programmed thresholds.

Conclusion

Amazon EMR has played a critical role in Drop’s ability to use data to make better-informed product and business decisions. We have had tremendous success in capitalizing Amazon EMR features to improve our data processing pipeline’s overall performance and cost efficiency, and will continue to explore new ways to constantly improve our pipeline. One of the easiest ways to learn about these new opportunities to improve our systems is to stay current with the latest AWS technologies and Amazon EMR features.

 


About the Authors

Michael Chau is a Software Engineer at Drop. He has experience moving data from A to B and sometimes transforming it along the way.

 

 

 

 

Leonardo Gómez is a Big Data Specialist Solutions Architect at AWS. Based in Toronto, Canada, He works with customers across Canada to design and build big data architectures.

 

 

Build an AWS Well-Architected environment with the Analytics Lens

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/build-an-aws-well-architected-environment-with-the-analytics-lens/

Building a modern data platform on AWS enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools. Yet you may be unsure of how to get started and the impact of certain design decisions. To address the need to provide advice tailored to specific technology and application domains, AWS added the concept of well-architected lenses 2017. AWS now is happy to announce the Analytics Lens for the AWS Well-Architected Framework. This post provides an introduction of its purpose, topics covered, common scenarios, and services included.

The new Analytics Lens offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. The goal is to give you a consistent way to design and evaluate cloud architectures, based on the following five pillars:

  • Operational excellence
  • Security
  • Reliability
  • Performance efficiency
  • Cost optimization

The tool can help you assess the analytics workloads you have deployed in AWS by identifying potential risks and offering suggestions for improvements.

Using the Analytics Lens to address common requirements

The Analytics Lens models both the data architecture at the core of the analytics applications and the application behavior itself. These models are organized into the following six areas, which encompass the vast majority of analytics workloads deployed on AWS:

  1. Data ingestion
  2. Security and governance
  3. Catalog and search
  4. Central storage
  5. Processing and analytics
  6. User access

The following diagram illustrates these areas and their related AWS services.

There are a number of common scenarios where the Analytics Lens applies, such as the following:

  • Building a data lake as the foundation for your data and analytics initiatives
  • Efficient batch data processing at scale
  • Building a platform for streaming ingest and real-time event processing
  • Handling big data processing and streaming
  • Data-preparation operations

Whichever of these scenarios fits your needs, building to the principles of the Analytics Lens in the AWS Well-Architected Framework can help you implement best practices for success.

The Analytics Lens explains when and how to use the core services in the AWS analytics portfolio. These include Amazon Kinesis, Amazon Redshift, Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. It also explains how Amazon Simple Storage Service (Amazon S3) can serve as the storage for your data lake and how to integrate with relevant AWS security services. With reference architectures, best practices advice, and answers to common questions, the Analytics Lens can help you make the right design decisions.

Conclusion

Applying the lens to your existing architectures can validate the stability and efficiency of your design (or provide recommendations to address the gaps that are identified). AWS is committed to the Analytics Lens as a living tool; as the analytics landscape evolves and new AWS services come on line, we’ll update the Analytics Lens appropriately. Our mission will always be to help you design and deploy well-architected applications.

For more information about building your own Well-Architected environment using the Analytics Lens, see the Analytics Lens whitepaper.

Special thanks to the following individuals who contributed to building this resource, among many others who helped with review and implementation: Radhika Ravirala, Laith Al-Saadoon, Wallace Printz, Ujjwal Ratan, and Neil Mukerje.

Are there questions you’d like to see answered in the tool? Share your thoughts and questions in the comments.

 


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at Amazon Web Services. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

AWS Architecture Monthly Magazine: Education

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/aws-architecture-monthly-magazine-education/

Young man sitting on a stack of books with his laptopOne of the missions of the education industry is to educate the next generation of the industry-ready workforce. Whether K-12, higher education, or continuing education, enabling teachers and professors to effectively deliver curriculum and improve student performance is a goal of Education Technology (EdTech) and learning companies. Two trends for AWS use cases in education are: 1) accessible remote learning; and 2) remote collaboration. For brevity, there are other innovation trend areas in education that we didn’t focus on in our “Ask an Expert” interview despite their importance. Use cases around learning accessibility, student performance, and campus experience have taken advantage of Amazon Alexa, Amazon Lex, and a variety of AWS technology areas including artificial intelligence (AI) and machine learning, data lakes, analytics, and mobile development. To dive deep into a wider range of education use cases, we invite everyone to look at our AWS Education blog.

In this month’s issue

For May’s Education issue, we asked our expert, Yuriko Horvath, about general architecture patterns in the education space as well as what education customers need to think about and ask themselves before considering AWS.

  • Ask an Expert: Yuriko Horvath, AWS Manager of Education for Solutions Architecture
  • Blog: How to Build a Chatbot for Your School in Less Than an Hour (with step-by-step video instructions)
  • Case Study: Virginia Tech: Building Modern Analytics on Amazon Web Services
  • Solution: Video on Demand on AWS
  • Whitepaper: Teaching Big Data Skills with Amazon EMR

How to access the magazine

We hope you’re enjoying Architecture Monthly, and we’d like to hear from you—leave us star rating and comment on the Amazon Kindle Newsstand page or contact us anytime at [email protected].

Build an automatic data profiling and reporting solution with Amazon EMR, AWS Glue, and Amazon QuickSight

Post Syndicated from Francesco Marelli original https://aws.amazon.com/blogs/big-data/build-an-automatic-data-profiling-and-reporting-solution-with-amazon-emr-aws-glue-and-amazon-quicksight/

In typical analytics pipelines, one of the first tasks that you typically perform after importing data into your data lakes is data profiling and high-level data quality analysis to check the content of the datasets. In this way, you can enrich the basic metadata that contains information such as table and column names and their types.

The results of data profiling help you determine whether the datasets contain the expected information and how to use them downstream in your analytics pipeline. Moreover, you can use these results as one of the inputs to an optional data semantics analysis stage.

The great quantity and variety of data in modern data lakes make unstructured manual data profiling and data semantics analysis impractical and time-consuming. This post shows how to implement a process for the automatic creation of a data profiling repository, as an extension of AWS Glue Data Catalog metadata, and a reporting system that can help you in your analytics pipeline design process and by providing a reliable tool for further analysis.

This post describes in detail the application Data Profiler for AWS Glue Data Catalog and provides step-by-step instructions of an example implementation.

Overview and architecture

The following diagram illustrates the architecture of this solution.

Data Profiler for AWS Glue Data Catalog is an Apache Spark Scala application that profiles all the tables defined in a database in the Data Catalog using the profiling capabilities of the Amazon Deequ library and saves the results in the Data Catalog and an Amazon S3 bucket in a partitioned Parquet format. You can use other analytics services such as Amazon Athena and Amazon QuickSight to query and visualize the data.

For more information about the Amazon Deequ data library, see Test data quality at scale with Deequ or the source code on the GitHub repo.

Metadata can be defined as data about data. Metadata for a table contains information like the table name and other attributes, column names and types, and the physical location of the files that contain the data. The Data Catalog is the metadata repository in AWS, and you can use it with other AWS services like Athena, Amazon EMR, and Amazon Redshift.

After you create or update the metadata for tables in a database (for example, adding new data to the table), either with an AWS Glue crawler or manually, you can run the application to profile each table. The results are stored as new versions of the tables’ metadata in the Data Catalog, which you can view interactively via the AWS Lake Formation console or query programmatically via the AWS CLI for AWS Glue.

For more information about the Data Profiler, see the GitHub repo.

The Deequ library does not support tables with nested data (such as JSON). If you want to run the application on a table with nested data, this must be un-nested/flattened or relationalized before profiling. For more information about useful transforms for this task, see AWS Glue Scala DynamicFrame APIs or AWS Glue PySpark Transforms Reference.

The following table shows the profiling metrics the application computes for column data types Text and Numeric. The computation of some profiling metrics for Text columns can be costly and is disabled by default. You can enable it by setting the compExp input parameter to true (see next section).

MetricDescriptionData Type
ApproxCountDistinctApproximate number of distinct values, computed with HyperLogLogPlusPlus sketches.Text / Numeric
CompletenessFraction of non-null values in a column.Text / Numeric
DistinctnessFraction of distinct values of a column over the number of all values of a column. Distinct values occur at least one time. For example, [a, a, b] contains two distinct values a and b, so distinctness is 2/3.Text / Numeric
MaxLengthMaximum length of the column.Text
MinLengthMinimum length of the column.Text
CountDistinctExact number of distinct values.Text
EntropyEntropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). For example, [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055.Text
HistogramThe summary of values in a column of a table. Groups the given column’s values and calculates the number of rows with that specific value and the fraction of this value.Text
UniqueValueRatioFraction of unique values over the number of all distinct values of a column. Unique values occur exactly one time; distinct values occur at least one time. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2.Text
UniquenessFraction of unique values over the number of all values of a column. Unique values occur exactly one time. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3.Text
ApproxQuantilesApproximate quantiles of a distribution.Numeric
MaximumMaximum value of the column.Numeric
MeanMean value of the column.Numeric
MinimumMinimum value of the column.Numeric
StandardDeviationStandard deviation value of the column.Numeric
SumSum of the column.Numeric

Application description

You can run the application via spark-submit on a transient or permanent EMR cluster (see the “Creating an EMR cluster” section in this post for minimum version specification) with Spark installed and configured with the Data Catalog settings option Use for Spark table metadata enabled.

The following example code executes the application:

 $ spark-submit \
  --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
  --master yarn \
  --deploy-mode cluster \
  --name data-profiler-for-aws-glue-data-catalog \
  /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
  --dbName nyctlcdb \
  --region eu-west-1 \
  --compExp true \
  --statsPrefix DQP \
  --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
  --profileUnsupportedTypes true \
  --noOfBins 30 \
  --quantiles 10

The following table summarizes the input parameters that the application accepts.

NameTypeRequiredDefaultDescription
--dbName (-d)StringYesN/AData Catalog database name. The database must be defined in the Catalog owned by the same account where the application is executed.
--region (-r)StringYesN/AAWS Region endpoint where the Data Catalog database is defined, for example us-west-1 or us-east-1. For more information, see Regional Endpoints.
--compExp (-c)BooleanNofalseIf true, the application also executes “expensive” profiling analyzers on Text columns. These are CountDistinct, Entropy, Histogram, UniqueValueRatio, and Uniqueness. If false, only the following default analyzers are executed: ApproxCountDistinct, Completeness, Distinctness, MaxLength, MinLength. All analyzers for Numeric columns are always executed.
--statsPrefix (-p)StringNoDQPString prepended to the statistics names in the Data Catalog. The application also adds two underscores (__). This is useful to identify metrics calculated by the application.
--s3BucketPrefix (-s)StringNoblankFormat must be s3Buckename/prefix. If specified, the application writes Parquet files with metrics in the prefixes db_name=…/table_name=….
--profileUnsupportedTypes (-u)BooleanNofalseBy default, the Amazon Deequ library only supports Text and Numeric columns. If this parameter is set to true, the application also profiles columns of type Boolean and Date.
--noOfBins (-b)IntegerNo10When --compExp (-c) is true, sets the number of maximum values to create for the Histogram analyzer for String columns.
--quantiles (-q)IntegerNo10Sets the number of quantiles to calculate for the ApproxQuantiles analyzer for numeric columns.

Setting up your environment

The following walkthrough demonstrates how to create and populate a Data Catalog database with three tables, which simulates a process with monthly updates. For this post, you simulate three monthly runs: February 2, 2019, March 2, 2019, and April 2, 2019.

After table creation and after each monthly table update, you run the application to generate and update the profiling information for each table in the Data Catalog and Amazon S3 repository. The post also provides examples of how to query the data using the AWS CLI and Athena, and build a simple Amazon QuickSight dashboard.

This post uses the New York City Taxi and Limousine Commission (TLC) Trip Record Data on the Registry of Open Data on AWS. In particular, you use the tables yellow_tripdata, fhv_tripdata, and green_tripdata.

The following steps explain how to set up the environment.

Creating an EMR cluster

The first step is to create an EMR cluster. Connect to the cluster master node and execute the code via spark-submit. Make sure that the cluster version is at least 5.28.0 with at least Hadoop and Spark installed and that you use the Data Catalog as table metadata for Spark.

The master node should also be accessible via SSH. For instructions, see Connect to the Master Node Using SSH.

Downloading the application

You can download the source code and create a uber jar with all dependencies from the application GitHub repo. You can build the application as a uber jar with all dependencies using the Scala Build Tool (sbt) with the following commands (adjust the memory values according to your needs):

$ export SBT_OPTS="-Xms1G -Xmx3G -Xss2M -XX:MaxMetaspaceSize=3G" && sbt assembly

By default, the .jar file is created in the following path, relative to the project root directory:

./target/scala-2.11/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar

When the .jar file is available, copy it to the master node of the EMR cluster. One way to copy the file to the master node code is to copy it to Amazon S3 from the client where the file was created and download it from Amazon S3 to the master node.

For this post, copy the file in the /home/hadoop directory of the master node. The full path is /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar.

Setting up S3 buckets and copy initial data

You use an S3 bucket to store the data that you profile. For this post, the bucket name is

s3://aws-big-data-blog-samples/

You need to create a bucket with a unique name in your account and store the data there. When the bucket is created and available, use the AWS CLI to copy the first set of files for January 2019 (therefore simulating the February 2, 2019, run) from the s3://nyc-tlc/ bucket. See the following code:

$ DEST_BUCKET=aws-big-data-blog-samples
$ MONTH=2019-01
 
$ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

After you copy the data to your destination bucket, create a second bucket to store the files with the profiling metrics created by the application. This step is optional because you can write the metrics to a prefix in an existing bucket. See the following code:

$ aws s3 mb s3://deequ-profiler/

You are now ready to create the database and tables metadata in the Data Catalog.

Creating metadata in the Data Catalog

The first step is to create a database in AWS Glue. You can create the database using the AWS CLI. See the following code:

$ aws glue create-database \
    --database-input '{"Name": "nyctlcdb"}'

Alternatively, on the AWS Glue console, choose Databases, Add database.

After you create the database, create a new AWS Glue Crawler to infer the schema of the data in the files you copied in the previous step. Complete the following steps:

  1. On the AWS Glue console, choose Crawler.
  2. Choose Add crawler.
  3. For Crawler name, enter nyc-tlc-db-raw.
  4. Choose Next.
  5. For Choose a data store, choose S3.
  6. For Crawl data in, choose Specified path in my account.
  7. For Include path, enter the S3 bucket and prefix where you copied the data earlier.
  8. Choose Next.
  9. In the Choose an IAM role section, select Choose an existing IAM role.
  10. Choose an IAM role that provides access to the S3 bucket and allows writing to the Data Catalog, or create a new one while creating this crawler.
  11. Choose Next.
  12. Choose the database you created earlier to store the tables’ metadata.
  13. Review the crawler properties and choose Finish.
    You can run the crawler when it’s ready. It creates three new tables in the database. The following screenshot shows the update you receive that the crawler is complete.
  14. You can now use the Lake Formation console to check the tables are correct. See the following screenshot of the Tables.If you select one of the tables, the table version is now 0. See the following screenshot.You can also perform the same check using the AWS CLI. See the following code:
    $ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId' 

    [
        "0"
    ]

  15. Check the parameters in the table metadata to verify which values the crawler generated. See the following code:
    $ aws glue get-table \
    	--database-name nyctlcdb \
    	--name trip_data_yellow \
       	--query 'Table.Parameters' 

    {
        "CrawlerSchemaDeserializerVersion": "1.0",
        "CrawlerSchemaSerializerVersion": "1.0",
        "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
        "areColumnsQuoted": "false",
        "averageRecordSize": "144",
        "classification": "csv",
        "columnsOrdered": "true",
        "compressionType": "none",
        "delimiter": ",",
        "objectCount": "1",
        "recordCount": "4771445",
        "sizeKey": "687088084",
        "skip.header.line.count": "1",
        "typeOfData": "file"
    }

  16. Check the metadata attributes for three columns in the same table. This post chooses the following columns because they have different data types, though any other column is suitable for this check. In the following code, the only attributes currently available for the columns are “Name” and “Type:”
    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'

    [
        {
            "Name": "fare_amount",
            "Type": "double"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'

    [
        {
            "Name": "passenger_count",
            "Type": "bigint"
        }
    ]

You can display the same information via the Lake Formation console. See the following screenshot.

You are now ready to execute the application.

First application execution

Connect to the EMR cluster master node via SSH and run the application with the following code (change the input parameters as needed, especially the value for the s3BucketPrefix parameter):

$ spark-submit \
    --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
    --master yarn \
    --deploy-mode cluster \
    --name data-profiler-for-aws-glue-data-catalog \
    /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
    --dbName nyctlcdb \
    --region eu-west-1 \
    --compExp true \
    --statsPrefix DQP \
    --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
    --profileUnsupportedTypes true \
    --noOfBins 30 \
    --quantiles 10

Profiling information in the metadata in the Data Catalog

When the application is complete, you can recheck the metadata via the Lake Formation console for the tables and verify that a new table version was created. See the following screenshot.

You can verify the same information via the AWS CLI. See the following code:

$ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId'
[
    "1",
    "0"
]

Check the metadata for the table and verify that the profiling information the application generated was successfully stored. In the following code, the parameter “DQP__Size” was generated, which contains the number of records in the table as calculated by the Deequ library:

$ aws glue get-table \ 
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.Parameters'
{
    "CrawlerSchemaDeserializerVersion": "1.0-",
    "CrawlerSchemaSerializerVersion": "1.0",
    "DQP__Size": "7667793.0",
    "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
    "areColumnsQuoted": "false",
    "averageRecordSize": "144",
    "classification": "csv",
    "columnsOrdered": "true",
    "compressionType": "none",
    "delimiter": ",",
    "objectCount": "1",
    "recordCount": "4771445",
    "sizeKey": "687088084",
    "skip.header.line.count": "1",
    "typeOfData": "file"
}

Similarly, you can verify that the metadata for the columns you checked previously contains the profiling information the application generated. This is stored in the “Parameters” object for each column. Each new attribute starts with the string “DQP” as specified in the statsPrefix input parameter. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 
[
    {
        "Name": "store_and_fwd_flag",
        "Type": "string",
        "Parameters": {
            "DQP__ApproxCountDistinct": "3.0",
            "DQP__Completeness": "1.0",
            "DQP__CountDistinct": "3.0",
            "DQP__Distinctness": "3.912468685578758E-7",
            "DQP__Entropy": "0.03100483390393341",
            "DQP__Histogram.abs.N": "7630142.0",
            "DQP__Histogram.abs.Y": "37650.0",
            "DQP__Histogram.abs.store_and_fwd_flag": "1.0",
            "DQP__Histogram.bins": "3.0",
            "DQP__Histogram.ratio.N": "0.9950897213839758",
            "DQP__Histogram.ratio.Y": "0.004910148200401341",
            "DQP__Histogram.ratio.store_and_fwd_flag": "1.3041562285262527E-7",
            "DQP__MaxLength": "18.0",
            "DQP__MinLength": "1.0",
            "DQP__UniqueValueRatio": "0.3333333333333333",
            "DQP__Uniqueness": "1.3041562285262527E-7"
        }
    }
]
$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'
[
    {
        "Name": "fare_amount",
        "Type": "double",
        "Parameters": {
            "DQP__ApproxCountDistinct": "6125.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "8.187492802687814E-4",
            "DQP__Maximum": "623259.86",
            "DQP__Mean": "12.40940884025023",
            "DQP__Minimum": "-362.0",
            "DQP__StandardDeviation": "262.0720412055651",
            "DQP__Sum": "9.515276582999998E7",
            "DQP__name-0.1": "5.0",
            "DQP__name-0.2": "6.0",
            "DQP__name-0.3": "7.0",
            "DQP__name-0.4": "8.0",
            "DQP__name-0.5": "9.0",
            "DQP__name-0.6": "10.5",
            "DQP__name-0.7": "12.5",
            "DQP__name-0.8": "15.5",
            "DQP__name-0.9": "23.5",
            "DQP__name-1.0": "623259.86"
        }
    }
]

The parameters named “DQP__name-x.x” are the results of the ApproxQuantiles Deequ analyzer for numeric columns; the number of quantiles is set via the –quantiles (-q) input parameter of the application. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'
[
    {
        "Name": "passenger_count",
        "Type": "bigint",
        "Parameters": {
            "DQP__ApproxCountDistinct": "10.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "1.3041562285262527E-6",
            "DQP__Maximum": "9.0",
            "DQP__Mean": "1.5670782410373156",
            "DQP__Minimum": "0.0",
            "DQP__StandardDeviation": "1.2244305354114957",
            "DQP__Sum": "1.201603E7",
            "DQP__name-0.1": "1.0",
            "DQP__name-0.2": "1.0",
            "DQP__name-0.3": "1.0",
            "DQP__name-0.4": "1.0",
            "DQP__name-0.5": "1.0",
            "DQP__name-0.6": "1.0",
            "DQP__name-0.7": "1.0",
            "DQP__name-0.8": "2.0",
            "DQP__name-0.9": "3.0",
            "DQP__name-1.0": "9.0"
        }
    }
]

Profiling information in Amazon S3

You can now also verify that the profiling information was saved in Parquet format in the S3 bucket you specified in the s3BucketPrefix application input parameter. The following screenshot shows the buckets via the Amazon S3 console.

The data is stored using prefixes that are compatible with Apache Hive partitions. This is useful to optimize performance and costs when you use analytics services like Athena. The partitions are defined on db_name and table_name. The following screenshot shows the details of table_name=trip_data_yellow.

Each execution of the application generates one Parquet file appending data to the metrics table for each physical table.

Second execution after monthly table updates

To run the application after monthly table updates, complete the following steps:

  1. Copy the new files for February 2019 to simulate the March 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-02
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. The following screenshot shows that the three tables were updated successfully.
  3. Check that the crawler created a third version of the table. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "2",
        "1",
        "0"
    ]

  4. Rerun the application to generate the new profiling metadata, entering the same code as before. To keep clean information, before storing new profiling information in the metadata, the application removes all custom attributes starting with the string specified in the “statsPrefix” See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

    Following a successful execution, a new version of the table was created. See the following code:

    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "3",
        "2",
        "1",
        "0"
    ]

  5. Check the value of the DQP__Size attribute; its value has changed. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "1.4687169E7"
    }

  6. Check one of the columns you saw earlier to see the updated profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "2.042599223853147E-7",
                "DQP__Entropy": "0.0317381414905775",
                "DQP__Histogram.abs.N": "1.4613018E7",
                "DQP__Histogram.abs.Y": "74149.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "2.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9949513074984022",
                "DQP__Histogram.ratio.Y": "0.005048556328316233",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.361732815902098E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

Third execution after monthly tables updates

To run the application a third time, complete the following steps:

  1. Copy the new files for March 2019 to simulate the April 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-03
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. You now have five versions of the table metadata. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "4",
        "3",
        "2",
        "1",
        "0"
    ]

  3. Rerun the application to update the profiling information. See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

  4. Check the DQP__Size parameter to see its new updated value. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "2.2519715E7"
    }

  5. Check one of the columns you saw earlier to the update profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "1.3321660598280218E-7",
                "DQP__Entropy": "0.030948463301702846",
                "DQP__Histogram.abs.N": "2.2409376E7",
                "DQP__Histogram.abs.Y": "110336.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "3.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9951003376374878",
                "DQP__Histogram.ratio.Y": "0.004899529145906154",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.3321660598280218E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

You can view and manage the same values via the Lake Formation console. See the following screenshot of the Edit column section.

Data profiling reporting with Athena and Amazon QuickSight

As demonstrated earlier, the application can save profiling information in Parquet format to an S3 bucket and prefix into db_name and table_name partitions. See the following code:

$ aws s3 ls s3://deequ-profiler/deequ-profiler-metrics/ --recursive
2020-01-28 09:30:12          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/_SUCCESS
2020-01-28 09:17:15       6506 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-760dafb1-fc37-4700-a506-a9dc71b7a745-c000.snappy.parquet
2020-01-28 09:01:19       6498 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-78dd2c4a-83c2-44c4-aa71-30e7a9fb0089-c000.snappy.parquet
2020-01-28 09:30:11       6505 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-cff4f2de-64b4-4338-a0f6-a50ed34a378f-c000.snappy.parquet
2020-01-28 09:30:08          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/_SUCCESS
2020-01-28 09:01:15       6355 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-0d5969c9-70a7-4cd4-ac64-8f16e35e23b5-c000.snappy.parquet
2020-01-28 09:17:11       6353 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-12a7b0b0-6a2a-45d5-a241-645148af41d7-c000.snappy.parquet
2020-01-28 09:30:08       6415 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-adecccd6-a884-403f-aa80-c574647a10f9-c000.snappy.parquet
2020-01-28 09:29:56          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/_SUCCESS
2020-01-28 09:16:59       6408 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-2e5e3280-29db-41b9-be67-a68ef8cb9777-c000.snappy.parquet
2020-01-28 09:01:02       6424 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-c4972037-7d3c-4279-8b77-361741133816-c000.snappy.parquet
2020-01-28 09:29:55       6398 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-f2d6076e-7019-4b03-97ba-a6aab8a677b8-c000.snappy.parquet

The application generates one Parquet file per execution.

Preparing metadata for profiler metrics data

To prepare the metadata for profiler metrics data, complete the following steps:

  1. On the Lake Formation console, create a new database with the name deequprofilerdb to contain the metadata.
  2. On the AWS Glue console, create a new crawler with the name deequ-profiler-metrics to infer the schema of the profiling information stored in Amazon S3.

The following screenshot shows the properties of the new crawler.

After you run the crawler, one table with the name deequ_profiler_metrics was created in the database. The table has the following columns.

NameData TypePartitionDescription
instancestringColumn name the statistic in column “name” refers to. Set to “*” if entity is “Dataset”.
entitystringEntity the statistic refers to. Valid values are “Column” and “Dataset”.
namestringMetrics name, derived from the Deequ Analyzer used for the calculation.
valuedoubleValue of the metric.
typestringData type of the column if entity is “Column”, blank otherwise.
db_name_embedstringDatabase name, same values as in partition “db_name”.
table_name_embedstringTable name, same values as in partition “table_name”.
profiler_run_dtdateDate the profiler application was run.
profiler_run_tstimestampDate/time the profile application was run; it can also be used as execution identifier.
db_namestring1Database name.
table_namestring2Table name.

Reporting with Athena

You can use Athena to run a query that checks the statistics for a column in the database for the execution you ran in March 2019. See the following code:

SELECT db_name, 
    profiler_run_dt,
    profiler_run_ts,
    table_name, 
    entity,
    instance,
    name,
    value
FROM "deequprofilerdb"."deequ_profiler_metrics" 
WHERE db_name = 'nyctlcdb' AND
    table_name = 'trip_data_yellow' AND 
    entity = 'Column' AND
    instance = 'extra' AND
    profiler_run_dt = date_parse('2019-03-02','%Y-%m-%d')
ORDER BY name
;

The following screenshot shows the query results.

Reporting with Amazon QuickSight

To create a dashboard in Amazon QuickSight based on the profiling metrics data the application generated, complete the following steps:

  1. Create a new QuickSight dataset called deequ_profiler_metrics with Athena as the data source.
  2. In the Choose your table section, select the profiling metrics table that you created earlier.
  3. Import the data into SPICE.

After you create the dataset, you can view it and edit its properties. For this post, leave the properties unchanged.

You are now ready to build visualizations and dashboards.

The following images in this section show a simple analysis with controls that allow for the selection of the Database, Table profiled, Entity, Column, and Profiling Metric.

Control NameMapped Column
Databasedb_name
Tabletable_name
Entityentity
Columninstance
Profiling Metricname

For more information about adding controls, see Create Amazon QuickSight dashboards that have impact with parameters, on-screen controls, and URL actions.

For example, you can select the Size metric of a specific table to see how many records are available in the table after each monthly load. See the following screenshot.

Similarly, you can use the same analysis to see how a specific metric changes over time for a column. The following screenshot shows that the mean of the fare_amount column changes after each monthly load.

You can select any metric calculated on any column, which makes for a very flexible profiling data reporting system.

Conclusion

This post demonstrated how to extend the metadata contained in the Data Catalog with profiling information calculated with an Apache Spark application based on the Amazon Deequ library running on an EMR cluster.

You can query the Data Catalog using the AWS CLI. You can also build a reporting system with Athena and Amazon QuickSight to query and visualize the data stored in Amazon S3.

Special thanks go to Sebastian Schelter at Amazon Search and Sven Hansen and Vincent Gromakowski at AWS for their help and support

 


About the Author

Francesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

Deploy an Amazon EMR edge node with RStudio using AWS Systems Manager

Post Syndicated from Randy DeFauw original https://aws.amazon.com/blogs/big-data/deploy-an-amazon-emr-edge-node-with-rstudio-using-aws-systems-manager/

RStudio is an integrated development environment (IDE) for R, a language and environment for statistical computing and graphics. As a data scientist, you may integrate R and Spark (a big data processing framework) to analyze large datasets. You can use an R package called sparklyr to offload filtering and aggregation of large datasets from your R script to Spark and use R’s native strength to further analyze and visualize the results from Spark.

An R script running in RStudio uses sparklyr to submit Spark jobs to the cluster. Typically, an R script (along with sparklyr) runs in an RStudio environment that is installed on a machine that’s separate from the cluster of machines (in Amazon EMR) that runs Spark. To enable sparklyr to submit Spark jobs, you need to establish network connectivity between the RStudio machine and the cluster running Spark. One way to do that is to run RStudio on an edge node, which is a machine that is part of the cluster’s private network and runs client applications like RStudio. Edge nodes let you run client applications separately from the nodes that run the core Hadoop services. Edge nodes also offer convenient access to local Spark and Hive shells.

However, edge nodes are not easy to deploy. They must have the same versions of Hadoop, Spark, Java, and other tools as the Hadoop cluster, and require the same Hadoop configuration as nodes in the cluster.

This post demonstrates an automated way to create an edge node with RStudio installed using AWS Systems Manager.

Deploying an edge node for an EMR cluster

One method to deploy an edge node involves creating an Amazon EC2 AMI directly from the EMR master node. For more information, see Launch an edge node for Amazon EMR to run RStudio. This post offers an SSM automation document that simplifies on-demand edge node deployment. Systems Manager gives you visibility and control of your AWS infrastructure, and Systems Manager Automation lets you safely automate common and repetitive tasks, like creating edge nodes on demand.

This post walks you through the process of installing the SSM document and how to use the document to create an edge node. For more information about the code, see the GitHub repo.

Creating the automation document

First, you use Terraform to create the automation document. You can download Terraform from the Terraform website. Alternatively, AWS CloudFormation works equally well.

After you install Terraform, go to the directory where you cloned the repo and edit the file vars.tf. For more information, see the GitHub repo. This file defines several input parameters, and the comments in the file should be self-explanatory. You can provide default values in vars.tf or override using one of the other supported techniques. For more information, see Input Variables on the Terraform website.

Next, enter the following code:

tf init # one time only
tf apply

The code runs a Terraform plan to create the document. Your environment should already be configured to access your AWS account with privileges to do the following:

To make updates to the Terraform plan going forward, use Terraform’s shared state feature. For more information, see Remote State on the Terraform website.

The Terraform plan loads your automation document from a local template file and registers it with Systems Manager. See the following code:

# Load our document from a template and substitute some variables.
data "template_file" "ssm_doc_edge_node" {
  template = "${file("${path.module}/ssm_doc_edge_node.tpl")}"

  vars = {
    SSMRoleArn = "${aws_iam_role.ssm_automation_role.arn}"
    InstanceProfileArn = "${aws_iam_instance_profile.edge_node_profile.arn}"
    PlaybookUrl = "s3://${var.bucket}/init.yaml"
    Environment = "${var.environment}"
    Project = "${var.ProjectTag}"
    region = "${var.region}"
  }
}

# Register the document content with SSM
resource "aws_ssm_document" "create_edge_node" {
  name          = "create_edge_node"
  document_type = "Automation"
  document_format = "YAML"
  tags = {
    Name = "create_edge_node"
    Project = "${var.ProjectTag}"
    Environment = "${var.environment}"
  }

  content = "${data.template_file.ssm_doc_edge_node.rendered}"
}

The rest of the Terraform plan does the following:

  • Uploads an Ansible template for the SSM document to use
  • Sets up IAM roles and policies that let Systems Manager and a new edge node assume the correct privileges

What’s in the automation document?

The automation document has three main steps. First, it creates and launches a new AMI from the existing EMR master node. See the following code:

- name: create_ami
  action: aws:createImage
  maxAttempts: 1
  timeoutSeconds: 1200
  onFailure: Abort
  inputs:
    InstanceId: "{{MasterNodeId}}"
    ImageName: AMI Created on{{global:DATE_TIME}}
    NoReboot: true
    
- name: launch_ami
  action: aws:runInstances
  maxAttempts: 1
  timeoutSeconds: 1200
  onFailure: Abort
  inputs:
    ImageId: "{{create_ami.ImageId}}"
  ...

Next, it updates the SSM agent and runs an Ansible playbook to install RStudio. You can examine the Ansible playbook in GitHub; it installs RStudio and dependencies and handles some initial configuration. See the following code:

- name: updateSSMAgent
  action: aws:runCommand
  inputs:
    DocumentName: AWS-UpdateSSMAgent
    InstanceIds:
    - "{{launch_ami.iid}}"
- name: installPip
  action: aws:runCommand
  inputs:
    DocumentName: AWS-RunShellScript
    InstanceIds:
    - "{{launch_ami.iid}}"
    Parameters:
        commands: 
        - pip install ansible boto3 botocore
- name: runPlaybook
  action: aws:runCommand
  inputs:
    DocumentName: AWS-RunAnsiblePlaybook
    InstanceIds:
    - "{{launch_ami.iid}}"
    Parameters:
      playbookurl: "${PlaybookUrl}"

Finally, it adds an Amazon CloudWatch alarm to trigger EC2 instance recovery if the edge node fails. See the following code:

- name: add_recovery
  action: aws:executeAwsApi
  inputs:
    Service: cloudwatch
    Api: PutMetricAlarm
    AlarmName: "Recovery for edge node {{ launch_ami.iid }}"
    ActionsEnabled: true
    AlarmActions: 
    - "arn:aws:automate:${region}:ec2:recover"

Using the automation document

To start using the automation document, complete the following steps:

  1. On the Systems Manager console, choose Automation.
  2. Choose Execute automation.
  3. On the Owned by me tab, choose the document create_edge_node.
  4. Choose Next.

    On the next page, you need to fill in three pieces of information. You may want to get some advice from your cloud operations team, or whomever manages your EMR clusters. For instructions on creating a cluster with the latest EMR version and Spark, see Launch Your Sample Amazon EMR Cluster.
  5. In the Input parameters section, provide the following information:
    • For MasterNodeId, enter the EC2 instance ID of the master node of the EMR cluster you want to connect to.In most cases, your operations team can provide this information, but you can also find the instance ID by going to the Hardware tab of your EMR cluster and drilling into the master node group. Your EMR cluster must have Spark installed because you want to use sparklyr with RStudio.The following screenshot shows where to find your EC2 instance ID on the Hardware tab.
    • For SubnetId, enter the subnet that the edge node should live in. Your operations team should provide this information, or you can see it on the Summary tab of the EMR cluster. The edge node must live in the same VPC as the cluster. It does not need to be in a public subnet because you connect via Session Manager.The following screenshot shows where to find your subnet ID on the Summary tab of your cluster.
    • For QuickIdentifier, enter a user-friendly name to help you remember this edge node; for example, Edge Node with RStudio.

When the execution is finished, you will see the completed steps, as in the following screenshot.

If you choose the last step in the list (step 8), you see the DNS name and EC2 instance ID for your new edge node. See the following screenshot.

You can now connect to this edge node by using another feature of Systems Manager: Session Manager. Session Manager lets you open an SSH tunnel for port forwarding without having to use SSH keys or expose the SSH port to the internet. For instructions on opening a port forwarding session, see Starting a Session (Port Forwarding). You need the Session Manager plugin installed locally. See the following code:

aws ssm start-session \
  --target instance-id \ # Get this from the output of the automation document
  --document-name AWS-StartPortForwardingSession \
  --parameters '{"portNumber":["8787"], "localPortNumber":["8787"]}'

For more information, see Install the Session Manager Plugin for the AWS CLI.

You can now access RStudio at http://localhost:8787. See the following screenshot.

You can also access the node directly and use the local Hive and Spark shells through the Session Manager console.

This post sets up the SSM document to create single-user edge nodes. The default user name to log in to RStudio is ruser. You must set the password by changing the password for the ruser account directly in the operating system, because RStudio uses PAM authentication by default. For more information, see What is my username on my RStudio Server? To change the password, open another Session Manager session and enter the following code:

aws ssm start-session \
  --target instance-id \ # Get this from the output of the automation document

$ sudo passwd ruser # Enter a password of your choice and confirm it

You should keep any valuable files like R scripts in a GitHub repo and store any output data in an S3 bucket for long-term persistence.

Configuring security

The Terraform plan sets up three important IAM roles:

  • A role that Systems Manager assumes when running the automation document. This role needs to perform actions in Amazon EC2, like creating new AMIs, CloudWatch, and Systems Manager.
  • An EC2 instance profile for the edge nodes. Theprofile has the permissions necessary for the SSM agent to run and for the edge node to perform tasks typical of an EMR node.
  • A role for CloudWatch to perform instance recovery.

In your environment, you may want to review the IAM roles and policies and tighten their scope based on tags or other conditions.

Conclusion

This post described an automated way to deploy an EMR edge node with RStudio using an SSM document. EMR edge nodes with RStudio give you a familiar working environment with access to large datasets via Spark and sparklyr. For information about deploying a new edge node and installing the necessary Hadoop libraries with an AWS CloudFormation template, see Launch an edge node for Amazon EMR to run RStudio.

 


About the Author

Randy DeFauw is a principal solutions architect at Amazon Web Services. He works with the AWS customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.