Tag Archives: AWS Big Data

Separating queries and managing costs using Amazon Athena workgroups

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/separating-queries-and-managing-costs-using-amazon-athena-workgroups/

Amazon Athena is a serverless query engine for data on Amazon S3. Many customers use Athena to query application and service logs, schedule automated reports, and integrate with their applications, enabling new analytics-based capabilities.

Different types of users rely on Athena, including business analysts, data scientists, security, and operations engineers. But how do you separate and manage these workloads so that users get the best experience while minimizing costs?

In this post, I show you how to use workgroups to do the following:

  • Separate workloads.
  • Control user access.
  • Manage query usage and costs.

Separate workloads

By default, all Athena queries execute in the primary workgroup.  As an administrator, you can create new workgroups to separate different types of workloads.  Administrators commonly turn to workgroups to separate analysts running ad hoc queries from automated reports.  Here’s how to build out that separation.

First create two workgroups, one for ad hoc users (ad-hoc-users) and another for automated reports (reporting).

Next, select a specific output location. All queries executed inside this workgroup save their results to this output location. Routing results to a single secure location helps make sure users only access data they are permitted to see. You can also enforce encryption of query results in S3 by selecting the appropriate encryption configuration.

Workgroups also help you simplify the onboarding of new users to Athena. By selecting override client-side settings, you enforce a predefined configuration on all queries within a workgroup. Users no longer have to configure a query results output location or S3 encryption keys. These settings default to the parameters defined for the workgroup where those queries execute. Additionally, each workgroup maintains a unique query history and saved query inventory, making queries easier for you to track down.

Finally, when creating a workgroup, you can add up to 50 key-value pair tags to help identify your workgroup resources. Tags are also useful when attempting to allocate Athena costs between groups of users. Create Name and Dept tags for the ad-hoc-users and reporting workgroups with their name and department association.

Control user access to workgroups

Now that you have two workgroups defined, ad-hoc-users and reporting, you must control who can use and update them.  Remember that workgroups are IAM resources and therefore have an ARN. You can use this ARN in the IAM policy that you associate with your users.  In this example, create a single IAM user representing the team of ad hoc users and add the individual to an IAM group. The group contains a policy that enforces what actions these users can perform.

Start by reviewing IAM Policies for Accessing Workgroups and Workgroup Example Policies to familiarize yourself with policy options. Use the following IAM policy to set up permissions for your analyst user. Grant this user only the permissions required for working in the ad-hoc-users workgroup. Make sure that you tweak this policy to match your exact needs:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "athena:ListWorkGroups"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryResults",
                "athena:DeleteNamedQuery",
                "athena:GetNamedQuery",
                "athena:ListQueryExecutions",
                "athena:StopQueryExecution",
                "athena:GetQueryResultsStream",
                "athena:GetQueryExecutions",
                "athena:ListNamedQueries",
                "athena:CreateNamedQuery",
                "athena:GetQueryExecution",
                "athena:BatchGetNamedQuery",
                "athena:BatchGetQueryExecution",
                "athena:GetWorkGroup",
                "athena:ListTagsForResource"
            ],
            "Resource": "arn:aws:athena:us-east-1:112233445566:workgroup/ad-hoc-users"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObjectAcl",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListMultipartUploadParts"
            ],
            "Resource": "arn:aws:s3:::demo/workgroups/adhocusers/*"
        },
{
            "Effect": "Allow",
            "Action": [
                "glue:Get*"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:112233445566:catalog",
                "arn:aws:glue:us-east-1:112233445566:database/amazon",
                "arn:aws:glue:us-east-1:112233445566:table/amazon/*"
            ]
        }
    ]
}

Now your analyst user can execute queries only in the ad-hoc-users workgroup. The analyst user can switch to other workgroups, but they lose access when they try to perform any action. They are further restricted to list and query only those tables that belong to the Amazon database. For more information about controlling access to AWS Glue resources such as databases and tables, see AWS Glue Resource Policies for Access Control.

The following screenshot shows what the analyst user sees in the Athena console:

I’ve created a simple Node.js tool that executes SQL queries stored as files in a given directory. You can find my Athena test runner code in the athena_test_runner GitHub repo. You can use this code to simulate a reporting tool, after configuring it to use a workgroup. To do that, create an IAM role with permissions like those previously defined for the analyst user. This time, restrict access to the reporting workgroup.

The following JavaScript code example shows how to select a workgroup programmatically when executing queries:

function executeQueries(files) {
    params = 
    {
      "QueryString": "", 
      "ResultConfiguration": { 
        "OutputLocation": ""
      },
      "QueryExecutionContext": {
        "Database": "default"
      },
      "WorkGroup":"reporting"
    }
 
    params.QueryString = "SELECT * FROM amazon.final_parquet LIMIT 10"
    return new Promise((resolve, reject) => {
        athena.startQueryExecution(params, (err, results) => {
            if (err) {
                reject(err.message)
            } else {
                resolve(results)
            }
        })
    })
}

Run sample automated reports under the reporting workgroup, with the following command:

node index.js testsuite/

Query histories remain isolated between workgroups. A user logging into the Athena console as an analyst using the ad-hoc-users workgroup doesn’t see any automated reports that you ran under the reporting workgroup.

Managing query usage and cost

You have two workgroups configured: one for ad hoc users and another for automated reports. Now, you must safeguard against bad queries. In this use case, two potential situations for query usage should be monitored and controlled:

  • Make sure that users don’t run queries that scan more data than allowed by their budget.
  • Safeguard against automated script bugs that could cause indefinite query retirement.

First, configure data usage controls for your ad-hoc-users workgroup. There are two types of data usage controls: per-query and per-workgroup.

Set the per-query control for analysts to be 1 GB. This control cancels any query run in the ad-hoc-users workgroup that tries to scan more than 1 GB.

To observe this limit in action, choose Update, return to the query editor, and run a query that would scan more than 1 GB. This query triggers the error message, “Query cancelled! : Bytes scanned limit was exceeded”. Remember that you incur charges for data the query scanned up to the point of cancellation. In this case, you incur charges for 1 GB of data.

Now, switch to your reporting workgroup. For this workload, you’re not worried about individual queries scanning too much data. However, you want to control the aggregate amount of data scanned of all queries in this workgroup.

Create a per-workload data usage control for the reporting workgroup. You can configure the maximum amount of data scanned by all queries in the workgroup during a specific period.

For the automated reporting workload, you probably have a good idea of how long the process should take and the total amount of data that queries scan during this time. You only have a few reports to run, so you can expect them to run in a few minutes, only scanning a few megabytes of data. Begin by setting up a low watermark alarm to notify you when your queries have scanned more data than you would expect in five minutes. The following example is for demo purposes only. In most cases, this period would be longer. I configured the alarm to send a notification to an Amazon SNS topic that I created.

To validate the alarm, I made a minor change to my test queries, causing them to scan more data. This change triggered the SNS alarm, shown in the following Amazon CloudWatch dashboard:

Next, create a high watermark alarm that is triggered when the queries in your reporting workgroup exceed 1 GB of data over 15 minutes. In this case, the alarm triggers an AWS Lambda function that disables the workgroup, making sure that no additional queries execute in it. This alarm protects you from incurring faulty automation code or runaway query costs.

Before creating the data usage control, create a Node.js Lambda function to disable the workgroup. Paste in the following code:

exports.handler = async (event) => {
    const AWS = require('aws-sdk')
    let athena = new AWS.Athena({region: 'us-east-1'})
 
    let msg = JSON.parse(event.Records[0].Sns.Message)
    let wgname = msg.Trigger.Dimensions.filter((i)=>i.name=='WorkGroup')[0].value
    
    athena.updateWorkGroup({WorkGroup: wgname, State: 'DISABLED'})
 
    const response = {
        statusCode: 200,
        body: JSON.stringify(`Workgroup ${wgname} has been disabled`),
    };
    return response;
}

This code grabs the workgroup name from the SNS message body and calls the UpdateWorkGroup API action with the name and the state of DISABLED. The Athena API requires the most recent version of the AWS SDK. When you create the Lambda bundle, include the latest AWS SDK version in that bundle.

Next, create a new SNS topic and a subscription. For Protocol, select AWS Lambda. Then, select the Lambda function that you created in the previous step.

In the Athena console, create the second alarm, 1 GB for 15 min., and point it to the SNS topic that you created earlier. When triggered, this SNS topic calls the Lambda function that disables the reporting workgroup. No more queries can execute in this workgroup. You see this error message in the console when a workgroup is disabled:

Athena exposes other aggregated metrics per workgroup under the AWS/Athena namespace in CloudWatch, such as the query status and the query type (DDL or DML) per workgroup. To learn more, see Monitoring Athena Queries with CloudWatch Metrics.

Cost allocation tags

When you created your ad-hoc-users and reporting workgroups, you added Name and Dept tags. These tags can be used in your Billing and Cost Management console to determine the usage per workgroup.

Summary

In this post, you learned how to use workgroups in Athena to isolate different query workloads, manage access, and define data usage controls to protect yourself from runaway queries. Metrics exposed to CloudWatch help you monitor query performance and make sure that your users are getting the best experience possible. For more details, see Using Workgroups to Control Query Access.

About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan, cheering his team on and hanging out with his family.

 

 

 

Modifying your cluster on the fly with Amazon EMR reconfiguration

Post Syndicated from Brandon Scheller original https://aws.amazon.com/blogs/big-data/modifying-your-cluster-on-the-fly-with-amazon-emr-reconfiguration/

If you are a developer or data scientist using long-running Amazon EMR clusters, you face fast-changing workloads. These changes often require different application configurations to run optimally on your cluster.

With the reconfiguration feature, you can now change configurations on running EMR clusters. Starting with EMR release emr-5.21.0, this feature allows you to modify configurations without creating a new cluster or manually connecting by SSH into each node.

In this post, I go over the following topics:

  • Using reconfiguration
  • Instance group states, configuration versions, and events
  • Reconfiguration example use cases
  • Reconfiguration benefits

Using reconfiguration

The following tasks are updated in EMR release emr-5.21.0:

  • Submitting a reconfiguration
  • Modifying configurations
  • Defining configuration levels

Submitting a reconfiguration

You can submit a recognition through the EMR console, SDK, or AWS CLI. For more information, see submitting a reconfiguration and additional information.

Modifying configurations

When submitting a reconfiguration, you must include all of the configurations you want to apply to the cluster. The update only applies those items, removing all others. As you modify configurations, the EMR console also tracks your previous cluster configurations for you.

Defining configuration levels

Define cluster-level and instance-group-level configurations for your applications. Supply cluster-level configurations as you create a cluster. These configurations are then automatically applied to all your instance groups, even those added after the cluster’s up and running. After the configuration starts, you can’t modify your cluster-level configurations. But you can supplement or override those configurations on the instance-group level through reconfiguration requests. Whenever you submit a reconfiguration request for an instance group, these new instance-group-level configurations take precedence over inherited cluster-level configurations.

To better understand how cluster-level and instance-group-level configurations work together on an instance group, look at a simple demonstration in the EMR console:

Under the Configuration tab, select an instance group in the Filter drop-down list. Navigate to the desired instance group’s configuration table. The Source column of the configuration table indicates the level of your configurations.

This cluster starts with the cluster-level configuration set:

[
  {
    "Classification": "core-site",
    "Properties": {
      "Key-A": "Value-1",
      "Key-B": "Value-2"
    }
  }
]

As you can see in the console, the instance group ig-Y4E3MN8C4YBP automatically inherited the cluster-level configuration set. Now, reconfigure the instance group as follows:

[
  {
    "Classification": "core-site",
    "Properties": {
      "Key-A": "Value-a",
      "Key-C": "Value-3"
    }
  }
]

Once the request goes through, the value of configuration “Key-A” gets overridden by the instance-group-level configuration and changes from “Value-1” to “Value-a.”  In contrast, the value of configuration “Key-B” remains unchanged. Meanwhile, your request introduces the new, supplemental configuration, “Key-C.” The configuration table in your console always displays these kinds of subtle changes.

For more information about how to customize cluster-level and instance-group-level configurations, see Supplying a Configuration when Creating a Cluster.

Instance group states, configuration versions, and events

The states of your reconfiguration requests appear in instance group state transitions, configuration version increases, and CloudWatch events. Understand how each works to keep from losing track of any reconfiguration request:

  • Instance group states: After an instance group receives a reconfiguration request, it transitions from the RUNNING state to the RECONFIGURING state. The RECONFIGURING state indicates the start of the reconfiguration process. After the process completes and the new configurations have taken effect, the instance group returns to the RUNNING state. Then, you can verify your configurations either via your application’s Web UI or application-specific commands.
  • Configuration versions: Every reconfiguration request you submit establishes a new configuration set, distinguished by a new version number. Configuration versions start from 0 and increase by 1 for each new configuration set that you submit. Each instance group keeps its respective configuration version number. Version numbers increase depending on the number of times that you reconfigure the different instance groups.
  • Events:EMR posts a state for each reconfiguration request as an Amazon CloudWatch event. These events list the exact times when the request is submitted, the reconfiguration operation starts, and when it completes. For easy tracking, each request is posted together with its associated configuration version. For example, the following event flow shows how EMR executes a typical reconfiguration request in an instance group:

For a complete list of EMR events and instance group state transitions for reconfiguration operation, see the EMR Management Guide.

Reconfiguration example use cases

Here are some use case examples of reconfiguration operations:

  • Reconfiguring HDFS blocksize
  • Configuring capacity-scheduler queues

Reconfiguring HDFS blocksize

You may deal with fluctuating workloads. These changes can call for new application configurations throughout the lifetime of a cluster.

For example, suppose that you’ve recently seen growth in the workload and filesize for your long-running cluster. You’d like to account for this growth without replacing your current cluster.

To increase your HDFS block size for better performance, take advantage of the new reconfiguration feature. HDFS NameNode tracks each data block in your cluster. Increasing this block size could increase HDFS performance by reducing the number of blocks watched by NameNode. In addition, this feature improves job performance by reducing the number of required mappers.

To increase the HDFS block size from the default of 128 GB to 256 GB, submit a reconfiguration request to the master instance group, which runs the same node:

$ aws emr modify-instance-groups --cli-input-json file://reconfiguration.json

Here’s the example reconfiguration.json file.

reconfiguration.json:
{
  "ClusterId": "j-MyClusterID",
  "InstanceGroups": [
  {
    "InstanceGroupId": "ig-MyMasterId",
    "Configurations": [
    {
      "Classification": "hdfs-site",
      "Properties": {
        "dfs.blocksize": "256m"
      },
    "Configurations": []
    }]
  }]
}

The EMR reconfiguration process then modifies the “dfs.blocksize” parameter to the provided “256 m” value within the hdfs-size.xml file. The reconfiguration process also automatically restarts NameNode, to pick up the new configuration. Any new blocks added to the cluster automatically use the new default blocksize of 256 MB. If you’d like any existing blocks to pick up this default, follow these steps:

  1. Copy the blocks to a new location.
  2. Delete the originals.
  3. Copy the blocks back to their original location.

The restored blocks pick up the new default block size. NameNode is inactive during the short restart period.

Configuring capacity-scheduler queues

Do you want to change cluster resources sharing strategies among different Hadoop jobs? Modify YARN CapacityScheduler configurations on a running cluster? Add new queues on a large shared cluster that you manage with another organization? Alter the capacity allocation between different queues to meet your changing workloads?

Using the EMR reconfiguration feature, you can make changes by submitting a reconfiguration request to the master node. New configurations take effect on your queues in a few minutes. You don’t have to go through the hassle of logging into the master node, directly updating the configuration file, or manually refresh queues.

EMR clusters come with a single queue by default. To create two additional queues, alpha, and beta, and allocate each 30% of the total resource capacity of your cluster to each of them. Here’s a sample command that submits a reconfiguration request to accomplish the desired change:

$ aws emr modify-instance-groups --cli-input-json file://reconfiguration.json

Here’s the example reconfiguration.json file.

reconfiguration.json:
{
   "ClusterId":"j-MyClusterID",
   "InstanceGroups":[
      {
         "InstanceGroupId":"ig-MyMasterId",
         "Configurations":[
            {
               "Classification":"capacity-scheduler",
               "Properties":{
                  "yarn.scheduler.capacity.root.queues":"default,alpha,beta",
                  "yarn.scheduler.capacity.root.default.capacity":"40",
                  "yarn.scheduler.capacity.root.default.accessible-node-labels.CORE.capacity":"40",
                  "yarn.scheduler.capacity.root.alpha.capacity":"30",
                  "yarn.scheduler.capacity.root.alpha.accessible-node-labels":"*",
                  "yarn.scheduler.capacity.root.alpha.accessible-node-labels.CORE.capacity":"30",
                  "yarn.scheduler.capacity.root.beta.capacity":"30",
                  "yarn.scheduler.capacity.root.beta.accessible-node-labels":"*",
                  "yarn.scheduler.capacity.root.beta.accessible-node-labels.CORE.capacity":"30"
               },
               "Configurations":[]
            }
         ]
      }
   ]
}

Access to the “*” label was given to both queues so that each can access labeled core nodes. Additionally, the sum of capacities for all queues must be equal to 100. The capacity of the default queue decreases to 40%.

Finally, the capacity for each queue’s access to the core label matches the capacity of the queue itself. That means that the core partition splits between queues at the same ratio as the rest of the cluster.

After completing this step, go to the YARN ResourceManager Web UI to verify that your modifications have taken place.

EMR reconfiguration benefits

The following are EMR reconfiguration benefits:

  • Rolling reconfiguration process
  • Reconfiguration failure and reversion

Rolling reconfiguration process

One key benefit of EMR reconfiguration is a rolling reconfiguration process. From the documentation:

“Amazon EMR follows a ‘rolling’ process to reconfigure the instances in the Task and Core instance groups. Only 10 percent of the instances in an instance group are modified and restarted at a time. This process takes longer to finish but reduces the chance of potential application failure in a running cluster.”

Rolling reconfiguration protects against any HDFS downtime by allowing 90% of core nodes to stay running during reconfiguration. YARN on EMR additionally has NodeManager recovery enabled. NodeManager recovers containers running after the reconfiguration restart.

Because containers are always active, some MapReduce jobs can continue to run successfully during the reconfiguration process. However, not all applications can recover after a restart. For example, Spark on YARN (the EMR default) may encounter executor issues and job failure after NodeManager restart.

Test applications with the type of reconfiguration that you plan to do in a safe environment before reconfiguring in production.

Finally, the rolling reconfiguration process might result in a temporary mismatch of your instance group’s configuration state. While mismatched, some instances may have old configurations while others may have the newly requested ones. When reconfiguring your cluster, consider any possible side effects of this situation.

Reconfiguration failure and reversion

EMR can also recover your instance group from a reconfiguration failure.

To make your new configuration take effect, EMR restarts your reconfigured applications and ensures that they are running before declaring the reconfiguration operation complete.

However, if any application fails to restart successfully on any node, the reconfiguration operation fails and the instance group remains in the RECONFIGURING state. Such failures might result from problematic configuration values. For example, an invalid address for `yarn.resourcemanager.scheduler.address` can cause the YARN ResourceManager to fail to restart.

In such situations, EMR automatically triggers a configuration reversion. Reversion re-applies the previous working configuration set on the instance group. Reversion brings the instance group state back to the RUNNING state as soon as the reversion completes. Your instance group thus returns to a functioning state and maintains the availability of your applications on the cluster. Rolling reconfiguration continues throughout the process.

If applications still fail to start after the previous working configurations have been re-applied, EMR places the instance group in te ARRESTED state rather than make further reconfiguration attempts. To release the instance group from the ARRESTED state, submit a new reconfiguration request.

Summary

In this post, I showed you the basics of how to configure instance groups on running clusters using the new EMR cluster reconfiguration feature. I walked through the extra semantics of submitting reconfiguration requests, important configuration level concepts, and ways of reconfiguration tracking methods. I provided some real-world reconfiguration examples and covered two useful features of reconfiguration.

Try the new cluster reconfiguration feature and share your experience with us in the comments below!

 


About the Authors

Brandon Scheller is a software development engineer for Amazon EMR. His passion lies in developing and advancing the applications of the Hadoop ecosystem and working with the open source community. He enjoys mountaineering in the Cascades with his free time.

 

 

 

Junyang Li is a software development engineer for Amazon EMR. She works on cutting-edge features of EMR and is also involved in open source projects. Besides work, she enjoys arts and crafts, exercising and traveling.

 

 

 

 

Detect fraudulent calls using Amazon QuickSight ML insights

Post Syndicated from Guy Ben Baruch original https://aws.amazon.com/blogs/big-data/detect-fraudulent-calls-using-amazon-quicksight-ml-insights/

The financial impact of fraud in any industry is massive. According to the Financial Times article Fraud Costs Telecoms Industry $17bn a Year (paid subscription required), fraud costs the telecommunications industry $17 billion in lost revenues every year.

Fraudsters constantly look for new technologies and devise new techniques. This changes fraud patterns and makes detection difficult. Companies commonly combat this with a rules-based fraud detection system. However, once the fraudsters realize their current techniques or tools are being identified, they quickly find a way around it. Also, rules-based detection systems tend to struggle and slow down with a lot of data. This makes it difficult to detect fraud and act quickly, resulting in loss of revenue.

Overview

There are several AWS services that implement anomaly detection and could be used to combat fraud, but lets focus on the following three:

When trying to detect fraud, there are two high-level challenges:

  • Scale – The amount of data to be analyzed. For example, each call generates a call detail record (CDR) event. These CDRs include many pieces of information such as originating and terminating phone numbers, and duration of call. Multiply these CDR events times the number of telephone calls placed each day and you can get an idea of the scale that operators must manage.
  • Machine learning knowledge and skill – The right set of skills to help solve business problems with machine learning. Developing these skills or hiring qualified data scientists with adequate domain knowledge is not simple.

Introducing Amazon QuickSight ML Insights

Amazon QuickSight is a fast, cloud-powered BI service that makes it easy for everyone in an organization to get business insights from their data through rich, interactive dashboards. With pay-per-session pricing and a dashboard that can be embedded into your applications, BI is now even more cost-effective and accessible to everyone.

However, as the volume of data that customers generate grows daily, it’s becoming more challenging to harness their data for business insights. This is where machine learning comes in. Amazon is a pioneer in using machine learning to automate and scale various aspects of business analytics in the supply chain, marketing, retail, and finance.

ML Insights integrates proven Amazon technologies into Amazon QuickSight to provide customers with ML-powered insights beyond visualizations.

  • ML-powered anomaly detection to help customers uncover hidden insights by continuously analyzing across billions of data points.
  • ML-powered forecasting and what-if analysis to predict key business metrics with point-and-click simplicity.
  • Auto-narratives to help customers tell the story of their dashboard in a plain-language narrative.

In this post, I demonstrate how a Telecom provider with little to no ML expertise can use Amazon QuickSight ML capabilities to detect fraudulent calls.

Prerequisites

To implement this solution, you need the following resources:

  • Amazon S3 to stage a ‘ribbon’ call detail record sample in a CSV format.
  • AWS Glue running an ETL job in PySpark.
  • AWS Glue crawlers to discover the schema of the tables and update the AWS Glue Data Catalog.
  • Amazon Athena to query the Amazon QuickSight dataset.
  • Amazon QuickSight to build visualizations and perform anomaly detection using ML Insights.

Diagram of fraudulent call-detecting architecture, using a PySpark script to prepare the data and transform it into Parquet and an AWS Glue crawler to build the AWS Glue Data Catalog.

The dataset

For this post, I use a synthetic dataset, thanks to Ribbon Communications. The data was generated by call test generators, and is not customer or sensitive data.

Inspecting the data

The example below is a typical CDR. The STOP CDR shown below is generated after a call has been terminated.


As you can see, there are a lot of values here. Most of them are not relevant for fraud identification or prevention.

Revenue shared fraud

Revenue shared fraud is one of the most common fraud schemes threatening the telecom industry today. It involves using fraudulent or stolen numbers to repeatedly call a premium rate B-number, who then shares the cash generated with the fraudster.

Say that you’d like to detect national and international revenue share fraud using Amazon QuickSight ML. Consider the typical traits of a revenue share fraud phone call. The pattern for revenue share fraud is multiple A-numbers calling the same B-number or a range of B-numbers with the same prefix. The call duration is usually higher than average and could be up to two hours, which is the maximum length of time international switches allow. Generally, the calls originate from one cell or a group of cells.

One SIM may make short test calls to a variety of B-numbers as a precursor to the fraud itself, which most often happens when the risk of detection is lowest, for example, Friday night, weekends, or holidays. Conference calling may be used to make several concurrent calls from one A-number.

Often, SIMs used for this type of fraud are sold or activated in bulk from the same distributor or group of distributors. SIMs could be topped up using fraudulent online or IVR payments, such as using stolen credit card numbers. Both PAYG credit and bundles may be used.Based on the above use case, the following pieces of information are most relevant to detecting fraud.

  • Call duration
  • Calling number (A number)
  • Called number (B number)
  • Start time of the call
  • Accounting ID

You can use this reference to help identify those fields in a CDR.

Figure 2: Decoded CDR data, highlighting the relevant fields.

I identified the columns that I need out of 235 columns in the CDR.

Inspecting the raw sample data, I quickly see that it’s missing a header.

To make life easier, I converted the raw CSV data, added the column names, and converted to Parquet.

Discovering the data

In the AWS Glue console, set up a crawler and name it CDR_CRAWLER.

Point the crawler to s3://telco-dest-bucket/blog where the Parquet CDR data resides.

Next, create a new IAM role to be used by the AWS Glue crawler.

For Frequency, leave the default definition of Run on Demand.

Next, choose Add database and define the name of the database. This database contains the table discovered by the AWS Glue crawler.

Choose next and review the crawler settings. When you’re satisfied, choose Finish.

Next, choose Crawlers, select the crawler that you just created (CDR_CRAWLER), and choose Run crawler.

The AWS Glue crawler starts crawling the database. This can take one minute or more to complete.

When it’s complete, under Data catalog, choose Databases.  You should be able to see the new database created by the AWS Glue crawler. In this case, the name of the database is blog.

To view the tables created under this database, select the relevant database and choose Tables. The crawler’s table also points to the location of the Parquet format CDRs.

To see the table’s schema, select the table created by the crawler.

Data preparation

You have defined the relevant dimensions to use in the ML model to detect fraud. Now, you can use a PySpark script that I built earlier using an Amazon SageMaker notebook and an AWS Glue endpoint. The script covers the following tasks:

  • Reduce the dataset and focus only on the relevant columns.
  • Create a timestamp column, which you need for creating an analysis using Amazon QuickSight.
  • Transform files from CSV to Parquet for improved performance.

You can run the PySpark script on the raw CSV format of the CDRs that you are using. Here is the location of the raw CSV format:

s3:/telco-source-bucket/machine-learning-for-all/v1.0.0/data/cdr-stop/cdr_stop.csv

Here is the PySpark script that I created.

import sys    
from awsglue.transforms import *    
from awsglue.utils import getResolvedOptions    
from pyspark.context import SparkContext    
from awsglue.context import GlueContext    
from awsglue.job import Job    
import pyspark.sql.functions as fn    
from awsglue.dynamicframe import DynamicFrame    
    
    
sc = SparkContext.getOrCreate()    
glueContext = GlueContext(sc)    
spark = glueContext.spark_session    
    
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "demo_ml", table_name = "cdr_stop_csv", transformation_ctx = "datasource0")    
#apply mapping from source table to destination , we pick only the relevant columns     
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col2", "string", "Accounting_ID", "string"), ("col13", "long", "Call_service_duration", "long"), ("col5", "string", "Start_Time_(MM/DD/YYYY)", "string"), ("col6", "string", "Start_Time_(HH/MM/SSs)", "string"), ("col19", "long", "Calling number", "string"), ("col20", "long", "Called number", "string")], transformation_ctx = "applymapping1")    
    
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")    
resolvechoice2.printSchema()    
    
resolvechoice3 = ResolveChoice.apply(frame = resolvechoice2, choice = "MATCH_CATALOG", database = "demo_ml", table_name = "cdr_stop_csv", transformation_ctx = "resolvechoice3")    
resolvechoice3.printSchema()    
    
customDF = resolvechoice3.toDF()    
#create timestamp column    
customDF = customDF.withColumn('timestamp', fn.concat(fn.col("Start_Time_(MM/DD/YYYY)"),fn.lit(" "),fn.col("Start_Time_(HH/MM/SSs)")))    
    
#create timestamp2 column which is a substring of timestamp column    
customDF = customDF.withColumn('timestamp2',fn.substring(fn.col("timestamp"),1,19))    
#create Date column     
customDF =customDF.withColumn("Date",fn.unix_timestamp(fn.col("timestamp2"),"MM/dd/yyyy HH:mm:ss").cast("timestamp"))    
    
#remove temporary fields     
customDF = customDF.drop('timestamp','timestamp2')    
    
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")    
#transform to parquet format and land in S3 path    
datasink4 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://telco-dest-bucket/blog/"}, format = "parquet", transformation_ctx = "datasink4")    

The dataset has been cataloged in AWS Glue Data Catalog and is queryable using Athena.

Amazon QuickSight and anomaly detection

Next, build out anomaly detection using Amazon QuickSight. To get started, follow these steps.

  1. In the Amazon QuickSight console, choose new analysis.
  2. click on create new data set
  3. select Athena
  4. enter a data source name
  5. click on create data source
  6. select from the drop down list the relevant database and table that were created by the AWS Glue crawlers and click on select
  7. select directly query your data and click visualize

Visualizing the data using Amazon QuickSight

  1. Under visual types, choose Line chart.
  2. Drag call_service_duration to the Value field well.
  3. Drag timestamp_new to the X axis field well.

Amazon QuickSight generates a dashboard, as in the following screenshot.

The x-axis is the timestamp. By default, it’s based on the aggregates of one day. This can be changed by choosing a different value.

Because I currently define the timestamp to look on one-day aggregations, the call duration is a sum of all call durations from all call records within a day. I can begin the search by looking for days where the total call duration is high.

Anomaly detection

Now look at how to start using the ML insights anomaly detection feature.

  1. On the top of the Insights panel, choose Add anomaly to sheet. This creates an insights visual for anomaly detection.
  2. On the top of the screen, choose Field Wells and add at least one field to the Categories, as in the following example. I added the calling/called number, as those become relevant for fraud use cases; for example, one A-number calling multiple B-numbers or multiple A-numbers calling B-numbers.
    The categories represent the dimensional values by which Amazon QuickSight splits the metric. For example, you can analyze anomalies on sales across all product categories and product SKUs—assuming there are 10 product categories, each with 10 product SKUs. Amazon QuickSight splits the metric by the 100 unique combinations and runs anomaly detection on each of the split metric.
  3. To configure the anomaly detection job, choose Get Started.
  4. On the anomaly detection configuration screen, set up the following options:
  • Analyze all combinations of these categories—By default, if you have selected three categories, Amazon QuickSight runs anomaly detection on the following combinations hierarchically: A, AB, ABC. If you select this option, QuickSight analyzes all combinations: A, AB, ABC, BC, AC. If your data is not hierarchical, check this option.
  • Schedule—Set this option to run anomaly detection on your data hourly, daily, weekly, or monthly, depending on your data and needs. For Start schedule on and Timezone, enter values and choose OK.Important: The schedule does not take effect until you publish the analysis as a dashboard. Within the analysis, you have the option to run the anomaly detection manually (without the schedule).Contribution analysis on anomaly – You can select up to four additional dimensions for Amazon QuickSight to analyze the top contributors when an anomaly is detected. For example, Amazon QuickSight can show you the top customers that contributed to a spike in sale. In my current example, I added one additional dimension: the accounting ID. If you think about a telecom fraud case, you can also consider fields like charging time or cell ID as additional dimensions.
  1. After setting the configuration, choose Run Now to execute the job manually, which includes the “Detecting anomalies… This may take a while…” message. Depending on the size of your dataset, this may take a few minutes or up to an hour.
  2. When the anomaly detection job is complete, any anomalies are called out in the insights visual. By default, only the top anomalies for the latest time period in the data are shown in the insights visuals.

    Anomaly detection reveals several B numbers being called from multiple A numbers with a high call service duration on August 29, 2018. That looks interesting!
  3. To explore all anomalies for this insight, select the menu on the top-right corner of the visual and choose Explore Anomalies.
  4. On the Anomalies detailed page, you can see all the anomalies for the latest period.
    In the view, you can see that two anomalies were detected, showing two time series.The title of the visuals represents the metric that is run on the unique combination of the categorical fields. In this case:
  • [All] | 9645000024
  • 3512000024 | [ALL]So the system detected anomalies for multiple A-numbers calling 9645000024, and 351200024 calling multiple B numbers. In both cases, it observed a high call duration. The labeled data point on the chart represents the most recent anomaly that is detected for that time series.
  1. To expose a date picker, choose show anomalies by date at the top-right corner. This chart shows the number of anomalies that were detected for each day (or hour, depending on your anomaly detection configuration). You can select a particular day to see the anomalies detected for that day.For example, selecting August 10, 2018 on the top chart shows the anomalies for that day:

    Important:
    The first 32 points in the dataset are used for training and are not scored by the anomaly detection algorithm. You may not see any anomalies on the first 32 data points.You can expand the filter controls on the top of the screen. With the filter controls, you can change the anomaly threshold to show high, medium, or low significance anomalies. You can choose to show only anomalies that are higher than expected or lower than expected. You can also filter by the categorical values that are present in your dataset to look at anomalies only for those categories.
  2. Look at the contributors columns. When you configured the anomaly detection, you defined the accounting ID as another dimension. If this were real call traffic instead of practice data, you would be able to single out specific accounting IDs that contribute to the anomaly.
  3. When you’re done, choose Back to analysis.

Summary

In this post, I explored a common fraud pattern called shared revenue fraud. I looked at how to extract the relevant data for training the anomaly detection model in Amazon QuickSight. I then used this data to detect anomalies based on call duration, calling party, and called party, looking at additional contributors like Accounting ID. The entire process used serverless technologies and little to no machine learning experience.

For more information about options and strategies, see Amazon QuickSight Announces General Availability of ML Insights.

If you have questions or suggestions, please comment below.

 


About the Author

Guy Ben Baruch is a solutions architect with Amazon Web Services.

 

 

 

Performance updates to Apache Spark in Amazon EMR 5.24 – Up to 13x better performance compared to Amazon EMR 5.16

Post Syndicated from Paul Codding original https://aws.amazon.com/blogs/big-data/performance-updates-to-apache-spark-in-amazon-emr-5-24-up-to-13x-better-performance-compared-to-amazon-emr-5-16/

Amazon EMR release 5.24.0 includes several optimizations in Spark that improve query performance. To evaluate the performance improvements, we used TPC-DS benchmark queries with 3-TB scale and ran them on a 6-node c4.8xlarge EMR cluster with data in Amazon S3. We observed up to 13X better query performance on EMR 5.24 compared to EMR 5.16 when operating with a similar configuration.

Customers use Spark for a wide array of analytics use cases ranging from large-scale transformations to streaming, data science, and machine learning. They choose to run Spark on EMR because EMR provides the latest, stable open source community innovations, performant storage with Amazon S3, and the unique cost savings capabilities of Spot Instances and Auto Scaling.

Each monthly EMR release offers the latest open source packages, alongside new features such as multiple master nodes, and cluster reconfiguration. The team also adds performance improvements with each release.

Each of those optimizations helps you run faster and reduce costs. With EMR 5.24, we have made several new optimizations and are detailing three critical ones in this post.

Setup

To get started with EMR, sign into the console, launch a cluster, and process data.

To replicate the setup for the benchmarking queries, use the following configuration:

  • Applications installed on the cluster: Ganglia, Hive, Spark, Hadoop (installed by default).
  • EMR release: EMR 5.24.0
  • Cluster configuration
    • Master instance group: 1 c4.8xlarge instance with 512 GiB of GP2 EBS storage (4 volumes of 128 GiB each)
    • Core instance group: 5 c4.8xlarge instances with 512 GiB of GP2 EBS storage (4 volumes of 128 GiB each)
ClassificationProperties
yarn-siteyarn.nodemanager.resource.memory-mb : 53248
yarn.scheduler.maximum-allocation-vcores : 36
spark-defaultsspark.executor.memory : 4743m
spark.driver.memory : 2g
spark.sql.optimizer.distinctBeforeIntersect.enabled : true
spark.sql.dynamicPartitionPruning.enabled : true
spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled : true
spark.executor.cores : 4
spark.executor.memoryOverhead : 890m

Results observed using TPC-DS benchmarks

The following two graphs compare the total aggregate runtime and geometric mean for all queries in the TPC-DS 3TB query dataset between the EMR releases.

The per-query runtime improvement between EMR 5.16 and EMR 5.24 is also illustrated in the following chart. The horizontal axis shows each of the queries in the TPC-DS 3 TB benchmark. The vertical axis shows the orders of magnitude of performance improvement seen in EMR 5.24.0 relative to EMR 5.16.0 as measured by query execution time. The largest performance improvements can be seen in 26 of the queries. In each of these queries, the performance was at least 2X better than EMR 5.16.

Performance optimizations in EMR 5.24

While AWS made several incremental performance improvements aggregating to the overall speedup, this post describes three major improvements in EMR 5.24 that affect the most common customer workloads:

  • Dynamic partition pruning
  • Flatten scalar subqueries
  • DISTINCT before INTERSECT

Dynamic partition pruning

Dynamic partition pruning improves job performance by selecting specific partitions within a table that must be read and processed for a query. By reducing the amount of data read and processed, queries run faster. The open source version of Spark (2.4.2) only supports pushing down static predicates that can be resolved at plan time. Examples of static predicate push down include the following:

partition_col = 5

partition_col IN (1,3,5)

partition_col BETWEEN 1 AND 3

partition_col = 1 + 3

With dynamic partition pruning turned on, Spark on EMR infers the partitions that must be read at runtime. Dynamic partition pruning is disabled by default, and can be enabled by setting the Spark property spark.sql.dynamicPartitionPruning.enabled from within Spark or when creating clusters. For more information, see Configure Spark.

Here’s an example that joins two tables and relies on dynamic partition pruning to improve performance. The store_sales table contains total sales data partitioned by region, and store_regions table contains a mapping of regions for each country. In this representative query, you want to only get data from a specific country.

SELECT ss.quarter, ss.region, ss.store, ss.total_sales 
FROM store_sales ss, store_regions sr
WHERE ss.region sr.region AND sr.country = ’North America’

Without dynamic partition pruning, this query reads all regions, before filtering out the subset of regions that match the results of the subquery. With dynamic partition pruning, only the partitions for the regions returned in the subquery are read and processed. This saves time and resources by both reading less data from storage, and processing fewer records.

The following graph shows the performance improvements to Queries 72, 80, 17, and 25 from the TPC-DS suite that we tested with 3-TB data.

Flatten scalar subqueries

This optimization can improve query performance where multiple conditions must be applied to rows from a specific table. The optimization prevents the table from being read multiple times for each condition. This optimization detects such cases, and optimizes the query to read the table only one time.

Flatten scalar subqueries is disabled by default and can be enabled by setting the Spark property spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled from within Spark or when creating clusters.

To give an example of how this works, use the same total_sales table from the previous optimization. In this example, you want to group stores by their average sales when their average sales are in between specific ranges.

SELECT (SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 5000000 AND 10000000) AS group1, 
(SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 10000000 AND 15000000) AS group2, 
(SELECT avg(total_sales) FROM store_sales 
WHERE total_sales BETWEEN 15000000 AND 20000000) AS group3  

With this optimization disabled, the total_sales table is read for each sub query. With the optimization enabled, the query is rewritten as follows to apply each of the conditions to the rows returned by reading the table only one time.

SELECT c1 AS group1, c2 AS group2, c3 AS group3 
FROM (SELECT avg (IF(total_sales BETWEEN 5000000 AND 10000000, total_sales, null)) AS c1, 
avg (IF(total_sales BETWEEN 10000000 AND 15000000, total_sales, null)) AS c2, 
avg (IF(total_sales BETWEEN 15000000 AND 20000000, total_sales, null)) AS c3 FROM store_sales);  

This optimization saves time and resources by both reading less data from storage, and processing fewer records.

To illustrate, take the example of Q9 from the TPCDS suite. The query runs 2.9x faster in version 5.24 compared to 5.16, when the relevant Spark property is switched on.

DISTINCT before INTERSECT

When producing the intersection of two collections, the result of that intersection is a set of unique values found in each collection. When dealing with large collections, many duplicate records must be both processed, and shuffled between hosts to finally calculate the intersection. This optimization eliminates duplicate values in each collection before computing the intersection, improving performance by reducing the amount of data shuffled between hosts.

This optimization is disabled by default and can be enabled by setting the Spark property spark.sql.optimizer.distinctBeforeIntersect.enabled from within Spark or when creating clusters.

For example (simplified from TPC-DS query14), you want to find all of the brands that are sold both in store and catalog sale channels. In this example, the store_sales table contains sale made through the store channel, the catalog_sales table contains sale made through catalog, and the item table contains each unique product’s formulation (e.g. brand, manufactuer).

(SELECT item.brand ss_brand FROM store_sales, item
WHERE store_sales.item_id = item.item_id)
INTERSECT
(SELECT item.brand cs_brand FROM catalog_sales, item 
WHERE catalog_sales.item_id = item.item_id) 

With this optimization disabled, the first SELECT statement produces 2,600,000 records (same number of records as store_sales) with only 1,200 unique brands. The second SELECT statement produces 1,500,000 records (same number of records as catalog_sales) with 300 unique brands. This results in all 4,100,000 rows being fed into the intersect operation to produce the 200 brands that exist in both results.

With the optimization enabled, a distinct operation is performed on each collection before being fed into the intersect operator, resulting in only 1,200 + 300 records being fed into the intersect operator. This optimization saves time and resources by shuffling less data between hosts.

Summary

With each of these performance optimizations to Apache Spark, you benefit from better query performance on EMR 5.24 compared to EMR 5.16. We look forward to feedback on how these optimizations benefit your real world workloads.

Stay tuned as we roll out additional updates to improve Apache Spark performance in EMR. To keep up-to-date, subscribe to the Big Data blog’s RSS feed to learn about more great Apache Spark optimizations, configuration best practices, and tuning advice. Be sure not to miss other great optimizations like using S3 Select with Spark, and the EMRFS S3-Optimized Committer from previous EMR releases.

 


About the Author

Paul Codding is a senior product manager for EMR at Amazon Web Services.

 

 

 

 

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

 

 

 

 

Joseph Marques is a principal engineer for EMR at Amazon Web Services.

 

 

 

 

Yuzhou Sun is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Atul Payapilly is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Surya Vadan Akivikolanu is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Introducing Amazon QuickSight fine-grained access control over Amazon S3 and Amazon Athena

Post Syndicated from Jose Kunnackal original https://aws.amazon.com/blogs/big-data/introducing-amazon-quicksight-fine-grained-access-control-over-amazon-s3-and-amazon-athena/

Today, AWS is excited to announce the availability of fine-grained access control for AWS Identity and Access Management (IAM)-permissioned resources in Amazon QuickSight. Fine-grained access control allows Amazon QuickSight account administrators to control authors’ default access to connected AWS resources. Fine-grained access control enables administrators to use IAM policies to scope down access permissions, limiting specific authors’ access to specific items within the AWS resources. Administrators can now apply this new level of access control to Amazon S3, Amazon Athena, and Amazon RDS/Redshift database discovery.

Fine-Grained Access Control Setup

Here’s how fine-grained access control works in Amazon QuickSight:

Imagine an AWS customer, Acme. Acme’s account contains three S3 buckets, called b1, b2, and b3. Amazon QuickSight is configured to read data via Athena. Assume that Acme’s administrators have configured Amazon QuickSight service role permissions with access to all three buckets. The new default access setting introduced today enables administrators to limit access to Acme’s data for all users by default. Administrators grant access to specific buckets (b1, b2, b3) to individual users or groups within Acme via IAM policies.

In the following example, the policies assigned to Acme users A and B, and Group X grant them access to buckets 1, 2, and 3. Group Y is not assigned, as shown in the following diagram.

When User A attempts to read data via Athena from bucket 1, AWS evaluates the IAM policy associated with the user. Since the policy assigned to User A grants access to bucket 1, the query succeeds. User A can access data from bucket 1. Similarly, User B and users in group X can access data in buckets 2 and 3, respectively.

However, when a user from group Y tries to access bucket 2, QuickSight doesn’t allow any access to data. Remember, group Y has no user-level assignments. Users from group Y are denied access to bucket 2 because Amazon QuickSight requires explicit permissions to access data, as shown in the following diagram.

In an S3 data lake, Amazon QuickSight enables administrators to restrict each author’s data access using IAM policies. (Other AWS mechanisms, outside of Amazon QuickSight, provide for the modification of the policies themselves.) This fine-grained access control relies on the underlying IAM policy evaluation infrastructure. Currently, there is a ten-policy limit on policies that may be linked together at runtime to evaluate the user’s permissions.

Amazon QuickSight also offers an updated UI for AWS permissions management. Administrators can now access an account’s default resource setting, as shown in the following screenshot:

You can set Default resource access to Allow or Deny, based on the administration model. Choose the appropriate option and press Update. (Or, if you decide not to proceed, press Cancel.)

As before, you can specify AWS resource permissions for the Amazon QuickSight account through the new UI. Select resources from the checkboxes on the right side of the screen, as shown in the following screenshot:

And, as shown in the next screenshot, S3 access can be granted to buckets within the AWS account or other AWS accounts:

To control access for specific users or groups, use the newly introduced fine-grained access control feature, illustrated in this screenshot:

The IAM policy assignments button leads you to a page displaying all assignments in the account and lets you create a new assignment, as shown in the following screenshot.

Creating a new policy assignment involves only two steps:

  1. Pick from an IAM policy from those on the AWS account list.
  2. Assign to specific users or groups.

If you haven’t yet configured groups in Amazon QuickSight, you can do so using AWS APIs for accounts using SSO or Amazon QuickSight-native credentials. Groups are also natively available in an AD-connected account.

Fine-grained access control in Amazon QuickSight, when combined with data in S3 and Athena, allows you to set up a secure environment for data exploration across the organization. This feature is available in Amazon QuickSight Enterprise Edition in all supported AWS Regions starting today.

 


About the Author

Jose Kunnackal is a principal product manager for Amazon QuickSight.

 

 

 

Why 3M Health Information Systems chose Amazon Redshift to build a healthcare analytics platform

Post Syndicated from Dhanraj Shriyan original https://aws.amazon.com/blogs/big-data/why-3m-health-information-systems-chose-amazon-redshift-to-build-a-healthcare-analytics-platform/

3M Health Information Systems (HIS), a business of 3M Health Care, works with providers, payers, and government agencies to anticipate and navigate a changing healthcare landscape. 3M provides healthcare performance measurement and management solutions, analysis and strategic services that help clients move from volume to value-based health care, resulting in millions of dollars in savings, improved provider performance and higher quality care. 3M’s innovative software is designed to raise the bar for computer-assisted coding, clinical documentation improvement, performance monitoring, quality outcomes reporting, and terminology management.

There was an ongoing initiative at 3M HIS to migrate applications installed on-premises or on other cloud hosting providers to Amazon Web Services (AWS). 3M HIS began migration to AWS to take advantage of compute, storage and network elasticity. We wanted to build on a solid foundation that would help us focus more of our efforts on delivering customer value while also scaling to support business growth that we expected over the next several years. 3M HIS was already processing healthcare data for many customers which is complex in nature, requiring a lot of complex transformations to get data into a format useful for analytics or machine learning.
After reviewing many solutions, 3M HIS chose Amazon Redshift as the appropriate data warehouse solution. We concluded Amazon Redshift met our needs; a fast, fully managed, petabyte-scale data warehouse solution that uses columnar storage to minimize I/O, provides high data compression rates, and offers fast performance. We quickly spun up a cluster in our development environment, built out the dimensional model, loaded data, and made it available to perform benchmarking and testing of the user data. An extract, transform, load (ETL) tool was used to process and load the data from various sources into Amazon Redshift.

After reviewing many solutions, 3M HIS chose Amazon Redshift as the appropriate data warehouse solution. We concluded Amazon Redshift met our needs; a fast, fully managed, petabyte-scale data warehouse solution that uses columnar storage to minimize I/O, provides high data compression rates, and offers fast performance. We quickly spun up a cluster in our development environment, built out the dimensional model, loaded data, and made it available to perform benchmarking and testing of the user data. An extract, transform, load (ETL) tool was used to process and load the data from various sources into Amazon Redshift.

3M legacy implementation

3M HIS processes a large volume of data through this data warehouse. We ingest healthcare data from clients, representing millions of procedure codes, diagnosis codes and all the associated meta data for each of those codes. The legacy process loaded this data into the data warehouse once every two weeks.

For reporting, we published 25 static reports and 6 static HTML reports for over 1000 customers every week. To provide business intelligence reports, we built analytical cubes on a legacy relational database and provided reports from those cubes.

With the ever-increasing amounts of data to be processed meeting the SLA’s was a challenge. It was time to replace our SQL database with a modern architecture and tooling that would auto scale based on the data to be processed and be performant.

How 3M modernized the data warehouse with Amazon Redshift

When choosing a new solution, first we had to ensure that we were able to load data in near real-time. Second, we had to ensure that the solution was scalable, because the amount of data stored in the data warehouse would be 10x larger as compared to the existing solution. Third, the solution needed to be able to provide reports for massive queries in reasonable amount of time without impacting the ETL processing that would be running 24/7. Last, we needed a cost-effective data warehouse that could integrate with other analytics services that were part of the overall solution.

We evaluated data warehouses such as Amazon Redshift and Snowflake. We chose Amazon Redshift because it fulfilled the preceding criteria and aligned with our preference for native AWS managed services. But also, we did so because Amazon Redshift would be a solution for the future that could keep pace with the growth of the business in an economically sustainable way.

To build the reporting tool, we migrated our multi-terabyte data warehouse to Amazon Redshift. The data was processed through the ETL workflow into an Amazon S3 bucket and then bulk-copied into Amazon Redshift. The GitHub repository provided by AWS, a collection of scripts and utilities, helped us set up the cluster and get the best performance possible from Amazon Redshift.

Top lessons that we learned during the implementation

During the initial development, we encountered challenges loading the data into the Amazon Redshift tables because we were trying to load the data from an Amazon RDS staging instance into Amazon Redshift. After some research, we identified that a bulk load from Amazon S3 bucket was the best practice to get large amounts of data loaded into the Amazon Redshift tables.

The second challenge was that the Amazon Redshift VACUUM and ANALYZE operations were holding up our ETL pipeline, because these operations had been baked into the ETL process. We performed frequent data loads into the Amazon Redshift tables and a lot of DELETE operations as part of the ETL processing. These two concerns meant that the VACUUM and ANALYZE operations had to be performed frequently, resulting in the tables being locked for the operations’ duration and conflicting with the ETL process. Triggering the process after all the loads were complete helped eliminate the performance issues we were encountering. VACUUM and ANALYZE have recently been automated, which we expect to prevent such issues arising in the future.

The final challenge was to find a way to use windowing functions which previously resided in the Analysis Services cube layer, whose functionality Amazon Redshift now fulfilled. However, most of the windowing functions that we need are built into Amazon Redshift allowing for an easy transition to port the existing functionality to Amazon Redshift and provide the same results.

During the port, we used Amazon Redshift’s comprehensive best practices guide and tuning techniques. We found these helped us set up the Amazon Redshift cluster for optimal performance.

Flow diagram of the new implementation

Benefits of the updated implementation

With the legacy solution, our implementation had grown complex, we found it difficult to support the growing volume of new data we needed to incorporate into the database and then report on in near real-time. Reports executing on the data were slowly drifting away from the initial SLA requirements. With Amazon Redshift, we’re working to solve those problems with less need for manual maintenance and care and feeding of the solution. First, it has the potential to allow us to store a larger quantity of data for a longer time. Second, adding nodes to the cluster when needed is simple and we can do it in minutes with the Elastic Resize feature. Likewise, we can scale back nodes when cost sensitivity is an issue. Third, Amazon Redshift also gives better support for computing analytics over large sets of grouped data than our previous solution. Often, we want to look at how recent data compares to historical data. In some cases, we want more than a year or two of historical data to rule out seasonality and we have found that Amazon Redshift is a more scalable solution for this type of operation.

Conclusion

At 3M HIS, we’re transforming health care from a system that treats disease to one that improves health and wellness with accurate health and clinical information from the start. 3M’s nearly 40 years of clinical knowledge, from data management and nosology to reimbursement and risk adjustment, opens the door for our providers and payers clients to find innovative solutions that improve outcomes across the continuum of care. We help our clients ensure accurate and compliant reimbursement, as well as leverage 3M’s analytical capabilities, powered by AWS, to improve health system and health plan performance while lowering costs.

 


About the Author

Dhanraj Shriyan is an Enterprise Data Architect at 3M Health Information Systems with a Masters in Predictive Analytics from Northwestern University, Chicago. He loves helping customers in exploring their data and providing valuable insights and implementing a scalable solution using the right database technology based on their needs. He has several years of experience in building large scale datawarehouse Business Intelligence solutions in cloud as well as on prem. Currently, exploring graph technologies and Lake formation services in AWS.

 

 

 

 

Amazon EMR Migration Guide

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/amazon-emr-migration-guide/

Businesses worldwide are discovering the power of new big data processing and analytics frameworks like Apache Hadoop and Apache Spark, but they are also discovering some of the challenges of operating these technologies in on-premises data lake environments. They may also have concerns about the future of their current distribution vendor.

To address this, we’ve introduced the Amazon EMR Migration Guide (first published June 2019.) This paper is a comprehensive guide to offer sound technical advice to help customers in planning how to move from on-premises big data deployments to EMR.

Common problems of on-premises big data environments include a lack of agility, excessive costs, and administrative headaches, as IT organizations wrestle with the effort of provisioning resources, handling uneven workloads at large scale, and keeping up with the pace of rapidly changing, community-driven, open-source software innovation. Many big data initiatives suffer from the delay and burden of evaluating, selecting, purchasing, receiving, deploying, integrating, provisioning, patching, maintaining, upgrading, and supporting the underlying hardware and software infrastructure.

A subtler, if equally critical, problem is the way companies’ data center deployments of Apache Hadoop and Apache Spark directly tie together the compute and storage resources in the same servers, creating an inflexible model where they must scale in lock step. This means that almost any on-premises environment pays for high amounts of under-used disk capacity, processing power, or system memory, as each workload has different requirements for these components. Typical workloads run on different types of clusters, at differing frequencies and times of day. These big data workloads should be freed to run whenever and however is most efficient, while still accessing the same shared underlying storage or data lake. See Figure 1. below for an illustration.

How can smart businesses find success with their big data initiatives? Migrating big data (and machine learning) to the cloud offers many advantages. Cloud infrastructure service providers, such as AWS offer a broad choice of on-demand and elastic compute resources, resilient and inexpensive persistent storage, and managed services that provide up-to-date, familiar environments to develop and operate big data applications. Data engineers, developers, data scientists, and IT personnel can focus their efforts on preparing data and extracting valuable insights.

Services like Amazon EMR, AWS Glue, and Amazon S3 enable you to decouple and scale your compute and storage independently, while providing an integrated, well-managed, highly resilient environment, immediately reducing so many of the problems of on-premises approaches. This approach leads to faster, more agile, easier to use, and more cost-efficient big data and data lake initiatives.

However, the conventional wisdom of traditional on-premises Apache Hadoop and Apache Spark isn’t always the best strategy in cloud-based deployments. A simple lift and shift approach to running cluster nodes in the cloud is conceptually easy but suboptimal in practice. Different design decisions go a long way towards maximizing your gains as you migrate big data to a cloud architecture.

This guide provides the best practices for:

  • Migrating data, applications, and catalogs
  • Using persistent and transient resources
  • Configuring security policies, access controls, and audit logs
  • Estimating and minimizing costs, while maximizing value
  • Leveraging the AWS Cloud for high availability (HA) and disaster recovery (DR)
  • Automating common administrative tasks

Although not intended as a replacement for professional services, this guide covers a wide range of common questions, and scenarios as you migrate your big data and data lake initiatives to the cloud.

When starting your journey for migrating your big data platform to the cloud, you must first decide how to approach migration. One approach is to re-architect your platform to maximize the benefits of the cloud. The other approach is known as lift and shift, is to take your existing architecture and complete a straight migration to the cloud. A final option is a hybrid approach, where you blend a lift and shift with re-architecture. This decision is not straightforward as there are advantages and disadvantages of both approaches.

A lift and shift approach is usually simpler with less ambiguity and risk. Additionally, this approach is better when you are working against tight deadlines, such as when your lease is expiring for a data center. However, the disadvantage to a lift and shift is that it is not always the most cost effective, and the existing architecture may not readily map to a solution in the cloud.

A re-architecture unlocks many advantages, including optimization of costs and efficiencies. With re-architecture, you move to the latest and greatest software, have better integration with native cloud tools, and lower operational burden by leveraging native cloud products and services.

This paper provides advantages and disadvantages of each migration approach from the perspective of the Apache Spark and Hadoop ecosystems. To read the paper, download the Amazon EMR Migration Guide now.

For a more general resource on deciding which approach is ideal for your workflow, see An E-Book of Cloud Best Practices for Your Enterprise, which outlines the best practices for performing migrations to the cloud at a higher level.

 


About the Author

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 

 

Amazon QuickSight updates: Multiple sheets in dashboards, axis label orientation options, and more

Post Syndicated from Sahitya Pandiri original https://aws.amazon.com/blogs/big-data/amazon-quicksight-updates-multiple-sheets-in-dashboards-axis-label-orientation-options-and-more/

Today, we are pleased to announce a set of updates to Amazon QuickSight:

  • Richer dashboards with multiple sheets in your regular and embedded dashboards
  • Multiple axis label orientation options for better readability of dashboards
  • More calculations such as standard deviation, variance and conditional string functions on SPICE
  • Enhanced URL actions for supporting a broader set of interaction scenarios
  • One-click duplication of visuals for faster authoring

Multiple sheets in Dashboards

First, let’s look at the update to dashboards that allows multiple sheets, accessed via a new tab control. This allows better organization of information within the dashboard, where visuals related to specific subject areas or topics can be organized in separate sheets and distinctly identified through the tab name. As a reader, you can then navigate to a single dashboard to get a comprehensive view of all the insights related to a topic.

Each sheet within a dashboard has a distinct URL, which allows you to bookmark the specific sheet, or share a link to the specific sheet with others. Additionally, dashboards can also be setup to filter content at a dashboard level, or at a per-sheet level, with the filters being passed either via on-screen controls or using the URL.

Filtering through on-screen controls

Navigating to another dashboard and passing filters via URL

If you are authoring a dashboard, you will find that every dashboard now starts off as a single sheet. You can use the “add sheet” icon in the tabs section to add a new sheet to your dashboard. Note that the tab control only shows up on a dashboard if there is more than one sheet. When authoring, you will also be able to rename a sheet, change the order of sheets or delete a sheet. The first sheet will always be the one presented to your readers by default, so this would be where you want to provide a summary of the dashboard with KPIs, ML-Insights, or high level trends. Email reports will send the first sheet of the dashboard to all your viewers; we are working on additional rendering options for email reports.

Previously, on the Big Data Blog, we covered ways to add parameters and on-screen controls to dashboards. Parameter values are global across a dashboard, which allow you to maintain user inputs and filter content according to user selections across the dashboard. On-screen controls are specific to each sheet, since the context of these controls is often confined to the sheet itself. If you want to discreetly filter data based on user or group-level attributes, you may also use row-level security to do so.

Multi-sheet dashboards can be shared through the regular website, or can be embedded in your custom application. To embed a dashboard in your application, visit Embed interactive dashboards in your application with Amazon QuickSight.

Axis label orientation

When authoring a dashboard, you will see that Amazon QuickSight now automatically selects a horizontal or angled layout to provide better readability. You can also manually navigate to a visual’s options menu and choose “Format visual”, and then choose the axis label orientation that is most suited to your dashboard.

Duplicate visual option

When authoring a dashboard, there are often situations where you want to quickly clone a visual and make changes. With the new “Duplicate visual” option, Amazon QuickSight allows you to achieve this via a one-click action from the visual’s options menu. This new option creates a copy of the visual, along with its filters and other settings such as visual size and format, within the same sheet or a different sheet within the dashboard, thus removing the overhead of creating visuals from scratch each time.

URL action open options

URL actions allow you to configure actions outside of QuickSight that can be triggered by a user viewing a QuickSight dashboard. With today’s release we will allow you to choose how these URLs are opened, with options of overriding the viewer’s QuickSight browser tab, loading in a new tab, or a new browser window altogether.

In addition to these features that improve author and reader experience, QuickSight now supports additional statistical capabilities through new functions in calculated fields. These statistical functions include standard deviation and variance calculations and conditional functions on string fields. With standard deviation and variance, you can now determine dispersion of a metric relative to the mean. You can do this across your entire data set or partition by a select dimension. For more information, see Functions by Category in the Amazon QuickSight User Guide.

With conditional string functions on SPICE, you can now use coalesce, ifelse, isNotNull, isNull, nullif while building reports on SPICE datasets. For more information, see Calculated Field Function and Operator Reference for Amazon QuickSight in the Amazon QuickSight user guide.

All of these new features are now available in both Standard and Enterprise Editions in all supported Amazon QuickSight regions – US East (N. Virginia and Ohio), US West (Oregon), EU (Ireland), and Asia Pacific (Singapore, Sydney and Tokyo).


About the Author

Sahitya Pandiri is a technical program manager with Amazon Web Services. Sahitya has been in the product/program management for 5 years now, and has built multiple products in the retail, healthcare and analytics spaces. She enjoys problem solving, and leveraging technology to simplify processes.

 

 

 

Optimizing downstream data processing with Amazon Kinesis Data Firehose and Amazon EMR running Apache Spark

Post Syndicated from Srikanth Kodali original https://aws.amazon.com/blogs/big-data/optimizing-downstream-data-processing-with-amazon-kinesis-data-firehose-and-amazon-emr-running-apache-spark/

For most organizations, working with ever-increasing volumes of data and incorporating new data sources can be a challenge.  Often, AWS customers have messages coming from various connected devices and sensors that must be efficiently ingested and processed before further analysis.  Amazon S3 is a natural landing spot for data of all types.  However, the way data is stored in Amazon S3 can make a significant difference in the efficiency and cost of downstream data processing.  Specifically, Apache Spark can be over-burdened with file operations if it is processing a large number of small files versus fewer larger files.  Each of these files has its own overhead of a few milliseconds for opening, reading metadata information, and closing. This overhead of file operations on these large numbers of files results in slow processing. This blog post shows how to use Amazon Kinesis Data Firehose to merge many small messages into larger messages for delivery to Amazon S3.  This results in faster processing with Amazon EMR running Spark.

Like Amazon Kinesis Data Streams, Kinesis Data Firehose accepts a maximum incoming message size of 1 MB.  If a single message is greater than 1 MB, it can be compressed before placing it on the stream.  However, at large volumes, a message or file size of 1 MB or less is usually too small.  Although there is no right answer for file size, 1 MB for many datasets would just yield too many files and file operations.

This post also shows how to read the compressed files using Apache Spark that are in Amazon S3, which does not have a proper file name extension and store back in Amazon S3 in parquet format.

Solution overview

The steps we follow in this blog post are:

  1. Create a virtual private cloud (VPC) and an Amazon S3 bucket.
  2. Provision a Kinesis data stream, and an AWS Lambda function to process the messages from the Kinesis data stream.
  3. Provision Kinesis Data Firehose to deliver messages to Amazon S3 sent from the Lambda function in step 2. This step also provisions an Amazon EMR cluster to process the data in Amazon S3.
  4. Generate test data with custom code running on an Amazon EC2
  5. Run a sample Spark program from the Amazon EMR cluster’s master instance to read the files from Amazon S3, convert them into parquet format and write back to an Amazon S3 destination.

The following diagram explains how the services work together:

The AWS Lambda function in the diagram reads the messages, append additional data to them, and compress them with gzip before sending to Amazon Kinesis Data Firehose. The reason for this is most customers need some enrichment to the data before arriving to Amazon S3.

Amazon Kinesis Data Firehose can buffer incoming messages into larger records before delivering them to your Amazon S3 bucket. It does so according to two conditions, buffer size (up to 128 MB) and buffer interval (up to 900 seconds). Record delivery is triggered once either of these conditions has been satisfied.

An Apache Spark job reads the messages from Amazon S3, and stores them in parquet format. With parquet, data is stored in a columnar format that provides more efficient scanning and enables ad hoc querying or further processing by services like Amazon Athena.

Considerations

The maximum size of a record sent to Kinesis Data Firehose is 1,000 KB. If your message size is greater than this value, compressing the message before it is sent to Kinesis Data Firehose is the best approach. Kinesis Data Firehose  also offers compression of messages after they are written to the Kinesis Data Firehose data stream. Unfortunately, this does not overcome the message size limitation, because this compression happens after the message is written. When Kinesis Data Firehose delivers a previously compressed message to Amazon S3 it is written as an object without a file extension. For example, if a message is compressed with gzip before it is written to Kinesis Data Firehose, it is delivered to Amazon S3 without the .gz extension. This is problematic if you are using Apache Spark for downstream processing because a “.gz” extension is required.

We will see how to overcome this issue by reading the files using the Amazon S3 API operations later in this blog.

Prerequisites and assumptions

To follow the steps outlined in this blog post, you need the following:

  • An AWS account that provides access to AWS services.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
  • The templates and code are intended to work in the US East (N. Virginia) Region only.

Additionally, be aware of the following:

  • We configure all services in the same VPC to simplify networking considerations.
  • Important: The AWS CloudFormation templates and the sample code that we provide use hardcoded user names and passwords and open security groups. These are for testing purposes only. They aren’t intended for production use without any modifications.

Implementing the solution

You can use this downloadable template for single-click deployment. This template is launched in the US East (N. Virginia) Region by default. Do not change to a different Region. The template is designed to work only in the US East (N. Virginia) Region. To launch directly through the console, choose the Launch Stack button.

This template takes the following parameters. Some of the parameters have default values, and you can’t edit these. These predefined names are hardcoded in the code. For some of the parameters, you must provide the values. The following table provides additional details.

For this parameterProvide this
StackNameProvide the stack name.

ClientIP

 

The IP address range of the client that is allowed to connect to the cluster using SSH.
FirehoseDeliveryStreamNameThe name of the Amazon Firehose delivery stream. Default value is set to “AWSBlogs-LambdaToFireHose”.
InstanceTypeThe EC2 instance type.
KeyNameThe name of an existing EC2 key pair to enable access to login.
KinesisStreamNameThe name of the Amazon Kinesis Stream. Default value is set to “AWS-Blog-BaseKinesisStream”
RegionAWS Region – By default it is us-east-1 — US East (N. Virginia). Do not change this as the scripts are developed to work in this Region only.

EMRClusterName

 

A name for the EMR cluster.
S3BucketNameThe name of the bucket that is created in your account. Provide some unique name to this bucket. This bucket is used for storing the messages and output from the Spark code.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names and for I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND. And then click on the Create button.

If you use this one-step solution, you can skip to Step 7: Generate test dataset and load into Kinesis Data Streams.

To create each component individually, use the following steps.

1. Use the AWS CloudFormation template to configure Amazon VPC and create an Amazon S3 bucket

In this step, we set up a VPC, public subnet, internet gateway, route table, and a security group. The security group has two inbound access rules. The first inbound rule allows access to the TCP port 22 (SSH) from the provided client IP CIDR range and the second inbound rule allows access to any TCP port from any host with in the same security group. We use this VPC and subnet for all other services that are created in the next steps. In addition to these resources, we will also create a standard Amazon S3 bucket with a provided bucket name to store the incoming data and processed data. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
S3BucketNameProvide a unique Amazon S3 bucket. This bucket is created in your account.
ClientIpProvide a CIDR IP address range that is added to inbound rule of the security group. You can get your current IP address from “checkip.amazon.com” web url.

After you specify the template details, choose Next. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
StackNameName
VPCIDVpc-xxxxxxx
SubnetIDsubnet-xxxxxxxx
SecurityGroupsg-xxxxxxxxxx
S3BucketDomain<S3_BUCKET_NAME>.s3.amazonaws.com
S3BucketARNarn:aws:s3:::<S3_BUCKET_NAME>

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

2.  Use the AWS CloudFormation template to create necessary IAM Roles

In this step, we set up two AWS IAM roles. One of the IAM roles will be used by an AWS Lambda function to allow access to Amazon S3 service, Amazon Kinesis Data Firehose, Amazon CloudWatch Logs, and Amazon EC2 instances.  The second IAM role is used by the Amazon Kinesis Data Firehose service to access Amazon S3 service. You can use this downloadable CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names. Choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
LambdaRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole
FirehoseRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole

When the stack launch is complete, it returns the output with information about the resources that were created. Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

3. Use an AWS CloudFormation template to configure the Amazon Kinesis Data Firehose data stream

In this step, we set up Amazon Kinesis Data Firehose with Amazon S3 as destination for the incoming messages. We select the Uncompressed option for compression format, buffering options with 128 MB size and interval seconds of 300. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
FirehoseDeliveryStreamNameProvide the name of the Amazon Kinesis Data Firehose delivery stream. The default value is set to “AWSBlogs-LambdaToFirehose”
RoleProvide the Kinesis Data Firehose IAM role ARN that was created as part of step 2.
S3BucketARNSelect the S3BucketARN. You can get this from the step 1 AWS CloudFormation output.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

4. Use an AWS CloudFormation template to create a Kinesis data stream and a Lambda function

In this step, we set up a Kinesis data stream and an AWS Lambda function. We can use the AWS Lambda function to process incoming messages in a Kinesis data stream. An event source mapping is also created as part of this template. This adds a trigger to the AWS Lambda function for the Kinesis data stream source. For more information about creating event source mapping, see Creating an Event Source Mapping. This Kinesis data stream is created with 10 shards and the Lambda function is created with a Java 8 runtime. We allocate memory size of 1920 MB and timeout of 300 seconds. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides details.

For this parameterDo this
StackNameProvide the stack name.
KinesisStreamNameProvide the name of the Amazon Kinesis stream. Default value is set to ‘AWS-Blog-BaseKinesisStream’
RoleProvide the IAM Role created for Lambda function as part of the second AWS CloudFormation template. Get the value from the output of second AWS CloudFormation template.
S3BucketProvide the existing Amazon S3 bucket name that was created using first AWS CloudFormation template. Do not use the domain name. Provide the bucket name only.
RegionSelect the AWS Region. By default it is us-east-1 — US East (N. Virginia).

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

5. Use an AWS CloudFormation template to configure the Amazon EMR cluster

In this step, we set up an Amazon EMR 5.16.0 cluster with “Spark”, “Ganglia” and “Hive” applications. We create this cluster with one master and two core nodes, and use an r4.xlarge instance type. The template uses an AWS Glue metastore for the Amazon EMR hive metastore. This Amazon EMR cluster is used to process the messages in Amazon S3 bucket that are created by the Amazon Kinesis Data Firehose data stream. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EMRClusterNameProvide the name for the EMR cluster.
ClusterSecurityGroupSelect the security group ID that was created as part of the first AWS CloudFormation template.
ClusterSubnetIDSelect the subnet ID that was created as part of the first AWS CloudFormation template.
AllowedCIDRProvide the IP address range of the client that is allowed to connect to the cluster.
KeyNameProvide the name of an existing EC2 key pair to access the Amazon EMR cluster.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EMRClusterMasterssh [email protected] -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

6. Use an AWS CloudFormation template to create an Amazon EC2 Instance to generate test data

In this step, we set up an Amazon EC2 instance and install open-jdk version 1.8. The AWS CloudFormation script that creates this EC2 instance runs two additional steps. First, it downloads and installs open-jdk version 1.8. Second, it downloads a Java program jar file on to the EC2 instance’s ec2-user home directory. We use this Java program to generate test data messages with an approximate size of ~900 KB. We then send them to the Kinesis data stream that was created as part of the previous steps. The Java jar file name is: “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar”.

You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EC2SecurityGroupSelect the security group ID that was created from the first AWS CloudFormation template.
EC2SubnetSelect the subnet that was created from the first AWS CloudFormation template.
InstanceTypeSelect the provided instance type. By default, it selects r4.4xlarge instance.
KeyNameName of an existing EC2 key pair to enable SSH access to the EC2 instance.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page select “I acknowledge that AWS CloudFormation might create IAM resources with custom names” option and, click Create button.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EC2Instancessh [email protected]<Public-IP> -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

7. Generate the test dataset and load into Kinesis Data Streams

After all of the previous AWS CloudFormation stacks are created successfully, log in to the EC2 instance that was created as part of the step 6. Use the “ssh” command as shown in the CloudFormation stack template output. This template copies the “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar” file, which we use to generate the test data and send to Amazon Kinesis Data Streams. You can find the code corresponding to this sample Kinesis producer in this Git repository.

Make sure your EC2 instance’s security group allows ssh port 22 (Inbound) from your IP address. If not, update your security group inbound access.

Run the following commands to generate some test data.

$ cd;

 

$ ls -ltra sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

-rwxr-xr-x 1 ec2-user ec2-user 27536802 Oct 29 21:19 sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

 

$java -Xms1024m -Xmx25600m -XX:+UseG1GC -cp sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar com.optimize.downstream.entry.Main 10000

 

This java program uses PutRecords API method that allows many records to be sent with a single HTTP request. For more information on this you can check this AWS blog. Once you run the above java program, you will see the below output that shows messages are in the process of sending to Kinesis Data Stream.

“Starting producer and consumer.....
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 0
Producer Thread # 9 is going to sleep mode for 500 ms.
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 1
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 2
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 3
::
::
Record sent to Kinesis Stream. Record size is ::: 5042850 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042726 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042729 KB”

When running the sample Kinesis producer jar, notice that the number of messages is 10,000. This program generates the test data messages and is not a replacement for your load testing tool. This is created to demonstrate the use case presented in this post.

After all of the messages generated and sent to Amazon Kinesis Data Streams, program will exit gracefully.

The sample JSON input message format is shown as follows:

   "processedDate":"2018/10/30 19:05:19",
   "currentDate":"2018/10/30 19:05:07",
   "hashDeviceId":"0c2745e4-c2d6-4d43-8339-9c2401e80e92",
   "deviceId":"94581b5f-a117-484a-8e3c-4fcc2dbd53b7",
   "accelerometerSensorList":[  
      {  
         "accelerometer_Y":8,
         "gravitySensor_X":5,
         "accelerometer_X":9,
         "gravitySensor_Z":4,
         "accelerometer_Z":1,
         "gravitySensor_Y":5,
         "linearAccelerationSensor_Z":3,
         "linearAccelerationSensor_Y":9,
         "linearAccelerationSensor_X":9
      },
      {  
         "accelerometer_Y":1,
         "gravitySensor_X":3,
         "accelerometer_X":5,
         "gravitySensor_Z":5,
         "accelerometer_Z":7,
         "gravitySensor_Y":9,
         "linearAccelerationSensor_Z":6,
         "linearAccelerationSensor_Y":5,
         "linearAccelerationSensor_X":3
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "tempSensorList":[  
      {  
         "kelvin":585.4928040286752,
         "celsius":43.329574923775425,
         "fahrenheit":50.13864584530086
      },
      {  
         "kelvin":349.95625855125814,
         "celsius":95.68423052685313,
         "fahrenheit":7.854854574219985
      },
 {
   …
 },
 {
   …
 },
 :
 :
 
   ],
   "illuminancesSensorList":[  
      {  
         "illuminance":44.65135784368194
      },
      {  
         "illuminance":98.15404017082403
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "gpsSensorList":[  
      {  
         "altitude":4.38273213294682,
         "heading":7.416314616289915,
         "latitude":5.759723677991661,
         "longitude":1.4732885894731842
      },
      {  
         "altitude":9.816473807569487,
         "heading":5.118919157684835,
         "latitude":3.581361614110458,
         "longitude":1.3699272610616127
      },
 {
   …
 },
 {
   …
 },
 :
 :
   }

 

Log in to the Kinesis Data Streams console, then choose the Kinesis data stream that was created as part of the step 4.  Choose the Monitor tab to see the graphs. Run the data generation utility for at least 15 mins to generate enough data.

8. Processing Kinesis Data Streams messages using AWS Lambda

As part of the previously-described setup, we also use an AWS Lambda function (name:LambdaForProcessingKinesisRecords) to process the messages from the Kinesis data stream. This Lambda function reads each message content and appends “additional data.” This demonstrates that the incoming message from Kinesis data stream is read, and appended with some additional information to make the message size more than 1 MB.  Several customers have a use case like this to enrich the incoming messages by adding additional information. After the AWS Lambda function appends additional data to incoming messages, it sends them to Amazon Kinesis Data Firehose. Because Kinesis Data Firehose accepts only messages that are less than 1 MB, we must compress the messages before sending to it. In the Lambda function, we are compressing the message using gzip compression before sending it to Kinesis Data Firehose. In addition to compressing each message, we are also appending a new line character (“/n”) to each message after compressing it to separate the messages.

We set the buffer size to 128 MB and duration of the buffer is 900 seconds while creating the Kinesis Data Firehose. This helps merge the incoming compressed messages into larger messages and sends to the provided Amazon S3 bucket.

The AWS Lambda function appends the following content to the original message in Kinesis Data Streams after reading it.

"testAdditonalDataList": [
  {
    "dimesnion_X": 9,
    "dimesnion_Y": 2,
    "dimesnion_Z": 2
  },
  {
    "dimesnion_X": 3,
    "dimesnion_Y": 10,
    "dimesnion_Z": 5
  }
  {
    …
  },
  {
    …
  },
  :
  :
]

If we do not compress the message before sending to Kinesis Data Firehose, it throws this error message in the Amazon CloudWatch Logs.

Here is the code snippet where we are compressing the message in the AWS Lambda function. The complete code can be found in this Git repository.

private String sendToFireHose(String mergedJsonString)
{
    PutRecordResult res = null;
    try {
        //To Firehose -
        System.out.println("MESSAGE SIZE BEFORE COMPRESSION IS : " + mergedJsonString.toString().getBytes(charset).length);
        System.out.println("MESSAGE SIZE AFTER GZIP COMPRESSION IS : " + compressMessage(mergedJsonString.toString().getBytes(charset)).length);
        PutRecordRequest req = new PutRecordRequest()
                .withDeliveryStreamName(firehoseStreamName);

        // Without compression - Send to Firehose
        //Record record = new Record().withData(ByteBuffer.wrap((mergedJsonString.toString() + "\r\n").getBytes()));

        // With compression - send to Firehose
        Record record = new Record().withData(ByteBuffer.wrap(compressMessage((mergedJsonString.toString() + "\r\n").getBytes())));
        req.setRecord(record);
        res = kinesisFirehoseClient.putRecord(req);
    }
    catch (IOException ie) {
        ie.printStackTrace();
    }
    return res.getRecordId();
}

You can check the provided bucket to see if the messages are flowing into the bucket. The Amazon S3 bucket should show something similar to the following example:

You see the files generated from Kinesis Data Firehose that do not have any extension. By default, Kinesis Data Firehose does not provide any extension to the files that are generated in Amazon S3 bucket unless you select a compression option. But in our use case, since the size of the uncompressed input message is greater than 1 MB, we are compressing it before sending to Kinesis Data Firehose. As the message is already compressed, we are not selecting any compression option in Kinesis Data Firehose, as it double-compresses the message and the downstream Spark application cannot process this.

9. Reading and converting the data into parquet format using Apache Spark program with Amazon EMR

As we noted down from the previous screen shot, Kinesis Data Firehose by default does not generate any file extensions to the files that are written into Amazon S3 bucket. This creates a problem while reading the files using Apache Spark. Apache Spark, by default, checks for a valid file name extension if the file is compressed. In this case for gzip compression, it looks for <filename>.gz to successfully read it.

To overcome this issue, we can use Amazon S3 API operations, particularly AmazonS3Client class, to list all the Amazon S3 keys and use Spark’s parallelize method to read the contents of the files. After reading the file content, we can uncompress it using GZipInputStream class. You can find the code snippet below. The complete code can be found in the Git repository.

val allLinesRDD = spark.sparkContext.parallelize(s3ObjectKeys).flatMap
{ key => Source.fromInputStream
  (
   new GZipInputStream(s3Client.getObject(bucketName, key).getObjectContent:   InputStream)
  ).getLines 
}

var finalDF = spark.read.json(allLinesRDD).toDF()

Once the Amazon EMR cluster creation is completed successfully, login to the Amazon EMR master machine using the following command. You can get the “ssh” login command from the AWS CloudFormation stack 5 (step 5) outputs parameter “EMRClusterMaster”.

  • ssh [email protected] -i <KEYPAIR_NAME>.pem
  • Make sure the security port 22 is opened to connect to the Amazon EMR master machine.

Run the Spark program using the following Spark submit command.

spark-submit --class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet --master yarn --deploy-mode client s3://aws-bigdata-blog/artifacts/aws-blog-optimize-downstream-data-processing/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar <S3_BUCKET_NAME> fromfirehose/<YEAR>/ output-emr-parquet/

Change the S3_BUCKET_NAME and YEAR values from the previous Spark command.

Argument #PropertyValue
1–classcom.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet
2–masteryarn
3–deploy-modeclient
4s3://aws-bigdata-blog/artifacts/aws-blog-avoid-small-files/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar
5S3_BUCKET_NAMEThe Amazon S3 bucket name that was created as part of the AWS CloudFormation template. The source files are created in this bucket.
6<INPUT S3 LOCATION>“fromfirehose/<YYYY>/”. The input files are created in this Amazon S3 key location under the bucket that was created. “YYYY” represents the current year. For example, “fromfirehose/2018/”
7<OUTPUT S3 LOCATION>Provide an output directory name that will be created under the above provided Amazon S3 bucket. For example: “output-emr-parquet/”

 

When the program finishes running, you can check the Amazon S3 output location to see the files that are written in parquet format.

Cleaning up after the migration

After completing and testing this solution, clean up the resources by stopping your tasks and deleting the AWS CloudFormation stacks. The stack deletion fails if you have any files in the created Amazon S3 bucket. Make sure that you cleaned up the Amazon S3 bucket that was created before deleting the AWS CloudFormation templates.

Conclusion

In this post, we described the process of avoiding small file creation in Amazon S3 by sending the incoming messages to Amazon Kinesis Data Firehose. We also went through the process of reading and storing the data in parquet format using Apache Spark with an Amazon EMR cluster.

 


About the Author

Photo of Srikanth KodaliSrikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.

 

 

How to export an Amazon DynamoDB table to Amazon S3 using AWS Step Functions and AWS Glue

Post Syndicated from Joe Feeney original https://aws.amazon.com/blogs/big-data/how-to-export-an-amazon-dynamodb-table-to-amazon-s3-using-aws-step-functions-and-aws-glue/

In typical AWS fashion, not a week had gone by after I published How Goodreads offloads Amazon DynamoDB tables to Amazon S3 and queries them using Amazon Athena on the AWS Big Data blog when the AWS Glue team released the ability for AWS Glue crawlers and AWS Glue ETL jobs to read from DynamoDB tables natively. I was actually pretty excited about this. Less code means fewer bugs. The original architecture had been around for at least 18 months and could be simplified significantly with a little bit of work.

Refactoring the data pipeline

The AWS Data Pipeline architecture outlined in my previous blog post is just under two years old now. We had used data pipelines as a way to back up Amazon DynamoDB data to Amazon S3 in case of a catastrophic developer error. However, with DynamoDB point-in-time recovery we have a better, native mechanism for disaster recovery. Additionally, with data pipelines we still own the operations associated with the clusters themselves, even if they are transient. A common challenge is keeping our clusters up with recent releases of Amazon EMR to help mitigate any outstanding bugs. Another is the inefficiency of needing to spin up an EMR cluster for each DynamoDB table.

I decided to take a step back and list the capabilities I wanted to have in the next iteration:

  • Export tables using AWS Glue instead of EMR.
    • AWS Glue provides a serverless ETL environment where I don’t have to worry about the underlying infrastructure. This minimizes operational tasks like keeping up with the EMR release tags.
  • Use a workflow solution that works across services like AWS Glue and Amazon Athena.
    • In the first iteration, the workflow was spread across various services. Unless you had the entire pipeline in your head, it was difficult to get a bird’s-eye view of how the pipeline was progressing.
  • Ability to select different formats.
    • For data engineering, I prefer Apache Parquet. However, customers might prefer a different format.
  • Add exported data to Athena.
    • I find that the easier it is for the data to be queried, the more likely it’s used.

Architecture overview

At a high level, this is the architecture:

  • We’re using AWS Step Functions as the workflow engine.
    • Each step is either a built-in Step Functions state, a service integration, or a simple Python AWS Lambda For example, GlueStartJobRun is using the synchronous job run service integration, as discussed in the documentation.
    • We get a visual representation of the entire pipeline.
    • It’s quick to onboard new developers.
  • An event in Amazon CloudWatch Events, which is disabled to start, triggers a Step Functions state machine with a JSON payload that contains the following:
    • AWS Glue job name
    • Export destination
    • DynamoDB table name
    • Desired read percentage
    • AWS Glue crawler name
  • AWS Glue exports a DynamoDB table in your preferred format to S3 as snapshots_your_table_name. The data is partitioned by the snapshot_timestamp
  • An AWS Glue crawler adds or updates your data’s schema and partitions in the AWS Glue Data Catalog.
  • Finally, we create an Athena view that only has data from the latest export snapshot.

A simple AWS Glue ETL job

The script that I created accepts AWS Glue ETL job arguments for the table name, read throughput, output, and format. Behind the scenes, AWS Glue scans the DynamoDB table. AWS Glue makes sure that every top-level attribute makes it into the schema, no matter how sparse your attributes are (as discussed in the DynamoDB documentation).

Here’s the script:

import sys
import datetime
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext

ARG_TABLE_NAME = "table_name"
ARG_READ_PERCENT = "read_percentage"
ARG_OUTPUT = "output_prefix"
ARG_FORMAT = "output_format"

PARTITION = "snapshot_timestamp"

args = getResolvedOptions(sys.argv,
  [
    'JOB_NAME',
    ARG_TABLE_NAME,
    ARG_READ_PERCENT,
    ARG_OUTPUT,
    ARG_FORMAT
  ]
)

table_name = args[ARG_TABLE_NAME]
read = args[ARG_READ_PERCENT]
output_prefix = args[ARG_OUTPUT]
fmt = args[ARG_FORMAT]

print("Table name:", table_name)
print("Read percentage:", read)
print("Output prefix:", output_prefix)
print("Format:", fmt)

date_str = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M')
output = "%s/%s=%s" % (output_prefix, PARTITION, date_str)

sc = SparkContext()
glueContext = GlueContext(sc)

table = glueContext.create_dynamic_frame.from_options(
  "dynamodb",
  connection_options={
    "dynamodb.input.tableName": table_name,
    "dynamodb.throughput.read.percent": read
  }
)

glueContext.write_dynamic_frame.from_options(
  frame=table,
  connection_type="s3",
  connection_options={
    "path": output
  },
  format=fmt,
  transformation_ctx="datasink"
)

There’s not a lot here. We’re creating a DynamicFrameReader of connection type dynamodb and passing in the table name and desired maximum read throughput consumption. We pass that data frame to a DynamicFrameWriter that writes the table to S3 in the specified format.

Athena views

Most teams at Amazon own applications that have multiple DynamoDB tables, including my own team. Our current application uses five primary tables. Ideally, at the end of an export workflow you can write simple, obvious queries across a consistent view of your tables. However, each exported table is partitioned by the timestamp from when the table was exported. This makes querying across one or more tables very cumbersome, because you have to add a WHERE snapshot_timestamp = clause to every table reference in your query. Additionally, each table might have a different snapshot_timestamp value for any given day!

The final step in this export workflow creates an Athena view that adds that WHERE clause for you. This means that you can interact with your DynamoDB exports as if they were one sane view of your exported DynamoDB tables.

Setting up the infrastructure

The AWS CloudFormation stacks I create are split into two stacks. The common stack contains shared infrastructure, and you need only one of these per AWS Region. The table stacks are designed in such a way that you can create one per table-format combination in any given AWS Region. It contains the CloudWatch event logic and AWS Glue components needed to export and transform DynamoDB tables.

Creating the common stack

The common stack contains the majority of the infrastructure. That includes the Step Functions state machine and Lambda functions to trigger and check the state of asynchronous jobs. It also includes IAM roles that the export stacks use, and the S3 bucket to store the exports.

To create the common stack, do the following:

  1. Choose this Launch Stack
  2. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Creating the table export stack

If you don’t have a DynamoDB table to export, follow the original blog post. Start with the Working with the Reviews stack section and continue until you’ve added the two Items to the table. Otherwise, feel free to point this CloudFormation stack at your favorite DynamoDB table that is using provisioned throughput. Tables that use on-demand throughput are not currently supported.

Because so much of this architecture is shareable, there’s not much in the table export stack. This stack defines the CloudWatch event used to trigger the Step Functions state machine with a JSON payload containing all the necessary metadata. Additionally, it contains the AWS Glue ETL job that exports the table and the AWS Glue Crawler that updates metadata in the AWS Glue Data Catalog.

Technically, you can define the AWS Glue ETL job in the common stack because it’s already parameterized. However, the default limit for concurrent runs for an AWS Glue job is three. This is a soft limit, but with this architecture you have headroom to export up to 25 tables before asking for a limit increase.

To create the table export stack, do the following:

  1. Choose this Launch Stack
  2. Choose an output format from the list. All the available formats are supported by Athena natively.
  3. Enter your DynamoDB table name.
  4. Enter the percentage of Read Capacity Units (RCUs) that the job should consume from your table’s currently provisioned throughput. This percentage is expressed as a float between 0.1 and 1.0 inclusive. The default is 0.25 (25 percent).

As an example: Suppose that your table’s RCUs are set to 100 and you use the default 0.25, 25 percent. Then the AWS Glue job consume 25 RCUs while running.

  1. Choose Create.

Kicking off a state machine execution

To demonstrate how this works, we run the DynamoDB export state machine manually by passing it the JSON payload that the CloudWatch event would pass to Step Functions.

Getting the JSON payload from CloudWatch Events

To get the JSON payload, do the following:

  1. Open CloudWatch in the AWS Management Console.
  2. In the left column under Events, choose Rules.
  3. Choose your rule from the list. It is prefixed by AWSBigDataBlog-.
  4. For Actions, choose Edit.
  5. Copy the JSON payload from the Configure input section of Targets.
  6. Choose Cancel to exit edit mode.

Starting a state machine execution

To start an execution of the state machine, take the following steps:

  1. Open Step Functions in the console.
  2. Choose the DynamoDBExportAndAthenaLoad state machine.
  3. Choose Start execution.
  4. Paste the JSON payload into the Input
  5. Choose Start execution.

There are a few ways to follow along with the execution. As steps are entered and exited, entries are added to the Execution event history list. This is a great way to see what state (event in Lambda speak) is passed to each step, in case you need to debug.

You can also expand the Visual workflow. It’s a great high-level view to see how the workflow is progressing.

After the workflow is finished, you see two new tables under the dynamodb_exports database in your AWS Glue Data Catalog. Your DynamoDB snapshots table name is prefixed with snapshots_. The schema is formatted for the AWS Glue Data Catalog (lowercase and hyphens transformed to underscores). You also have a view table with the same table name formatted for AWS Glue Data Catalog but without the snapshots_ prefix.

Querying your data

To showcase how having a separate view table of the most recent snapshot of a table is useful, I use the Reviews table from the previous blog post. The table has two items. I have also run the export workflow twice. As you can see when you preview the table, there are four items total. That’s because each snapshot contains two items.

From the items, the latest snapshot_timestamp is 2019-01-11T23:26. When I run the same preview query against the view table reviews, we see that there are only two items, which is what we expect. The view takes care of specifying the where snapshot_timestamp=… clause so you don’t have to.

Wrapping up

In this post, I showed you how to use AWS Glue’s DynamoDB integration and AWS Step Functions to create a workflow to export your DynamoDB tables to S3 in Parquet. I also show how to create an Athena view for each table’s latest snapshot, giving you a consistent view of your DynamoDB table exports.


About the Author

Joe Feeney is a Software Engineer at Amazon Go, where he does secret stuff and he’s quite chuffed with that. He enjoys embarrassing his family by taking Mario Kart entirely too seriously.

 

 

 

Trigger cross-region replication of pre-existing objects using Amazon S3 inventory, Amazon EMR, and Amazon Athena

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/trigger-cross-region-replication-of-pre-existing-objects-using-amazon-s3-inventory-amazon-emr-and-amazon-athena/

In Amazon Simple Storage Service (Amazon S3), you can use cross-region replication (CRR) to copy objects automatically and asynchronously across buckets in different AWS Regions. CRR is a bucket-level configuration, and it can help you meet compliance requirements and minimize latency by keeping copies of your data in different Regions. CRR replicates all objects in the source bucket, or optionally a subset, controlled by prefix and tags.

Objects that exist before you enable CRR (pre-existing objects) are not replicated. Similarly, objects might fail to replicate (failed objects) if permissions aren’t in place, either on the IAM role used for replication or the bucket policy (if the buckets are in different AWS accounts).

In our work with customers, we have seen situations where large numbers of objects aren’t replicated for the previously mentioned reasons. In this post, we show you how to trigger cross-region replication for pre-existing and failed objects.

Methodology

At a high level, our strategy is to perform a copy-in-place operation on pre-existing and failed objects. This operation uses the Amazon S3 API to copy the objects over the top of themselves, preserving tags, access control lists (ACLs), metadata, and encryption keys. The operation also resets the Replication_Status flag on the objects. This triggers cross-region replication, which then copies the objects to the destination bucket.

To accomplish this, we use the following:

  • Amazon S3 inventory to identify objects to copy in place. These objects don’t have a replication status, or they have a status of FAILED.
  • Amazon Athena and AWS Glue to expose the S3 inventory files as a table.
  • Amazon EMR to execute an Apache Spark job that queries the AWS Glue table and performs the copy-in-place operation.

Object filtering

To reduce the size of the problem (we’ve seen buckets with billions of objects!) and eliminate S3 List operations, we use Amazon S3 inventory. S3 inventory is enabled at the bucket level, and it provides a report of S3 objects. The inventory files contain the objects’ replication status: PENDING, COMPLETED, FAILED, or REPLICA. Pre-existing objects do not have a replication status in the inventory.

Interactive analysis

To simplify working with the files that are created by S3 inventory, we create a table in the AWS Glue Data Catalog. You can query this table using Amazon Athena and analyze the objects.  You can also use this table in the Spark job running on Amazon EMR to identify the objects to copy in place.

Copy-in-place execution

We use a Spark job running on Amazon EMR to perform concurrent copy-in-place operations of the S3 objects. This step allows the number of simultaneous copy operations to be scaled up. This improves performance on a large number of objects compared to doing the copy operations consecutively with a single-threaded application.

Account setup

For the purpose of this example, we created three S3 buckets. The buckets are specific to our demonstration. If you’re following along, you need to create your own buckets (with different names).

We’re using a source bucket named crr-preexisting-demo-source and a destination bucket named crr-preexisting-demo-destination. The source bucket contains the pre-existing objects and the objects with the replication status of FAILED. We store the S3 inventory files in a third bucket named crr-preexisting-demo-inventory.

The following diagram illustrates the basic setup.

You can use any bucket to store the inventory, but the bucket policy must include the following statement (change Resource and aws:SourceAccount to match yours).

{
    "Version": "2012-10-17",
    "Id": "S3InventoryPolicy",
    "Statement": [
        {
            "Sid": "S3InventoryStatement",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::crr-preexisting-demo-inventory/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control",
                    "aws:SourceAccount": "111111111111"
                }
            }
        }
    ]
}

In our example, we uploaded six objects to crr-preexisting-demo-source. We added three objects (preexisting-*.txt) before CRR was enabled. We also added three objects (failed-*.txt) after permissions were removed from the CRR IAM role, causing CRR to fail.

Enable S3 inventory

You need to enable S3 inventory on the source bucket. You can do this on the Amazon S3 console as follows:

On the Management tab for the source bucket, choose Inventory.

Choose Add new, and complete the settings as shown, choosing the CSV format and selecting the Replication status check box. For detailed instructions for creating an inventory, see How Do I Configure Amazon S3 Inventory? in the Amazon S3 Console User Guide.

After enabling S3 inventory, you need to wait for the inventory files to be delivered. It can take up to 48 hours to deliver the first report. If you’re following the demo, ensure that the inventory report is delivered before proceeding.

Here’s what our example inventory file looks like:

You can also look on the S3 console on the objects’ Overview tab. The pre-existing objects do not have a replication status, but the failed objects show the following:

Register the table in the AWS Glue Data Catalog using Amazon Athena

To be able to query the inventory files using SQL, first you need to create an external table in the AWS Glue Data Catalog. Open the Amazon Athena console at https://console.aws.amazon.com/athena/home.

On the Query Editor tab, run the following SQL statement. This statement registers the external table in the AWS Glue Data Catalog.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo (
    `bucket` string,
    key string,
    replication_status string
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://crr-preexisting-demo-inventory/crr-preexisting-demo-source/crr-preexisting-demo/hive';

After creating the table, you need to make the AWS Glue Data Catalog aware of any existing data and partitions by adding partition metadata to the table. To do this, you use the Metastore Consistency Check utility to scan for and add partition metadata to the AWS Glue Data Catalog.

MSCK REPAIR TABLE crr_preexisting_demo;

To learn more about why this is required, see the documentation on MSCK REPAIR TABLE and data partitioning in the Amazon Athena User Guide.

Now that the table and partitions are registered in the Data Catalog, you can query the inventory files with Amazon Athena.

SELECT * FROM crr_preexisting_demo where dt='2019-02-24-04-00';

The results of the query are as follows.

The query returns all rows in the S3 inventory for a specific delivery date. You’re now ready to launch an EMR cluster to copy in place the pre-existing and failed objects.

Note: If your goal is to fix FAILED objects, make sure that you correct what caused the failure (IAM permissions or S3 bucket policies) before proceeding to the next step.

Create an EMR cluster to copy objects

To parallelize the copy-in-place operations, run a Spark job on Amazon EMR. To facilitate EMR cluster creation and EMR step submission, we wrote a bash script (available in this GitHub repository).

To run the script, clone the GitHub repo. Then launch the EMR cluster as follows:

$ git clone https://github.com/aws-samples/amazon-s3-crr-preexisting-objects
$ ./launch emr.sh

Note: Running the bash script results in AWS charges. By default, it creates two Amazon EC2 instances, one m4.xlarge and one m4.2xlarge. Auto-termination is enabled so when the cluster is finished with the in-place copies, it terminates.

The script performs the following tasks:

  1. Creates the default EMR roles (EMR_EC2_DefaultRole and EMR_DefaultRole).
  2. Uploads the files used for bootstrap actions and steps to Amazon S3 (we use crr-preexisting-demo-inventory to store these files).
  3. Creates an EMR cluster with Apache Spark installed using the create-cluster

After the cluster is provisioned:

  1. A bootstrap action installs boto3 and awscli.
  2. Two steps execute, copying the Spark application to the master node and then running the application.

The following are highlights from the Spark application. You can find the complete code for this example in the amazon-s3-crr-preexisting-objects repo on GitHub.

Here we select records from the table registered with the AWS Glue Data Catalog, filtering for objects with a replication_status of "FAILED" or “”.

query = """
        SELECT bucket, key
        FROM {}
        WHERE dt = '{}'
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')
        """.format(inventory_table, inventory_date)

print('Query: {}'.format(query))

crr_failed = spark.sql(query)

We call the copy_object function for each key returned by the previous query.

def copy_object(self, bucket, key, copy_acls):
        dest_bucket = self._s3.Bucket(bucket)
        dest_obj = dest_bucket.Object(key)

        src_bucket = self._s3.Bucket(bucket)
        src_obj = src_bucket.Object(key)

        # Get the S3 Object's Storage Class, Metadata, 
        # and Server Side Encryption
        storage_class, metadata, sse_type, last_modified = \
            self._get_object_attributes(src_obj)

        # Update the Metadata so the copy will work
        metadata['forcedreplication'] = runtime

        # Get and copy the current ACL
        if copy_acls:
            src_acl = src_obj.Acl()
            src_acl.load()
            dest_acl = {
                'Grants': src_acl.grants,
                'Owner': src_acl.owner
            }

        params = {
            'CopySource': {
                'Bucket': bucket,
                'Key': key
            },
            'MetadataDirective': 'REPLACE',
            'TaggingDirective': 'COPY',
            'Metadata': metadata,
            'StorageClass': storage_class
        }

        # Set Server Side Encryption
        if sse_type == 'AES256':
            params['ServerSideEncryption'] = 'AES256'
        elif sse_type == 'aws:kms':
            kms_key = src_obj.ssekms_key_id
            params['ServerSideEncryption'] = 'aws:kms'
            params['SSEKMSKeyId'] = kms_key

        # Copy the S3 Object over the top of itself, 
        # with the Storage Class, updated Metadata, 
        # and Server Side Encryption
        result = dest_obj.copy_from(**params)

        # Put the ACL back on the Object
        if copy_acls:
            dest_obj.Acl().put(AccessControlPolicy=dest_acl)

        return {
            'CopyInPlace': 'TRUE',
            'LastModified': str(result['CopyObjectResult']['LastModified'])
        }

Note: The Spark application adds a forcedreplication key to the objects’ metadata. It does this because Amazon S3 doesn’t allow you to copy in place without changing the object or its metadata.

Verify the success of the EMR job by running a query in Amazon Athena

The Spark application outputs its results to S3. You can create another external table with Amazon Athena and register it with the AWS Glue Data Catalog. You can then query the table with Athena to ensure that the copy-in-place operation was successful.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo_results (
  `bucket` string,
  key string,
  replication_status string,
  last_modified string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  STORED AS TEXTFILE
LOCATION 's3://crr-preexisting-demo-inventory/results';

SELECT * FROM crr_preexisting_demo_results;

The results appear as follows on the console.

Although this shows that the copy-in-place operation was successful, CRR still needs to replicate the objects. Subsequent inventory files show the objects’ replication status as COMPLETED. You can also verify on the console that preexisting-*.txt and failed-*.txt are COMPLETED.

It is worth noting that because CRR requires versioned buckets, the copy-in-place operation produces another version of the objects. You can use S3 lifecycle policies to manage noncurrent versions.

Conclusion

In this post, we showed how to use Amazon S3 inventory, Amazon Athena, the AWS Glue Data Catalog, and Amazon EMR to perform copy-in-place operations on pre-existing and failed objects at scale.

Note: Amazon S3 batch operations is an alternative for copying objects. The difference is that S3 batch operations will not check each object’s existing properties and set object ACLs, storage class, and encryption on an object-by-object basis. For more information, see Introduction to Amazon S3 Batch Operations in the Amazon S3 Console User Guide.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Chauncy McCaughey is a senior data architect at AWS. His current side project is using statistical analysis of driving habits and traffic patterns to understand how he always ends up in the slow lane.

 

 

 

Easily query AWS service logs using Amazon Athena

Post Syndicated from Damon Cortesi original https://aws.amazon.com/blogs/big-data/easily-query-aws-service-logs-using-amazon-athena/

Many organizations use Amazon Simple Storage Service (Amazon S3) as a primary storage destination for a wide variety of logs including AWS service logs. One of the benefits of storing log data in Amazon S3 is that you can access it in any number of ways. One popular option is to query it using Amazon Athena, a serverless query engine for data on S3. Common use cases for querying logs are service and application troubleshooting, performance analysis, and security audits. To get the best performance and reduce query costs in Athena, we recommend following common best practices, as outlined in Top 10 Performance Tuning Tips for Amazon Athena on the AWS Big Data Blog. These best practices include converting the data to a columnar format like Apache Parquet and partitioning the resulting data in S3.

In this post, we’re open-sourcing a Python library known as Athena Glue Service Logs (AGSlogger). This library has predefined templates for parsing and optimizing the most popular log formats. The library provides a mechanism for defining schemas, managing partitions, and transforming data within an extract, transform, load (ETL) job in AWS Glue. AWS Glue is a serverless data transformation and cataloging service. You can use this library in conjunction with AWS Glue ETL jobs to enable a common framework for processing log data.

Using Python libraries with AWS Glue ETL

One of the features of AWS Glue ETL is the ability to import Python libraries into a job (as described in the documentation). We take advantage of this feature in our approach. With this capability, you first provide a link to a .zip file in Amazon S3 containing selected Python modules to AWS Glue. Then AWS Glue imports them at runtime.

We want our AWS Glue jobs to be as simple as possible while enabling the ability to easily roll out new versions of the library. To accomplish this, all of the setup, configuration, and transformation logic is contained in the library and AWS Glue simply executes the job. As new log formats are added or updated in the library, a new version of the .zip file can be deployed to S3. It’s then automatically imported by the relevant AWS Glue job. Here is an example ETL script:

from athena_glue_service_logs.job import JobRunner
 
job_run = JobRunner(service_name='s3_access')
job_run.convert_and_partition()

About the AGSlogger library

The library is available on GitHub in the athena-glue-service-logs repository. It’s designed to do an initial conversion of AWS Service logs and also perform ongoing conversion as new logs are delivered to S3. The following log types are supported:

  • Application Load Balancer
  • Classic Load Balancer
  • AWS CloudTrail
  • Amazon CloudFront
  • S3 Access
  • Amazon VPC Flow

To convert additional logs, update the service_name variable in the script, and also the different job parameters that point to your desired table names and Amazon S3 locations.

There are some limitations of the script:

  • The script has not been tested with large volumes of log data (greater than 100 GiB).
  • If you have a large number of log files, you might need to increase your Apache Spark executor settings. Edit the AWS Glue job and add the following job parameter:

key: --conf
value: spark.yarn.executor.memoryOverhead=1G

  • If you do not have any recent logs (less than 30 days old) for certain log types like S3 Access, the script may not be able to properly populate the optimized table.
  • Several CloudTrail fields such as requestParameters and responseElements are left as JSON strings – you can use Athena to extract data from this JSON at the time of query.

Before you begin

There are a few prerequisites before you get started:

  1. Create an IAM role to use with AWS Glue. For more information, see Create an IAM Role for AWS Glue in the AWS Glue documentation.
  2. Ensure that you have access to Athena from your account.
  3. We use Amazon S3 server access logs as our example for this script, so enable access logging on an Amazon S3 bucket. For more information, see How to Enable Server Access Logging in the
    S3 documentation.
  4. Download and store the Python library in an Amazon S3 bucket in the same AWS Region in which you run the AWS Glue ETL job. Download the latest release from https://github.com/awslabs/athena-glue-service-logs/releases. Then, copy the .zip file to your Amazon S3 bucket, as follows:

aws s3 cp athena_glue_converter_v5.3.0.zip s3://<bucket>/glue_scripts/

Now, you are ready to create the AWS Glue ETL job.

Create an AWS Glue ETL job using the library

For this post, we focus on Amazon S3 server access logs. (described in the documentation). By default, these logs are delivered to a single location in Amazon S3. Converting to Parquet and partitioning these logs can significantly improve query performance and decrease query costs.

If you’ve cloned the repository associated with this release, you can use a “make” command to automate the job creation. We also walk through the job creation process in the AWS Glue console. There are a few specific settings on the Job properties page we need to set.

To create the AWS Glue ETL job

  1. In the AWS Glue console, choose Jobs under ETL on the navigation pane, and then choose Add Job. Follow the job creation wizard. Ensure that “A new script to be authored by you” is selected. We provide the code for it later. Our ETL language is Python. Under advanced properties, enable the Job bookmark. Job metrics can also be useful when monitoring your job, but not required.
  2. Under Script libraries in the Python library path section, put the full path to the .zip file that you uploaded to your Amazon S3 bucket as shown previously:
    s3://<bucket>/glue_scripts/athena_glue_converter_v5.3.0.zip

    You can adjust the DPUs if you think you need more or less processing power. For our purposes, you can leave it at 10.
  1. Specify a few different types of parameters, described in detail following:
  • The source of your Amazon S3 Server Access Logs.
  • The destination where to save the converted logs.

AWS service logs can be stored in a number of different locations, as discussed in Service Log Specifics. For storing Amazon S3 server access logs, specify the bucket and prefix matching those that you configured on the S3 bucket where you enabled access logging.

  • The names of the databases and tables that are created in the AWS Glue Data Catalog.

By default, the converted logs are partitioned by date. The script creates the necessary tables and keeps the partitions up-to-date on subsequent runs of the job. You don’t need to use AWS Glue crawlers, although they can provide similar functionality. Here are the different properties you need to configure:

KeyValue
--raw_database_namesource_logs
--raw_table_names3_access
--converted_database_nameaws_service_logs
--converted_table_names3_access
--s3_converted_targets3://<bucket>/converted/s3_access
--s3_source_location

s3://<bucket>/s3_access

 

  1. Continue with the rest of the wizard, finishing the job creation flow. The script editor opens. Replace all the code in the script editor, even the import lines, with these lines:
    from athena_glue_service_logs.job import JobRunner
     
    job_run = JobRunner(service_name='s3_access')
    job_run.convert_and_partition()

  1. Save the script and choose Run Job! When the job begins, you see log output from the job scrolling under the script.

The script you just created is saved to S3 in a standard bucket. You can also use the AWS Command Line Interface to create the AWS Glue ETL job. Copy the script preceding to S3 first and provide that as the ScriptLocation parameter.

aws glue create-job --name S3AccessLogConvertor \
--description "Convert and partition S3 Access logs" \
--role AWSGlueServiceRoleDefault \
--command Name=glueetl,ScriptLocation=s3://<bucket>/glue_scripts/s3_access_job.py \
--default-arguments '{
  "--extra-py-files":"s3://<bucket>/glue_scripts/athena_glue_converter_v5.3.0.zip",
  "--job-bookmark-option":"job-bookmark-enable",
  "--raw_database_name":"source_logs",
  "--raw_table_name":"s3_access",
  "--converted_database_name":"aws_service_logs",
  "--converted_table_name":"s3_access",
  "--TempDir":"s3://<bucket>/tmp",
  "--s3_converted_target":"s3://<bucket>/converted/s3_access",
  "--s3_source_location":"s3://<bucket>/s3_access/"
}'

Scheduling future runs

By default, this job is configured to run on a manual basis. To run it on a regular basis, set up a new schedule trigger in AWS Glue to run the job at your desired frequency. We recommend scheduling it at hourly to make it easier to locate recent logs for your optimized queries.

On every run of the job, the script looks for the new log data and converts it to Parquet format. The script then adds any new partitions that might have been added as a result of the conversion. The script uses the AWS Glue job bookmarks to ensure that it processes newly delivered data. To find more information about bookmarks in the AWS Glue documentation, see Tracking Processed Data Using Job Bookmarks.

Querying your optimized data in Athena: examples

Now that you’ve converted your data from row-based log files to columnar-based Parquet, you can write queries against this data using Athena. After the first run of the script, the tables specified in the AWS Glue ETL job properties are created for you. Here are several sample queries to get you started.

Example 1: Most requested S3 keys

SELECT key, COUNT(*) AS count
FROM "aws_service_logs"."s3_access"
WHERE operation IN ('REST.GET.OBJECT', 'REST.COPY.OBJECT', 'REST.COPY.OBJECT_GET')
GROUP BY 1
ORDER BY 2 DESC
limit 100;

Example 2: Top IP addresses that accessed the bucket yesterday

SELECT remote_ip, COUNT(*) FROM "aws_service_logs"."s3_access"
WHERE year=date_format(current_date, '%Y') AND month=date_format(current_date, '%m') AND day=date_format(current_date + interval '-1' day, '%d')
GROUP BY 1
ORDER BY 2 DESC
limit 100;

Note the use of numbers instead of strings in the use of the GROUP BY and ORDER BY operations. This is one of the optimizations for Athena queries. For other optimizations, be sure to check out the Top 10 Performance Tuning Tips blog post.

In addition, we use the year, month, and day partition columns to limit the amount of data scanned and decrease the cost of the query.

Summary

This post introduces a new open-source library that you can use to efficiently process various types of AWS service logs using AWS Glue. The library automates the application of common best practices to allow high-performing and cost-effective querying of the data using Amazon Athena and Amazon Redshift. We hope this library comes in handy, and we’re open to pull requests. If you want to add a new log type, check out the code in the AWS Labs athena-glue-service-logs repository!

 


About the Author

Damon Cortesi is a big data architect with Amazon Web Services.

 

 

 

EMR Notebooks: A managed analytics environment based on Jupyter notebooks

Post Syndicated from Vignesh Rajamani original https://aws.amazon.com/blogs/big-data/emr-notebooks-a-managed-analytics-environment-based-on-jupyter-notebooks/

Notebooks are increasingly becoming the standard tool for interactively developing big data applications. It’s easy to see why. Their flexible architecture allows you to experiment with data in multiple languages, test code interactively, and visualize large datasets. To help scientists and developers easily access notebook tools, we launched Amazon EMR Notebooks, a managed notebook environment that is based on the popular open-source Jupyter notebook application. EMR Notebooks support Spark Magic kernels, which allows you to submit jobs remotely on your EMR cluster using languages like PySpark, Spark SQL, Spark R, and Scala. The kernels submit your Spark code through Apache Livy, which is a REST server for Spark running on your cluster.

EMR Notebooks is designed to make it easy for you to experiment and build applications with Apache Spark. In this blog post, I first cover some of the benefits that EMR Notebooks offers. Then I introduce you to some of its capabilities such as detaching and attaching a notebook to different EMR clusters, monitoring Spark activity from within the notebook, using tags to control user permissions, and setting up user-impersonation to track notebook users and their actions. To learn about creating and using EMR Notebooks, you can visit Using Amazon EMR Notebooks or follow along with the AWS Online Tech Talks webinar.

Benefits of EMR Notebooks

One of the useful features of EMR Notebooks is the separation of the notebook environment from your underlying cluster infrastructure. The separation makes it easy for you to execute notebook code against transient clusters without worrying about deploying or configuring your notebook infrastructure every time you bring up a new cluster. You can create multiple serverless notebooks from the AWS Management Console for EMR and access the notebook UI without spending time setting up SSH access or configuring your browser for port-forwarding. Each notebook you create is launched instantly with its own Spark context. This capability enables you to attach multiple notebooks to a single shared cluster and submit parallel jobs without fear of job conflicts in a multi-tenant environment. This way you make efficient use of your clusters.

You can also connect EMR Notebooks to an EMR cluster as small as a one node. This gives you a budget-friendly sandbox environment to develop your Spark application.

Finally, with EMR Notebooks, you don’t have to spend time to manually configure your notebook to store files persistently. Your notebook files are saved automatically to a chosen Amazon S3 bucket periodically so you don’t have to worry about losing your work if your cluster is shut down. You can retrieve your saved notebooks from the console or download it locally from your S3 bucket in Jupyter “ipynb” format.

Detaching and attaching EMR Notebooks to different clusters

With EMR Notebooks, you can detach an active notebook from a cluster and attach it to a different cluster and promptly resume your work. This capability can be useful in scenarios where you want to move your notebook from a sandbox development cluster to a production environment or attach to a different cluster with appropriate CPU or memory resources and library packages required to execute your notebook against large datasets. To detach an active notebook:

First select the notebook name and then choose Stop.

Wait for the notebook status shown next to the notebook name to change from Ready to Stopped and then choose Change Cluster.

After you stop the notebook, you can choose to attach it to a different cluster in the same VPC or create a new cluster. EMR Notebooks automatically attaches the notebook to the cluster and re-starts the notebook.

Monitoring and debugging Spark jobs

EMR Notebooks supports a built-in Jupyter notebook widget called SparkMonitor that allows you to monitor the status of all your Spark jobs launched from the notebook without connecting to the Spark web UI server.

A widget appears and automatically integrates within the cell structure of your notebook and displays detailed status about the job submitted from each cell of your notebook, providing you with real-time progress of the different stages of the job. For any failed jobs, this widget also offers embedded links to the container logs in Amazon S3, allowing you to get to the relevant logs and debug your jobs.

Additionally, if you have configured your cluster to accept SSH connections, then you can access the Spark application web UI and Hadoop jobs history server from within the notebook. This allows you to view the event timelines, visualize Directed Acyclic Graphs (DAG) of each job, and view the detailed system and runtime information to inspect and debug your code. These web UIs are available automatically the first time you start to run your Spark code from a notebook.

Writing tag-based policies to control user permissions for notebooks and users

EMR Notebooks are by default shared resources that anyone from your organization with access to your AWS account can open, edit, or even delete. If you want more control over your notebook, you can use tags to label your notebook and write IAM policies that control access for other users. To get you started, when you create a notebook, a default tag with a key string, creatorUserId, is set to the value of the IAM User ID of the user who created the notebook.

You can use this tag to limit allowed actions on the notebook to only the creator of the notebook. For example, the permissions policy statement below, when attached to a role or user, enables the IAM user to view, start, stop, edit, or delete only those notebooks that they have created. This policy statement uses the default tag that is applied by EMR Notebooks when you create the notebook.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:DescribeEditor",
                "elasticmapreduce:StartEditor",
                "elasticmapreduce:StopEditor",
                "elasticmapreduce:DeleteEditor",
                "elasticmapreduce:OpenEditorInConsole"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/creatorUserId": "${aws:userId}"
                }
            }
        }
    ]

You can write policies that enforce the creation of tags before starting a notebook. For example, the policy below requires that the user not change or delete the creatorUserID tag that is added by default. The variable ${aws:userId}, specifies the currently active user’s User ID, which is the default value of the tag.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:CreateEditor"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:RequestTag/creatorUserId": "${aws:userid}"
                }
            }
        }
    ]
}

You can use the notebook tags along with EMR cluster tags to control notebook user access to your cluster. Tagging your notebook and clusters, in addition to securing your resources, also allows you to categorize, track, and allocate your EMR cluster costs across your different line of businesses. For example, the policy below allows a user to create a notebook only if the notebook has a tag with a key string “department” with its value set to “Analytics” and only if the notebook is attached to the EMR cluster that has a tag with the key string “cost-center” and value set to “12345.”

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:editor/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/department": [
                        "Analytics"
                    ]
                }
            }
        },
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:cluster/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/cost-center": [
                        "12345"
                    ]
                }
            }
        }
    ]
}

You can learn more about notebook and cluster tags by visiting Using Tags to Control User Permissions and Tag Clusters. Also, visit Using Cost Allocation Tags to learn more using tags to generate cost allocation report in the AWS Billing and Cost Management Console.

Tracking notebook users by enforcing user-impersonation

EMR Notebooks enables multiple users to execute their notebooks’ code concurrently in a shared EMR cluster, improving cluster utilization. By default, all Spark jobs spawned by these different users from their notebook run as the same user (the livy user) on your EMR cluster. If your corporate policy requires you to set up an audit trail and track individual notebook user actions on shared clusters, you can use the user-impersonation feature of EMR Notebooks. This capability allows you to discriminate and audit all notebook users by associating the jobs they executed from their notebook with the user’s IAM identity. To use this feature, you should enable Livy user-impersonation by configuring the core-site and livy-conf configuration classifications when you create and launch an EMR cluster as follows:

[
    {
        "Classification": "core-site",
        "Properties": {
          "hadoop.proxyuser.livy.groups": "*",
          "hadoop.proxyuser.livy.hosts": "*"
        }
    },
    {
        "Classification": "livy-conf",
        "Properties": {
          "livy.impersonation.enabled": "true"
        }
    }
]

Visit Configuring Applications to learn more about configuring application. After this feature is enabled, EMR Notebooks creates HDFS user directories on the master node for each user identity. This means all the Spark jobs from the notebook run as the IAM user instead of the indistinct user livy. For example, if user NB_User1 runs code from the notebook editor, then a user directory named user_NB_User1 is created on the master node and all Spark jobs run as user_NB_User1. You can then use a service like AWS CloudTrail to audit the record of actions by the user NbUser1 by creating a trail. To learn more about setting up audit trails, see logging Amazon EMR API calls in AWS CloudTrail.

Conclusion

In this post, I highlighted some of the capabilities of EMR Notebooks such as the ability to change clusters, monitor Spark jobs from each notebook cell, control user permission, and categorize resource costs. You can do this by using notebook and cluster tags and setting up user-impersonation to track notebook user actions.

Oh by the way, there is no additional charge for using EMR Notebooks and you only pay for the use of the EMR cluster as-usual!

If you have questions or suggestions, feel free to leave a comment.

 


About the Authors

Vignesh Rajamani is a senior product manager for EMR at AWS.

 

 

 

 

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 

 

Test data quality at scale with Deequ

Post Syndicated from Dustin Lange original https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/

You generally write unit tests for your code, but do you also test your data? Incorrect or malformed data can have a large impact on production systems. Examples of data quality issues are:

  • Missing values can lead to failures in production system that require non-null values (NullPointerException).
  • Changes in the distribution of data can lead to unexpected outputs of machine learning models.
  • Aggregations of incorrect data can lead to wrong business decisions.

In this blog post, we introduce Deequ, an open source tool developed and used at Amazon. Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (think billions of rows) that typically live in a distributed filesystem or a data warehouse.

Deequ at Amazon

Deequ is being used internally at Amazon for verifying the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues do not propagate to consumer data pipelines, reducing their blast radius.

Overview of Deequ

To use Deequ, let’s look at its main components (also shown in Figure 1).

  • Metrics Computation — Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon S3, and to compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
  • Constraint Verification — As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
  • Constraint Suggestion — You can choose to define your own custom data quality constraints, or use the automated constraint suggestion methods that profile the data to infer useful constraints.

Figure 1: Overview of Deequ components

Setup: Launch the Spark cluster

This section shows the steps to use Deequ on your own data. First, set up Spark and Deequ on an Amazon EMR cluster. Then, load a sample dataset provided by AWS, run some analysis, and then run data tests.

Deequ is built on top of Apache Spark to support fast, distributed calculations on large datasets. Deequ depends on Spark version 2.2.0 or later. As a first step, create a cluster with Spark on Amazon EMR. Amazon EMR takes care of the configuration of Spark for you. Also, you canuse the EMR File System (EMRFS) to directly access data in Amazon S3. For testing, you can also install Spark on a single machine in standalone mode.

Connect to the Amazon EMR master node using SSH. Load the latest Deequ JAR from Maven Repository. To load the JAR of version 1.0.1, use the following:

wget http://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.1/deequ-1.0.1.jar

Launch Spark Shell and use the spark.jars argument for referencing the Deequ JAR file:

spark-shell --conf spark.jars=deequ-1.0.1.jar

For more information about how to set up Spark, see the Spark Quick Start guide, and the overview of Spark configuration options.

Load data

As a running example, we use a customer review dataset provided by Amazon on Amazon S3. Let’s load the dataset containing reviews for the category “Electronics” in Spark. Make sure to enter the code in the Spark shell:

val dataset = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")

You can see the following selected attributes if you run dataset.printSchema() in the Spark shell:

root
|-- marketplace: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: integer (nullable = true)
|-- helpful_votes: integer (nullable = true)
|-- total_votes: integer (nullable = true)
|-- vine: string (nullable = true)
|-- year: integer (nullable = true)

Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. Deequ supports the following metrics (they are defined in this Deequ package):

Metric

Description

Usage Example

ApproxCountDistinctApproximate number of distinct value, computed with HyperLogLogPlusPlus sketches.ApproxCountDistinct("review_id")
ApproxQuantileApproximate quantile of a distribution.ApproxQuantile("star_rating", quantile = 0.5)
ApproxQuantilesApproximate quantiles of a distribution.ApproxQuantiles("star_rating", quantiles = Seq(0.1, 0.5, 0.9))
CompletenessFraction of non-null values in a column.Completeness("review_id")
ComplianceFraction of rows that comply with the given column constraint.Compliance("top star_rating", "star_rating >= 4.0")
CorrelationPearson correlation coefficient, measures the linear correlation between two columns. The result is in the range [-1, 1], where 1 means positive linear correlation, -1 means negative linear correlation, and 0 means no correlation.Correlation("total_votes", "star_rating")
CountDistinctNumber of distinct values.CountDistinct("review_id")
DataTypeDistribution of data types such as Boolean, Fractional, Integral, and String. The resulting histogram allows filtering by relative or absolute fractions.DataType("year")
DistinctnessFraction of distinct values of a column over the number of all values of a column. Distinct values occur at least once. Example: [a, a, b] contains two distinct values a and b, so distinctness is 2/3.Distinctness("review_id")
EntropyEntropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). Example: [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055.Entropy("star_rating")
MaximumMaximum value.Maximum("star_rating")
MeanMean value; null values are excluded.Mean("star_rating")
MinimumMinimum value.Minimum("star_rating")
MutualInformationMutual information describes how much information about one column (one random variable) can be inferred from another column (another random variable). If the two columns are independent, mutual information is zero. If one column is a function of the other column, mutual information is the entropy of the column. Mutual information is symmetric and nonnegative.MutualInformation(Seq("total_votes", "star_rating"))
PatternMatchFraction of rows that comply with a given regular experssion.PatternMatch("marketplace", pattern = raw"\w{2}".r)
SizeNumber of rows in a DataFrame.Size()
SumSum of all values of a column.Sum("total_votes")
UniqueValueRatioFraction of unique values over the number of all distinct values of a column. Unique values occur exactly once; distinct values occur at least once. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2.UniqueValueRatio("star_rating")
UniquenessFraction of unique values over the number of all values of a column. Unique values occur exactly once. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3.Uniqueness("star_rating")

In the following example, we show how to use the AnalysisRunner to define the metrics you are interested in. You can run the following code in the Spark shell by either just pasting it in the shell or by saving it in a local file on the master node and loading it in the Spark shell with the following command:

:load PATH_TO_FILE

import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}

val analysisResult: AnalyzerContext = { AnalysisRunner
  // data to run the analysis on
  .onData(dataset)
  // define analyzers that compute metrics
  .addAnalyzer(Size())
  .addAnalyzer(Completeness("review_id"))
  .addAnalyzer(ApproxCountDistinct("review_id"))
  .addAnalyzer(Mean("star_rating"))
  .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0"))
  .addAnalyzer(Correlation("total_votes", "star_rating"))
  .addAnalyzer(Correlation("total_votes", "helpful_votes"))
  // compute metrics
  .run()
}

// retrieve successfully computed metrics as a Spark data frame
val metrics = successMetricsAsDataFrame(spark, analysisResult)

The resulting data frame contains the calculated metrics (call metrics.show() in the Spark shell):

nameinstancevalue
ApproxCountDistinctreview_id3010972
Completenessreview_id1
Compliancetop star_rating0.74941
Correlationhelpful_votes,total_votes0.99365
Correlationtotal_votes,star_rating-0.03451
Meanstar_rating4.03614
Size*3120938

We can learn that:

  • review_id has no missing values and approximately 3,010,972 unique values.
  • 74.9 % of reviews have a star_rating of 4 or higher.
  • total_votes and star_rating are not correlated.
  • helpful_votes and total_votes are strongly correlated.
  • The average star_rating is 4.0.
  • The dataset contains 3,120,938 reviews.

Define and Run Tests for Data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the VerificationSuite and add Checks on attributes of the data. In this example, we test for the following properties of our data:

  • There are at least 3 million rows in total.
  • review_id is never NULL.
  • review_id is unique.
  • star_rating has a minimum of 1.0 and a maximum of 5.0.
  • marketplace only contains “US”, “UK”, “DE”, “JP”, or “FR”.
  • year does not contain negative values.

This is the code that reflects the previous statements. For information about all available checks, see this GitHub repository. You can run this directly in the Spark shell as previously explained:

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}

val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
  .onData(dataset)
  // define a data quality check
  .addCheck(
    Check(CheckLevel.Error, "Review Check") 
      .hasSize(_ >= 3000000) // at least 3 million rows
      .hasMin("star_rating", _ == 1.0) // min is 1.0
      .hasMax("star_rating", _ == 5.0) // max is 5.0
      .isComplete("review_id") // should never be NULL
      .isUnique("review_id") // should not contain duplicates
      .isComplete("marketplace") // should never be NULL
      // contains only the listed values
      .isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
      .isNonNegative("year")) // should not contain negative values
  // compute metrics and verify check conditions
  .run()
}

// convert check results to a Spark data frame
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)

After calling run, Deequ translates your test description into a series of Spark jobs, which are executed to compute metrics on the data. Afterwards, it invokes your assertion functions (e.g., _ == 1.0 for the minimum star-rating check) on these metrics to see if the constraints hold on the data.

Call resultDataFrame.show(truncate=false) in the Spark shell to inspect the result. The resulting table shows the verification result for every test, for example:

constraintconstraint_statusconstraint_message
SizeConstraint(Size(None))Success
MinimumConstraint(Minimum(star_rating,None))Success
MaximumConstraint(Maximum(star_rating,None))Success
CompletenessConstraint(Completeness(review_id,None))Success
UniquenessConstraint(Uniqueness(List(review_id)))FailureValue: 0.9926566948782706 does not meet the constraint requirement!
CompletenessConstraint(Completeness(marketplace,None))Success
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None))Success
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None))Success

Interestingly, the review_id column is not unique, which resulted in a failure of the check on uniqueness.

We can also look at all the metrics that Deequ computed for this check:

VerificationResult.successMetricsAsDataFrame(spark, verificationResult).show(truncate=False)

Result:

nameinstancevalue
Completenessreview_id1
Completenessmarketplace1
Compliancemarketplace contained in US,UK,DE,JP,FR1
Complianceyear is non-negative1
Maximumstar_rating5
Minimumstar_rating1
Size*3120938
Uniquenessreview_id0.99266

Automated Constraint Suggestion

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see this GitHub repository.

import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method

// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
  // data to suggest constraints for
  .onData(dataset)
  // default set of rules for constraint suggestion
  .addConstraintRules(Rules.DEFAULT)
  // run data profiling and constraint suggestion
  .run()
}

// We can now investigate the constraints that Deequ suggested. 
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap { 
  case (column, suggestions) => 
    suggestions.map { constraint =>
      (column, constraint.description, constraint.codeForConstraint)
    } 
}.toSeq.toDS()

The result contains a list of constraints with descriptions and Scala code, so that you can directly apply it in your data quality checks. Call suggestionDataFrame.show(truncate=false) in the Spark shell to inspect the suggested constraints; here we show a subset:

columnconstraintscala code
customer_id‘customer_id’ is not null.isComplete("customer_id")
customer_id‘customer_id’ has type Integral.hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_id‘customer_id’ has no negative values.isNonNegative("customer_id")
helpful_votes‘helpful_votes’ is not null.isComplete("helpful_votes")
helpful_votes‘helpful_votes’ has no negative values.isNonNegative("helpful_votes")
marketplace‘marketplace’ has value range ‘US’, ‘UK’, ‘DE’, ‘JP’, ‘FR’.isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
product_title‘product_title’ is not null.isComplete("product_title")
star_rating‘star_rating’ is not null.isComplete("star_rating")
star_rating‘star_rating’ has no negative values.isNonNegative("star_rating")
vine‘vine’ has value range ‘N’, ‘Y’.isContainedIn("vine", Array("N", "Y"))

Note that the constraint suggestion is based on heuristic rules and assumes that the data it is shown is correct, which might not be the case. We recommend to review the suggestions before applying them in production.

More Examples on GitHub

You can find examples of more advanced features at Deequ’s GitHub page:

  • Deequ not only provides data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
  • Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
  • If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation capability. For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

Additional Resources

Learn more about the inner workings of Deequ in our VLDB 2018 paper “Automating large-scale data quality verification.

Conclusion

This blog post showed you how to use Deequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. Deequ is available for you now to build your own data quality management pipeline.

 


About the Authors

Dustin Lange is an Applied Science Manager at Amazon Search in Berlin. Dustin’s team develops algorithms for improving the search customer experience through machine learning and data quality tracking. He completed his PhD in similarity search in databases in 2013 and started his Amazon career as an Applied Scientist in forecasting the same year.

 

 

Sebastian Schelter is a Senior Applied Scientist at Amazon Search, working on problems at the intersection of data management and machine learning. He holds a Ph.D. in large-scale data processing from TU Berlin and is an elected member of the Apache Software Foundation, where he currently serves as a mentor for the Apache Incubator.

 

 

Philipp Schmidt is an ML Engineer at Amazon Search. After his graduation from TU Berlin he worked at the University of Potsdam and in several startups in Berlin. At Amazon he is working on enabling data quality tracking for large scale datasets and refining the customer shopping experience through machine learning.

 

 

Tammo Rukat is an Applied Scientist at Amazon Search in Berlin. He holds a PhD in statistical machine learning from the University of Oxford. At Amazon he makes use of the abundance and complexity of the company’s large-scale noisy datasets to contribute to a more intelligent customer experience.

 

 

 

 

Optimize Amazon EMR costs with idle checks and automatic resource termination using advanced Amazon CloudWatch metrics and AWS Lambda

Post Syndicated from Praveen Krishnamoorthy Ravikumar original https://aws.amazon.com/blogs/big-data/optimize-amazon-emr-costs-with-idle-checks-and-automatic-resource-termination-using-advanced-amazon-cloudwatch-metrics-and-aws-lambda/

Many customers use Amazon EMR to run big data workloads, such as Apache Spark and Apache Hive queries, in their development environment. Data analysts and data scientists frequently use these types of clusters, known as analytics EMR clusters. Users often forget to terminate the clusters after their work is done. This leads to idle running of the clusters and in turn, adds up unnecessary costs.

To avoid this overhead, you must track the idleness of the EMR cluster and terminate it if it is running idle for long hours. There is the Amazon EMR native IsIdle Amazon CloudWatch metric, which determines the idleness of the cluster by checking whether there’s a YARN job running. However, you should consider additional metrics, such as SSH users connected or Presto jobs running, to determine whether the cluster is idle. Also, when you execute any Spark jobs in Apache Zeppelin, the IsIdle metric remains active (1) for long hours, even after the job is finished executing. In such cases, the IsIdle metric is not ideal in deciding the inactivity of a cluster.

In this blog post, we propose a solution to cut down this overhead cost. We implemented a bash script to be installed in the master node of the EMR cluster, and the script is scheduled to run every 5 minutes. The script monitors the clusters and sends a CUSTOM metric EMR-INUSE (0=inactive; 1=active) to CloudWatch every 5 minutes. If CloudWatch receives 0 (inactive) for some predefined set of data points, it triggers an alarm, which in turn executes an AWS Lambda function that terminates the cluster.

Prerequisites

You must have the following before you can create and deploy this framework:

Note: This solution is designed as an additional feature. It can be applied to any existing EMR clusters by executing the scheduler script (explained later in the post) as an EMR step. If you want to implement this solution as a mandatory feature for your future clusters, you can include the EMR step as part of your cluster deployment. You can also apply this solution to EMR clusters that are spun up through AWS CloudFormation, the AWS CLI, and even the AWS Management Console.

Key components

The following are the key components of the solution.

Analytics EMR cluster

Amazon EMR provides a managed Apache Hadoop framework that lets you easily process large amounts of data across dynamically scalable Amazon EC2 instances. Data scientists use analytics EMR clusters for data analysis, machine learning using notebook applications (such as Apache Zeppelin or JupyterHub), and running big data workloads based on Apache Spark, Presto, etc.

Scheduler script

The schedule_script.sh is the shell script to be executed as an Amazon EMR step. When executed, it copies the monitoring script from the Amazon S3 artifacts folder and schedules the monitoring script to run every 5 minutes. The S3 location of the monitoring script should be passed as an argument.

Monitoring script

The pushShutDownMetrin.sh script is a monitoring script that is implemented using shell commands. It should be installed in the master node of the EMR cluster as an Amazon EMR step. The script is scheduled to run every 5 minutes and sends the cluster activity status to CloudWatch.  

JupyterHub API token script

The jupyterhub_addAdminToken.sh script is a shell script to be executed as an Amazon EMR step if JupyterHub is enabled on the cluster. In our design, the monitoring script uses REST APIs provided by JupyterHub to check whether the application is in use.

To send the request to JupyterHub, you must pass an API token along with the request. By default, the application does not generate API tokens. This script generates the API token and assigns it to the admin user, which is then picked up by the jupyterhub module in the monitoring script to track the activity of the application.

Custom CloudWatch metric

All Amazon EMR clusters send data for several metrics to CloudWatch. Metrics are updated every 5 minutes, automatically collected, and pushed to CloudWatch. For this use case, we created the Amazon EMR metric EMR-INUSE. This metric represents the active status of the cluster based on the module checks implemented in the monitoring script. The metric is set to 0 when the cluster is inactive and 1 when active.

Amazon CloudWatch

CloudWatch is a monitoring service that you can use to set high-resolution alarms to take automated actions. In this case, CloudWatch triggers an alarm if it receives 0 continuously for the configured number of hours.

AWS Lambda

Lambda is a serverless technology that lets you run code without provisioning or managing servers. With Lambda, you can run code for virtually any type of application or backend service—all with zero administration. You can set up your code to automatically trigger from other AWS services. In this case, the triggered CloudWatch alarm mentioned earlier signals Lambda to terminate the cluster.

Architectural diagram

The following diagram illustrates the sequence of events when the solution is enabled, showing what happens to the EMR cluster that is spun up via AWS CloudFormation.

 

The diagram shows the following steps:

  1. The AWS CloudFormation stack is launched to spin up an EMR cluster.
  2. The Amazon EMR step is executed (installs the pushShutDownMetric.sh and then schedules it as a cron job to run every 5 minutes).
  3. If the EMR cluster is active (executing jobs), the master node sets the EMR-INUSE metric to 1 and sends it to CloudWatch.
  4. If the EMR cluster is inactive, the master node sets the EMR-INUSE metric to 0 and sends it to CloudWatch.
  5. On receiving 0 for a predefined number of data points, CloudWatch triggers a CloudWatch alarm.
  6. The CloudWatch alarm sends notification to AWS Lambda to terminate the cluster.
  7. AWS Lambda executes the Lambda function.
  8. The Lambda function then deletes all the stack resources associated with the cluster.
  9. Finally, the EMR cluster is terminated, and the Stack ID is removed from AWS CloudFormation.

Modules in the monitoring script

Following are the different activity checks that are implemented in the monitoring script (pushShutDownMetric.sh). The script is designed in a modular fashion so that you can easily include new modules without modifying the core functionality.

ActiveSSHCheck

The ActiveSSHCheck module checks whether there are any active SSH connections to the master node. If there is an active SSH connection, and it’s idle for less than 10 minutes, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch.

YARNCheck

Apache Hadoop YARN is the resource manager of the EMR Hadoop ecosystem. All the Spark Submits and Hive queries reach YARN initially. It then schedules and processes these jobs. The YARNCheck module checks whether there are any running jobs in YARN or jobs completed within last 5 minutes. If it finds any, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch.The checks are performed by calling REST APIs exposed by YARN.

The API to fetch the running jobs is http://localhost:8088/ws/v1/cluster/apps?state=RUNNING.

The API to fetch the completed jobs is

http://localhost:8088/ws/v1/cluster/apps?state=FINISHED.

PRESTOCheck

Presto is an open-source distributed query engine for running interactive analytic queries. It is included in EMR release version 5.0.0 and later.The PRESTOCheck module checks whether there are any running Presto queries or if any queries have been completed within last 5 minutes. If there are some, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. These checks are performed by calling REST APIs exposed by the Presto server.

The API to fetch the Presto jobs is http://localhost:8889/v1/query.

ZeppelinCheck

Amazon EMR users use Apache Zeppelin as a notebook for interactive data exploration. The ZeppelinCheck module checks whether there are any jobs running or if any have been completed within the last 5 minutes. If so, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. These checks are performed by calling the REST APIs exposed by Zeppelin.

The API to fetch the list of notebook IDs is http://localhost:8890/api/notebook.

The API to fetch the status of each cell inside each notebook ID is http://localhost:8890/api/notebook/job/$notebookID.

JupyterHubCheck

Jupyter Notebook is an open-source web application that you can use to create and share documents that contain live code, equations, visualizations, and narrative text. JupyterHub allows you to host multiple instances of a single-user Jupyter notebook server.The JupyterHubCheck module checks whether any Jupyter notebook is currently in use.

The function uses REST APIs exposed by JupyterHub to fetch the list of Jupyter notebook users and gathers the data about individual notebook servers. From the response, it extracts the last activity time of the servers and checks whether any server was used in the last 5 minutes. If so, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. The jupyterhub_addAdminToken.sh script needs to be executed as an EMR step before enabling the scheduler script.

The API to fetch the list of notebook users is https://localhost:9443/hub/api/users -H "Authorization: token $admin_token".

The API to fetch individual server information is https://localhost:9443/hub/api/users/$user -H "Authorization: token $admin_token.

If any one of these checks fails, the cluster is considered to be inactive, and the monitoring script sets the EMR-INUSE metric to 0 and pushes it to CloudWatch.

Note:

The scheduler script schedules the monitoring script (pushShutDownMetric.sh) to run every 5 minutes. Internal cron jobs that run for a very few minutes are not considered in calibrating the EMR-INUSE metric.

Deploying each component

Follow the steps in this section to deploy each component of the proposed design.

Step 1. Create the Lambda function and SNS subscription

The Lambda function and the SNS subscription are the core components of the design. You must set up these components initially, and they are common for every cluster. The following are the AWS resources to be created for these components:

  • Execution role for the Lambda function
  • Terminate Idle EMR Lambda function
  • SNS topic and Lambda subscription

For one-step deployment, use this AWS CloudFormation template to launch and configure the resources in a single go.

The following parameters are available in the template.

ParameterDefaultDescription
s3Bucketemr-shutdown-blogartifactsThe name of the S3 bucket that contains the Lambda file
s3KeyEMRTerminate.zipThe Amazon S3 key of the Lambda file

For manual deployment, follow these steps on the AWS Management Console.

Execution role for the Lambda function

  1. Open the AWS Identity and Access Management (IAM) consoleand choose PoliciesCreate policy.
  2. Choose the JSON tab, paste the following policy text, and then choose Review policy.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListAllMyBuckets",
                "s3:HeadBucket",
                "s3:ListObjects",
                "s3:GetObject",
                "cloudformation:ListStacks",
                "cloudformation:DeleteStack",
                "cloudformation:DescribeStacks",
                "cloudformation:ListStackResources",
                "elasticmapreduce:TerminateJobFlows"
            ],
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "GenericAccess"
        },
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*",
            "Effect": "Allow",
            "Sid": "LogAccess"
        }
    ]
}
  1. For Name, enter TerminateEMRPolicy and choose Create policy.
  2. Choose RolesCreate role.
  3. Under Choose the service that will use this role, choose Lambda, and then choose Next: Permissions.
  4. For Attach permissions policies, choose the arrow next to Filter policies and choose Customer managed in the drop-down list.
  5. Attach the TerminateEMRPolicy policy that you just created, and choose Review.
  6. For Role name, enter TerminateEMRLambdaRole and then choose Create role.

Terminate idle EMR – Lambda function

I created a deployment package to use with this function.

  1. Open the Lambda consoleand choose Create function.
  2. Choose Author from scratch, and provide the details as shown in the following screenshot:
  • Name: lambdaTerminateEMR
  • Runtime: Python 2.7
  • Role: Choose an existing role
  • Existing role: TerminateEMRLambdaRole

  1. Choose Create function.
  2. In the Function code section, for Code entry type, choose Upload a file from Amazon S3, and for Runtime, choose Python 2.7.

The Lambda function S3 link URL is

s3://emr-shutdown-blogartifacts/EMRTerminate.zip.

Link to the function: https://s3.amazonaws.com/emr-shutdown-blogartifacts/EMRTerminate.zip

This Lambda function is triggered by a CloudWatch alarm. It parses the input event, retrieves the JobFlowId, and deletes the AWS CloudFormation stack of the corresponding JobFlowId.

SNS topic and Lambda subscription

For setting the CloudWatch alarm in the further stages, you must create an Amazon SNS topic that notifies the preceding Lambda function to execute. Follow these steps to create an SNS topic and configure the Lambda endpoint.

  1. Navigate to the Amazon SNS console, and choose Create topic.
  2. Enter the Topic name and Display name, and choose Create topic.

  1. The topic is created and displayed in the Topics
  2. Select the topic and choose Actions, Subscribe to topic.

  1. In the Create subscription, choose the AWS Lambda Choose lambdaterminateEMR as the endpoint, and choose Create subscription.

Step 2. Execute the JupyterHub API token script as an EMR step

This step is required only when JupyterHub is enabled in the cluster.

Navigate to the EMR cluster to be monitored, and execute the scheduler script as an EMR step.

Command: s3://emr-shutdown-blogartifacts/jupyterhub_addAdminToken.sh

This script generates an API token and assigns it to the admin user. It is then picked up by the jupyterhub module in the monitoring script to track the activity of the application.

Step 3. Execute the scheduler script as an EMR step

Navigate to the EMR cluster to be monitored and execute the scheduler script as an EMR step.

Note:

Ensure that termination protection is disabled in the cluster. The termination protection flag causes the Lambda function to fail.

Command: s3://emr-shutdown-blogartifacts/schedule_script.sh

Parameter: s3://emr-shutdown-blogartifacts/pushShutDownMetrin.sh

The step function copies the pushShutDownMetric.sh script to the master node and schedules it to run every 5 minutes.

The schedule_script.sh is at https://s3.amazonaws.com/emr-shutdown-blogartifacts/schedule_script.sh.

The pushShutDownMetrin.sh is at https://s3.amazonaws.com/emr-shutdown-blogartifacts/pushShutDownMetrin.sh.

Step 4. Create a CloudWatch alarm

For single-step deployment, use this AWS CloudFormation template to launch and configure the resources in a single go.

The following parameters are available in the template.

ParameterDefaultDescription
AlarmNameTerminateIDLE-EMRAlarmThe name for the alarm.
EMRJobFlowIDRequires inputThe Jobflowid of the cluster.
EvaluationPeriodRequires inputThe idle timeout value—input should be in data points (1 data point equals 5 minutes). For example, to terminate the cluster if it is idle for 20 minutes, the input should be 4.
SNSSubscribeTopicRequires inputThe Amazon Resource Name (ARN) of the SNS topic to be triggered on the alarm.

 

The AWS CloudFormation CLI command is as follows:

aws cloudformation create-stack --stack-name EMRAlarmStack \
      --template-body s3://emr-shutdown-blogartifacts/Cloudformation/alarm.json \
      --parameters AlarmName=TerminateIDLE-EMRAlarm,EMRJobFlowID=<Input>,                 EvaluationPeriod=4,SNSSubscribeTopic=<Input>

For manual deployment, follow these steps to create the alarm.

  1. Open the Amazon CloudWatch console and choose Alarms.
  2. Choose Create Alarm.
  3. On the Select Metric page, under Custom Metrics, choose EMRShutdown/Cluster-Metric.

  1. Choose the isEMRUsed metric of the EMR JobFlowId, and then choose Next.

  1. Define the alarm as required. In this case, the alarm is set to send notification to the SNS topic shutDownEMRTest when CloudWatch receives the IsEMRUsed metric as 0 for every data point in the last 2 hours.

  1. Choose Create Alarm.

Summary

In this post, we focused on building a framework to cut down the additional cost that you might incur due to the idle running of an EMR cluster. The modules implemented in the shell script, the tracking of the execution status of the Spark scripts, and the Hive/Presto queries using the lightweight REST API calls make this approach an efficient solution.

If you have questions or suggestions, please comment below.

 


About the Author

Praveen Krishnamoorthy Ravikumar is an associate big data consultant with Amazon Web Services.

 

 

 

 

Query your Amazon Redshift cluster with the new Query Editor

Post Syndicated from Surbhi Dangi original https://aws.amazon.com/blogs/big-data/query-your-amazon-redshift-cluster-with-the-new-query-editor/

Data warehousing is a critical component for analyzing and extracting actionable insights from your data. Amazon Redshift is a fast, scalable data warehouse that makes it cost-effective to analyze all of your data across your data warehouse and data lake.

The Amazon Redshift console recently launched the Query Editor. The Query Editor is an in-browser interface for running SQL queries on Amazon Redshift clusters directly from the AWS Management Console. Using the Query Editor is the most efficient way to run queries on databases hosted by your Amazon Redshift cluster.

After creating your cluster, you can use the Query Editor immediately to run queries on the Amazon Redshift console. It’s a great alternative to connecting to your database with external JDBC/ODBC clients.

In this post, we show how you can run SQL queries for loading data in clusters and monitoring cluster performance directly from the console.

Using the Query Editor instead of your SQL IDE or tool

The Query Editor provides an in-browser interface for running SQL queries on Amazon Redshift clusters. For queries that are run on compute nodes, you can then view the query results and query execution plan next to your queries.

The ability to visualize queries and results in a convenient user interface lets you accomplish many tasks, both as a database administrator and a database developer. The visual Query Editor helps you do the following:

  • Build complex queries.
  • Edit and run queries.
  • Create and edit data.
  • View and export results.
  • Generate EXPLAIN plans on your queries.

With the Query Editor, you can also have multiple SQL tabs open at the same time. Colored syntax, query autocomplete, and single-step query formatting are all an added bonus!

Database administrators typically maintain a repository of commonly used SQL statements that they run regularly. If you have this written in a notepad somewhere, the saved queries feature is for you. This feature lets you save and reuse your commonly run SQL statements in one step. This makes it efficient for you to review, rerun, and modify previously run SQL statements. The Query Editor also has an exporter so that you can export the query results into a CSV format.

The Query Editor lets you perform common tasks, such as creating a schema and table on the cluster and loading data in tables. These common tasks are now possible with a few simple SQL statements that you run directly on the console. You can also do day-to-day administrative tasks from the console. These tasks can include finding long-running queries on the cluster, checking for potential deadlocks with long-running updates on a cluster, and checking for how much space is available in the cluster.

The Query Editor is available in 16 AWS Regions. It’s available on the Amazon Redshift console at no extra cost to you. Standard Amazon Redshift rates apply for your cluster usage and for Amazon Redshift Spectrum. To learn more, see Amazon Redshift pricing.

Let’s get started with the Query Editor

The following sections contain the steps for setting up your Amazon Redshift cluster with a sample dataset from an Amazon S3 bucket using the Query Editor directly from the console. For new users, this is an especially handy alternative to setting up JDBC/ODBC clients to establish a connection to your cluster. If you already have a cluster, you can complete these steps in 10 minutes or less.

In the following example, you use the Query Editor to perform these tasks:

  • Load a sample dataset in your cluster.
  • Run SQL queries on a sample dataset and view results and execution details.
  • Run administration queries on system tables and save frequently used queries.
  • Run SQL queries to join an internal and external table.

Use the following steps to set up your cluster for querying:

  1. On the Amazon Redshift console, create a cluster.For detailed steps, see the procedure described in Launch a Sample Amazon Redshift Cluster in the Amazon Redshift Getting Started Guide. Use any of the following currently supported node types: dc1.8xlarge, dc2.large, dc2.8xlarge, or ds2.8xlarge.For this post, we used the Quick launch cluster button on the Amazon Redshift dashboard to create a single-node, dc2.large cluster called demo-cluster in the us-east-1 Region. As you go through the tutorial, replace this cluster name with the name of the cluster that you launched, and the Region where you launched the cluster.

  1. Add Query Editor-related permissions to the AWS account.To access the Query Editor feature on the console, you need permissions. For detailed steps, see Enabling Access to the Query Editor in the Amazon Redshift Cluster Management Guide.
  1. To load and run queries on a sample dataset (including permissions to load data from S3 or to use the AWS Glue or Amazon Athena Data Catalogs), follow these steps:a. To load sample data from Amazon S3 using the COPY command, you must provide authentication for your cluster to access Amazon S3 on your behalf. Sample data for this procedure is provided in an Amazon S3 bucket that is owned by Amazon Redshift. The bucket permissions are configured to allow all authenticated AWS users read access to the sample data files.To perform this step:

• Attach the AmazonS3ReadOnlyAccess policy to the IAM role. The AmazonS3ReadOnlyAccess policy grants your cluster read-only access to all Amazon S3 buckets.

• If you’re using the AWS Glue Data Catalog, attach the AWSGlueConsoleFullAccess policy to the IAM role. If you’re using the Athena Data Catalog, attach the AmazonAthenaFullAccess policy to the IAM role.

b. In step 2 of the example, you run the COPY command to load the sample data. The COPY command includes a placeholder for the IAM role Amazon Resource Name (ARN). To load sample data, add the role ARN in the COPY The following is a sample COPY command:

COPY myinternalschema.event FROM 's3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/event/allevents_pipe.txt'
iam_role ‘REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN'
delimiter '|' timeformat 'YYYY-MM-DD HH:MI:SS' region 'us-east-1';

After you complete these steps, your Amazon Redshift cluster is ready. The following sections describe three steps that demonstrate what you can do with the Query Editor:

  • Use the Query Editor for loading data.
  • Perform several day-to-day administration tasks.
  • Run a query on data stored in the Amazon Redshift cluster and Amazon S3 data lake, with no need for loading or other data preparation.

Step 1: Connect to your cluster in the Query Editor

To connect to your cluster:

  1. Using the left navigation pane on the Amazon Redshift console, navigate to the Query Editor.
  2. In the Credentials dialog box, in the Cluster drop-down list, choose the cluster name (demo-cluster). Choose the database and the database user for this cluster.
  3. If you created the cluster by using the service-provided default values, choose dev as your Database selection, and enter awsuser in the Database user box.
  4. Enter the password for the cluster. Commonly, Amazon Redshift database users log on by providing a database user name and password. As an alternative, if you don’t remember your password, you can retrieve it in an encrypted format by choosing Create a temporary password, as shown in the following example. For more information, see Using IAM Authentication to Generate Database User Credentials.

This connects to the cluster if you have Query Editor-related permissions for the AWS account. For more information, see the step to add the Query Editor-related permissions to the AWS account in the previous section.

Step 2: Prepare the cluster with a sample dataset

To prepare the cluster with a sample dataset:

  1. Run the following SQL in the Query Editor. This creates the schema myinternalschema in the Amazon Redshift cluster demo-cluster.
/* Create a schema */
CREATE SCHEMA myinternalschema

  1. Run the following SQL statement in the Query Editor to create a table for schema myinternalschema.
/* Create table */
CREATE TABLE myinternalschema.event(
	eventid integer not null distkey,
	venueid smallint not null,
	catid smallint not null,
	dateid smallint not null sortkey,
	eventname varchar(200),
	starttime timestamp);
  1. Run the following SQL statement with the COPY command to copy the sample dataset from Amazon S3 to your Amazon Redshift cluster, demo-cluster, in the us-east-1 The Amazon S3 path for the sample dataset is s3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/event/allevents_pipe.txt.

Before choosing Run Query, remember to replace the placeholder in the example with the ARN for the IAM role that is associated with this AWS account. If your cluster is in another AWS Region, replace the Region in the region parameter and the Amazon S3 path, as shown in the following SQL command:

/* Load data */
COPY myinternalschema.event FROM 's3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/event/allevents_pipe.txt'
iam_role ‘REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN'
delimiter '|' timeformat 'YYYY-MM-DD HH:MI:SS' region 'us-east-1';
  1. To ensure access to the public dataset in Amazon S3, make sure that this AWS account has the correct permissions to access Amazon S3, AWS Glue, and Athena. For more information, see the step to load and run queries on the sample dataset (Amazon S3 and AWS Glue/Amazon Athena Data Catalog permissions) earlier in this post.
  2. To verify the data in the previously created table in the Query Editor, browse through the tables in the schema viewer on the left. Choose the preview icon next to the table name to see the first 10 records from the event table. Choosing this option runs the following query for a preview of the table, displaying 10 rows from the table:
/* View a snippet of the same dataset in myinternalschema */ 
SELECT * FROM myinternalschema.event
LIMIT 10;

You can also enter your own SQL statements. Use Ctrl + Space to autocomplete queries in the Query Editor, to verify the data in the table that you created.

Step 3: Helpful cluster management queries

You are all set to try Amazon Redshift! In day-to-day cluster management and monitoring, you can run the following SQL queries in the Query Editor. These frequently used queries let you find and shut down long-running queries, uncover deadlock situations, and check for available disk space on your Amazon Redshift cluster. Save these queries and get convenient access to them by choosing Saved queries in the left navigation on the console, as shown in the following example:

Kill malfunctioning or long-running queries on a cluster

If there is a malfunctioning query that must be shut down, locating the query can often be a multi-step process. Run the following SQL in the Query Editor to find all queries that are running on an Amazon Redshift cluster with a SQL statement:

/* Queries are currently in progress */ 
SELECT
userid
 , query
 , pid
 , starttime
 , left(text, 50) as text
FROM pg_catalog.stv_inflight

After locating the malfunctioning queries from the query result set, use the cancel <pid> <msg> command to kill a query. Be sure to use the process ID—pid in the previous SQL—and not the query ID. You can supply an optional message that is returned to the issuer of the query and logged.

Monitor disk space being used on a cluster

One of the most frequently used console functions is monitoring the percentage of disk space used by a cluster. Queries fail if there is limited space in the cluster to create temp tables used while the query is running. Vacuums can also fail if the cluster does not have free space to store intermediate data in the cluster restore process. Monitoring this metric is important for planning ahead before the cluster gets full and you have to resize or add more clusters.

If you suspect that you are experiencing high or full disk usage with Amazon Redshift, run the following SQL in the Query Editor to find disk space available and see individual table sizes on the cluster:

/* Disk space available on your Redshift cluster */
SELECT SUM(used)::float / SUM(capacity) as pct_full
FROM pg_catalog.stv_partitions
 
/* Find individual table sizes */
SELECT t.name, COUNT(tbl) / 1000.0 as gb
FROM (
SELECT DISTINCT id, name FROM stv_tbl_perm
) t
JOIN stv_blocklist ON tbl=t.id
GROUP BY t.name ORDER BY gb DESC

From here, you can either drop the unnecessary tables or resize your cluster to have more capacity. For more information, see Resizing Clusters in Amazon Redshift.

Watch for deadlock situations with suspiciously long-running updates on the cluster

If a cluster has a suspiciously long-running update, it might be in a deadlocked transaction. The stv_locks table indicates any transactions that have locks, along with the process ID of the relevant sessions. This pid can be passed to pg_terminate_backend(pid) to kill the offending session.

Run a SQL statement in the Query Editor to inspect the locks:

\/* Find all transactions that have locks along with the process id of the relevant sessions */ 
select 
  table_id, 
  last_update, 
  last_commit, 
  lock_owner_pid, 
  lock_status 
FROM pg_catalog.stv_locks 
ORDER BY last_update asc

To shut down the session, run select pg_terminate_backend(lock_owner_pid), using the value from stl_locks.

See the rows affected by the most recent vacuums of the cluster

By running a vacuum command on tables in the cluster, any free space because of delete and update operations is reclaimed. At the same time, the data of the table gets sorted. The result is a compact and sorted table, which improves the cluster performance.

Run the following SQL statement to see a count of rows that were deleted or resorted from the most recent vacuums from the svv_vacuum_summary table:

/* Deleted or restored rows from most recent vacuums */
select * from svv_vacuum_summary
where table_name = 'events'

Debug connection issues for Amazon Redshift clusters

Joining stv_sessions and stl_connection_log tables returns a list of all sessions (all connects, authenticates, and disconnects on the cluster) and the respective remote host and port information.

To list all connections, run the following SQL statement in the Query Editor:

/* List connections, along with remote host information */ 
SELECT DISTINCT
 starttime,
 process,
 user_name,
 '169.254.21.1' remotehost,
 remoteport
FROM stv_sessions
LEFT JOIN stl_connection_log ON pid = process
  AND starttime > recordtime - interval '1 second'
ORDER BY starttime DESC

Use the saved queries feature to save these commonly used SQL statements in your account and run them in the Query Editor with one click.

Bonus step 4: Query with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data in Amazon S3 without the need to first load it into Amazon Redshift. Amazon Redshift Spectrum queries employ massive parallelism to quickly process large datasets in S3, without ingesting that data into Amazon Redshift. Much of the processing occurs in the Amazon Redshift Spectrum layer. Multiple clusters can concurrently query the same dataset in Amazon S3 without needing to make copies of the data for each cluster.

To get set up with Amazon Redshift Spectrum, run the following SQL statements in the Query Editor for demo-cluster. If your cluster is in another AWS Region, be sure to replace the Region in the region parameter and the Amazon S3 path in the following SQL statement.

To create a new schema from a data catalog to use with Amazon Redshift Spectrum:

/* Create external (Amazon S3) schema */
CREATE EXTERNAL SCHEMA myexternalschema
from data catalog
database 'myexternaldatabase'
region 'us-east-1'
iam_role 'REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
create external database if not exists;

To create a table for the Amazon Redshift Spectrum S3 sample dataset:

/* Create external table */
CREATE EXTERNAL TABLE myexternalschema.sales(
salesid integer,
listid integer,
sellerid integer,
buyerid integer,
eventid integer,
dateid smallint,
qtysold smallint, pricepaid decimal(8,1), commission decimal(8,1), saletime timestamp)
row format delimited
fields terminated by '\t'
stored as textfile
location 's3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/sales/' 
table properties ('numRows'='171000');

Start querying!

This section provides an example scenario to start querying data from the external (Amazon S3) sales table and the internal (Amazon Redshift) event table. The join query in this scenario looks for all events (from the sales dataset loaded on the demo-cluster) with the sale price paid > 50 (from the Amazon Redshift Spectrum dataset in Amazon S3, s3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/sales/).

/* Join a table from the sample dataset with a Spectrum table */
/* Join external (Amazon S3) and internal (Amazon Redshift) table */
SELECT
    myexternalschema.sales.eventid,
    sum(myexternalschema.sales.pricepaid)   
from
    myexternalschema.sales,
    myinternalschema.event  
where
    myexternalschema.sales.eventid = myinternalschema.event.eventid       
    and myexternalschema.sales.pricepaid > 50  
group by
    myexternalschema.sales.eventid  
order by
    1 desc;

In the Query results section, choose View execution to see the detailed execution plan. The query plan is available for all queries executed on compute nodes.

Note: Queries that do not reference user tables, such as administration queries that only use catalog tables, do not have an available query plan.

Optionally, download the query results to your local disk for offline use. Queries run for up to three minutes in the Query Editor. After a query is completed, the Query Editor provides two minutes to fetch results. Rerun the query and try again if you hit the two-minute threshold.

Load additional tables from the Amazon Redshift sample dataset by using the following SQL statements and get creative with your queries. Before choosing Run query in the Query Editor, remember to add the ARN for the IAM role that is associated with this AWS account in the placeholder in the following SQL statement. If your cluster is in another AWS Region, replace the Region in the region parameter and the Amazon S3 path in the following SQL statement.

copy users from 's3://awssampledbuswest2/tickit/allusers_pipe.txt' 
credentials 'aws_iam_role=REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy venue from 's3://awssampledbuswest2/tickit/venue_pipe.txt' 
credentials 'aws_iam_role=REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy category from 's3://awssampledbuswest2/tickit/category_pipe.txt' 
credentials 'aws_iam_role=REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy date from 's3://awssampledbuswest2/tickit/date2008_pipe.txt' 
credentials 'aws_iam_role= REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy listing from 's3://awssampledbuswest2/tickit/listings_pipe.txt' 
credentials 'aws_iam_role= REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy sales from 's3://awssampledbuswest2/tickit/sales_tab.txt'
credentials 'aws_iam_role= REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN'
delimiter '\t' timeformat 'MM/DD/YYYY HH:MI:SS' region 'us-west-2';

Summary

In this post, we introduced the Query Editor, an in-browser interface for running SQL queries on Amazon Redshift clusters. We showed how you can use it to run SQL queries for loading data in clusters and monitoring cluster performance directly on the console. To learn more about Amazon Redshift and start with Query Editor, visit the Amazon Redshift webpage.

If you like this feature, share your feedback by using the Send feedback link on the console, as shown following.

If you have any questions or suggestions, please leave a comment below.

Happy querying!

 


About the Authors

Surbhi Dangi is a senior product/design manager at AWS. Her work includes building user experiences for Database, Analytics & AI AWS consoles, launching new database and analytics products, working on new feature launches for existing products, and building broadly adopted internal tools for AWS teams. She enjoys traveling to new destinations to discover new cultures, trying new cuisines, and teaches product management 101 to aspiring PMs.

 

 

Raja Bhogi is an engineering manager at AWS. He is responsible for building delightful and easy-to-use web experiences for analytics and blockchain products. His work includes launching web experiences for new analytics products, and working on new feature launches for existing products. He is passionate about web technologies, performance insights, and tuning. He is a thrill seeker and enjoys everything from roller coasters to bungy jumping.

 

 

 

Build and automate a serverless data lake using an AWS Glue trigger for the Data Catalog and ETL jobs

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/build-and-automate-a-serverless-data-lake-using-an-aws-glue-trigger-for-the-data-catalog-and-etl-jobs/

Today, data is flowing from everywhere, whether it is unstructured data from resources like IoT sensors, application logs, and clickstreams, or structured data from transaction applications, relational databases, and spreadsheets. Data has become a crucial part of every business. This has resulted in a need to maintain a single source of truth and automate the entire pipeline—from data ingestion to transformation and analytics— to extract value from the data quickly.

There is a growing concern over the complexity of data analysis as the data volume, velocity, and variety increases. The concern stems from the number and complexity of steps it takes to get data to a state that is usable by business users. Often data engineering teams spend most of their time on building and optimizing extract, transform, and load (ETL) pipelines. Automating the entire process can reduce the time to value and cost of operations. In this post, we describe how to create a fully automated data cataloging and ETL pipeline to transform your data.

Architecture

In this post, you learn how to build and automate the following architecture.

You build your serverless data lake with Amazon Simple Storage Service (Amazon S3) as the primary data store. Given the scalability and high availability of Amazon S3, it is best suited as the single source of truth for your data.

You can use various techniques to ingest and store data in Amazon S3. For example, you can use Amazon Kinesis Data Firehose to ingest streaming data. You can use AWS Database Migration Service (AWS DMS) to ingest relational data from existing databases. And you can use AWS DataSync to ingest files from an on-premises Network File System (NFS).

Ingested data lands in an Amazon S3 bucket that we refer to as the raw zone. To make that data available, you have to catalog its schema in the AWS Glue Data Catalog. You can do this using an AWS Lambda function invoked by an Amazon S3 trigger to start an AWS Glue crawler that catalogs the data. When the crawler is finished creating the table definition, you invoke a second Lambda function using an Amazon CloudWatch Events rule. This step starts an AWS Glue ETL job to process and output the data into another Amazon S3 bucket that we refer to as the processed zone.

The AWS Glue ETL job converts the data to Apache Parquet format and stores it in the processed S3 bucket. You can modify the ETL job to achieve other objectives, like more granular partitioning, compression, or enriching of the data. Monitoring and notification is an integral part of the automation process. So as soon as the ETL job finishes, another CloudWatch rule sends you an email notification using an Amazon Simple Notification Service (Amazon SNS) topic. This notification indicates that your data was successfully processed.

In summary, this pipeline classifies and transforms your data, sending you an email notification upon completion.

Deploy the automated data pipeline using AWS CloudFormation

First, you use AWS CloudFormation templates to create all of the necessary resources. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time.

Launch the AWS CloudFormation template with the following Launch stack button.

Be sure to choose the US East (N. Virginia) Region (us-east-1). Then enter the appropriate stack name, email address, and AWS Glue crawler name to create the Data Catalog. Add the AWS Glue database name to save the metadata tables. Acknowledge the IAM resource creation as shown in the following screenshot, and choose Create.

Note: It is important to enter your valid email address so that you get a notification when the ETL job is finished.

This AWS CloudFormation template creates the following resources in your AWS account:

  • Two Amazon S3 buckets to store both the raw data and processed Parquet data.
  • Two AWS Lambda functions: one to create the AWS Glue Data Catalog and another function to publish topics to Amazon SNS.
  • An Amazon Simple Queue Service (Amazon SQS) queue for maintaining the retry logic.
  • An Amazon SNS topic to inform you that your data has been successfully processed.
  • Two CloudWatch Events rules: one rule on the AWS Glue crawler and another on the AWS Glue ETL job.
  • AWS Identity and Access Management (IAM) roles for accessing AWS Glue, Amazon SNS, Amazon SQS, and Amazon S3.

When the AWS CloudFormation stack is ready, check your email and confirm the SNS subscription. Choose the Resources tab and find the details.

Follow these steps to verify your email subscription so that you receive an email alert as soon as your ETL job finishes.

  1. On the Amazon SNS console, in the navigation pane, choose Topics. An SNS topic named SNSProcessedEvent appears in the display.

  1. Choose the ARN The topic details page appears, listing the email subscription as Pending confirmation. Be sure to confirm the subscription for your email address as provided in the Endpoint column.

If you don’t see an email address, or the link is showing as not valid in the email, choose the corresponding subscription endpoint. Then choose Request confirmation to confirm your subscription. Be sure to check your email junk folder for the request confirmation link.

Configure an Amazon S3 bucket event trigger

In this section, you configure a trigger on a raw S3 bucket. So when new data lands in the bucket, you trigger GlueTriggerLambda, which was created in the AWS CloudFormation deployment.

To configure notifications:

  1. Open the Amazon S3 console.
  2. Choose the source bucket. In this case, the bucket name contains raws3bucket, for example, <stackname>-raws3bucket-1k331rduk5aph.
  3. Go to the Properties tab, and under Advanced settings, choose Events.

  1. Choose Add notification and configure a notification with the following settings:
  • Name– Enter a name of your choice. In this example, it is crawlerlambdaTrigger.
  • Events– Select the All object create events check box to create the AWS Glue Data Catalog when you upload the file.
  • Send to– Choose Lambda function.
  • Lambda– Choose the Lambda function that was created in the deployment section. Your Lambda function should contain the string GlueTriggerLambda.

See the following screenshot for all the settings. When you’re finished, choose Save.

For more details on configuring events, see How Do I Enable and Configure Event Notifications for an S3 Bucket? in the Amazon S3 Console User Guide.

Download the dataset

For this post, you use a publicly available New York green taxi dataset in CSV format. You upload monthly data to your raw zone and perform automated data cataloging using an AWS Glue crawler. After cataloging, an automated AWS Glue ETL job triggers to transform the monthly green taxi data to Parquet format and store it in the processed zone.

You can download the raw dataset from the NYC Taxi & Limousine Commission trip record data site. Download the monthly green taxi dataset and upload only one month of data. For example, first upload only the green taxi January 2018 data to the raw S3 bucket.

Automate the Data Catalog with an AWS Glue crawler

One of the important aspects of a modern data lake is to catalog the available data so that it’s easily discoverable. To run ETL jobs or ad hoc queries against your data lake, you must first determine the schema of the data along with other metadata information like location, format, and size. An AWS Glue crawler makes this process easy.

After you upload the data into the raw zone, the Amazon S3 trigger that you created earlier in the post invokes the GlueTriggerLambdafunction. This function creates an AWS Glue Data Catalog that stores metadata information inferred from the data that was crawled.

Open the AWS Glue console. You should see the database, table, and crawler that were created using the AWS CloudFormation template. Your AWS Glue crawler should appear as follows.

Browse to the table using the left navigation, and you will see the table in the database that you created earlier.

Choose the table name, and further explore the metadata discovered by the crawler, as shown following.

You can also view the columns, data types, and other details.  In following screenshot, Glue Crawler has created schema from files available in Amazon S3 by determining column name and respective data type. You can use this schema to create external table.

Author ETL jobs with AWS Glue

AWS Glue provides a managed Apache Spark environment to run your ETL job without maintaining any infrastructure with a pay as you go model.

Open the AWS Glue console and choose Jobs under the ETL section to start authoring an AWS Glue ETL job. Give the job a name of your choice, and note the name because you’ll need it later. Choose the already created IAM role with the name containing <stackname>– GlueLabRole, as shown following. Keep the other default options.

AWS Glue generates the required Python or Scala code, which you can customize as per your data transformation needs. In the Advanced properties section, choose Enable in the Job bookmark list to avoid reprocessing old data.

On the next page, choose your raw Amazon S3 bucket as the data source, and choose Next. On the Data target page, choose the processed Amazon S3 bucket as the data target path, and choose Parquet as the Format.

On the next page, you can make schema changes as required, such as changing column names, dropping ones that you’re less interested in, or even changing data types. AWS Glue generates the ETL code accordingly.

Lastly, review your job parameters, and choose Save Job and Edit Script, as shown following.

On the next page, you can modify the script further as per your data transformation requirements. For this post, you can leave the script as is. In the next section, you automate the execution of this ETL job.

Automate ETL job execution

As the frequency of data ingestion increases, you will want to automate the ETL job to transform the data. Automating this process helps reduce operational overhead and free your data engineering team to focus on more critical tasks.

AWS Glue is optimized for processing data in batches. You can configure it to process data in batches on a set time interval. How often you run a job is determined by how recent the end user expects the data to be and the cost of processing. For information about the different methods, see Triggering Jobs in AWS Glue in the AWS Glue Developer Guide.

First, you need to make one-time changes and configure your ETL job name in the Lambda function and the CloudWatch Events rule. On the console, open the ETLJobLambda Lambda function, which was created using the AWS CloudFormation stack.

Choose the Lambda function link that appears, and explore the code. Change the JobName value to the ETL job name that you created in the previous step, and then choose Save.

As shown in in the following screenshot, you will see an AWS CloudWatch Events rule CrawlerEventRule that is associated with an AWS Lambda function. When the CloudWatch Events rule receives a success status, it triggers the ETLJobLambda Lambda function.

Now you are all set to trigger your AWS Glue ETL job as soon as you upload a file in the raw S3 bucket. Before testing your data pipeline, set up the monitoring and alerts.

Monitoring and notification with Amazon CloudWatch Events

Suppose that you want to receive a notification over email when your AWS Glue ETL job is completed. To achieve that, the CloudWatch Events rule OpsEventRule was deployed from the AWS CloudFormation template in the data pipeline deployment section. This CloudWatch Events rule monitors the status of the AWS Glue ETL job and sends an email notification using an SNS topic upon successful completion of the job.

As the following image shows, you configure your AWS Glue job name in the Event pattern section in CloudWatch. The event triggers an SNS topic configured as a target when the AWS Glue job state changes to SUCCEEDED. This SNS topic sends an email notification to the email address that you provided in the deployment section to receive notification.

Let’s make one-time configuration changes in the CloudWatch Events rule OpsEventRule to capture the status of the AWS Glue ETL job.

  1. Open the CloudWatch console.
  2. In the navigation pane, under Events, choose Rules. Choose the rule name that contains OpsEventRule, as shown following.

  1. In the upper-right corner, choose Actions, Edit.

  1. Replace Your-ETL-jobName with the ETL job name that you created in the previous step.

  1. Scroll down and choose Configure details. Then choose Update rule.

Now that you have set up an entire data pipeline in an automated way with the appropriate notifications and alerts, it’s time to test your pipeline. If you upload new monthly data to the raw Amazon S3 bucket (for example, upload the NY green taxi February 2018 CSV), it triggers the GlueTriggerLambda AWS Lambda function. You can navigate to the AWS Glue console, where you can see that the AWS Glue crawler is running.

Upon completion of the crawler, the CloudWatch Events rule CrawlerEventRule triggers your ETLJobLambda Lambda function. You can notice now that the AWS Glue ETL job is running.

When the ETL job is successful, the CloudWatch Events rule OpsEventRule sends an email notification to you using an Amazon SNS topic, as shown following, hence completing the automation cycle.

Be sure to check your processed Amazon S3 bucket, where you will find transformed data processed by your automated ETL pipeline. Now that the processed data is ready in Amazon S3, you need to run the AWS Glue crawler on this Amazon S3 location. The crawler creates a metadata table with the relevant schema in the AWS Glue Data Catalog.

After the Data Catalog table is created, you can execute standard SQL queries using Amazon Athena and visualize the data using Amazon QuickSight. To learn more, see the blog post Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight

Conclusion

Having an automated serverless data lake architecture lessens the burden of managing data from its source to destination—including discovery, audit, monitoring, and data quality. With an automated data pipeline across organizations, you can identify relevant datasets and extract value much faster than before. The advantage of reducing the time to analysis is that businesses can analyze the data as it becomes available in real time. From the BI tools, queries return results much faster for a single dataset than for multiple databases.

Business analysts can now get their job done faster, and data engineering teams can free themselves from repetitive tasks. You can extend it further by loading your data into a data warehouse like Amazon Redshift or making it available for machine learning via Amazon SageMaker.

Additional resources

See the following resources for more information:

 


About the Author

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

 

Luis Lopez Soria is a partner solutions architect and serverless specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys doing sports in addition to traveling around the world exploring new foods and cultures.

 

 

 

Chirag Oswal is a partner solutions architect and AR/VR specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys video games and travel.

Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects

Post Syndicated from Rajeev Chakrabarti original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/

In February 2019, Amazon Web Services (AWS) announced a new feature in Amazon Kinesis Data Firehose called Custom Prefixes for Amazon S3 Objects. It lets customers specify a custom expression for the Amazon S3 prefix where data records are delivered. Previously, Kinesis Data Firehose allowed only specifying a literal prefix. This prefix was then combined with a static date-formatted prefix to create the output folder in a fixed format. Customers asked for flexibility, so AWS listened and delivered.

Kinesis Data Firehose is most commonly used to consume event data from streaming sources, such as applications or IoT devices.  The data then is typically stored in a data lake, so it can be processed and eventually queried.  When storing data on Amazon S3, it is a best practice to partition or group related data and store it together in the same folder.  This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thus improving performance and reducing cost.

A common way to group data is by date.  Kinesis Data Firehose automatically groups data and stores it into the appropriate folders on Amazon S3 based on the date.  However, the naming of folders in Amazon S3 is not compatible with Apache Hive naming conventions. This makes data more difficult to catalog using AWS Glue crawlers and analyze using big data tools.

This post discusses a new capability that lets us customize how Kinesis Data Firehose names the output folders in Amazon S3. It covers how custom prefixes work, the intended use cases, and includes step-by-step instructions to try the feature in your own account.

The need for custom prefixes for Amazon S3 objects

Previously, Kinesis Data Firehose created a static Universal Coordinated Time (UTC) based folder structure in the format YYYY/MM/DD/HH. It then appended it to the provided prefix before writing objects to Amazon S3. For example, if you provided a prefix “mydatalake/”, the generated folder hierarchy would be “mydatalake/2019/02/09/13”.  However, to be compatible with Hive naming conventions, the folder structure is expected to follow the format “/partitionkey=partitionvalue”.  Using this naming convention, data can be easily cataloged with AWS Glue crawlers, resulting in proper partition names.

Other methods for managing partitions also become possible such as running MSCK REPAIR TABLE in Amazon Athena or Apache Hive on Amazon EMR, which can add all partitions through a single statement. Furthermore, you can use other date-based partitioning patterns like “/dt=2019-02-09-13/” instead of expanding the date out into folders.  This is helpful in reducing the total number of partitions that need to be maintained as the table grows over time. It also simplifies range queries. Providing the ability to specify custom prefixes obviates the need for an additional ETL step to put the data in the right folder structure improving the time to insight.

How custom prefixes for Amazon S3 objects works

This new capability does not let you use any date or timestamp value from your event data, nor can you use any other arbitrary value in the event. Kinesis Data Firehose uses an internal timestamp field called ApproximateArrivalTimestamp. Each data record includes an ApproximateArrivalTimestamp (in UTC) that is set when a stream successfully receives and stores the record. This is commonly referred to as a server-side timestamp. Kinesis Data Firehose buffers incoming records according to the configured buffering hints and delivers them into Amazon S3 objects for the Amazon S3 destination. The resulting objects in Amazon S3 may contain multiple records, each with a different ApproximateArrivalTimestamp. When evaluating timestamps, Kinesis Data Firehose uses the ApproximateArrivalTimestamp of the oldest record that’s contained in the Amazon S3 object being written.

Kinesis Data Firehose also provides the ability to deliver records to a different error output location when there is a delivery, AWS Lambda transformation or format conversion failure. Previously, the error output location could not be configured and was determined by the type of delivery failure. With this release, the error output location (ErrorOutputPrefix) can also be configured. One benefit of this new capability is that you can separate failed records into date partitioned folders for easy reprocessing.

So how do you specify the custom Prefix and the ErrorOutputPrefix? You use an expression of the form: !{namespace:value}, where the namespace can be either firehose or timestamp. The value can be either “random-string” or “error-output-type” for the firehose namespace or a date pattern for the timestamp namespace in the Java DateTimeFormatter format. In a single expression, you can use a combination of the two namespaces although the !{firehose: error-output-type} can be used only in the ErrorOutputPrefix. For more information and examples, see Custom Prefixes for Amazon S3 Objects.

Writing streaming data into Amazon S3 with Kinesis Data Firehose

This walkthrough describes how streaming data can be written into Amazon S3 with Kinesis Data Firehose using a Hive compatible folder structure.  It then shows how AWS Glue crawlers can infer the schema and extract the proper partition names that we designated in Kinesis Data Firehose, and catalog them in AWS Glue Data Catalog.  Finally, we run sample queries to show that partitions are indeed being recognized.

To demonstrate this, we use python code to generate sample data.  We also use a Lambda transform on Kinesis Data Firehose to forcibly create failures. This demonstrates how data can be saved to the error output location. The code that you need for this walkthrough is included here in GitHub.

For this walkthrough, this is the architecture that we are building:

Step 1: Create an Amazon S3 bucket

Create an S3 bucket to be used by Kinesis Data Firehose to deliver event records. We use the AWS Command Line Interface (AWS CLI) to create the Amazon S3 bucket in the US East (N. Virginia) Region. Remember to substitute the bucket name in the example for your own.

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

Step 2: Lambda Transform (optional)

The incoming events have an ApproximateArrivalTimestamp field in the event payload.  This is sufficient to create a proper folder structure on Amazon S3.  However, when querying the data it may be beneficial to expose this timestamp value as a top level column for easy filtering and validation.  To accomplish this, we create a Lambda function that adds the ApproximateArrivalTimestamp as a top level field in the data payload. The data payload is what Kinesis Data Firehose writes as an object in Amazon S3. Additionally, the Lambda code also artificially generates some processing errors that are delivered to the “ErrorOutputPrefix” location specified for the delivery destination to illustrate the use of expressions in the “ErrorOutputPrefix.”

Create an IAM role for the Lambda transform function

First, create a role for the Lambda function called LambdaBasicRole. The TrustPolicyForLambda.json file is included in the GitHub repository.

$ aws iam create-role --role-name KDFLambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json

After the role is created, attach the managed Lambda basic execution policy to it.

$ aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name KDFLambdaBasicRole

Lambda function

To create the Lambda function, start with the Python Kinesis Data Firehose blueprint “General Firehose Processing” and then modify it. For more information about the structure of the records and what must be returned, see Amazon Kinesis Data Firehose Data Transformation.

Zip up the Python file, and then create the Lambda function using the AWS CLI. The CreateLambdaFunctionS3CustomPrefixes.json file is included in the GitHub repository.

aws lambda create-function --zip-file "fileb://lambda_function.zip" --cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

Step3. Delivery Stream

Next, create the Kinesis Data Firehose delivery stream. The createdeliverystream.json file is included in the GitHub repository.

 aws firehose create-delivery-stream --cli-input-json file://createdeliverystream.json

In the previous configuration, we defined a Prefix and an ErrorOutputPrefix under the “ExtendedS3DestinationConfiguration” element. We defined the same for the “S3BackupConfiguration” element. Note that when the “ProcessingConfiguration” element is set to “Disabled”, the ErrorOutputPrefix parameter of the “ExtendedS3DestinationConfiguration” element exists only for consistency. It otherwise has no significance.

We’ve chosen a prefix that will result in a folder structure compatible with hive-style partitioning. This is the prefix we used:

“fhbase/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/”

Kinesis Data Firehose first creates a base folder called “fhbase” directly under the Amazon S3 bucket. Second, it evaluates the expressions !{timestamp:YYYY}, !{timestamp:MM}, !{timestamp:dd}, and !{timestamp:HH} to year, month, day and hour using the Java DateTimeFormatter format. For example, an ApproximateArrivalTimestamp of 1549754078390 in UNIX epoch time, which is 2019-02-09T16:13:01.000000Z in UTC would evaluate to “year=2019”, “month=02”, “day=09” and “hour=16”.  Therefore, the location in Amazon S3 where data records that are delivered evaluate to “fhbase/year=2019/month=02/day=09/hour=16/”.

Similarly, the ErrorOutputPrefix “fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/” results in a base folder called “fherroroutputbase” directly under the S3 bucket. The expression !{firehose:random-string} evaluates to an 11 character random string like “ztWxkdg3Thg”.  If you use this more than once in the same expression, every instance evaluates to a new random string. The expression !{firehose:error-output-type} evaluates to one of the following:

  1. “processing-failed” for Lambda transformation delivery failures
  2. “elasticsearch-failed” for an Amazon ES destination delivery failures
  3. “splunk-failed” for Splunk destination delivery failures
  4. “format-conversion-failed” for data format conversion failures

So, the location for an Amazon S3 object containing the delivery failed records for a Lambda transformation could evaluate to: fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/.

You can run aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample to describe the delivery stream created.

Next, enable encryption-at-rest for the delivery stream:

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

Or Create the delivery stream using the AWS Console

  1. Choose the source. For this example, I use Direct PUT.
  2. Choose if you would like to transform the incoming records with a Lambda transformation. I chose Enabled, and chose the name of the Lambda function that I had created earlier.

  1. Choose the destination. I chose the Amazon S3 destination.

  1. Choose the Amazon S3 bucket. I chose the Amazon S3 bucket that I had created earlier in this exercise.

  1. Specify the Amazon S3 Prefix and the Amazon S3 error prefix. This corresponds to the “Prefix” and “ErrorOutputPrefix” explained earlier in the context of the AWS CLI input JSON.

  1. Choose whether you would like to back up the raw (before transformation) records to another Amazon S3 location. I chose Enabled and specified the same bucket (you could choose a different bucket). I also specified a different prefix from the transformed records – the base folder is different but the folder structure below that is the same. This would make it more efficient to crawl this location using an AWS Glue crawler or create external tables in Athena or Redshift Spectrum pointing to this location.

  1. Specify the buffering hints for the Amazon S3 destination. I chose 1 MB and 240 seconds.
  2. Choose the S3 Compression and encryption settings. I chose no compression for the transformed records’ location. I chose to encrypt the Amazon S3 location at rest by using the service-managed AWS KMS customer master key (CMK).
  3. Choose whether you want to enable Error Logging in Cloudwatch. I chose Enabled.
  4. Specify the IAM role that you want Kinesis Data Firehose to assume to access resources on your behalf. Choose either Create new or Choose to display a new screen. Choose Create a new IAM role, name the role, and then choose Allow.
  5. Choose Create Delivery Stream.

The delivery stream is now created and active. You can send events to it.

 Test with sample data

I used Python code to generate sample data. The structure of the generated data is as follows:

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}
{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}
{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

Sample code to generate data and push it into Kinesis Data Firehose is included in the GitHub repository.

After you start sending events to the Kinesis Data Firehose delivery stream, objects should start appearing under the specified prefixes in Amazon S3.

I wanted to illustrate Lambda invoke errors and the appearance of files in the ErrorOutputPrefix location for Lambda transform errors. Therefore, I did not give permissions to the “firehose_delivery_role” to invoke my Lambda function. The following file showed up in the location specified by the ErrorOutputPrefix.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

Here is a snippet of the contents of the error file that I previously mentioned.

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

After I gave the “firehose_delivery_role” the appropriate permissions, the data objects showed up in the “Prefix” location specified for the Amazon S3 destination.

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

Also, because the Lambda code in my Lambda transform set the status failed for 10 percent of the records, those showed up in the ErrorOutputPrefix location for Lambda transform errors.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

Here is a snippet of the content of the error file:

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

You’re now ready to create an AWS Glue crawler. For more information about using the AWS Glue Data Catalog, see Populating the AWS Glue Data Catalog.

  1. In the AWS Glue console, go to Crawlers, and choose Add Crawler.

  1. Add information about your crawler, then choose Next.
  2. In the Include Path, specify the Amazon S3 bucket name that you entered under the Amazon S3 destination. Also include the static prefix used when you created the Kinesis Data Firehose delivery stream. Do not include the custom prefix expression.
  3. Choose Next.

  1. Choose Next, No, Next.
  2. Specify the IAM role that AWS Glue would use. I chose to create a new IAM Role. Choose Next.
  3. Specify a schedule to run the crawler. I chose to Run it on Demand. Choose Next.
  4. Specify where the crawler adds the crawled and discovered tables. I chose the default database. Choose Next.

  1. Choose Finish.
  1. The crawler has been created and is ready to be run. Choose Run crawler.

  1. In the AWS Glue console, go to Tables. You can see that a table has been created with the name of the base folder. Choose fhbase.

The crawler has discovered and populated the table and its properties.

You can see the discovered schema. The crawler has identified and created the partitions based on the folder structure specified by the prefix expression.

Open the Amazon Athena console, and select the default database from the drop-down menu. Write the following query in the New query1 window, then choose Run query.

SELECT * FROM "default"."fhbase"

where year = '2019' and day = '12' and hour = '17'

order by approxarrtimestamputcfh desc

Notice that Amazon Athena recognizes the fhbase table as a partitioned table. The query can take advantage of the partitions in the query to filter the results.

Conclusion

As this post illustrates, Custom Prefixes for Amazon S3 objects provides much flexibility to customize the folder structure, where Kinesis Data Firehose delivers the data records and failure records in Amazon S3. Having control over the folder structure and naming in Amazon S3 simplifies data discovery, cataloging, and access. As a result, it helps get insight more expediently and helps you better manage the cost of your queries.

 


About the Author

Rajeev Chakrabarti is a Kinesis specialist solutions architect.

 

 

 

 

Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/

Stream processing facilitates the collection, processing, and analysis of real-time data and enables the continuous generation of insights and quick reactions to emerging situations. This capability is useful when the value of derived insights diminishes over time. Hence, the faster you can react to a detected situation, the more valuable the reaction is going to be. Consider, for instance, a streaming application that analyzes and blocks fraudulent credit card transactions while they occur. Compare that application to a traditional batch-oriented approach that identifies fraudulent transactions at the end of every business day and generates a nice report for you to read the next morning.

It is quite common for the value of insights to diminish over time. Therefore, using stream processing can substantially improve the value of your analytics application. However, building and operating a streaming application that continuously receives and processes data is much more challenging than operating a traditional batch-oriented analytics application.

In this post, we discuss how you can use Apache Flink and Amazon Kinesis Data Analytics for Java Applications to address these challenges. We explore how to build a reliable, scalable, and highly available streaming architecture based on managed services that substantially reduce the operational overhead compared to a self-managed environment. We particularly focus on how to prepare and run Flink applications with Kinesis Data Analytics for Java Applications. To this end, we use an exemplary scenario that includes source code and AWS CloudFormation templates. You can follow along with this example using your own AWS account or adapt the code according to your specific requirements.

Challenges of running streaming applications

When you build a streaming application, the downstream systems naturally rely on a continuous and timely generation of output. Accordingly, there are much higher requirements on the availability of the streaming application. There is also much less time to address operational issues compared to a traditional batch-based approach. In a batch-processing scenario, when a job that runs once at the end of a business day fails, you can often restart the failed job and still complete the computation by the next morning, when the results are needed. In contrast, when a streaming application fails, downstream systems that consume the output might be affected within minutes, or even sooner, when the expected output no longer arrives in time.

Moreover, in case of failure, you can’t simply delete all intermediate results and restart a failed processing job, as it is commonly done in the batch-processing case. The output of a streaming job is continuously consumed by downstream systems. Output that has already been consumed cannot easily be retracted. Therefore, the entire processing pipeline is much more sensitive to duplicates that are introduced by an application that is restarted on failure. Furthermore, the computations of a streaming application often rely on some kind of internal state that can be corrupted or even lost when the application fails.

Last but not least, streaming applications often deal with a varying amount of throughput. Therefore, scaling the application according to the current load is highly desirable. When the load increases, the infrastructure that supports the streaming application must scale to keep the application from becoming overloaded, falling behind, and producing results that are no longer relevant. On the other hand, when the load decreases, the infrastructure should scale in again to remain cost effective by not provisioning more resources than are needed.

A reliable and scalable streaming architecture based on Flink and Kinesis Data Analytics for Java Applications

Apache Flink is an open-source project that is tailored to stateful computations over unbounded and bounded datasets. Flink addresses many of the challenges that are common when analyzing streaming data by supporting different APIs (including Java and SQL), rich time semantics, and state management capabilities. It can also recover from failures while maintaining exactly-once processing semantics. Therefore, Flink is well suited for analyzing streaming data with low latency.

In this post, we illustrate how to deploy, operate, and scale a Flink application with Kinesis Data Analytics for Java Applications. We use a scenario to analyze the telemetry data of a taxi fleet in New York City in near-real time to optimize the fleet operation. In this scenario, every taxi in the fleet is capturing information about completed trips. The tracked information includes the pickup and drop-off locations, number of passengers, and generated revenue. This information is ingested into a Kinesis data stream as a simple JSON blob. From there, the data is processed by a Flink application, which is deployed to Kinesis Data Analytics for Java Applications. This application identifies areas that are currently requesting a high number of taxi rides. The derived insights are finally persisted into Amazon Elasticsearch Service, where they can be accessed and visualized using Kibana.

This scenario leads to the following architecture, which is separated into three stages for the ingestion, processing, and presentation of data.

Separating the different aspects of the infrastructure is a common approach in this domain and has several benefits over a more tightly coupled architecture.

First, the Kinesis data stream serves as a buffer that decouples the producers from the consumers. Taxis can persist the events that they generate into the data stream regardless of the condition of, for instance, the processing layer, which might be currently recovering from a node failure. Likewise, the derived data remains available through Kibana even if the ingestion or processing layer is currently unavailable due to some operational issues. Last but not least, all components can be scaled independently and can use infrastructure that is specifically tailored according to their individual requirements.

This architecture also allows you to experiment and adopt new technologies in the future. Multiple independent applications can concurrently consume the data stored in the Kinesis data stream. You can then test how a new version of an existing application performs with a copy of the production traffic. But you can also introduce a different tool and technology stack to analyze the data, again without affecting the existing production application. For example, it is common to persist the raw event data to Amazon S3 by adding a Kinesis Data Firehose delivery stream as a second consumer to the Kinesis data stream. This facilitates long-term archiving of the data, which you can then use to evaluate ad hoc queries or analyze historic trends.

All in all, separating the different aspects of the architecture into ingestion, processing, and presentation nicely decouples different components, making the architecture more robust. It furthermore allows you to choose different tools for different purposes and gives you a lot of flexibility to change or evolve the architecture over time.

For the rest of this post, we focus on using Apache Flink and Kinesis Data Analytics for Java Applications to identify areas that currently request a high number of taxi rides. We also derive the average trip duration to the New York City airports. But with this architecture, you also have the option to consume the incoming events using other tools, such as Apache Spark Structured Streaming and Kinesis Data Firehose, instead of, or in addition to, what is described here.

Let’s kick the tires!

To see the described architecture in action, execute the following AWS CloudFormation template in your own AWS account. The template first builds the Flink application that analyzes the incoming taxi trips, including the Flink Kinesis Connector that is required to read data from a Kinesis data stream. It then creates the infrastructure and submits the Flink application to Kinesis Data Analytics for Java Applications.

The entire process of building the application and creating the infrastructure takes about 20 minutes. After the AWS CloudFormation stack is created, the Flink application has been deployed as a Kinesis Data Analytics for Java application. It then waits for events in the data stream to arrive. Checkpointing is enabled so that the application can seamlessly recover from failures of the underlying infrastructure while Kinesis Data Analytics for Java Applications manages the checkpoints on your behalf. In addition, automatic scaling is configured so that Kinesis Data Analytics for Java Applications automatically allocates or removes resources and scales the application (that is, it adapts its parallelism) in response to changes in the incoming traffic.

To populate the Kinesis data stream, we use a Java application that replays a public dataset of historic taxi trips made in New York City into the data stream. The Java application has already been downloaded to an Amazon EC2 instance that was provisioned by AWS CloudFormation. You just need to connect to the instance and execute the JAR file to start ingesting events into the stream.

You can obtain all of the following commands, including their correct parameters, from the output section of the AWS CloudFormation template that you executed previously.

$ ssh [email protected]«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-1.0.jar -stream «Kinesis data stream name» -region «AWS region» -speedup 3600

The speedup parameter determines how much faster the data is ingested into the Kinesis data stream relative to the actual occurrence of the historic events. With the given parameters, the Java application ingests an hour of historic data within one second. This results in a throughput of roughly 13k events and 6 MB of data per second, which completely saturates the Kinesis data stream (more on this later).

You can then go ahead and inspect the derived data through the Kibana dashboard that has been created. Or you can create your own visualizations to explore the data in Kibana.

https://«Elasticsearch endpoint»/_plugin/kibana/app/kibana#/dashboard/nyc-tlc-dashboard

The prepared Kibana dashboard contains a heatmap and a line graph. The heatmap visualizes locations where taxis are currently requested, and it shows that the highest demand for taxis is Manhattan. Moreover, the JFK and LaGuardia airports are also spots on the map where substantially more rides are requested compared to their direct neighborhoods. The line graph visualizes the average trip duration to these two airports. The following image shows how it steadily increases throughout the day until it abruptly drops in the evening.

For this post, the Elasticsearch cluster is configured to accept connections from the IP address range specified as a parameter of the AWS CloudFormation template. For production workloads, it’s much more desirable to further tighten the security of your Elasticsearch domain, for instance, by using Amazon Cognito for Kibana access control.

Scaling the architecture to increase its throughput

For this post, the Kinesis data stream was deliberately underprovisioned so that the Java application is completely saturating the data stream. When you closely inspect the output of the Java application, you’ll notice that the “replay lag” is continuously increasing. This means that the producer cannot ingest events as quickly as it is required according to the specified speedup parameter.

You can dive deeper into the metrics of the data stream by accessing it through an Amazon CloudWatch Dashboard. You can then see that the WriteProvisionedThroughputExceeded metric is slightly increased: Roughly 0.4 percent of the records are not accepted into the stream as the respective requests are throttled. In other terms, the data stream is underprovisioned, in particular as the producer pauses the ingestion of new events when too many events are in flight.

To increase the throughput of the data stream, you can simply update the number of shards from 6 to 12 with a couple of clicks on the console and through an API call, respectively. For production environments, you might even want to automate this procedure. For details on how to automatically scale a Kinesis data stream, see the blog post Scaling Amazon Kinesis Data Streams with AWS Application Auto Scaling.

When the scaling operation of the stream finishes, you can observe how the “replay lag” decreases and more events are ingested into the stream.

However, as a direct result, more events need to be processed. So now the Kinesis Data Analytics for Java application becomes overloaded and can no longer keep up with the increased number of incoming events. You can observe this through the millisBehindLatest metric, which is published to CloudWatch. The metric reports the time difference between the oldest record currently read by the Kinesis Data Analytics for Java application and the latest record in the stream according to the ingestion time in milliseconds. So it indicates how much behind the processing is from the tip of the stream.

As these metrics show, 10 minutes after the scaling operation finishes, processing is already more than 3 minutes behind the latest event in the stream. Even worse, it steadily keeps falling behind, continuously widening this gap.

However, in contrast to Kinesis Data Streams, Kinesis Data Analytics for Java Applications natively supports auto scaling. After a couple of minutes, you can see the effect of the scaling activities in the metrics. The millisBehindLatest metric starts to decrease until it reaches zero, when the processing has caught up with the tip of the Kinesis data stream.

However, notice how the millisBehindLatest metric spikes just before it starts to decline. This is caused by the way that scaling a Kinesis Data Analytics for Java application works today. To scale a running application, the internal state of the application is persisted into a so-called savepoint. This savepoint is exposed as a snapshot by Kinesis Data Analytics for Java Applications. Subsequently, the running instance of the application is terminated, and a new instance of the same application with more resources and a higher parallelism is created. The new instance of the application then populates its internal state from the snapshot and resumes the processing from where the now terminated instance left off.

Accordingly, the scaling operation causes a brief interruption of the processing, which explains the spike in metric. However, this operation is transparent to the producers and consumers. Producers can continue to write to the Kinesis data stream because they are nicely decoupled from the application. Likewise, consumers can still use Kibana to view their dashboards, although they might not see the latest data because it hasn’t yet been processed.

Let’s step back for a moment and review what you just did: You created a fully managed, highly available, scalable streaming architecture. You ingested and analyzed up to 25k events per second. You doubled the throughput of the architecture by scaling the Kinesis data stream and the Kinesis Data Analytics for Java application with a couple of clicks. You did all this while the architecture remained fully functional and kept receiving and processing events, not losing a single event. You also could have scaled the Elasticsearch cluster as seamlessly as the other components. But we’ll leave that as an exercise for the interested reader.

Try to imagine what it would have taken you to build something similar from scratch.

Prepare Flink applications for Kinesis Data Analytics for Java Applications

Now that you have seen the streaming application in action, let’s look at what is required to deploy and run a Flink application with Kinesis Data Analytics for Java Applications.

Similar to other deployment methods, the Flink application is first built and packaged into a fat JAR, which contains all the necessary dependencies for the application to run. The resulting fat JAR is then uploaded to Amazon S3. The location of the fat JAR on S3 and some additional configuration parameters are then used to create an application that can be executed by Kinesis Data Analytics for Java Applications. So instead of logging in to a cluster and directly submitting a job to the Flink runtime, you upload the respective fat JAR to S3. You then create a Kinesis Data Analytics for Java application that you can interact with using API calls, the console, and the AWS CLI, respectively.

Adapt the Flink configuration and runtime parameters

To obtain a valid Kinesis Data Analytics for Java application, the fat JAR of the Flink application must include certain dependencies. When you use Apache Maven to build your Flink application, you can simply add another dependency to the .pom file of your project.

<!—pom.xml ->
<project>
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-kinesisanalytics-runtime</artifactId>
            <version>1.0.1</version>
        </dependency>
    </dependencies>
    ...
</project>

You can then specify parameters that are passed to the resulting Kinesis Data Analytics for Java application when it is created or updated. These parameters are basically key-value pairs that are contained in a property map that is part of a property group.

"ApplicationConfiguration": {
    "EnvironmentProperties": {
        "PropertyGroups": [
            {
                "PropertyGroupId": "FlinkApplicationProperties",
                "PropertyMap": {
                    "InputStreamName": "...",
                    ...
                }
            }
        ]
    },
    ...
}

You can then obtain the values of these parameters in the application code from the Kinesis Data Analytics for Java Applications runtime. For example, the following code snippet gets the name of the Kinesis data stream that the application should connect to from the FlinkApplicationProperties property group.

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

String kinesisStreamName = flinkProperties.getProperty("InputStreamName");

You use the same mechanism to configure other properties for the Kinesis Data Analytics for Java application (for example, checkpointing and the parallelism of the application) that are usually specified as a parameter or configuration option directly to the Flink runtime.

"ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
        "CheckpointConfiguration": {
            "ConfigurationType": "DEFAULT"
        },
        "MonitoringConfiguration": {
            "ConfigurationType": "CUSTOM",
            "MetricsLevel": "TASK",
            "LogLevel": "INFO"
        },
        "ParallelismConfiguration": {
            "ConfigurationType": "DEFAULT"
        }
    },
    ...
}

With this configuration, the checkpointing and parallelism settings are left at their default. This enables checkpointing and auto scaling and sets the initial parallelism of the Kinesis Data Analytics for Java application to one. Moreover, the log level is increased to INFO and CloudWatch metrics are collected for every subtask of the application.

Build the Flink Kinesis Connector

When you are building a Flink application that reads data from a Kinesis data stream, you might notice that the Flink Kinesis Connector is not available from Maven central. You actually need to build it yourself. The following steps build the connector for any recent Apache Flink release. However, because Kinesis Data Analytics for Java Applications is based on Flink 1.6.2, you can use this specific version for now.

$ wget -qO- https://github.com/apache/flink/archive/release-1.6.2.zip | bsdtar -xf-

$ cd flink-release-1.6.2

$ mvn clean package -B -DskipTests -Dfast -Pinclude-kinesis -pl flink-connectors/flink-connector-kinesis

Note that the connector has already been built and stored on S3 by the AWS CloudFormation template. You can simply download the JAR file of the connector from there and put it in your local Maven repository using the following Maven command:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.6.2.jar -DpomFile flink-connector-kinesis_2.11-1.6.2.pom.xml

Integrate the Flink Elasticsearch sink with Amazon Elasticsearch Service

Beginning with the 1.6 release, Apache Flink comes with an Elasticsearch connector that supports the Elasticsearch APIs over HTTP. Therefore, it can natively talk to the endpoints that are provided by Amazon Elasticsearch Service.

You just need to decide how to authenticate requests against the public endpoint of the Elasticsearch cluster. You can whitelist individual IPs for access to the cluster. However, the recommended way of authenticating against the Amazon Elasticsearch Service endpoint is to add authentication information to AWS requests using IAM credentials and the Signature Version 4 signing process.

To extend the Flink Elasticsearch connector, which is not aware of the AWS specific signing process, you can use the open-source aws-signing-request-interceptor, which is available from Maven central. You just need to add an interceptor to the Elasticsearch sink that is called just before the request is sent to the Amazon Elasticsearch Service endpoint. The interceptor can then sign the request using the permission of the role that has been configured for the Kinesis Data Analytics for Java application.

final List<HttpHost> httpHosts = Arrays.asList(HttpHost.create("https://...")));

ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<T>() {
      ...
    }
);

final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, "eu-west-1", "es", clock);

esSinkBuilder.setRestClientFactory(
    restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(
        callback -> callback.addInterceptorLast(new AWSSigningRequestInterceptor(awsSigner))
    )
);

esSinkBuilder.build();

Note that the actual code in the GitHub repository is a bit more sophisticated because you need to obtain a serializable request interceptor. But the basic approach to sign requests remains the same.

Monitor and debug the Flink application

When running a Kinesis Data Analytics for Java application, you don’t get direct access to the cluster that runs Flink. This is because the underlying infrastructure is completely managed by the service. You merely interact with the service through an API. However, you can still obtain metrics and logging information through CloudWatch and CloudWatch Logs, respectively.

The Kinesis Data Analytics for Java application exposes a lot of operational metrics, ranging from metrics for the entire application down to metrics for individual processes of operators of the application. You can control which level of detail is adequate or required for your purposes. In fact, the metrics used in the previous section were all obtained through CloudWatch.

In addition to operational metrics, you can configure the Kinesis Data Analytics for Java application to write messages to CloudWatch Logs. This capability seamlessly integrates with common logging frameworks, such as Apache Log4j and the Simple Logging Facade for Java (SLF4J). So it is useful for debugging and identifying the cause of operational issues.

To enable logging for your Kinesis Data Analytics for Java application, just specify an existing CloudWatch log stream as a logging option when you start the application, as follows:

final Logger LOG = LoggerFactory.getLogger(...);

LOG.info("Starting to consume events from stream {}", flinkProperties.getProperty("InputStreamName"));

After the log messages are persisted into CloudWatch Logs, you can easily query and analyze them through CloudWatch Logs Insights

Conclusion

In this post, you not only built a reliable, scalable, and highly available streaming application based on Apache Flink and Kinesis Data Analytics for Java Applications. You also scaled the different components while ingesting and analyzing up to 25k events per second in near-real time. In large parts, this scenario was enabled by using managed services, so you didn’t need to spend time on provisioning and configuring the underlying infrastructure.

The sources of the application and the AWS CloudFormation template used in this post are available from GitHub for your reference. You can dive into all the details of the Flink application and the configuration of the underlying services. I’m curious to see what you will build when you can focus on analyzing data in a streaming fashion rather than spending time on managing and operating infrastructure.

 


About the Author

Steffen Hausmann is a specialist solutions architect with AWS.

 

 

 

 

Improve clinical trial outcomes by using AWS technologies

Post Syndicated from Mayank Thakkar original https://aws.amazon.com/blogs/big-data/improve-clinical-trial-outcomes-by-using-aws-technologies/

We are living in a golden age of innovation, where personalized medicine is making it possible to cure diseases that we never thought curable. Digital medicine is helping people with diseases get healthier, and we are constantly discovering how to use the body’s immune system to target and eradicate cancer cells. According to a report published by ClinicalTrials.gov, the number of registered studies hit 293,000 in 2018, representing a 250x growth since 2000.

However, an internal rate of return (IRR) analysis by Endpoints News, using data from EvaluatePharma, highlights some interesting trends. A flourishing trend in pharma innovation is supported by strong growth in registered studies. However, the IRR shows a rapidly declining trend, from around 17 percent in 2000 to below the cost of capital in 2017 and projected to go to 0 percent by 2020.

This blog post is the first installment in a series that focuses on the end-to-end workflow of collecting, storing, processing, visualizing, and acting on clinical trials data in a compliant and secure manner. The series also discusses the application of artificial intelligence and machine learning technologies to the world of clinical trials. In this post, we highlight common architectural patterns that AWS customers use to modernize their clinical trials. These incorporate mobile technologies for better evidence generation, cost reduction, increasing quality, improving access, and making medicine more personalized for patients.

Improving the outcomes of clinical trials and reducing costs

Biotech and pharma organizations are feeling the pressure to use resources as efficiently as possible. This pressure forces them to search for any opportunity to streamline processes, get faster, and stay more secure, all while decreasing costs. More and more life sciences companies are targeting biologics, CAR-T, and precision medicine therapeutics, with focus shifting towards smaller, geographically distributed patient segments. This shift has resulted in an increasing mandate to capture data from previously unavailable, nontraditional sources. These sources include mobile devices, IoT devices, and in-home and clinical devices. Life sciences companies merge data from these sources with data from traditional clinical trials to build robust evidence around the safety and efficacy of a drug.

Early last year, the Clinical Trials Transformation Initiative (CTTI) provided recommendations about using mobile technologies for capturing holistic, high quality, attributable, real-world data from patients and for submission to the U.S. Food and Drug Administration (FDA). By using mobile technologies, life sciences companies can reduce barriers to trial participation and lower costs associated with conducting clinical trials. Global regulatory groups such as the FDA, Health Canada, and Medicines and Healthcare products Regulatory Agency (MHRA), among others, are also in favor of using mobile technologies. Mobile technologies can make patient recruitment more efficient, reach endpoints faster, and reduce the cost and time required to conduct clinical trials.

Improvised data ingestion using mobile technologies can speed up outcomes, reduce costs, and improve the accuracy of clinical trials. This is especially true when mobile data ingestion is supplemented with artificial intelligence and machine learning (AI/ML) technologies.

Together, they can usher in a new age of smart clinical trials.

At the same time, traditional clinical trial processes and technology designed for mass-marketed blockbuster drugs can’t effectively meet emerging industry needs. This leaves life sciences and pharmaceutical companies in need of assistance for evolving their clinical trial operations. These circumstances result in making clinical trials one of the largest areas of investment for bringing a new drug to market.

Using mobile technologies with traditional technologies in clinical trials can improve the outcomes of the trials and simultaneously reduce costs. Some of the use cases that the integration of various technologies enables include these:

  • Identifying and tracking participants in clinical trials
    • Identifying participants for clinical trials recruitment
    • Educating and informing patients participating in clinical trials
    • Implementing standardized protocols and sharing associated information to trial participants
    • Tracking adverse events and safety profiles
  • Integrating genomic and phenotypic data for identifying novel biomarkers
  • Integrating mobile data into clinical trials for better clinical trial management
  • Creation of a patient-control arm based on historical data
  • Stratifying cohorts based on treatment, claims, and registry datasets
  • Building a collaborative, interoperable network for data sharing and knowledge creation
  • Building compliance-ready infrastructure for clinical trial management

The AWS Cloud provides HIPAA eligible services and solutions. As an AWS customer, you can use these to build solutions for global implementation of mobile devices and sensors in trials, secure capture of streaming Internet of Things (IoT) data, and advanced analytics through visualization tools or AI/ML capabilities. Some of the use cases these services and solutions enable are finding and recruiting patients using smart analytics, facilitating global data management, and remote or in-patient site monitoring. Others include predicting lack of adherence, detecting adverse events, and accelerating trial outcomes along with optimizing trial costs.

Clinical Trials 2.0 (CT2.0) at AWS is geared toward facilitating wider adoption of cloud-native services to enable data ingestion from disparate sources, cost-optimized and reliable storage, and holistic analytics. At the same time, CT2.0 provides the granular access control, end-to-end security, and global scalability needed to conduct clinical trials more efficiently.

Reference architecture

One of the typical architectures for managing a clinical trial using mobile technologies is shown following. This architecture focuses on capturing real-time data from mobile sources and providing a way to process it.

* – Additional considerations such as data security, access control, and compliance need to be incorporated into the architecture and are discussed in the remainder of this post.

Managing a trial by using this architecture consists of the following five major steps.

Step 1: Collect data

Mobile devices, personal wearables, instruments, and smart-devices are extensively being used (or being considered) by global pharmaceutical companies in patient care and clinical trials to provide data for activity tracking, vital signs monitoring, and so on, in real-time. Devices like infusion pumps, personal use dialysis machines, and so on require tracking and alerting of device consumables and calibration status. Remote settings management is also a major use case for these kinds of devices. The end-user mobile devices used in the clinical trial emit a lot of telemetry data that requires real-time data capture, data cleansing, transformation, and analysis.

Typically, these devices are connected to an edge node or a smart phone. Such a connection provides sufficient computing resources to stream data to AWS IoT Core. AWS IoT Core can then be configured to write data to Amazon Kinesis Data Firehose in near real time. Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3. S3 provides online, flexible, cost efficient, pay-as-you-go storage that can replicate your data on three Availability Zones within an AWS Region. The edge node or smart phone can use the AWS IoT SDKs to publish and subscribe device data to AWS IoT Core with MQTT. This process uses the AWS method of authentication (called ‘SigV4’), X.509 certificate–based authentication, and customer-created token-based authentication (through custom authorizers). This authenticated approach enables you to map your choice of policies to each certificate and remotely control device or application access. You can also use the Kinesis Data Firehose encryption feature to enable server-side data encryption.

You can also capture additional data such as Case Report Forms (CRF), Electronic Medical Records (EMR), and medical images using Picture Archiving and Communication Systems (PACS). In addition, you can capture laboratory data (Labs) and other Patient Reported Outcomes data (ePRO). AWS provides multiple tools and services to effectively and securely connect to these data sources, enabling you to ingest data in various volumes, variety, and velocities. For more information about creating a HealthCare Data Hub and ingesting Digital Imaging and Communications in Medicine (DICOM) data, see the AWS Big Data Blog post Create a Healthcare Data Hub with AWS and Mirth Connect.

Step 2: Store data

After data is ingested from the devices and wearables used in the clinical trial, Kinesis Data Firehose is used to store the data on Amazon S3. This stored data serves as a raw copy and can later be used for historical analysis and pattern prediction. Using Amazon S3’s lifecycle policies, you can periodically move your data to reduced cost storage such as Amazon S3 Glacier for further optimizing their storage costs. Using Amazon S3 Intelligent Tiering can automatically optimize costs when data access patterns change, without performance impact or operational overhead by moving data between two access tiers—frequent access and infrequent access. You can also choose to encrypt data at rest and in motion using various encryption options available on S3.

Amazon S3 offers an extremely durable, highly available, and infinitely scalable data storage infrastructure, simplifying most data processing, backup, and replication tasks.

Step 3: Data processingfast lane

After collecting and storing a raw copy of the data, Amazon S3 is configured to publish events to AWS Lambda and invoke a Lambda function by passing the event data as a parameter. The Lambda function is used to extract the key performance indicators (KPIs) such as adverse event notifications, medication adherence, and treatment schedule management from the incoming data. You can use Lambda to process these KPIs and store them in Amazon DynamoDB, along with encryption at rest, which powers a near-real-time clinical trial status dashboard. This alerts clinical trial coordinators in real time so that appropriate interventions can take place.

In addition to this, using a data warehouse full of medical records, you can train and implement a machine learning model. This model can predict which patients are about to switch medications or might exhibit adherence challenges in the future. Such prediction can enable clinical trial coordinators to narrow in on those patients with mitigation strategies.

Step 4: Data processing—batch

For historical analysis and pattern prediction, the staged data (stored in S3) is processed in batches. A Lambda function is used to trigger the extract, transform, and load (ETL) process every time new data is added to the raw data S3 bucket. This Lambda function triggers an ETL process using AWS Glue, a fully managed ETL service that makes it easy for you to prepare and load your data for analytics. This approach helps in mining current and historical data to derive actionable insights, which is stored on Amazon S3.

From there, data is loaded on to Amazon Redshift, a cost-effective, petabyte-scale data warehouse offering from AWS. You can also use Amazon Redshift Spectrum to extend data warehousing out to exabytes without loading any data to Amazon Redshift, as detailed in the Big Data blog post Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required. This enables you to provide an all-encompassing picture of the entire clinical trial to your clinical trial coordinators, enabling you to react and respond faster.

In addition to this, you can train and implement a machine learning model to identify patients who might be at risk for adherence challenges. This enables clinical trial coordinators to reinforce patient education and support.

Step 5: Visualize and act on data

After the data is processed and ready to be consumed, you can use Amazon QuickSight, a cloud-native business intelligence service from AWS that offers native Amazon Redshift connectivity. Amazon QuickSight is serverless and so can be rolled out to your audiences in hours. You can also use a host of third-party reporting tools, which can use AWS-supplied JDBC or ODBC drivers or open-source PostgreSQL drivers to connect with Amazon Redshift. These tools include TIBCO Spotfire Analytics, Tableau Server, Qlik Sense Enterprise, Looker, and others. Real-time data processing (step 3 preceding) combines with historical-view batch processing (step 4). Together, they empower contract research organizations (CROs), study managers, trial coordinators, and other entities involved in the clinical trial journey to make effective and informed decisions at a speed and frequency that was previously unavailable. Using Amazon QuickSight’s unique Pay-per-Session pricing model, you can optimize costs for your bursty usage models by paying only when users access the dashboards.

Using Amazon Simple Notification Service (Amazon SNS), real-time feedback based on incoming data and telemetry is sent to patients by using text messages, mobile push, and emails. In addition, study managers and coordinators can send Amazon SNS notifications to patients. Amazon SNS provides a fully managed pub/sub messaging for micro services, distributed systems, and serverless applications. It’s designed for high-throughput, push-based, many-to-many messaging. Alerts and notifications can be based on current data or a combination of current and historical data.

To encrypt messages published to Amazon SNS, you can follow the steps listed in the post Encrypting messages published to Amazon SNS with AWS KMS, on the AWS Compute Blog.   

Data security, data privacy, data integrity, and compliance considerations

At AWS, customer trust is our top priority. We deliver services to millions of active customers, including enterprises, educational institutions, and government agencies in over 190 countries. Our customers include financial services providers, healthcare providers, and governmental agencies, who trust us with some of their most sensitive information.

To facilitate this, along with the services mentioned earlier, you should also use AWS Identity and Access Management (IAM) service. IAM enables you to maintain segregation of access, fine-grained access control, and securing end user mobile and web applications. You can also use AWS Security Token Service (AWS STS) to provide secure, self-expiring, time-boxed, temporary security credentials to third-party administrators and service providers, greatly strengthening your security posture. You can use AWS CloudTrail to log IAM and STS API calls. Additionally, AWS IoT Device Management makes it easy to securely onboard, organize, monitor, and remotely manage IoT devices at scale.

With AWS, you can add an additional layer of security to your data at rest in the cloud. AWS provides scalable and efficient encryption features for services like Amazon EBS, Amazon S3, Amazon Redshift, Amazon SNSAWS Glue, and many more. Flexible key management options, including AWS Key Management Service, enable you to choose whether to have AWS manage the encryption keys or to keep complete control over their keys. In addition, AWS provides APIs for you to integrate encryption and data protection with any of the services that you develop or deploy in an AWS environment.

As a customer, you maintain ownership of your data, and select which AWS services can process, store, and host the content. Generally speaking, AWS doesn’t access or use customers’ content for any purpose without their consent. AWS never uses customer data to derive information for marketing or advertising.

When evaluating the security of a cloud solution, it’s important that you understand and distinguish between the security of the cloud and security in the cloud. The AWS Shared Responsibility Model details this relationship.

To assist you with your compliance efforts, AWS continues to add more services to the various compliance regulations, attestations, certifications, and programs across the world. To decide which services are suitable for you, see the services in scope page.

You can also use various services like, but not limited to, AWS CloudTrail, AWS Config, Amazon GuardDuty, and AWS Key Management Service (AWS KMS) to enhance your compliance and auditing efforts. Find more details in the AWS Compliance Solutions Guide.

Final thoughts

With the ever-growing interconnectivity and technological advances in the field of medical devices, mobile devices and sensors can improve numerous aspects of clinical trials. They can help in recruitment, informed consent, patient counseling, and patient communication management. They can also improve protocol and medication adherence, clinical endpoints measurement, and the process of alerting participants on adverse events. Smart sensors, smart mobile devices, and robust interconnecting systems can be central in conducting clinical trials.

Every biopharma organization conducting or sponsoring a clinical trial activity faces the conundrum of advancing their approach to trials while maintaining overall trial performance and data consistency. The AWS Cloud enables a new dimension for how data is collected, stored, and used for clinical trials. It thus addresses that conundrum as we march towards a new reality of how drugs are brought to market. The AWS Cloud abstracts away technical challenges such as scaling, security, and establishing a cost-efficient IT infrastructure. In doing so, it allows biopharma organizations to focus on their core mission of improving patent lives through the development of effective, groundbreaking treatments.

 


About the Author

Mayank Thakkar – Global Solutions Architect, AWS HealthCare and Life Sciences

 

 

 

 

Deven Atnoor, Ph.D. – Industry Specialist, AWS HealthCare and Life Sciences