Tag Archives: AWS Big Data

EMR Notebooks: A managed analytics environment based on Jupyter notebooks

Post Syndicated from Vignesh Rajamani original https://aws.amazon.com/blogs/big-data/emr-notebooks-a-managed-analytics-environment-based-on-jupyter-notebooks/

Notebooks are increasingly becoming the standard tool for interactively developing big data applications. It’s easy to see why. Their flexible architecture allows you to experiment with data in multiple languages, test code interactively, and visualize large datasets. To help scientists and developers easily access notebook tools, we launched Amazon EMR Notebooks, a managed notebook environment that is based on the popular open-source Jupyter notebook application. EMR Notebooks support Spark Magic kernels, which allows you to submit jobs remotely on your EMR cluster using languages like PySpark, Spark SQL, Spark R, and Scala. The kernels submit your Spark code through Apache Livy, which is a REST server for Spark running on your cluster.

EMR Notebooks is designed to make it easy for you to experiment and build applications with Apache Spark. In this blog post, I first cover some of the benefits that EMR Notebooks offers. Then I introduce you to some of its capabilities such as detaching and attaching a notebook to different EMR clusters, monitoring Spark activity from within the notebook, using tags to control user permissions, and setting up user-impersonation to track notebook users and their actions. To learn about creating and using EMR Notebooks, you can visit Using Amazon EMR Notebooks or follow along with the AWS Online Tech Talks webinar.

Benefits of EMR Notebooks

One of the useful features of EMR Notebooks is the separation of the notebook environment from your underlying cluster infrastructure. The separation makes it easy for you to execute notebook code against transient clusters without worrying about deploying or configuring your notebook infrastructure every time you bring up a new cluster. You can create multiple serverless notebooks from the AWS Management Console for EMR and access the notebook UI without spending time setting up SSH access or configuring your browser for port-forwarding. Each notebook you create is launched instantly with its own Spark context. This capability enables you to attach multiple notebooks to a single shared cluster and submit parallel jobs without fear of job conflicts in a multi-tenant environment. This way you make efficient use of your clusters.

You can also connect EMR Notebooks to an EMR cluster as small as a one node. This gives you a budget-friendly sandbox environment to develop your Spark application.

Finally, with EMR Notebooks, you don’t have to spend time to manually configure your notebook to store files persistently. Your notebook files are saved automatically to a chosen Amazon S3 bucket periodically so you don’t have to worry about losing your work if your cluster is shut down. You can retrieve your saved notebooks from the console or download it locally from your S3 bucket in Jupyter “ipynb” format.

Detaching and attaching EMR Notebooks to different clusters

With EMR Notebooks, you can detach an active notebook from a cluster and attach it to a different cluster and promptly resume your work. This capability can be useful in scenarios where you want to move your notebook from a sandbox development cluster to a production environment or attach to a different cluster with appropriate CPU or memory resources and library packages required to execute your notebook against large datasets. To detach an active notebook:

First select the notebook name and then choose Stop.

Wait for the notebook status shown next to the notebook name to change from Ready to Stopped and then choose Change Cluster.

After you stop the notebook, you can choose to attach it to a different cluster in the same VPC or create a new cluster. EMR Notebooks automatically attaches the notebook to the cluster and re-starts the notebook.

Monitoring and debugging Spark jobs

EMR Notebooks supports a built-in Jupyter notebook widget called SparkMonitor that allows you to monitor the status of all your Spark jobs launched from the notebook without connecting to the Spark web UI server.

A widget appears and automatically integrates within the cell structure of your notebook and displays detailed status about the job submitted from each cell of your notebook, providing you with real-time progress of the different stages of the job. For any failed jobs, this widget also offers embedded links to the container logs in Amazon S3, allowing you to get to the relevant logs and debug your jobs.

Additionally, if you have configured your cluster to accept SSH connections, then you can access the Spark application web UI and Hadoop jobs history server from within the notebook. This allows you to view the event timelines, visualize Directed Acyclic Graphs (DAG) of each job, and view the detailed system and runtime information to inspect and debug your code. These web UIs are available automatically the first time you start to run your Spark code from a notebook.

Writing tag-based policies to control user permissions for notebooks and users

EMR Notebooks are by default shared resources that anyone from your organization with access to your AWS account can open, edit, or even delete. If you want more control over your notebook, you can use tags to label your notebook and write IAM policies that control access for other users. To get you started, when you create a notebook, a default tag with a key string, creatorUserId, is set to the value of the IAM User ID of the user who created the notebook.

You can use this tag to limit allowed actions on the notebook to only the creator of the notebook. For example, the permissions policy statement below, when attached to a role or user, enables the IAM user to view, start, stop, edit, or delete only those notebooks that they have created. This policy statement uses the default tag that is applied by EMR Notebooks when you create the notebook.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:DescribeEditor",
                "elasticmapreduce:StartEditor",
                "elasticmapreduce:StopEditor",
                "elasticmapreduce:DeleteEditor",
                "elasticmapreduce:OpenEditorInConsole"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/creatorUserId": "${aws:userId}"
                }
            }
        }
    ]

You can write policies that enforce the creation of tags before starting a notebook. For example, the policy below requires that the user not change or delete the creatorUserID tag that is added by default. The variable ${aws:userId}, specifies the currently active user’s User ID, which is the default value of the tag.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:CreateEditor"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:RequestTag/creatorUserId": "${aws:userid}"
                }
            }
        }
    ]
}

You can use the notebook tags along with EMR cluster tags to control notebook user access to your cluster. Tagging your notebook and clusters, in addition to securing your resources, also allows you to categorize, track, and allocate your EMR cluster costs across your different line of businesses. For example, the policy below allows a user to create a notebook only if the notebook has a tag with a key string “department” with its value set to “Analytics” and only if the notebook is attached to the EMR cluster that has a tag with the key string “cost-center” and value set to “12345.”

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:editor/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/department": [
                        "Analytics"
                    ]
                }
            }
        },
        {
            "Action": [
                "elasticmapreduce:StartEditor"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:elasticmapreduce:*:123456789012:cluster/*",
            "Condition": {
                "StringEquals": {
                    "elasticmapreduce:ResourceTag/cost-center": [
                        "12345"
                    ]
                }
            }
        }
    ]
}

You can learn more about notebook and cluster tags by visiting Using Tags to Control User Permissions and Tag Clusters. Also, visit Using Cost Allocation Tags to learn more using tags to generate cost allocation report in the AWS Billing and Cost Management Console.

Tracking notebook users by enforcing user-impersonation

EMR Notebooks enables multiple users to execute their notebooks’ code concurrently in a shared EMR cluster, improving cluster utilization. By default, all Spark jobs spawned by these different users from their notebook run as the same user (the livy user) on your EMR cluster. If your corporate policy requires you to set up an audit trail and track individual notebook user actions on shared clusters, you can use the user-impersonation feature of EMR Notebooks. This capability allows you to discriminate and audit all notebook users by associating the jobs they executed from their notebook with the user’s IAM identity. To use this feature, you should enable Livy user-impersonation by configuring the core-site and livy-conf configuration classifications when you create and launch an EMR cluster as follows:

[
    {
        "Classification": "core-site",
        "Properties": {
          "hadoop.proxyuser.livy.groups": "*",
          "hadoop.proxyuser.livy.hosts": "*"
        }
    },
    {
        "Classification": "livy-conf",
        "Properties": {
          "livy.impersonation.enabled": "true"
        }
    }
]

Visit Configuring Applications to learn more about configuring application. After this feature is enabled, EMR Notebooks creates HDFS user directories on the master node for each user identity. This means all the Spark jobs from the notebook run as the IAM user instead of the indistinct user livy. For example, if user NB_User1 runs code from the notebook editor, then a user directory named user_NB_User1 is created on the master node and all Spark jobs run as user_NB_User1. You can then use a service like AWS CloudTrail to audit the record of actions by the user NbUser1 by creating a trail. To learn more about setting up audit trails, see logging Amazon EMR API calls in AWS CloudTrail.

Conclusion

In this post, I highlighted some of the capabilities of EMR Notebooks such as the ability to change clusters, monitor Spark jobs from each notebook cell, control user permission, and categorize resource costs. You can do this by using notebook and cluster tags and setting up user-impersonation to track notebook user actions.

Oh by the way, there is no additional charge for using EMR Notebooks and you only pay for the use of the EMR cluster as-usual!

If you have questions or suggestions, feel free to leave a comment.

 


About the Authors

Vignesh Rajamani is a senior product manager for EMR at AWS.

 

 

 

 

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 

 

Test data quality at scale with Deequ

Post Syndicated from Dustin Lange original https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/

You generally write unit tests for your code, but do you also test your data? Incorrect or malformed data can have a large impact on production systems. Examples of data quality issues are:

  • Missing values can lead to failures in production system that require non-null values (NullPointerException).
  • Changes in the distribution of data can lead to unexpected outputs of machine learning models.
  • Aggregations of incorrect data can lead to wrong business decisions.

In this blog post, we introduce Deequ, an open source tool developed and used at Amazon. Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (think billions of rows) that typically live in a distributed filesystem or a data warehouse.

Deequ at Amazon

Deequ is being used internally at Amazon for verifying the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues do not propagate to consumer data pipelines, reducing their blast radius.

Overview of Deequ

To use Deequ, let’s look at its main components (also shown in Figure 1).

  • Metrics Computation — Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon S3, and to compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
  • Constraint Verification — As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
  • Constraint Suggestion — You can choose to define your own custom data quality constraints, or use the automated constraint suggestion methods that profile the data to infer useful constraints.

Figure 1: Overview of Deequ components

Setup: Launch the Spark cluster

This section shows the steps to use Deequ on your own data. First, set up Spark and Deequ on an Amazon EMR cluster. Then, load a sample dataset provided by AWS, run some analysis, and then run data tests.

Deequ is built on top of Apache Spark to support fast, distributed calculations on large datasets. Deequ depends on Spark version 2.2.0 or later. As a first step, create a cluster with Spark on Amazon EMR. Amazon EMR takes care of the configuration of Spark for you. Also, you canuse the EMR File System (EMRFS) to directly access data in Amazon S3. For testing, you can also install Spark on a single machine in standalone mode.

Connect to the Amazon EMR master node using SSH. Load the latest Deequ JAR from Maven Repository. To load the JAR of version 1.0.1, use the following:

wget http://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.1/deequ-1.0.1.jar

Launch Spark Shell and use the spark.jars argument for referencing the Deequ JAR file:

spark-shell --conf spark.jars=deequ-1.0.1.jar

For more information about how to set up Spark, see the Spark Quick Start guide, and the overview of Spark configuration options.

Load data

As a running example, we use a customer review dataset provided by Amazon on Amazon S3. Let’s load the dataset containing reviews for the category “Electronics” in Spark. Make sure to enter the code in the Spark shell:

val dataset = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/")

You can see the following selected attributes if you run dataset.printSchema() in the Spark shell:

root
|-- marketplace: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: integer (nullable = true)
|-- helpful_votes: integer (nullable = true)
|-- total_votes: integer (nullable = true)
|-- vine: string (nullable = true)
|-- year: integer (nullable = true)

Data analysis

Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. Deequ supports the following metrics (they are defined in this Deequ package):

Metric

Description

Usage Example

ApproxCountDistinctApproximate number of distinct value, computed with HyperLogLogPlusPlus sketches.ApproxCountDistinct("review_id")
ApproxQuantileApproximate quantile of a distribution.ApproxQuantile("star_rating", quantile = 0.5)
ApproxQuantilesApproximate quantiles of a distribution.ApproxQuantiles("star_rating", quantiles = Seq(0.1, 0.5, 0.9))
CompletenessFraction of non-null values in a column.Completeness("review_id")
ComplianceFraction of rows that comply with the given column constraint.Compliance("top star_rating", "star_rating >= 4.0")
CorrelationPearson correlation coefficient, measures the linear correlation between two columns. The result is in the range [-1, 1], where 1 means positive linear correlation, -1 means negative linear correlation, and 0 means no correlation.Correlation("total_votes", "star_rating")
CountDistinctNumber of distinct values.CountDistinct("review_id")
DataTypeDistribution of data types such as Boolean, Fractional, Integral, and String. The resulting histogram allows filtering by relative or absolute fractions.DataType("year")
DistinctnessFraction of distinct values of a column over the number of all values of a column. Distinct values occur at least once. Example: [a, a, b] contains two distinct values a and b, so distinctness is 2/3.Distinctness("review_id")
EntropyEntropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). Example: [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055.Entropy("star_rating")
MaximumMaximum value.Maximum("star_rating")
MeanMean value; null values are excluded.Mean("star_rating")
MinimumMinimum value.Minimum("star_rating")
MutualInformationMutual information describes how much information about one column (one random variable) can be inferred from another column (another random variable). If the two columns are independent, mutual information is zero. If one column is a function of the other column, mutual information is the entropy of the column. Mutual information is symmetric and nonnegative.MutualInformation(Seq("total_votes", "star_rating"))
PatternMatchFraction of rows that comply with a given regular experssion.PatternMatch("marketplace", pattern = raw"\w{2}".r)
SizeNumber of rows in a DataFrame.Size()
SumSum of all values of a column.Sum("total_votes")
UniqueValueRatioFraction of unique values over the number of all distinct values of a column. Unique values occur exactly once; distinct values occur at least once. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2.UniqueValueRatio("star_rating")
UniquenessFraction of unique values over the number of all values of a column. Unique values occur exactly once. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3.Uniqueness("star_rating")

In the following example, we show how to use the AnalysisRunner to define the metrics you are interested in. You can run the following code in the Spark shell by either just pasting it in the shell or by saving it in a local file on the master node and loading it in the Spark shell with the following command:

:load PATH_TO_FILE

import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}

val analysisResult: AnalyzerContext = { AnalysisRunner
  // data to run the analysis on
  .onData(dataset)
  // define analyzers that compute metrics
  .addAnalyzer(Size())
  .addAnalyzer(Completeness("review_id"))
  .addAnalyzer(ApproxCountDistinct("review_id"))
  .addAnalyzer(Mean("star_rating"))
  .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0"))
  .addAnalyzer(Correlation("total_votes", "star_rating"))
  .addAnalyzer(Correlation("total_votes", "helpful_votes"))
  // compute metrics
  .run()
}

// retrieve successfully computed metrics as a Spark data frame
val metrics = successMetricsAsDataFrame(spark, analysisResult)

The resulting data frame contains the calculated metrics (call metrics.show() in the Spark shell):

nameinstancevalue
ApproxCountDistinctreview_id3010972
Completenessreview_id1
Compliancetop star_rating0.74941
Correlationhelpful_votes,total_votes0.99365
Correlationtotal_votes,star_rating-0.03451
Meanstar_rating4.03614
Size*3120938

We can learn that:

  • review_id has no missing values and approximately 3,010,972 unique values.
  • 74.9 % of reviews have a star_rating of 4 or higher.
  • total_votes and star_rating are not correlated.
  • helpful_votes and total_votes are strongly correlated.
  • The average star_rating is 4.0.
  • The dataset contains 3,120,938 reviews.

Define and Run Tests for Data

After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.

For writing tests on data, we start with the VerificationSuite and add Checks on attributes of the data. In this example, we test for the following properties of our data:

  • There are at least 3 million rows in total.
  • review_id is never NULL.
  • review_id is unique.
  • star_rating has a minimum of 1.0 and a maximum of 5.0.
  • marketplace only contains “US”, “UK”, “DE”, “JP”, or “FR”.
  • year does not contain negative values.

This is the code that reflects the previous statements. For information about all available checks, see this GitHub repository. You can run this directly in the Spark shell as previously explained:

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}

val verificationResult: VerificationResult = { VerificationSuite()
  // data to run the verification on
  .onData(dataset)
  // define a data quality check
  .addCheck(
    Check(CheckLevel.Error, "Review Check") 
      .hasSize(_ >= 3000000) // at least 3 million rows
      .hasMin("star_rating", _ == 1.0) // min is 1.0
      .hasMax("star_rating", _ == 5.0) // max is 5.0
      .isComplete("review_id") // should never be NULL
      .isUnique("review_id") // should not contain duplicates
      .isComplete("marketplace") // should never be NULL
      // contains only the listed values
      .isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
      .isNonNegative("year")) // should not contain negative values
  // compute metrics and verify check conditions
  .run()
}

// convert check results to a Spark data frame
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)

After calling run, Deequ translates your test description into a series of Spark jobs, which are executed to compute metrics on the data. Afterwards, it invokes your assertion functions (e.g., _ == 1.0 for the minimum star-rating check) on these metrics to see if the constraints hold on the data.

Call resultDataFrame.show(truncate=false) in the Spark shell to inspect the result. The resulting table shows the verification result for every test, for example:

constraintconstraint_statusconstraint_message
SizeConstraint(Size(None))Success
MinimumConstraint(Minimum(star_rating,None))Success
MaximumConstraint(Maximum(star_rating,None))Success
CompletenessConstraint(Completeness(review_id,None))Success
UniquenessConstraint(Uniqueness(List(review_id)))FailureValue: 0.9926566948782706 does not meet the constraint requirement!
CompletenessConstraint(Completeness(marketplace,None))Success
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None))Success
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None))Success

Interestingly, the review_id column is not unique, which resulted in a failure of the check on uniqueness.

We can also look at all the metrics that Deequ computed for this check:

VerificationResult.successMetricsAsDataFrame(spark, verificationResult).show(truncate=False)

Result:

nameinstancevalue
Completenessreview_id1
Completenessmarketplace1
Compliancemarketplace contained in US,UK,DE,JP,FR1
Complianceyear is non-negative1
Maximumstar_rating5
Minimumstar_rating1
Size*3120938
Uniquenessreview_id0.99266

Automated Constraint Suggestion

If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see this GitHub repository.

import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method

// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
  // data to suggest constraints for
  .onData(dataset)
  // default set of rules for constraint suggestion
  .addConstraintRules(Rules.DEFAULT)
  // run data profiling and constraint suggestion
  .run()
}

// We can now investigate the constraints that Deequ suggested. 
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap { 
  case (column, suggestions) => 
    suggestions.map { constraint =>
      (column, constraint.description, constraint.codeForConstraint)
    } 
}.toSeq.toDS()

The result contains a list of constraints with descriptions and Scala code, so that you can directly apply it in your data quality checks. Call suggestionDataFrame.show(truncate=false) in the Spark shell to inspect the suggested constraints; here we show a subset:

columnconstraintscala code
customer_id‘customer_id’ is not null.isComplete("customer_id")
customer_id‘customer_id’ has type Integral.hasDataType("customer_id", ConstrainableDataTypes.Integral)
customer_id‘customer_id’ has no negative values.isNonNegative("customer_id")
helpful_votes‘helpful_votes’ is not null.isComplete("helpful_votes")
helpful_votes‘helpful_votes’ has no negative values.isNonNegative("helpful_votes")
marketplace‘marketplace’ has value range ‘US’, ‘UK’, ‘DE’, ‘JP’, ‘FR’.isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
product_title‘product_title’ is not null.isComplete("product_title")
star_rating‘star_rating’ is not null.isComplete("star_rating")
star_rating‘star_rating’ has no negative values.isNonNegative("star_rating")
vine‘vine’ has value range ‘N’, ‘Y’.isContainedIn("vine", Array("N", "Y"))

Note that the constraint suggestion is based on heuristic rules and assumes that the data it is shown is correct, which might not be the case. We recommend to review the suggestions before applying them in production.

More Examples on GitHub

You can find examples of more advanced features at Deequ’s GitHub page:

  • Deequ not only provides data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
  • Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
  • If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation capability. For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.

Additional Resources

Learn more about the inner workings of Deequ in our VLDB 2018 paper “Automating large-scale data quality verification.

Conclusion

This blog post showed you how to use Deequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. Deequ is available for you now to build your own data quality management pipeline.

 


About the Authors

Dustin Lange is an Applied Science Manager at Amazon Search in Berlin. Dustin’s team develops algorithms for improving the search customer experience through machine learning and data quality tracking. He completed his PhD in similarity search in databases in 2013 and started his Amazon career as an Applied Scientist in forecasting the same year.

 

 

Sebastian Schelter is a Senior Applied Scientist at Amazon Search, working on problems at the intersection of data management and machine learning. He holds a Ph.D. in large-scale data processing from TU Berlin and is an elected member of the Apache Software Foundation, where he currently serves as a mentor for the Apache Incubator.

 

 

Philipp Schmidt is an ML Engineer at Amazon Search. After his graduation from TU Berlin he worked at the University of Potsdam and in several startups in Berlin. At Amazon he is working on enabling data quality tracking for large scale datasets and refining the customer shopping experience through machine learning.

 

 

Tammo Rukat is an Applied Scientist at Amazon Search in Berlin. He holds a PhD in statistical machine learning from the University of Oxford. At Amazon he makes use of the abundance and complexity of the company’s large-scale noisy datasets to contribute to a more intelligent customer experience.

 

 

 

 

Optimize Amazon EMR costs with idle checks and automatic resource termination using advanced Amazon CloudWatch metrics and AWS Lambda

Post Syndicated from Praveen Krishnamoorthy Ravikumar original https://aws.amazon.com/blogs/big-data/optimize-amazon-emr-costs-with-idle-checks-and-automatic-resource-termination-using-advanced-amazon-cloudwatch-metrics-and-aws-lambda/

Many customers use Amazon EMR to run big data workloads, such as Apache Spark and Apache Hive queries, in their development environment. Data analysts and data scientists frequently use these types of clusters, known as analytics EMR clusters. Users often forget to terminate the clusters after their work is done. This leads to idle running of the clusters and in turn, adds up unnecessary costs.

To avoid this overhead, you must track the idleness of the EMR cluster and terminate it if it is running idle for long hours. There is the Amazon EMR native IsIdle Amazon CloudWatch metric, which determines the idleness of the cluster by checking whether there’s a YARN job running. However, you should consider additional metrics, such as SSH users connected or Presto jobs running, to determine whether the cluster is idle. Also, when you execute any Spark jobs in Apache Zeppelin, the IsIdle metric remains active (1) for long hours, even after the job is finished executing. In such cases, the IsIdle metric is not ideal in deciding the inactivity of a cluster.

In this blog post, we propose a solution to cut down this overhead cost. We implemented a bash script to be installed in the master node of the EMR cluster, and the script is scheduled to run every 5 minutes. The script monitors the clusters and sends a CUSTOM metric EMR-INUSE (0=inactive; 1=active) to CloudWatch every 5 minutes. If CloudWatch receives 0 (inactive) for some predefined set of data points, it triggers an alarm, which in turn executes an AWS Lambda function that terminates the cluster.

Prerequisites

You must have the following before you can create and deploy this framework:

Note: This solution is designed as an additional feature. It can be applied to any existing EMR clusters by executing the scheduler script (explained later in the post) as an EMR step. If you want to implement this solution as a mandatory feature for your future clusters, you can include the EMR step as part of your cluster deployment. You can also apply this solution to EMR clusters that are spun up through AWS CloudFormation, the AWS CLI, and even the AWS Management Console.

Key components

The following are the key components of the solution.

Analytics EMR cluster

Amazon EMR provides a managed Apache Hadoop framework that lets you easily process large amounts of data across dynamically scalable Amazon EC2 instances. Data scientists use analytics EMR clusters for data analysis, machine learning using notebook applications (such as Apache Zeppelin or JupyterHub), and running big data workloads based on Apache Spark, Presto, etc.

Scheduler script

The schedule_script.sh is the shell script to be executed as an Amazon EMR step. When executed, it copies the monitoring script from the Amazon S3 artifacts folder and schedules the monitoring script to run every 5 minutes. The S3 location of the monitoring script should be passed as an argument.

Monitoring script

The pushShutDownMetrin.sh script is a monitoring script that is implemented using shell commands. It should be installed in the master node of the EMR cluster as an Amazon EMR step. The script is scheduled to run every 5 minutes and sends the cluster activity status to CloudWatch.  

JupyterHub API token script

The jupyterhub_addAdminToken.sh script is a shell script to be executed as an Amazon EMR step if JupyterHub is enabled on the cluster. In our design, the monitoring script uses REST APIs provided by JupyterHub to check whether the application is in use.

To send the request to JupyterHub, you must pass an API token along with the request. By default, the application does not generate API tokens. This script generates the API token and assigns it to the admin user, which is then picked up by the jupyterhub module in the monitoring script to track the activity of the application.

Custom CloudWatch metric

All Amazon EMR clusters send data for several metrics to CloudWatch. Metrics are updated every 5 minutes, automatically collected, and pushed to CloudWatch. For this use case, we created the Amazon EMR metric EMR-INUSE. This metric represents the active status of the cluster based on the module checks implemented in the monitoring script. The metric is set to 0 when the cluster is inactive and 1 when active.

Amazon CloudWatch

CloudWatch is a monitoring service that you can use to set high-resolution alarms to take automated actions. In this case, CloudWatch triggers an alarm if it receives 0 continuously for the configured number of hours.

AWS Lambda

Lambda is a serverless technology that lets you run code without provisioning or managing servers. With Lambda, you can run code for virtually any type of application or backend service—all with zero administration. You can set up your code to automatically trigger from other AWS services. In this case, the triggered CloudWatch alarm mentioned earlier signals Lambda to terminate the cluster.

Architectural diagram

The following diagram illustrates the sequence of events when the solution is enabled, showing what happens to the EMR cluster that is spun up via AWS CloudFormation.

 

The diagram shows the following steps:

  1. The AWS CloudFormation stack is launched to spin up an EMR cluster.
  2. The Amazon EMR step is executed (installs the pushShutDownMetric.sh and then schedules it as a cron job to run every 5 minutes).
  3. If the EMR cluster is active (executing jobs), the master node sets the EMR-INUSE metric to 1 and sends it to CloudWatch.
  4. If the EMR cluster is inactive, the master node sets the EMR-INUSE metric to 0 and sends it to CloudWatch.
  5. On receiving 0 for a predefined number of data points, CloudWatch triggers a CloudWatch alarm.
  6. The CloudWatch alarm sends notification to AWS Lambda to terminate the cluster.
  7. AWS Lambda executes the Lambda function.
  8. The Lambda function then deletes all the stack resources associated with the cluster.
  9. Finally, the EMR cluster is terminated, and the Stack ID is removed from AWS CloudFormation.

Modules in the monitoring script

Following are the different activity checks that are implemented in the monitoring script (pushShutDownMetric.sh). The script is designed in a modular fashion so that you can easily include new modules without modifying the core functionality.

ActiveSSHCheck

The ActiveSSHCheck module checks whether there are any active SSH connections to the master node. If there is an active SSH connection, and it’s idle for less than 10 minutes, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch.

YARNCheck

Apache Hadoop YARN is the resource manager of the EMR Hadoop ecosystem. All the Spark Submits and Hive queries reach YARN initially. It then schedules and processes these jobs. The YARNCheck module checks whether there are any running jobs in YARN or jobs completed within last 5 minutes. If it finds any, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch.The checks are performed by calling REST APIs exposed by YARN.

The API to fetch the running jobs is http://localhost:8088/ws/v1/cluster/apps?state=RUNNING.

The API to fetch the completed jobs is

http://localhost:8088/ws/v1/cluster/apps?state=FINISHED.

PRESTOCheck

Presto is an open-source distributed query engine for running interactive analytic queries. It is included in EMR release version 5.0.0 and later.The PRESTOCheck module checks whether there are any running Presto queries or if any queries have been completed within last 5 minutes. If there are some, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. These checks are performed by calling REST APIs exposed by the Presto server.

The API to fetch the Presto jobs is http://localhost:8889/v1/query.

ZeppelinCheck

Amazon EMR users use Apache Zeppelin as a notebook for interactive data exploration. The ZeppelinCheck module checks whether there are any jobs running or if any have been completed within the last 5 minutes. If so, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. These checks are performed by calling the REST APIs exposed by Zeppelin.

The API to fetch the list of notebook IDs is http://localhost:8890/api/notebook.

The API to fetch the status of each cell inside each notebook ID is http://localhost:8890/api/notebook/job/$notebookID.

JupyterHubCheck

Jupyter Notebook is an open-source web application that you can use to create and share documents that contain live code, equations, visualizations, and narrative text. JupyterHub allows you to host multiple instances of a single-user Jupyter notebook server.The JupyterHubCheck module checks whether any Jupyter notebook is currently in use.

The function uses REST APIs exposed by JupyterHub to fetch the list of Jupyter notebook users and gathers the data about individual notebook servers. From the response, it extracts the last activity time of the servers and checks whether any server was used in the last 5 minutes. If so, the function sets the EMR-INUSE metric to 1 and pushes it to CloudWatch. The jupyterhub_addAdminToken.sh script needs to be executed as an EMR step before enabling the scheduler script.

The API to fetch the list of notebook users is https://localhost:9443/hub/api/users -H "Authorization: token $admin_token".

The API to fetch individual server information is https://localhost:9443/hub/api/users/$user -H "Authorization: token $admin_token.

If any one of these checks fails, the cluster is considered to be inactive, and the monitoring script sets the EMR-INUSE metric to 0 and pushes it to CloudWatch.

Note:

The scheduler script schedules the monitoring script (pushShutDownMetric.sh) to run every 5 minutes. Internal cron jobs that run for a very few minutes are not considered in calibrating the EMR-INUSE metric.

Deploying each component

Follow the steps in this section to deploy each component of the proposed design.

Step 1. Create the Lambda function and SNS subscription

The Lambda function and the SNS subscription are the core components of the design. You must set up these components initially, and they are common for every cluster. The following are the AWS resources to be created for these components:

  • Execution role for the Lambda function
  • Terminate Idle EMR Lambda function
  • SNS topic and Lambda subscription

For one-step deployment, use this AWS CloudFormation template to launch and configure the resources in a single go.

The following parameters are available in the template.

ParameterDefaultDescription
s3Bucketemr-shutdown-blogartifactsThe name of the S3 bucket that contains the Lambda file
s3KeyEMRTerminate.zipThe Amazon S3 key of the Lambda file

For manual deployment, follow these steps on the AWS Management Console.

Execution role for the Lambda function

  1. Open the AWS Identity and Access Management (IAM) consoleand choose PoliciesCreate policy.
  2. Choose the JSON tab, paste the following policy text, and then choose Review policy.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListAllMyBuckets",
                "s3:HeadBucket",
                "s3:ListObjects",
                "s3:GetObject",
                "cloudformation:ListStacks",
                "cloudformation:DeleteStack",
                "cloudformation:DescribeStacks",
                "cloudformation:ListStackResources",
                "elasticmapreduce:TerminateJobFlows"
            ],
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "GenericAccess"
        },
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*",
            "Effect": "Allow",
            "Sid": "LogAccess"
        }
    ]
}
  1. For Name, enter TerminateEMRPolicy and choose Create policy.
  2. Choose RolesCreate role.
  3. Under Choose the service that will use this role, choose Lambda, and then choose Next: Permissions.
  4. For Attach permissions policies, choose the arrow next to Filter policies and choose Customer managed in the drop-down list.
  5. Attach the TerminateEMRPolicy policy that you just created, and choose Review.
  6. For Role name, enter TerminateEMRLambdaRole and then choose Create role.

Terminate idle EMR – Lambda function

I created a deployment package to use with this function.

  1. Open the Lambda consoleand choose Create function.
  2. Choose Author from scratch, and provide the details as shown in the following screenshot:
  • Name: lambdaTerminateEMR
  • Runtime: Python 2.7
  • Role: Choose an existing role
  • Existing role: TerminateEMRLambdaRole

  1. Choose Create function.
  2. In the Function code section, for Code entry type, choose Upload a file from Amazon S3, and for Runtime, choose Python 2.7.

The Lambda function S3 link URL is

s3://emr-shutdown-blogartifacts/EMRTerminate.zip.

Link to the function: https://s3.amazonaws.com/emr-shutdown-blogartifacts/EMRTerminate.zip

This Lambda function is triggered by a CloudWatch alarm. It parses the input event, retrieves the JobFlowId, and deletes the AWS CloudFormation stack of the corresponding JobFlowId.

SNS topic and Lambda subscription

For setting the CloudWatch alarm in the further stages, you must create an Amazon SNS topic that notifies the preceding Lambda function to execute. Follow these steps to create an SNS topic and configure the Lambda endpoint.

  1. Navigate to the Amazon SNS console, and choose Create topic.
  2. Enter the Topic name and Display name, and choose Create topic.

  1. The topic is created and displayed in the Topics
  2. Select the topic and choose Actions, Subscribe to topic.

  1. In the Create subscription, choose the AWS Lambda Choose lambdaterminateEMR as the endpoint, and choose Create subscription.

Step 2. Execute the JupyterHub API token script as an EMR step

This step is required only when JupyterHub is enabled in the cluster.

Navigate to the EMR cluster to be monitored, and execute the scheduler script as an EMR step.

Command: s3://emr-shutdown-blogartifacts/jupyterhub_addAdminToken.sh

This script generates an API token and assigns it to the admin user. It is then picked up by the jupyterhub module in the monitoring script to track the activity of the application.

Step 3. Execute the scheduler script as an EMR step

Navigate to the EMR cluster to be monitored and execute the scheduler script as an EMR step.

Note:

Ensure that termination protection is disabled in the cluster. The termination protection flag causes the Lambda function to fail.

Command: s3://emr-shutdown-blogartifacts/schedule_script.sh

Parameter: s3://emr-shutdown-blogartifacts/pushShutDownMetrin.sh

The step function copies the pushShutDownMetric.sh script to the master node and schedules it to run every 5 minutes.

The schedule_script.sh is at https://s3.amazonaws.com/emr-shutdown-blogartifacts/schedule_script.sh.

The pushShutDownMetrin.sh is at https://s3.amazonaws.com/emr-shutdown-blogartifacts/pushShutDownMetrin.sh.

Step 4. Create a CloudWatch alarm

For single-step deployment, use this AWS CloudFormation template to launch and configure the resources in a single go.

The following parameters are available in the template.

ParameterDefaultDescription
AlarmNameTerminateIDLE-EMRAlarmThe name for the alarm.
EMRJobFlowIDRequires inputThe Jobflowid of the cluster.
EvaluationPeriodRequires inputThe idle timeout value—input should be in data points (1 data point equals 5 minutes). For example, to terminate the cluster if it is idle for 20 minutes, the input should be 4.
SNSSubscribeTopicRequires inputThe Amazon Resource Name (ARN) of the SNS topic to be triggered on the alarm.

 

The AWS CloudFormation CLI command is as follows:

aws cloudformation create-stack --stack-name EMRAlarmStack \
      --template-body s3://emr-shutdown-blogartifacts/Cloudformation/alarm.json \
      --parameters AlarmName=TerminateIDLE-EMRAlarm,EMRJobFlowID=<Input>,                 EvaluationPeriod=4,SNSSubscribeTopic=<Input>

For manual deployment, follow these steps to create the alarm.

  1. Open the Amazon CloudWatch console and choose Alarms.
  2. Choose Create Alarm.
  3. On the Select Metric page, under Custom Metrics, choose EMRShutdown/Cluster-Metric.

  1. Choose the isEMRUsed metric of the EMR JobFlowId, and then choose Next.

  1. Define the alarm as required. In this case, the alarm is set to send notification to the SNS topic shutDownEMRTest when CloudWatch receives the IsEMRUsed metric as 0 for every data point in the last 2 hours.

  1. Choose Create Alarm.

Summary

In this post, we focused on building a framework to cut down the additional cost that you might incur due to the idle running of an EMR cluster. The modules implemented in the shell script, the tracking of the execution status of the Spark scripts, and the Hive/Presto queries using the lightweight REST API calls make this approach an efficient solution.

If you have questions or suggestions, please comment below.

 


About the Author

Praveen Krishnamoorthy Ravikumar is an associate big data consultant with Amazon Web Services.

 

 

 

 

Query your Amazon Redshift cluster with the new Query Editor

Post Syndicated from Surbhi Dangi original https://aws.amazon.com/blogs/big-data/query-your-amazon-redshift-cluster-with-the-new-query-editor/

Data warehousing is a critical component for analyzing and extracting actionable insights from your data. Amazon Redshift is a fast, scalable data warehouse that makes it cost-effective to analyze all of your data across your data warehouse and data lake.

The Amazon Redshift console recently launched the Query Editor. The Query Editor is an in-browser interface for running SQL queries on Amazon Redshift clusters directly from the AWS Management Console. Using the Query Editor is the most efficient way to run queries on databases hosted by your Amazon Redshift cluster.

After creating your cluster, you can use the Query Editor immediately to run queries on the Amazon Redshift console. It’s a great alternative to connecting to your database with external JDBC/ODBC clients.

In this post, we show how you can run SQL queries for loading data in clusters and monitoring cluster performance directly from the console.

Using the Query Editor instead of your SQL IDE or tool

The Query Editor provides an in-browser interface for running SQL queries on Amazon Redshift clusters. For queries that are run on compute nodes, you can then view the query results and query execution plan next to your queries.

The ability to visualize queries and results in a convenient user interface lets you accomplish many tasks, both as a database administrator and a database developer. The visual Query Editor helps you do the following:

  • Build complex queries.
  • Edit and run queries.
  • Create and edit data.
  • View and export results.
  • Generate EXPLAIN plans on your queries.

With the Query Editor, you can also have multiple SQL tabs open at the same time. Colored syntax, query autocomplete, and single-step query formatting are all an added bonus!

Database administrators typically maintain a repository of commonly used SQL statements that they run regularly. If you have this written in a notepad somewhere, the saved queries feature is for you. This feature lets you save and reuse your commonly run SQL statements in one step. This makes it efficient for you to review, rerun, and modify previously run SQL statements. The Query Editor also has an exporter so that you can export the query results into a CSV format.

The Query Editor lets you perform common tasks, such as creating a schema and table on the cluster and loading data in tables. These common tasks are now possible with a few simple SQL statements that you run directly on the console. You can also do day-to-day administrative tasks from the console. These tasks can include finding long-running queries on the cluster, checking for potential deadlocks with long-running updates on a cluster, and checking for how much space is available in the cluster.

The Query Editor is available in 16 AWS Regions. It’s available on the Amazon Redshift console at no extra cost to you. Standard Amazon Redshift rates apply for your cluster usage and for Amazon Redshift Spectrum. To learn more, see Amazon Redshift pricing.

Let’s get started with the Query Editor

The following sections contain the steps for setting up your Amazon Redshift cluster with a sample dataset from an Amazon S3 bucket using the Query Editor directly from the console. For new users, this is an especially handy alternative to setting up JDBC/ODBC clients to establish a connection to your cluster. If you already have a cluster, you can complete these steps in 10 minutes or less.

In the following example, you use the Query Editor to perform these tasks:

  • Load a sample dataset in your cluster.
  • Run SQL queries on a sample dataset and view results and execution details.
  • Run administration queries on system tables and save frequently used queries.
  • Run SQL queries to join an internal and external table.

Use the following steps to set up your cluster for querying:

  1. On the Amazon Redshift console, create a cluster.For detailed steps, see the procedure described in Launch a Sample Amazon Redshift Cluster in the Amazon Redshift Getting Started Guide. Use any of the following currently supported node types: dc1.8xlarge, dc2.large, dc2.8xlarge, or ds2.8xlarge.For this post, we used the Quick launch cluster button on the Amazon Redshift dashboard to create a single-node, dc2.large cluster called demo-cluster in the us-east-1 Region. As you go through the tutorial, replace this cluster name with the name of the cluster that you launched, and the Region where you launched the cluster.

  1. Add Query Editor-related permissions to the AWS account.To access the Query Editor feature on the console, you need permissions. For detailed steps, see Enabling Access to the Query Editor in the Amazon Redshift Cluster Management Guide.
  1. To load and run queries on a sample dataset (including permissions to load data from S3 or to use the AWS Glue or Amazon Athena Data Catalogs), follow these steps:a. To load sample data from Amazon S3 using the COPY command, you must provide authentication for your cluster to access Amazon S3 on your behalf. Sample data for this procedure is provided in an Amazon S3 bucket that is owned by Amazon Redshift. The bucket permissions are configured to allow all authenticated AWS users read access to the sample data files.To perform this step:

• Attach the AmazonS3ReadOnlyAccess policy to the IAM role. The AmazonS3ReadOnlyAccess policy grants your cluster read-only access to all Amazon S3 buckets.

• If you’re using the AWS Glue Data Catalog, attach the AWSGlueConsoleFullAccess policy to the IAM role. If you’re using the Athena Data Catalog, attach the AmazonAthenaFullAccess policy to the IAM role.

b. In step 2 of the example, you run the COPY command to load the sample data. The COPY command includes a placeholder for the IAM role Amazon Resource Name (ARN). To load sample data, add the role ARN in the COPY The following is a sample COPY command:

COPY myinternalschema.event FROM 's3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/event/allevents_pipe.txt'
iam_role ‘REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN'
delimiter '|' timeformat 'YYYY-MM-DD HH:MI:SS' region 'us-east-1';

After you complete these steps, your Amazon Redshift cluster is ready. The following sections describe three steps that demonstrate what you can do with the Query Editor:

  • Use the Query Editor for loading data.
  • Perform several day-to-day administration tasks.
  • Run a query on data stored in the Amazon Redshift cluster and Amazon S3 data lake, with no need for loading or other data preparation.

Step 1: Connect to your cluster in the Query Editor

To connect to your cluster:

  1. Using the left navigation pane on the Amazon Redshift console, navigate to the Query Editor.
  2. In the Credentials dialog box, in the Cluster drop-down list, choose the cluster name (demo-cluster). Choose the database and the database user for this cluster.
  3. If you created the cluster by using the service-provided default values, choose dev as your Database selection, and enter awsuser in the Database user box.
  4. Enter the password for the cluster. Commonly, Amazon Redshift database users log on by providing a database user name and password. As an alternative, if you don’t remember your password, you can retrieve it in an encrypted format by choosing Create a temporary password, as shown in the following example. For more information, see Using IAM Authentication to Generate Database User Credentials.

This connects to the cluster if you have Query Editor-related permissions for the AWS account. For more information, see the step to add the Query Editor-related permissions to the AWS account in the previous section.

Step 2: Prepare the cluster with a sample dataset

To prepare the cluster with a sample dataset:

  1. Run the following SQL in the Query Editor. This creates the schema myinternalschema in the Amazon Redshift cluster demo-cluster.
/* Create a schema */
CREATE SCHEMA myinternalschema

  1. Run the following SQL statement in the Query Editor to create a table for schema myinternalschema.
/* Create table */
CREATE TABLE myinternalschema.event(
	eventid integer not null distkey,
	venueid smallint not null,
	catid smallint not null,
	dateid smallint not null sortkey,
	eventname varchar(200),
	starttime timestamp);
  1. Run the following SQL statement with the COPY command to copy the sample dataset from Amazon S3 to your Amazon Redshift cluster, demo-cluster, in the us-east-1 The Amazon S3 path for the sample dataset is s3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/event/allevents_pipe.txt.

Before choosing Run Query, remember to replace the placeholder in the example with the ARN for the IAM role that is associated with this AWS account. If your cluster is in another AWS Region, replace the Region in the region parameter and the Amazon S3 path, as shown in the following SQL command:

/* Load data */
COPY myinternalschema.event FROM 's3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/event/allevents_pipe.txt'
iam_role ‘REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN'
delimiter '|' timeformat 'YYYY-MM-DD HH:MI:SS' region 'us-east-1';
  1. To ensure access to the public dataset in Amazon S3, make sure that this AWS account has the correct permissions to access Amazon S3, AWS Glue, and Athena. For more information, see the step to load and run queries on the sample dataset (Amazon S3 and AWS Glue/Amazon Athena Data Catalog permissions) earlier in this post.
  2. To verify the data in the previously created table in the Query Editor, browse through the tables in the schema viewer on the left. Choose the preview icon next to the table name to see the first 10 records from the event table. Choosing this option runs the following query for a preview of the table, displaying 10 rows from the table:
/* View a snippet of the same dataset in myinternalschema */ 
SELECT * FROM myinternalschema.event
LIMIT 10;

You can also enter your own SQL statements. Use Ctrl + Space to autocomplete queries in the Query Editor, to verify the data in the table that you created.

Step 3: Helpful cluster management queries

You are all set to try Amazon Redshift! In day-to-day cluster management and monitoring, you can run the following SQL queries in the Query Editor. These frequently used queries let you find and shut down long-running queries, uncover deadlock situations, and check for available disk space on your Amazon Redshift cluster. Save these queries and get convenient access to them by choosing Saved queries in the left navigation on the console, as shown in the following example:

Kill malfunctioning or long-running queries on a cluster

If there is a malfunctioning query that must be shut down, locating the query can often be a multi-step process. Run the following SQL in the Query Editor to find all queries that are running on an Amazon Redshift cluster with a SQL statement:

/* Queries are currently in progress */ 
SELECT
userid
 , query
 , pid
 , starttime
 , left(text, 50) as text
FROM pg_catalog.stv_inflight

After locating the malfunctioning queries from the query result set, use the cancel <pid> <msg> command to kill a query. Be sure to use the process ID—pid in the previous SQL—and not the query ID. You can supply an optional message that is returned to the issuer of the query and logged.

Monitor disk space being used on a cluster

One of the most frequently used console functions is monitoring the percentage of disk space used by a cluster. Queries fail if there is limited space in the cluster to create temp tables used while the query is running. Vacuums can also fail if the cluster does not have free space to store intermediate data in the cluster restore process. Monitoring this metric is important for planning ahead before the cluster gets full and you have to resize or add more clusters.

If you suspect that you are experiencing high or full disk usage with Amazon Redshift, run the following SQL in the Query Editor to find disk space available and see individual table sizes on the cluster:

/* Disk space available on your Redshift cluster */
SELECT SUM(used)::float / SUM(capacity) as pct_full
FROM pg_catalog.stv_partitions
 
/* Find individual table sizes */
SELECT t.name, COUNT(tbl) / 1000.0 as gb
FROM (
SELECT DISTINCT id, name FROM stv_tbl_perm
) t
JOIN stv_blocklist ON tbl=t.id
GROUP BY t.name ORDER BY gb DESC

From here, you can either drop the unnecessary tables or resize your cluster to have more capacity. For more information, see Resizing Clusters in Amazon Redshift.

Watch for deadlock situations with suspiciously long-running updates on the cluster

If a cluster has a suspiciously long-running update, it might be in a deadlocked transaction. The stv_locks table indicates any transactions that have locks, along with the process ID of the relevant sessions. This pid can be passed to pg_terminate_backend(pid) to kill the offending session.

Run a SQL statement in the Query Editor to inspect the locks:

\/* Find all transactions that have locks along with the process id of the relevant sessions */ 
select 
  table_id, 
  last_update, 
  last_commit, 
  lock_owner_pid, 
  lock_status 
FROM pg_catalog.stv_locks 
ORDER BY last_update asc

To shut down the session, run select pg_terminate_backend(lock_owner_pid), using the value from stl_locks.

See the rows affected by the most recent vacuums of the cluster

By running a vacuum command on tables in the cluster, any free space because of delete and update operations is reclaimed. At the same time, the data of the table gets sorted. The result is a compact and sorted table, which improves the cluster performance.

Run the following SQL statement to see a count of rows that were deleted or resorted from the most recent vacuums from the svv_vacuum_summary table:

/* Deleted or restored rows from most recent vacuums */
select * from svv_vacuum_summary
where table_name = 'events'

Debug connection issues for Amazon Redshift clusters

Joining stv_sessions and stl_connection_log tables returns a list of all sessions (all connects, authenticates, and disconnects on the cluster) and the respective remote host and port information.

To list all connections, run the following SQL statement in the Query Editor:

/* List connections, along with remote host information */ 
SELECT DISTINCT
 starttime,
 process,
 user_name,
 '169.254.21.1' remotehost,
 remoteport
FROM stv_sessions
LEFT JOIN stl_connection_log ON pid = process
  AND starttime > recordtime - interval '1 second'
ORDER BY starttime DESC

Use the saved queries feature to save these commonly used SQL statements in your account and run them in the Query Editor with one click.

Bonus step 4: Query with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data in Amazon S3 without the need to first load it into Amazon Redshift. Amazon Redshift Spectrum queries employ massive parallelism to quickly process large datasets in S3, without ingesting that data into Amazon Redshift. Much of the processing occurs in the Amazon Redshift Spectrum layer. Multiple clusters can concurrently query the same dataset in Amazon S3 without needing to make copies of the data for each cluster.

To get set up with Amazon Redshift Spectrum, run the following SQL statements in the Query Editor for demo-cluster. If your cluster is in another AWS Region, be sure to replace the Region in the region parameter and the Amazon S3 path in the following SQL statement.

To create a new schema from a data catalog to use with Amazon Redshift Spectrum:

/* Create external (Amazon S3) schema */
CREATE EXTERNAL SCHEMA myexternalschema
from data catalog
database 'myexternaldatabase'
region 'us-east-1'
iam_role 'REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
create external database if not exists;

To create a table for the Amazon Redshift Spectrum S3 sample dataset:

/* Create external table */
CREATE EXTERNAL TABLE myexternalschema.sales(
salesid integer,
listid integer,
sellerid integer,
buyerid integer,
eventid integer,
dateid smallint,
qtysold smallint, pricepaid decimal(8,1), commission decimal(8,1), saletime timestamp)
row format delimited
fields terminated by '\t'
stored as textfile
location 's3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/sales/' 
table properties ('numRows'='171000');

Start querying!

This section provides an example scenario to start querying data from the external (Amazon S3) sales table and the internal (Amazon Redshift) event table. The join query in this scenario looks for all events (from the sales dataset loaded on the demo-cluster) with the sale price paid > 50 (from the Amazon Redshift Spectrum dataset in Amazon S3, s3://aws-redshift-spectrum-sample-data-us-east-1/spectrum/sales/).

/* Join a table from the sample dataset with a Spectrum table */
/* Join external (Amazon S3) and internal (Amazon Redshift) table */
SELECT
    myexternalschema.sales.eventid,
    sum(myexternalschema.sales.pricepaid)   
from
    myexternalschema.sales,
    myinternalschema.event  
where
    myexternalschema.sales.eventid = myinternalschema.event.eventid       
    and myexternalschema.sales.pricepaid > 50  
group by
    myexternalschema.sales.eventid  
order by
    1 desc;

In the Query results section, choose View execution to see the detailed execution plan. The query plan is available for all queries executed on compute nodes.

Note: Queries that do not reference user tables, such as administration queries that only use catalog tables, do not have an available query plan.

Optionally, download the query results to your local disk for offline use. Queries run for up to three minutes in the Query Editor. After a query is completed, the Query Editor provides two minutes to fetch results. Rerun the query and try again if you hit the two-minute threshold.

Load additional tables from the Amazon Redshift sample dataset by using the following SQL statements and get creative with your queries. Before choosing Run query in the Query Editor, remember to add the ARN for the IAM role that is associated with this AWS account in the placeholder in the following SQL statement. If your cluster is in another AWS Region, replace the Region in the region parameter and the Amazon S3 path in the following SQL statement.

copy users from 's3://awssampledbuswest2/tickit/allusers_pipe.txt' 
credentials 'aws_iam_role=REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy venue from 's3://awssampledbuswest2/tickit/venue_pipe.txt' 
credentials 'aws_iam_role=REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy category from 's3://awssampledbuswest2/tickit/category_pipe.txt' 
credentials 'aws_iam_role=REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy date from 's3://awssampledbuswest2/tickit/date2008_pipe.txt' 
credentials 'aws_iam_role= REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy listing from 's3://awssampledbuswest2/tickit/listings_pipe.txt' 
credentials 'aws_iam_role= REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN' 
delimiter '|' region 'us-west-2';

copy sales from 's3://awssampledbuswest2/tickit/sales_tab.txt'
credentials 'aws_iam_role= REPLACE THIS PLACEHOLDER WITH THE IAM ROLE ARN'
delimiter '\t' timeformat 'MM/DD/YYYY HH:MI:SS' region 'us-west-2';

Summary

In this post, we introduced the Query Editor, an in-browser interface for running SQL queries on Amazon Redshift clusters. We showed how you can use it to run SQL queries for loading data in clusters and monitoring cluster performance directly on the console. To learn more about Amazon Redshift and start with Query Editor, visit the Amazon Redshift webpage.

If you like this feature, share your feedback by using the Send feedback link on the console, as shown following.

If you have any questions or suggestions, please leave a comment below.

Happy querying!

 


About the Authors

Surbhi Dangi is a senior product/design manager at AWS. Her work includes building user experiences for Database, Analytics & AI AWS consoles, launching new database and analytics products, working on new feature launches for existing products, and building broadly adopted internal tools for AWS teams. She enjoys traveling to new destinations to discover new cultures, trying new cuisines, and teaches product management 101 to aspiring PMs.

 

 

Raja Bhogi is an engineering manager at AWS. He is responsible for building delightful and easy-to-use web experiences for analytics and blockchain products. His work includes launching web experiences for new analytics products, and working on new feature launches for existing products. He is passionate about web technologies, performance insights, and tuning. He is a thrill seeker and enjoys everything from roller coasters to bungy jumping.

 

 

 

Build and automate a serverless data lake using an AWS Glue trigger for the Data Catalog and ETL jobs

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/build-and-automate-a-serverless-data-lake-using-an-aws-glue-trigger-for-the-data-catalog-and-etl-jobs/

Today, data is flowing from everywhere, whether it is unstructured data from resources like IoT sensors, application logs, and clickstreams, or structured data from transaction applications, relational databases, and spreadsheets. Data has become a crucial part of every business. This has resulted in a need to maintain a single source of truth and automate the entire pipeline—from data ingestion to transformation and analytics— to extract value from the data quickly.

There is a growing concern over the complexity of data analysis as the data volume, velocity, and variety increases. The concern stems from the number and complexity of steps it takes to get data to a state that is usable by business users. Often data engineering teams spend most of their time on building and optimizing extract, transform, and load (ETL) pipelines. Automating the entire process can reduce the time to value and cost of operations. In this post, we describe how to create a fully automated data cataloging and ETL pipeline to transform your data.

Architecture

In this post, you learn how to build and automate the following architecture.

You build your serverless data lake with Amazon Simple Storage Service (Amazon S3) as the primary data store. Given the scalability and high availability of Amazon S3, it is best suited as the single source of truth for your data.

You can use various techniques to ingest and store data in Amazon S3. For example, you can use Amazon Kinesis Data Firehose to ingest streaming data. You can use AWS Database Migration Service (AWS DMS) to ingest relational data from existing databases. And you can use AWS DataSync to ingest files from an on-premises Network File System (NFS).

Ingested data lands in an Amazon S3 bucket that we refer to as the raw zone. To make that data available, you have to catalog its schema in the AWS Glue Data Catalog. You can do this using an AWS Lambda function invoked by an Amazon S3 trigger to start an AWS Glue crawler that catalogs the data. When the crawler is finished creating the table definition, you invoke a second Lambda function using an Amazon CloudWatch Events rule. This step starts an AWS Glue ETL job to process and output the data into another Amazon S3 bucket that we refer to as the processed zone.

The AWS Glue ETL job converts the data to Apache Parquet format and stores it in the processed S3 bucket. You can modify the ETL job to achieve other objectives, like more granular partitioning, compression, or enriching of the data. Monitoring and notification is an integral part of the automation process. So as soon as the ETL job finishes, another CloudWatch rule sends you an email notification using an Amazon Simple Notification Service (Amazon SNS) topic. This notification indicates that your data was successfully processed.

In summary, this pipeline classifies and transforms your data, sending you an email notification upon completion.

Deploy the automated data pipeline using AWS CloudFormation

First, you use AWS CloudFormation templates to create all of the necessary resources. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time.

Launch the AWS CloudFormation template with the following Launch stack button.

Be sure to choose the US East (N. Virginia) Region (us-east-1). Then enter the appropriate stack name, email address, and AWS Glue crawler name to create the Data Catalog. Add the AWS Glue database name to save the metadata tables. Acknowledge the IAM resource creation as shown in the following screenshot, and choose Create.

Note: It is important to enter your valid email address so that you get a notification when the ETL job is finished.

This AWS CloudFormation template creates the following resources in your AWS account:

  • Two Amazon S3 buckets to store both the raw data and processed Parquet data.
  • Two AWS Lambda functions: one to create the AWS Glue Data Catalog and another function to publish topics to Amazon SNS.
  • An Amazon Simple Queue Service (Amazon SQS) queue for maintaining the retry logic.
  • An Amazon SNS topic to inform you that your data has been successfully processed.
  • Two CloudWatch Events rules: one rule on the AWS Glue crawler and another on the AWS Glue ETL job.
  • AWS Identity and Access Management (IAM) roles for accessing AWS Glue, Amazon SNS, Amazon SQS, and Amazon S3.

When the AWS CloudFormation stack is ready, check your email and confirm the SNS subscription. Choose the Resources tab and find the details.

Follow these steps to verify your email subscription so that you receive an email alert as soon as your ETL job finishes.

  1. On the Amazon SNS console, in the navigation pane, choose Topics. An SNS topic named SNSProcessedEvent appears in the display.

  1. Choose the ARN The topic details page appears, listing the email subscription as Pending confirmation. Be sure to confirm the subscription for your email address as provided in the Endpoint column.

If you don’t see an email address, or the link is showing as not valid in the email, choose the corresponding subscription endpoint. Then choose Request confirmation to confirm your subscription. Be sure to check your email junk folder for the request confirmation link.

Configure an Amazon S3 bucket event trigger

In this section, you configure a trigger on a raw S3 bucket. So when new data lands in the bucket, you trigger GlueTriggerLambda, which was created in the AWS CloudFormation deployment.

To configure notifications:

  1. Open the Amazon S3 console.
  2. Choose the source bucket. In this case, the bucket name contains raws3bucket, for example, <stackname>-raws3bucket-1k331rduk5aph.
  3. Go to the Properties tab, and under Advanced settings, choose Events.

  1. Choose Add notification and configure a notification with the following settings:
  • Name– Enter a name of your choice. In this example, it is crawlerlambdaTrigger.
  • Events– Select the All object create events check box to create the AWS Glue Data Catalog when you upload the file.
  • Send to– Choose Lambda function.
  • Lambda– Choose the Lambda function that was created in the deployment section. Your Lambda function should contain the string GlueTriggerLambda.

See the following screenshot for all the settings. When you’re finished, choose Save.

For more details on configuring events, see How Do I Enable and Configure Event Notifications for an S3 Bucket? in the Amazon S3 Console User Guide.

Download the dataset

For this post, you use a publicly available New York green taxi dataset in CSV format. You upload monthly data to your raw zone and perform automated data cataloging using an AWS Glue crawler. After cataloging, an automated AWS Glue ETL job triggers to transform the monthly green taxi data to Parquet format and store it in the processed zone.

You can download the raw dataset from the NYC Taxi & Limousine Commission trip record data site. Download the monthly green taxi dataset and upload only one month of data. For example, first upload only the green taxi January 2018 data to the raw S3 bucket.

Automate the Data Catalog with an AWS Glue crawler

One of the important aspects of a modern data lake is to catalog the available data so that it’s easily discoverable. To run ETL jobs or ad hoc queries against your data lake, you must first determine the schema of the data along with other metadata information like location, format, and size. An AWS Glue crawler makes this process easy.

After you upload the data into the raw zone, the Amazon S3 trigger that you created earlier in the post invokes the GlueTriggerLambdafunction. This function creates an AWS Glue Data Catalog that stores metadata information inferred from the data that was crawled.

Open the AWS Glue console. You should see the database, table, and crawler that were created using the AWS CloudFormation template. Your AWS Glue crawler should appear as follows.

Browse to the table using the left navigation, and you will see the table in the database that you created earlier.

Choose the table name, and further explore the metadata discovered by the crawler, as shown following.

You can also view the columns, data types, and other details.  In following screenshot, Glue Crawler has created schema from files available in Amazon S3 by determining column name and respective data type. You can use this schema to create external table.

Author ETL jobs with AWS Glue

AWS Glue provides a managed Apache Spark environment to run your ETL job without maintaining any infrastructure with a pay as you go model.

Open the AWS Glue console and choose Jobs under the ETL section to start authoring an AWS Glue ETL job. Give the job a name of your choice, and note the name because you’ll need it later. Choose the already created IAM role with the name containing <stackname>– GlueLabRole, as shown following. Keep the other default options.

AWS Glue generates the required Python or Scala code, which you can customize as per your data transformation needs. In the Advanced properties section, choose Enable in the Job bookmark list to avoid reprocessing old data.

On the next page, choose your raw Amazon S3 bucket as the data source, and choose Next. On the Data target page, choose the processed Amazon S3 bucket as the data target path, and choose Parquet as the Format.

On the next page, you can make schema changes as required, such as changing column names, dropping ones that you’re less interested in, or even changing data types. AWS Glue generates the ETL code accordingly.

Lastly, review your job parameters, and choose Save Job and Edit Script, as shown following.

On the next page, you can modify the script further as per your data transformation requirements. For this post, you can leave the script as is. In the next section, you automate the execution of this ETL job.

Automate ETL job execution

As the frequency of data ingestion increases, you will want to automate the ETL job to transform the data. Automating this process helps reduce operational overhead and free your data engineering team to focus on more critical tasks.

AWS Glue is optimized for processing data in batches. You can configure it to process data in batches on a set time interval. How often you run a job is determined by how recent the end user expects the data to be and the cost of processing. For information about the different methods, see Triggering Jobs in AWS Glue in the AWS Glue Developer Guide.

First, you need to make one-time changes and configure your ETL job name in the Lambda function and the CloudWatch Events rule. On the console, open the ETLJobLambda Lambda function, which was created using the AWS CloudFormation stack.

Choose the Lambda function link that appears, and explore the code. Change the JobName value to the ETL job name that you created in the previous step, and then choose Save.

As shown in in the following screenshot, you will see an AWS CloudWatch Events rule CrawlerEventRule that is associated with an AWS Lambda function. When the CloudWatch Events rule receives a success status, it triggers the ETLJobLambda Lambda function.

Now you are all set to trigger your AWS Glue ETL job as soon as you upload a file in the raw S3 bucket. Before testing your data pipeline, set up the monitoring and alerts.

Monitoring and notification with Amazon CloudWatch Events

Suppose that you want to receive a notification over email when your AWS Glue ETL job is completed. To achieve that, the CloudWatch Events rule OpsEventRule was deployed from the AWS CloudFormation template in the data pipeline deployment section. This CloudWatch Events rule monitors the status of the AWS Glue ETL job and sends an email notification using an SNS topic upon successful completion of the job.

As the following image shows, you configure your AWS Glue job name in the Event pattern section in CloudWatch. The event triggers an SNS topic configured as a target when the AWS Glue job state changes to SUCCEEDED. This SNS topic sends an email notification to the email address that you provided in the deployment section to receive notification.

Let’s make one-time configuration changes in the CloudWatch Events rule OpsEventRule to capture the status of the AWS Glue ETL job.

  1. Open the CloudWatch console.
  2. In the navigation pane, under Events, choose Rules. Choose the rule name that contains OpsEventRule, as shown following.

  1. In the upper-right corner, choose Actions, Edit.

  1. Replace Your-ETL-jobName with the ETL job name that you created in the previous step.

  1. Scroll down and choose Configure details. Then choose Update rule.

Now that you have set up an entire data pipeline in an automated way with the appropriate notifications and alerts, it’s time to test your pipeline. If you upload new monthly data to the raw Amazon S3 bucket (for example, upload the NY green taxi February 2018 CSV), it triggers the GlueTriggerLambda AWS Lambda function. You can navigate to the AWS Glue console, where you can see that the AWS Glue crawler is running.

Upon completion of the crawler, the CloudWatch Events rule CrawlerEventRule triggers your ETLJobLambda Lambda function. You can notice now that the AWS Glue ETL job is running.

When the ETL job is successful, the CloudWatch Events rule OpsEventRule sends an email notification to you using an Amazon SNS topic, as shown following, hence completing the automation cycle.

Be sure to check your processed Amazon S3 bucket, where you will find transformed data processed by your automated ETL pipeline. Now that the processed data is ready in Amazon S3, you need to run the AWS Glue crawler on this Amazon S3 location. The crawler creates a metadata table with the relevant schema in the AWS Glue Data Catalog.

After the Data Catalog table is created, you can execute standard SQL queries using Amazon Athena and visualize the data using Amazon QuickSight. To learn more, see the blog post Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight

Conclusion

Having an automated serverless data lake architecture lessens the burden of managing data from its source to destination—including discovery, audit, monitoring, and data quality. With an automated data pipeline across organizations, you can identify relevant datasets and extract value much faster than before. The advantage of reducing the time to analysis is that businesses can analyze the data as it becomes available in real time. From the BI tools, queries return results much faster for a single dataset than for multiple databases.

Business analysts can now get their job done faster, and data engineering teams can free themselves from repetitive tasks. You can extend it further by loading your data into a data warehouse like Amazon Redshift or making it available for machine learning via Amazon SageMaker.

Additional resources

See the following resources for more information:

 


About the Author

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

 

Luis Lopez Soria is a partner solutions architect and serverless specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys doing sports in addition to traveling around the world exploring new foods and cultures.

 

 

 

Chirag Oswal is a partner solutions architect and AR/VR specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys video games and travel.

Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects

Post Syndicated from Rajeev Chakrabarti original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/

In February 2019, Amazon Web Services (AWS) announced a new feature in Amazon Kinesis Data Firehose called Custom Prefixes for Amazon S3 Objects. It lets customers specify a custom expression for the Amazon S3 prefix where data records are delivered. Previously, Kinesis Data Firehose allowed only specifying a literal prefix. This prefix was then combined with a static date-formatted prefix to create the output folder in a fixed format. Customers asked for flexibility, so AWS listened and delivered.

Kinesis Data Firehose is most commonly used to consume event data from streaming sources, such as applications or IoT devices.  The data then is typically stored in a data lake, so it can be processed and eventually queried.  When storing data on Amazon S3, it is a best practice to partition or group related data and store it together in the same folder.  This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thus improving performance and reducing cost.

A common way to group data is by date.  Kinesis Data Firehose automatically groups data and stores it into the appropriate folders on Amazon S3 based on the date.  However, the naming of folders in Amazon S3 is not compatible with Apache Hive naming conventions. This makes data more difficult to catalog using AWS Glue crawlers and analyze using big data tools.

This post discusses a new capability that lets us customize how Kinesis Data Firehose names the output folders in Amazon S3. It covers how custom prefixes work, the intended use cases, and includes step-by-step instructions to try the feature in your own account.

The need for custom prefixes for Amazon S3 objects

Previously, Kinesis Data Firehose created a static Universal Coordinated Time (UTC) based folder structure in the format YYYY/MM/DD/HH. It then appended it to the provided prefix before writing objects to Amazon S3. For example, if you provided a prefix “mydatalake/”, the generated folder hierarchy would be “mydatalake/2019/02/09/13”.  However, to be compatible with Hive naming conventions, the folder structure is expected to follow the format “/partitionkey=partitionvalue”.  Using this naming convention, data can be easily cataloged with AWS Glue crawlers, resulting in proper partition names.

Other methods for managing partitions also become possible such as running MSCK REPAIR TABLE in Amazon Athena or Apache Hive on Amazon EMR, which can add all partitions through a single statement. Furthermore, you can use other date-based partitioning patterns like “/dt=2019-02-09-13/” instead of expanding the date out into folders.  This is helpful in reducing the total number of partitions that need to be maintained as the table grows over time. It also simplifies range queries. Providing the ability to specify custom prefixes obviates the need for an additional ETL step to put the data in the right folder structure improving the time to insight.

How custom prefixes for Amazon S3 objects works

This new capability does not let you use any date or timestamp value from your event data, nor can you use any other arbitrary value in the event. Kinesis Data Firehose uses an internal timestamp field called ApproximateArrivalTimestamp. Each data record includes an ApproximateArrivalTimestamp (in UTC) that is set when a stream successfully receives and stores the record. This is commonly referred to as a server-side timestamp. Kinesis Data Firehose buffers incoming records according to the configured buffering hints and delivers them into Amazon S3 objects for the Amazon S3 destination. The resulting objects in Amazon S3 may contain multiple records, each with a different ApproximateArrivalTimestamp. When evaluating timestamps, Kinesis Data Firehose uses the ApproximateArrivalTimestamp of the oldest record that’s contained in the Amazon S3 object being written.

Kinesis Data Firehose also provides the ability to deliver records to a different error output location when there is a delivery, AWS Lambda transformation or format conversion failure. Previously, the error output location could not be configured and was determined by the type of delivery failure. With this release, the error output location (ErrorOutputPrefix) can also be configured. One benefit of this new capability is that you can separate failed records into date partitioned folders for easy reprocessing.

So how do you specify the custom Prefix and the ErrorOutputPrefix? You use an expression of the form: !{namespace:value}, where the namespace can be either firehose or timestamp. The value can be either “random-string” or “error-output-type” for the firehose namespace or a date pattern for the timestamp namespace in the Java DateTimeFormatter format. In a single expression, you can use a combination of the two namespaces although the !{firehose: error-output-type} can be used only in the ErrorOutputPrefix. For more information and examples, see Custom Prefixes for Amazon S3 Objects.

Writing streaming data into Amazon S3 with Kinesis Data Firehose

This walkthrough describes how streaming data can be written into Amazon S3 with Kinesis Data Firehose using a Hive compatible folder structure.  It then shows how AWS Glue crawlers can infer the schema and extract the proper partition names that we designated in Kinesis Data Firehose, and catalog them in AWS Glue Data Catalog.  Finally, we run sample queries to show that partitions are indeed being recognized.

To demonstrate this, we use python code to generate sample data.  We also use a Lambda transform on Kinesis Data Firehose to forcibly create failures. This demonstrates how data can be saved to the error output location. The code that you need for this walkthrough is included here in GitHub.

For this walkthrough, this is the architecture that we are building:

Step 1: Create an Amazon S3 bucket

Create an S3 bucket to be used by Kinesis Data Firehose to deliver event records. We use the AWS Command Line Interface (AWS CLI) to create the Amazon S3 bucket in the US East (N. Virginia) Region. Remember to substitute the bucket name in the example for your own.

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

Step 2: Lambda Transform (optional)

The incoming events have an ApproximateArrivalTimestamp field in the event payload.  This is sufficient to create a proper folder structure on Amazon S3.  However, when querying the data it may be beneficial to expose this timestamp value as a top level column for easy filtering and validation.  To accomplish this, we create a Lambda function that adds the ApproximateArrivalTimestamp as a top level field in the data payload. The data payload is what Kinesis Data Firehose writes as an object in Amazon S3. Additionally, the Lambda code also artificially generates some processing errors that are delivered to the “ErrorOutputPrefix” location specified for the delivery destination to illustrate the use of expressions in the “ErrorOutputPrefix.”

Create an IAM role for the Lambda transform function

First, create a role for the Lambda function called LambdaBasicRole. The TrustPolicyForLambda.json file is included in the GitHub repository.

$ aws iam create-role --role-name KDFLambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json

After the role is created, attach the managed Lambda basic execution policy to it.

$ aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name KDFLambdaBasicRole

Lambda function

To create the Lambda function, start with the Python Kinesis Data Firehose blueprint “General Firehose Processing” and then modify it. For more information about the structure of the records and what must be returned, see Amazon Kinesis Data Firehose Data Transformation.

Zip up the Python file, and then create the Lambda function using the AWS CLI. The CreateLambdaFunctionS3CustomPrefixes.json file is included in the GitHub repository.

aws lambda create-function --zip-file "fileb://lambda_function.zip" --cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

Step3. Delivery Stream

Next, create the Kinesis Data Firehose delivery stream. The createdeliverystream.json file is included in the GitHub repository.

 aws firehose create-delivery-stream --cli-input-json file://createdeliverystream.json

In the previous configuration, we defined a Prefix and an ErrorOutputPrefix under the “ExtendedS3DestinationConfiguration” element. We defined the same for the “S3BackupConfiguration” element. Note that when the “ProcessingConfiguration” element is set to “Disabled”, the ErrorOutputPrefix parameter of the “ExtendedS3DestinationConfiguration” element exists only for consistency. It otherwise has no significance.

We’ve chosen a prefix that will result in a folder structure compatible with hive-style partitioning. This is the prefix we used:

“fhbase/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/”

Kinesis Data Firehose first creates a base folder called “fhbase” directly under the Amazon S3 bucket. Second, it evaluates the expressions !{timestamp:YYYY}, !{timestamp:MM}, !{timestamp:dd}, and !{timestamp:HH} to year, month, day and hour using the Java DateTimeFormatter format. For example, an ApproximateArrivalTimestamp of 1549754078390 in UNIX epoch time, which is 2019-02-09T16:13:01.000000Z in UTC would evaluate to “year=2019”, “month=02”, “day=09” and “hour=16”.  Therefore, the location in Amazon S3 where data records that are delivered evaluate to “fhbase/year=2019/month=02/day=09/hour=16/”.

Similarly, the ErrorOutputPrefix “fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/” results in a base folder called “fherroroutputbase” directly under the S3 bucket. The expression !{firehose:random-string} evaluates to an 11 character random string like “ztWxkdg3Thg”.  If you use this more than once in the same expression, every instance evaluates to a new random string. The expression !{firehose:error-output-type} evaluates to one of the following:

  1. “processing-failed” for Lambda transformation delivery failures
  2. “elasticsearch-failed” for an Amazon ES destination delivery failures
  3. “splunk-failed” for Splunk destination delivery failures
  4. “format-conversion-failed” for data format conversion failures

So, the location for an Amazon S3 object containing the delivery failed records for a Lambda transformation could evaluate to: fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/.

You can run aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample to describe the delivery stream created.

Next, enable encryption-at-rest for the delivery stream:

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

Or Create the delivery stream using the AWS Console

  1. Choose the source. For this example, I use Direct PUT.
  2. Choose if you would like to transform the incoming records with a Lambda transformation. I chose Enabled, and chose the name of the Lambda function that I had created earlier.

  1. Choose the destination. I chose the Amazon S3 destination.

  1. Choose the Amazon S3 bucket. I chose the Amazon S3 bucket that I had created earlier in this exercise.

  1. Specify the Amazon S3 Prefix and the Amazon S3 error prefix. This corresponds to the “Prefix” and “ErrorOutputPrefix” explained earlier in the context of the AWS CLI input JSON.

  1. Choose whether you would like to back up the raw (before transformation) records to another Amazon S3 location. I chose Enabled and specified the same bucket (you could choose a different bucket). I also specified a different prefix from the transformed records – the base folder is different but the folder structure below that is the same. This would make it more efficient to crawl this location using an AWS Glue crawler or create external tables in Athena or Redshift Spectrum pointing to this location.

  1. Specify the buffering hints for the Amazon S3 destination. I chose 1 MB and 240 seconds.
  2. Choose the S3 Compression and encryption settings. I chose no compression for the transformed records’ location. I chose to encrypt the Amazon S3 location at rest by using the service-managed AWS KMS customer master key (CMK).
  3. Choose whether you want to enable Error Logging in Cloudwatch. I chose Enabled.
  4. Specify the IAM role that you want Kinesis Data Firehose to assume to access resources on your behalf. Choose either Create new or Choose to display a new screen. Choose Create a new IAM role, name the role, and then choose Allow.
  5. Choose Create Delivery Stream.

The delivery stream is now created and active. You can send events to it.

 Test with sample data

I used Python code to generate sample data. The structure of the generated data is as follows:

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}
{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}
{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

Sample code to generate data and push it into Kinesis Data Firehose is included in the GitHub repository.

After you start sending events to the Kinesis Data Firehose delivery stream, objects should start appearing under the specified prefixes in Amazon S3.

I wanted to illustrate Lambda invoke errors and the appearance of files in the ErrorOutputPrefix location for Lambda transform errors. Therefore, I did not give permissions to the “firehose_delivery_role” to invoke my Lambda function. The following file showed up in the location specified by the ErrorOutputPrefix.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

Here is a snippet of the contents of the error file that I previously mentioned.

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

After I gave the “firehose_delivery_role” the appropriate permissions, the data objects showed up in the “Prefix” location specified for the Amazon S3 destination.

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

Also, because the Lambda code in my Lambda transform set the status failed for 10 percent of the records, those showed up in the ErrorOutputPrefix location for Lambda transform errors.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

Here is a snippet of the content of the error file:

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

You’re now ready to create an AWS Glue crawler. For more information about using the AWS Glue Data Catalog, see Populating the AWS Glue Data Catalog.

  1. In the AWS Glue console, go to Crawlers, and choose Add Crawler.

  1. Add information about your crawler, then choose Next.
  2. In the Include Path, specify the Amazon S3 bucket name that you entered under the Amazon S3 destination. Also include the static prefix used when you created the Kinesis Data Firehose delivery stream. Do not include the custom prefix expression.
  3. Choose Next.

  1. Choose Next, No, Next.
  2. Specify the IAM role that AWS Glue would use. I chose to create a new IAM Role. Choose Next.
  3. Specify a schedule to run the crawler. I chose to Run it on Demand. Choose Next.
  4. Specify where the crawler adds the crawled and discovered tables. I chose the default database. Choose Next.

  1. Choose Finish.
  1. The crawler has been created and is ready to be run. Choose Run crawler.

  1. In the AWS Glue console, go to Tables. You can see that a table has been created with the name of the base folder. Choose fhbase.

The crawler has discovered and populated the table and its properties.

You can see the discovered schema. The crawler has identified and created the partitions based on the folder structure specified by the prefix expression.

Open the Amazon Athena console, and select the default database from the drop-down menu. Write the following query in the New query1 window, then choose Run query.

SELECT * FROM "default"."fhbase"

where year = '2019' and day = '12' and hour = '17'

order by approxarrtimestamputcfh desc

Notice that Amazon Athena recognizes the fhbase table as a partitioned table. The query can take advantage of the partitions in the query to filter the results.

Conclusion

As this post illustrates, Custom Prefixes for Amazon S3 objects provides much flexibility to customize the folder structure, where Kinesis Data Firehose delivers the data records and failure records in Amazon S3. Having control over the folder structure and naming in Amazon S3 simplifies data discovery, cataloging, and access. As a result, it helps get insight more expediently and helps you better manage the cost of your queries.

 


About the Author

Rajeev Chakrabarti is a Kinesis specialist solutions architect.

 

 

 

 

Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/

Stream processing facilitates the collection, processing, and analysis of real-time data and enables the continuous generation of insights and quick reactions to emerging situations. This capability is useful when the value of derived insights diminishes over time. Hence, the faster you can react to a detected situation, the more valuable the reaction is going to be. Consider, for instance, a streaming application that analyzes and blocks fraudulent credit card transactions while they occur. Compare that application to a traditional batch-oriented approach that identifies fraudulent transactions at the end of every business day and generates a nice report for you to read the next morning.

It is quite common for the value of insights to diminish over time. Therefore, using stream processing can substantially improve the value of your analytics application. However, building and operating a streaming application that continuously receives and processes data is much more challenging than operating a traditional batch-oriented analytics application.

In this post, we discuss how you can use Apache Flink and Amazon Kinesis Data Analytics for Java Applications to address these challenges. We explore how to build a reliable, scalable, and highly available streaming architecture based on managed services that substantially reduce the operational overhead compared to a self-managed environment. We particularly focus on how to prepare and run Flink applications with Kinesis Data Analytics for Java Applications. To this end, we use an exemplary scenario that includes source code and AWS CloudFormation templates. You can follow along with this example using your own AWS account or adapt the code according to your specific requirements.

Challenges of running streaming applications

When you build a streaming application, the downstream systems naturally rely on a continuous and timely generation of output. Accordingly, there are much higher requirements on the availability of the streaming application. There is also much less time to address operational issues compared to a traditional batch-based approach. In a batch-processing scenario, when a job that runs once at the end of a business day fails, you can often restart the failed job and still complete the computation by the next morning, when the results are needed. In contrast, when a streaming application fails, downstream systems that consume the output might be affected within minutes, or even sooner, when the expected output no longer arrives in time.

Moreover, in case of failure, you can’t simply delete all intermediate results and restart a failed processing job, as it is commonly done in the batch-processing case. The output of a streaming job is continuously consumed by downstream systems. Output that has already been consumed cannot easily be retracted. Therefore, the entire processing pipeline is much more sensitive to duplicates that are introduced by an application that is restarted on failure. Furthermore, the computations of a streaming application often rely on some kind of internal state that can be corrupted or even lost when the application fails.

Last but not least, streaming applications often deal with a varying amount of throughput. Therefore, scaling the application according to the current load is highly desirable. When the load increases, the infrastructure that supports the streaming application must scale to keep the application from becoming overloaded, falling behind, and producing results that are no longer relevant. On the other hand, when the load decreases, the infrastructure should scale in again to remain cost effective by not provisioning more resources than are needed.

A reliable and scalable streaming architecture based on Flink and Kinesis Data Analytics for Java Applications

Apache Flink is an open-source project that is tailored to stateful computations over unbounded and bounded datasets. Flink addresses many of the challenges that are common when analyzing streaming data by supporting different APIs (including Java and SQL), rich time semantics, and state management capabilities. It can also recover from failures while maintaining exactly-once processing semantics. Therefore, Flink is well suited for analyzing streaming data with low latency.

In this post, we illustrate how to deploy, operate, and scale a Flink application with Kinesis Data Analytics for Java Applications. We use a scenario to analyze the telemetry data of a taxi fleet in New York City in near-real time to optimize the fleet operation. In this scenario, every taxi in the fleet is capturing information about completed trips. The tracked information includes the pickup and drop-off locations, number of passengers, and generated revenue. This information is ingested into a Kinesis data stream as a simple JSON blob. From there, the data is processed by a Flink application, which is deployed to Kinesis Data Analytics for Java Applications. This application identifies areas that are currently requesting a high number of taxi rides. The derived insights are finally persisted into Amazon Elasticsearch Service, where they can be accessed and visualized using Kibana.

This scenario leads to the following architecture, which is separated into three stages for the ingestion, processing, and presentation of data.

Separating the different aspects of the infrastructure is a common approach in this domain and has several benefits over a more tightly coupled architecture.

First, the Kinesis data stream serves as a buffer that decouples the producers from the consumers. Taxis can persist the events that they generate into the data stream regardless of the condition of, for instance, the processing layer, which might be currently recovering from a node failure. Likewise, the derived data remains available through Kibana even if the ingestion or processing layer is currently unavailable due to some operational issues. Last but not least, all components can be scaled independently and can use infrastructure that is specifically tailored according to their individual requirements.

This architecture also allows you to experiment and adopt new technologies in the future. Multiple independent applications can concurrently consume the data stored in the Kinesis data stream. You can then test how a new version of an existing application performs with a copy of the production traffic. But you can also introduce a different tool and technology stack to analyze the data, again without affecting the existing production application. For example, it is common to persist the raw event data to Amazon S3 by adding a Kinesis Data Firehose delivery stream as a second consumer to the Kinesis data stream. This facilitates long-term archiving of the data, which you can then use to evaluate ad hoc queries or analyze historic trends.

All in all, separating the different aspects of the architecture into ingestion, processing, and presentation nicely decouples different components, making the architecture more robust. It furthermore allows you to choose different tools for different purposes and gives you a lot of flexibility to change or evolve the architecture over time.

For the rest of this post, we focus on using Apache Flink and Kinesis Data Analytics for Java Applications to identify areas that currently request a high number of taxi rides. We also derive the average trip duration to the New York City airports. But with this architecture, you also have the option to consume the incoming events using other tools, such as Apache Spark Structured Streaming and Kinesis Data Firehose, instead of, or in addition to, what is described here.

Let’s kick the tires!

To see the described architecture in action, execute the following AWS CloudFormation template in your own AWS account. The template first builds the Flink application that analyzes the incoming taxi trips, including the Flink Kinesis Connector that is required to read data from a Kinesis data stream. It then creates the infrastructure and submits the Flink application to Kinesis Data Analytics for Java Applications.

The entire process of building the application and creating the infrastructure takes about 20 minutes. After the AWS CloudFormation stack is created, the Flink application has been deployed as a Kinesis Data Analytics for Java application. It then waits for events in the data stream to arrive. Checkpointing is enabled so that the application can seamlessly recover from failures of the underlying infrastructure while Kinesis Data Analytics for Java Applications manages the checkpoints on your behalf. In addition, automatic scaling is configured so that Kinesis Data Analytics for Java Applications automatically allocates or removes resources and scales the application (that is, it adapts its parallelism) in response to changes in the incoming traffic.

To populate the Kinesis data stream, we use a Java application that replays a public dataset of historic taxi trips made in New York City into the data stream. The Java application has already been downloaded to an Amazon EC2 instance that was provisioned by AWS CloudFormation. You just need to connect to the instance and execute the JAR file to start ingesting events into the stream.

You can obtain all of the following commands, including their correct parameters, from the output section of the AWS CloudFormation template that you executed previously.

$ ssh [email protected]«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-1.0.jar -stream «Kinesis data stream name» -region «AWS region» -speedup 3600

The speedup parameter determines how much faster the data is ingested into the Kinesis data stream relative to the actual occurrence of the historic events. With the given parameters, the Java application ingests an hour of historic data within one second. This results in a throughput of roughly 13k events and 6 MB of data per second, which completely saturates the Kinesis data stream (more on this later).

You can then go ahead and inspect the derived data through the Kibana dashboard that has been created. Or you can create your own visualizations to explore the data in Kibana.

https://«Elasticsearch endpoint»/_plugin/kibana/app/kibana#/dashboard/nyc-tlc-dashboard

The prepared Kibana dashboard contains a heatmap and a line graph. The heatmap visualizes locations where taxis are currently requested, and it shows that the highest demand for taxis is Manhattan. Moreover, the JFK and LaGuardia airports are also spots on the map where substantially more rides are requested compared to their direct neighborhoods. The line graph visualizes the average trip duration to these two airports. The following image shows how it steadily increases throughout the day until it abruptly drops in the evening.

For this post, the Elasticsearch cluster is configured to accept connections from the IP address range specified as a parameter of the AWS CloudFormation template. For production workloads, it’s much more desirable to further tighten the security of your Elasticsearch domain, for instance, by using Amazon Cognito for Kibana access control.

Scaling the architecture to increase its throughput

For this post, the Kinesis data stream was deliberately underprovisioned so that the Java application is completely saturating the data stream. When you closely inspect the output of the Java application, you’ll notice that the “replay lag” is continuously increasing. This means that the producer cannot ingest events as quickly as it is required according to the specified speedup parameter.

You can dive deeper into the metrics of the data stream by accessing it through an Amazon CloudWatch Dashboard. You can then see that the WriteProvisionedThroughputExceeded metric is slightly increased: Roughly 0.4 percent of the records are not accepted into the stream as the respective requests are throttled. In other terms, the data stream is underprovisioned, in particular as the producer pauses the ingestion of new events when too many events are in flight.

To increase the throughput of the data stream, you can simply update the number of shards from 6 to 12 with a couple of clicks on the console and through an API call, respectively. For production environments, you might even want to automate this procedure. For details on how to automatically scale a Kinesis data stream, see the blog post Scaling Amazon Kinesis Data Streams with AWS Application Auto Scaling.

When the scaling operation of the stream finishes, you can observe how the “replay lag” decreases and more events are ingested into the stream.

However, as a direct result, more events need to be processed. So now the Kinesis Data Analytics for Java application becomes overloaded and can no longer keep up with the increased number of incoming events. You can observe this through the millisBehindLatest metric, which is published to CloudWatch. The metric reports the time difference between the oldest record currently read by the Kinesis Data Analytics for Java application and the latest record in the stream according to the ingestion time in milliseconds. So it indicates how much behind the processing is from the tip of the stream.

As these metrics show, 10 minutes after the scaling operation finishes, processing is already more than 3 minutes behind the latest event in the stream. Even worse, it steadily keeps falling behind, continuously widening this gap.

However, in contrast to Kinesis Data Streams, Kinesis Data Analytics for Java Applications natively supports auto scaling. After a couple of minutes, you can see the effect of the scaling activities in the metrics. The millisBehindLatest metric starts to decrease until it reaches zero, when the processing has caught up with the tip of the Kinesis data stream.

However, notice how the millisBehindLatest metric spikes just before it starts to decline. This is caused by the way that scaling a Kinesis Data Analytics for Java application works today. To scale a running application, the internal state of the application is persisted into a so-called savepoint. This savepoint is exposed as a snapshot by Kinesis Data Analytics for Java Applications. Subsequently, the running instance of the application is terminated, and a new instance of the same application with more resources and a higher parallelism is created. The new instance of the application then populates its internal state from the snapshot and resumes the processing from where the now terminated instance left off.

Accordingly, the scaling operation causes a brief interruption of the processing, which explains the spike in metric. However, this operation is transparent to the producers and consumers. Producers can continue to write to the Kinesis data stream because they are nicely decoupled from the application. Likewise, consumers can still use Kibana to view their dashboards, although they might not see the latest data because it hasn’t yet been processed.

Let’s step back for a moment and review what you just did: You created a fully managed, highly available, scalable streaming architecture. You ingested and analyzed up to 25k events per second. You doubled the throughput of the architecture by scaling the Kinesis data stream and the Kinesis Data Analytics for Java application with a couple of clicks. You did all this while the architecture remained fully functional and kept receiving and processing events, not losing a single event. You also could have scaled the Elasticsearch cluster as seamlessly as the other components. But we’ll leave that as an exercise for the interested reader.

Try to imagine what it would have taken you to build something similar from scratch.

Prepare Flink applications for Kinesis Data Analytics for Java Applications

Now that you have seen the streaming application in action, let’s look at what is required to deploy and run a Flink application with Kinesis Data Analytics for Java Applications.

Similar to other deployment methods, the Flink application is first built and packaged into a fat JAR, which contains all the necessary dependencies for the application to run. The resulting fat JAR is then uploaded to Amazon S3. The location of the fat JAR on S3 and some additional configuration parameters are then used to create an application that can be executed by Kinesis Data Analytics for Java Applications. So instead of logging in to a cluster and directly submitting a job to the Flink runtime, you upload the respective fat JAR to S3. You then create a Kinesis Data Analytics for Java application that you can interact with using API calls, the console, and the AWS CLI, respectively.

Adapt the Flink configuration and runtime parameters

To obtain a valid Kinesis Data Analytics for Java application, the fat JAR of the Flink application must include certain dependencies. When you use Apache Maven to build your Flink application, you can simply add another dependency to the .pom file of your project.

<!—pom.xml ->
<project>
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-kinesisanalytics-runtime</artifactId>
            <version>1.0.1</version>
        </dependency>
    </dependencies>
    ...
</project>

You can then specify parameters that are passed to the resulting Kinesis Data Analytics for Java application when it is created or updated. These parameters are basically key-value pairs that are contained in a property map that is part of a property group.

"ApplicationConfiguration": {
    "EnvironmentProperties": {
        "PropertyGroups": [
            {
                "PropertyGroupId": "FlinkApplicationProperties",
                "PropertyMap": {
                    "InputStreamName": "...",
                    ...
                }
            }
        ]
    },
    ...
}

You can then obtain the values of these parameters in the application code from the Kinesis Data Analytics for Java Applications runtime. For example, the following code snippet gets the name of the Kinesis data stream that the application should connect to from the FlinkApplicationProperties property group.

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

String kinesisStreamName = flinkProperties.getProperty("InputStreamName");

You use the same mechanism to configure other properties for the Kinesis Data Analytics for Java application (for example, checkpointing and the parallelism of the application) that are usually specified as a parameter or configuration option directly to the Flink runtime.

"ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
        "CheckpointConfiguration": {
            "ConfigurationType": "DEFAULT"
        },
        "MonitoringConfiguration": {
            "ConfigurationType": "CUSTOM",
            "MetricsLevel": "TASK",
            "LogLevel": "INFO"
        },
        "ParallelismConfiguration": {
            "ConfigurationType": "DEFAULT"
        }
    },
    ...
}

With this configuration, the checkpointing and parallelism settings are left at their default. This enables checkpointing and auto scaling and sets the initial parallelism of the Kinesis Data Analytics for Java application to one. Moreover, the log level is increased to INFO and CloudWatch metrics are collected for every subtask of the application.

Build the Flink Kinesis Connector

When you are building a Flink application that reads data from a Kinesis data stream, you might notice that the Flink Kinesis Connector is not available from Maven central. You actually need to build it yourself. The following steps build the connector for any recent Apache Flink release. However, because Kinesis Data Analytics for Java Applications is based on Flink 1.6.2, you can use this specific version for now.

$ wget -qO- https://github.com/apache/flink/archive/release-1.6.2.zip | bsdtar -xf-

$ cd flink-release-1.6.2

$ mvn clean package -B -DskipTests -Dfast -Pinclude-kinesis -pl flink-connectors/flink-connector-kinesis

Note that the connector has already been built and stored on S3 by the AWS CloudFormation template. You can simply download the JAR file of the connector from there and put it in your local Maven repository using the following Maven command:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.6.2.jar -DpomFile flink-connector-kinesis_2.11-1.6.2.pom.xml

Integrate the Flink Elasticsearch sink with Amazon Elasticsearch Service

Beginning with the 1.6 release, Apache Flink comes with an Elasticsearch connector that supports the Elasticsearch APIs over HTTP. Therefore, it can natively talk to the endpoints that are provided by Amazon Elasticsearch Service.

You just need to decide how to authenticate requests against the public endpoint of the Elasticsearch cluster. You can whitelist individual IPs for access to the cluster. However, the recommended way of authenticating against the Amazon Elasticsearch Service endpoint is to add authentication information to AWS requests using IAM credentials and the Signature Version 4 signing process.

To extend the Flink Elasticsearch connector, which is not aware of the AWS specific signing process, you can use the open-source aws-signing-request-interceptor, which is available from Maven central. You just need to add an interceptor to the Elasticsearch sink that is called just before the request is sent to the Amazon Elasticsearch Service endpoint. The interceptor can then sign the request using the permission of the role that has been configured for the Kinesis Data Analytics for Java application.

final List<HttpHost> httpHosts = Arrays.asList(HttpHost.create("https://...")));

ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<T>() {
      ...
    }
);

final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, "eu-west-1", "es", clock);

esSinkBuilder.setRestClientFactory(
    restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(
        callback -> callback.addInterceptorLast(new AWSSigningRequestInterceptor(awsSigner))
    )
);

esSinkBuilder.build();

Note that the actual code in the GitHub repository is a bit more sophisticated because you need to obtain a serializable request interceptor. But the basic approach to sign requests remains the same.

Monitor and debug the Flink application

When running a Kinesis Data Analytics for Java application, you don’t get direct access to the cluster that runs Flink. This is because the underlying infrastructure is completely managed by the service. You merely interact with the service through an API. However, you can still obtain metrics and logging information through CloudWatch and CloudWatch Logs, respectively.

The Kinesis Data Analytics for Java application exposes a lot of operational metrics, ranging from metrics for the entire application down to metrics for individual processes of operators of the application. You can control which level of detail is adequate or required for your purposes. In fact, the metrics used in the previous section were all obtained through CloudWatch.

In addition to operational metrics, you can configure the Kinesis Data Analytics for Java application to write messages to CloudWatch Logs. This capability seamlessly integrates with common logging frameworks, such as Apache Log4j and the Simple Logging Facade for Java (SLF4J). So it is useful for debugging and identifying the cause of operational issues.

To enable logging for your Kinesis Data Analytics for Java application, just specify an existing CloudWatch log stream as a logging option when you start the application, as follows:

final Logger LOG = LoggerFactory.getLogger(...);

LOG.info("Starting to consume events from stream {}", flinkProperties.getProperty("InputStreamName"));

After the log messages are persisted into CloudWatch Logs, you can easily query and analyze them through CloudWatch Logs Insights

Conclusion

In this post, you not only built a reliable, scalable, and highly available streaming application based on Apache Flink and Kinesis Data Analytics for Java Applications. You also scaled the different components while ingesting and analyzing up to 25k events per second in near-real time. In large parts, this scenario was enabled by using managed services, so you didn’t need to spend time on provisioning and configuring the underlying infrastructure.

The sources of the application and the AWS CloudFormation template used in this post are available from GitHub for your reference. You can dive into all the details of the Flink application and the configuration of the underlying services. I’m curious to see what you will build when you can focus on analyzing data in a streaming fashion rather than spending time on managing and operating infrastructure.

 


About the Author

Steffen Hausmann is a specialist solutions architect with AWS.

 

 

 

 

Improve clinical trial outcomes by using AWS technologies

Post Syndicated from Mayank Thakkar original https://aws.amazon.com/blogs/big-data/improve-clinical-trial-outcomes-by-using-aws-technologies/

We are living in a golden age of innovation, where personalized medicine is making it possible to cure diseases that we never thought curable. Digital medicine is helping people with diseases get healthier, and we are constantly discovering how to use the body’s immune system to target and eradicate cancer cells. According to a report published by ClinicalTrials.gov, the number of registered studies hit 293,000 in 2018, representing a 250x growth since 2000.

However, an internal rate of return (IRR) analysis by Endpoints News, using data from EvaluatePharma, highlights some interesting trends. A flourishing trend in pharma innovation is supported by strong growth in registered studies. However, the IRR shows a rapidly declining trend, from around 17 percent in 2000 to below the cost of capital in 2017 and projected to go to 0 percent by 2020.

This blog post is the first installment in a series that focuses on the end-to-end workflow of collecting, storing, processing, visualizing, and acting on clinical trials data in a compliant and secure manner. The series also discusses the application of artificial intelligence and machine learning technologies to the world of clinical trials. In this post, we highlight common architectural patterns that AWS customers use to modernize their clinical trials. These incorporate mobile technologies for better evidence generation, cost reduction, increasing quality, improving access, and making medicine more personalized for patients.

Improving the outcomes of clinical trials and reducing costs

Biotech and pharma organizations are feeling the pressure to use resources as efficiently as possible. This pressure forces them to search for any opportunity to streamline processes, get faster, and stay more secure, all while decreasing costs. More and more life sciences companies are targeting biologics, CAR-T, and precision medicine therapeutics, with focus shifting towards smaller, geographically distributed patient segments. This shift has resulted in an increasing mandate to capture data from previously unavailable, nontraditional sources. These sources include mobile devices, IoT devices, and in-home and clinical devices. Life sciences companies merge data from these sources with data from traditional clinical trials to build robust evidence around the safety and efficacy of a drug.

Early last year, the Clinical Trials Transformation Initiative (CTTI) provided recommendations about using mobile technologies for capturing holistic, high quality, attributable, real-world data from patients and for submission to the U.S. Food and Drug Administration (FDA). By using mobile technologies, life sciences companies can reduce barriers to trial participation and lower costs associated with conducting clinical trials. Global regulatory groups such as the FDA, Health Canada, and Medicines and Healthcare products Regulatory Agency (MHRA), among others, are also in favor of using mobile technologies. Mobile technologies can make patient recruitment more efficient, reach endpoints faster, and reduce the cost and time required to conduct clinical trials.

Improvised data ingestion using mobile technologies can speed up outcomes, reduce costs, and improve the accuracy of clinical trials. This is especially true when mobile data ingestion is supplemented with artificial intelligence and machine learning (AI/ML) technologies.

Together, they can usher in a new age of smart clinical trials.

At the same time, traditional clinical trial processes and technology designed for mass-marketed blockbuster drugs can’t effectively meet emerging industry needs. This leaves life sciences and pharmaceutical companies in need of assistance for evolving their clinical trial operations. These circumstances result in making clinical trials one of the largest areas of investment for bringing a new drug to market.

Using mobile technologies with traditional technologies in clinical trials can improve the outcomes of the trials and simultaneously reduce costs. Some of the use cases that the integration of various technologies enables include these:

  • Identifying and tracking participants in clinical trials
    • Identifying participants for clinical trials recruitment
    • Educating and informing patients participating in clinical trials
    • Implementing standardized protocols and sharing associated information to trial participants
    • Tracking adverse events and safety profiles
  • Integrating genomic and phenotypic data for identifying novel biomarkers
  • Integrating mobile data into clinical trials for better clinical trial management
  • Creation of a patient-control arm based on historical data
  • Stratifying cohorts based on treatment, claims, and registry datasets
  • Building a collaborative, interoperable network for data sharing and knowledge creation
  • Building compliance-ready infrastructure for clinical trial management

The AWS Cloud provides HIPAA eligible services and solutions. As an AWS customer, you can use these to build solutions for global implementation of mobile devices and sensors in trials, secure capture of streaming Internet of Things (IoT) data, and advanced analytics through visualization tools or AI/ML capabilities. Some of the use cases these services and solutions enable are finding and recruiting patients using smart analytics, facilitating global data management, and remote or in-patient site monitoring. Others include predicting lack of adherence, detecting adverse events, and accelerating trial outcomes along with optimizing trial costs.

Clinical Trials 2.0 (CT2.0) at AWS is geared toward facilitating wider adoption of cloud-native services to enable data ingestion from disparate sources, cost-optimized and reliable storage, and holistic analytics. At the same time, CT2.0 provides the granular access control, end-to-end security, and global scalability needed to conduct clinical trials more efficiently.

Reference architecture

One of the typical architectures for managing a clinical trial using mobile technologies is shown following. This architecture focuses on capturing real-time data from mobile sources and providing a way to process it.

* – Additional considerations such as data security, access control, and compliance need to be incorporated into the architecture and are discussed in the remainder of this post.

Managing a trial by using this architecture consists of the following five major steps.

Step 1: Collect data

Mobile devices, personal wearables, instruments, and smart-devices are extensively being used (or being considered) by global pharmaceutical companies in patient care and clinical trials to provide data for activity tracking, vital signs monitoring, and so on, in real-time. Devices like infusion pumps, personal use dialysis machines, and so on require tracking and alerting of device consumables and calibration status. Remote settings management is also a major use case for these kinds of devices. The end-user mobile devices used in the clinical trial emit a lot of telemetry data that requires real-time data capture, data cleansing, transformation, and analysis.

Typically, these devices are connected to an edge node or a smart phone. Such a connection provides sufficient computing resources to stream data to AWS IoT Core. AWS IoT Core can then be configured to write data to Amazon Kinesis Data Firehose in near real time. Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3. S3 provides online, flexible, cost efficient, pay-as-you-go storage that can replicate your data on three Availability Zones within an AWS Region. The edge node or smart phone can use the AWS IoT SDKs to publish and subscribe device data to AWS IoT Core with MQTT. This process uses the AWS method of authentication (called ‘SigV4’), X.509 certificate–based authentication, and customer-created token-based authentication (through custom authorizers). This authenticated approach enables you to map your choice of policies to each certificate and remotely control device or application access. You can also use the Kinesis Data Firehose encryption feature to enable server-side data encryption.

You can also capture additional data such as Case Report Forms (CRF), Electronic Medical Records (EMR), and medical images using Picture Archiving and Communication Systems (PACS). In addition, you can capture laboratory data (Labs) and other Patient Reported Outcomes data (ePRO). AWS provides multiple tools and services to effectively and securely connect to these data sources, enabling you to ingest data in various volumes, variety, and velocities. For more information about creating a HealthCare Data Hub and ingesting Digital Imaging and Communications in Medicine (DICOM) data, see the AWS Big Data Blog post Create a Healthcare Data Hub with AWS and Mirth Connect.

Step 2: Store data

After data is ingested from the devices and wearables used in the clinical trial, Kinesis Data Firehose is used to store the data on Amazon S3. This stored data serves as a raw copy and can later be used for historical analysis and pattern prediction. Using Amazon S3’s lifecycle policies, you can periodically move your data to reduced cost storage such as Amazon S3 Glacier for further optimizing their storage costs. Using Amazon S3 Intelligent Tiering can automatically optimize costs when data access patterns change, without performance impact or operational overhead by moving data between two access tiers—frequent access and infrequent access. You can also choose to encrypt data at rest and in motion using various encryption options available on S3.

Amazon S3 offers an extremely durable, highly available, and infinitely scalable data storage infrastructure, simplifying most data processing, backup, and replication tasks.

Step 3: Data processingfast lane

After collecting and storing a raw copy of the data, Amazon S3 is configured to publish events to AWS Lambda and invoke a Lambda function by passing the event data as a parameter. The Lambda function is used to extract the key performance indicators (KPIs) such as adverse event notifications, medication adherence, and treatment schedule management from the incoming data. You can use Lambda to process these KPIs and store them in Amazon DynamoDB, along with encryption at rest, which powers a near-real-time clinical trial status dashboard. This alerts clinical trial coordinators in real time so that appropriate interventions can take place.

In addition to this, using a data warehouse full of medical records, you can train and implement a machine learning model. This model can predict which patients are about to switch medications or might exhibit adherence challenges in the future. Such prediction can enable clinical trial coordinators to narrow in on those patients with mitigation strategies.

Step 4: Data processing—batch

For historical analysis and pattern prediction, the staged data (stored in S3) is processed in batches. A Lambda function is used to trigger the extract, transform, and load (ETL) process every time new data is added to the raw data S3 bucket. This Lambda function triggers an ETL process using AWS Glue, a fully managed ETL service that makes it easy for you to prepare and load your data for analytics. This approach helps in mining current and historical data to derive actionable insights, which is stored on Amazon S3.

From there, data is loaded on to Amazon Redshift, a cost-effective, petabyte-scale data warehouse offering from AWS. You can also use Amazon Redshift Spectrum to extend data warehousing out to exabytes without loading any data to Amazon Redshift, as detailed in the Big Data blog post Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required. This enables you to provide an all-encompassing picture of the entire clinical trial to your clinical trial coordinators, enabling you to react and respond faster.

In addition to this, you can train and implement a machine learning model to identify patients who might be at risk for adherence challenges. This enables clinical trial coordinators to reinforce patient education and support.

Step 5: Visualize and act on data

After the data is processed and ready to be consumed, you can use Amazon QuickSight, a cloud-native business intelligence service from AWS that offers native Amazon Redshift connectivity. Amazon QuickSight is serverless and so can be rolled out to your audiences in hours. You can also use a host of third-party reporting tools, which can use AWS-supplied JDBC or ODBC drivers or open-source PostgreSQL drivers to connect with Amazon Redshift. These tools include TIBCO Spotfire Analytics, Tableau Server, Qlik Sense Enterprise, Looker, and others. Real-time data processing (step 3 preceding) combines with historical-view batch processing (step 4). Together, they empower contract research organizations (CROs), study managers, trial coordinators, and other entities involved in the clinical trial journey to make effective and informed decisions at a speed and frequency that was previously unavailable. Using Amazon QuickSight’s unique Pay-per-Session pricing model, you can optimize costs for your bursty usage models by paying only when users access the dashboards.

Using Amazon Simple Notification Service (Amazon SNS), real-time feedback based on incoming data and telemetry is sent to patients by using text messages, mobile push, and emails. In addition, study managers and coordinators can send Amazon SNS notifications to patients. Amazon SNS provides a fully managed pub/sub messaging for micro services, distributed systems, and serverless applications. It’s designed for high-throughput, push-based, many-to-many messaging. Alerts and notifications can be based on current data or a combination of current and historical data.

To encrypt messages published to Amazon SNS, you can follow the steps listed in the post Encrypting messages published to Amazon SNS with AWS KMS, on the AWS Compute Blog.   

Data security, data privacy, data integrity, and compliance considerations

At AWS, customer trust is our top priority. We deliver services to millions of active customers, including enterprises, educational institutions, and government agencies in over 190 countries. Our customers include financial services providers, healthcare providers, and governmental agencies, who trust us with some of their most sensitive information.

To facilitate this, along with the services mentioned earlier, you should also use AWS Identity and Access Management (IAM) service. IAM enables you to maintain segregation of access, fine-grained access control, and securing end user mobile and web applications. You can also use AWS Security Token Service (AWS STS) to provide secure, self-expiring, time-boxed, temporary security credentials to third-party administrators and service providers, greatly strengthening your security posture. You can use AWS CloudTrail to log IAM and STS API calls. Additionally, AWS IoT Device Management makes it easy to securely onboard, organize, monitor, and remotely manage IoT devices at scale.

With AWS, you can add an additional layer of security to your data at rest in the cloud. AWS provides scalable and efficient encryption features for services like Amazon EBS, Amazon S3, Amazon Redshift, Amazon SNSAWS Glue, and many more. Flexible key management options, including AWS Key Management Service, enable you to choose whether to have AWS manage the encryption keys or to keep complete control over their keys. In addition, AWS provides APIs for you to integrate encryption and data protection with any of the services that you develop or deploy in an AWS environment.

As a customer, you maintain ownership of your data, and select which AWS services can process, store, and host the content. Generally speaking, AWS doesn’t access or use customers’ content for any purpose without their consent. AWS never uses customer data to derive information for marketing or advertising.

When evaluating the security of a cloud solution, it’s important that you understand and distinguish between the security of the cloud and security in the cloud. The AWS Shared Responsibility Model details this relationship.

To assist you with your compliance efforts, AWS continues to add more services to the various compliance regulations, attestations, certifications, and programs across the world. To decide which services are suitable for you, see the services in scope page.

You can also use various services like, but not limited to, AWS CloudTrail, AWS Config, Amazon GuardDuty, and AWS Key Management Service (AWS KMS) to enhance your compliance and auditing efforts. Find more details in the AWS Compliance Solutions Guide.

Final thoughts

With the ever-growing interconnectivity and technological advances in the field of medical devices, mobile devices and sensors can improve numerous aspects of clinical trials. They can help in recruitment, informed consent, patient counseling, and patient communication management. They can also improve protocol and medication adherence, clinical endpoints measurement, and the process of alerting participants on adverse events. Smart sensors, smart mobile devices, and robust interconnecting systems can be central in conducting clinical trials.

Every biopharma organization conducting or sponsoring a clinical trial activity faces the conundrum of advancing their approach to trials while maintaining overall trial performance and data consistency. The AWS Cloud enables a new dimension for how data is collected, stored, and used for clinical trials. It thus addresses that conundrum as we march towards a new reality of how drugs are brought to market. The AWS Cloud abstracts away technical challenges such as scaling, security, and establishing a cost-efficient IT infrastructure. In doing so, it allows biopharma organizations to focus on their core mission of improving patent lives through the development of effective, groundbreaking treatments.

 


About the Author

Mayank Thakkar – Global Solutions Architect, AWS HealthCare and Life Sciences

 

 

 

 

Deven Atnoor, Ph.D. – Industry Specialist, AWS HealthCare and Life Sciences

 

 

 

 

Best practices for successfully managing memory for Apache Spark applications on Amazon EMR

Post Syndicated from Karunanithi Shanmugam original https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

In the world of big data, a common use case is performing extract, transform (ET) and data analytics on huge amounts of data from a variety of data sources. Often, you then analyze the data to get insights. One of the most popular cloud-based solutions to process such vast amounts of data is Amazon EMR.

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS. Amazon EMR enables organizations to spin up a cluster with multiple instances in a matter of few minutes. It also enables you to process various data engineering and business intelligence workloads through parallel processing. By doing this, to a great extent you can reduce the data processing times, effort, and costs involved in establishing and scaling a cluster.

Apache Spark is a cluster-computing software framework that is open-source, fast, and general-purpose. It is widely used in distributed processing of big data. Apache Spark relies heavily on cluster memory (RAM) as it performs parallel computing in memory across nodes to reduce the I/O and execution times of tasks.

Generally, you perform the following steps when running a Spark application on Amazon EMR:

  1. Upload the Spark application package to Amazon S3.
  2. Configure and launch the Amazon EMR cluster with configured Apache Spark.
  3. Install the application package from Amazon S3 onto the cluster and then run the application.
  4. Terminate the cluster after the application is completed.

It’s important to configure the Spark application appropriately based on data and processing requirements for it to be successful. With default settings, Spark might not use all the available resources of the cluster and might end up with physical or virtual memory issues, or both. There are thousands of questions raised in stackoverflow.com related to this specific topic.

This blog post is intended to assist you by detailing best practices to prevent memory-related issues with Apache Spark on Amazon EMR.

Common memory issues in Spark applications with default or improper configurations

Listed following are a few sample out-of-memory errors that can occur in a Spark application with default or improper configurations.

Out of Memory Error, Java Heap Space

WARN TaskSetManager: Loss was due to 
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space

Out of Memory Error, Exceeding Physical Memory

Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
12.4 GB of 12.3 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
4.5GB of 3GB physical memory used limits.
Consider boosting spark.yarn.executor.memoryOverhead.

Out of Memory Error, Exceeding Virtual Memory

Container killed by YARN for exceeding memory limits.
1.1gb of 1.0gb virtual memory used. Killing container.

Out of Memory Error, Exceeding Executor Memory

Required executor memory (1024+384 MB) is above 
the max threshold (896 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb'
and/or 'yarn.nodemanager.resource.memory-mb

These issues occur for various reasons, some of which are listed following:

  1. When the number of Spark executor instances, the amount of executor memory, the number of cores, or parallelism is not set appropriately to handle large volumes of data.
  2. When the Spark executor’s physical memory exceeds the memory allocated by YARN. In this case, the total of Spark executor instance memory plus memory overhead is not enough to handle memory-intensive operations. Memory-intensive operations include caching, shuffling, and aggregating (using reduceByKey, groupBy, and so on). Or, in some cases, the total of Spark executor instance memory plus memory overhead can be more than what is defined in yarn.scheduler.maximum-allocation-mb.
  3. The memory required to perform system operations such as garbage collection is not available in the Spark executor instance.

In the following sections, I discuss how to properly configure to prevent out-of-memory issues, including but not limited to those preceding.

Configuring for a successful Spark application on Amazon EMR

The following steps can help you configure a successful Spark application on Amazon EMR.

1. Determine the type and number of instances based on application needs

Amazon EMR has three types of nodes:

  1. Master: An EMR cluster has one master, which acts as the resource manager and manages the cluster and tasks.
  2. Core: The core nodes are managed by the master node. Core nodes run YARN NodeManager daemons, Hadoop MapReduce tasks, and Spark executors to manage storage, execute tasks, and send a heartbeat to the master.
  3. Task: The optional task-only nodes perform tasks and don’t store any data, in contrast to core nodes.

Best practice 1: Choose the right type of instance for each of the node types in an Amazon EMR cluster. Doing this is one key to success in running any Spark application on Amazon EMR.

There are numerous instance types offered by AWS with varying ranges of vCPUs, storage, and memory, as described in the Amazon EMR documentation. Based on whether an application is compute-intensive or memory-intensive, you can choose the right instance type with the right compute and memory configuration.

For memory-intensive applications, prefer R type instances over the other instance types. For compute-intensive applications, prefer C type instances. For applications balanced between memory and compute, prefer M type general-purpose instances.

To understand the possible use cases for each instance type offered by AWS, see Amazon EC2 Instance Types on the EC2 service website.

After deciding the instance type, determine the number of instances for each of the node types. You do this based on the size of the input datasets, application execution times, and frequency requirements.

2. Determine the Spark configuration parameters

Before we dive into the details on Spark configuration, let’s get an overview of how the executor container memory is organized using the diagram following.

As the preceding diagram shows, the executor container has multiple memory compartments. Of these, only one (execution memory) is actually used for executing the tasks. These compartments should be properly configured for running the tasks efficiently and without failure.

Calculate and set the following Spark configuration parameters carefully for the Spark application to run successfully:

  • spark.executor.memory – Size of memory to use for each executor that runs the task.
  • spark.executor.cores – Number of virtual cores.
  • spark.driver.memory – Size of memory to use for the driver.
  • spark.driver.cores – Number of virtual cores to use for the driver.
  • spark.executor.instances ­– Number of executors. Set this parameter unless spark.dynamicAllocation.enabled is set to true.
  • spark.default.parallelism – Default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join, reduceByKey, and parallelize when no partition number is set by the user.

Amazon EMR provides high-level information on how it sets the default values for Spark parameters in the release guide. These values are automatically set in the spark-defaults settings based on the core and task instance types in the cluster.

To use all the resources available in a cluster, set the maximizeResourceAllocation parameter to true. This EMR-specific option calculates the maximum compute and memory resources available for an executor on an instance in the core instance group. It then sets these parameters in the spark-defaults settings. Even with this setting, generally the default numbers are low and the application doesn’t use the full strength of the cluster. For example, the default for spark.default.parallelism is only 2 x the number of virtual cores available, though parallelism can be higher for a large cluster.

Spark on YARN can dynamically scale the number of executors used for a Spark application based on the workloads. Using Amazon EMR release version 4.4.0 and later, dynamic allocation is enabled by default (as described in the Spark documentation).

The problem with the spark.dynamicAllocation.enabled property is that it requires you to set subproperties. Some example subproperties are spark.dynamicAllocation.initialExecutors, minExecutors, and maxExecutors. Subproperties are required for most cases to use the right number of executors in a cluster for an application, especially when you need multiple applications to run simultaneously. Setting subproperties requires a lot of trial and error to get the numbers right. If they’re not right, the capacity might be reserved but never actually used. This leads to wastage of resources or memory errors for other applications.

Best practice 2: Set spark.dynamicAllocation.enabled to true only if the numbers are properly determined for spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutors parameters. Otherwise, set spark.dynamicAllocation.enabled to false and control the driver memory, executor memory, and CPU parameters yourself. To do this, calculate and set these properties manually for each application (see the example following).

Let’s assume that we are going to process 200 terabytes of data spread across thousands of file stores in Amazon S3. Further, let’s assume that we do this through an Amazon EMR cluster with 1 r5.12xlarge master node and 19 r5.12xlarge core nodes. Each r5.12xlarge instance has 48 virtual cores (vCPUs) and 384 GB RAM. All these calculations are for the --deploy-mode cluster, which we recommend for production use.

The following list describes how to set some important Spark properties, using the preceding case as an example.

spark.executor.cores

Assigning executors with a large number of virtual cores leads to a low number of executors and reduced parallelism. Assigning a low number of virtual cores leads to a high number of executors, causing a larger amount of I/O operations. Based on historical data, we suggest that you have five virtual cores for each executor to achieve optimal results in any sized cluster.

For the preceding cluster, the property spark.executor.cores should be assigned as follows: spark.executors.cores = 5 (vCPU)

spark.executor.memory

After you decide on the number of virtual cores per executor, calculating this property is much simpler. First, get the number of executors per instance using total number of virtual cores and executor virtual cores. Subtract one virtual core from the total number of virtual cores to reserve it for the Hadoop daemons.

Number of executors per instance = (total number of virtual cores per instance - 1)/ spark.executors.cores

Number of executors per instance = (48 - 1)/ 5 = 47 / 5 = 9 (rounded down)

Then, get the total executor memory by using the total RAM per instance and number of executors per instance. Leave 1 GB for the Hadoop daemons.

Total executor memory = total RAM per instance / number of executors per instance
Total executor memory = 383 / 9 = 42 (rounded down)

This total executor memory includes the executor memory and overhead (spark.yarn.executor.memoryOverhead). Assign 10 percent from this total executor memory to the memory overhead and the remaining 90 percent to executor memory.

spark.executors.memory = total executor memory * 0.90
spark.executors.memory = 42 * 0.9 = 37 (rounded down)

spark.yarn.executor.memoryOverhead = total executor memory * 0.10
spark.yarn.executor.memoryOverhead = 42 * 0.1 = 5 (rounded up)

spark.driver.memory

We recommend setting this to equal spark.executors.memory.

spark.driver.memory = spark.executors.memory

spark.driver.cores

We recommend setting this to equal spark.executors.cores.

spark.driver.cores= spark.executors.cores.

spark.executor.instances

Calculate this by multiplying the number of executors and total number of instances. Leave one executor for the driver.

spark.executor.instances = (number of executors per instance * number of core instances) minus 1 for the driver

spark.executor.instances = (9 * 19) - 1 = 170

spark.default.parallelism

Set this property using the following formula.

spark.default.parallelism = spark.executor.instances * spark.executors.cores * 2

spark.default.parallelism = 170 * 5 * 2 = 1,700

Warning: Although this calculation gives partitions of 1,700, we recommend that you estimate the size of each partition and adjust this number accordingly by using coalesce or repartition.

In case of dataframes, configure the parameter spark.sql.shuffle.partitions along with spark.default.parallelism.

Though the preceding parameters are critical for any Spark application, the following parameters also help in running the applications smoothly to avoid other timeout and memory-related errors. We advise that you set these in the spark-defaults configuration file.

  • spark.network.timeout – Timeout for all network transactions.
  • spark.executor.heartbeatInterval – Interval between each executor’s heartbeats to the driver. This value should be significantly less than spark.network.timeout.
  • spark.memory.fraction – Fraction of JVM heap space used for Spark execution and storage. The lower this is, the more frequently spills and cached data eviction occur.
  • spark.memory.storageFraction – Expressed as a fraction of the size of the region set aside by spark.memory.fraction. The higher this is, the less working memory might be available to execution. This means that tasks might spill to disk more often.
  • spark.yarn.scheduler.reporterThread.maxFailures – Maximum number executor failures allowed before YARN can fail the application.
  • spark.rdd.compress – When set to true, this property can save substantial space at the cost of some extra CPU time by compressing the RDDs.
  • spark.shuffle.compress – When set to true, this property compresses the map output to save space.
  • spark.shuffle.spill.compress – When set to true, this property compresses the data spilled during shuffles.
  • spark.sql.shuffle.partitions – Sets the number of partitions for joins and aggregations.
  • spark.serializer – Sets the serializer to serialize or deserialize data. As a serializer, I prefer Kyro (org.apache.spark.serializer.KryoSerializer), which is faster and more compact than the Java default serializer.

To understand more about each of the parameters mentioned preceding, see the Spark documentation.

We recommend you consider these additional programming techniques for efficient Spark processing:

  • coalesce – Reduces the number of partitions to allow for less data movement.
  • repartition – Reduces or increases the number of partitions and performs full shuffle of data as opposed to coalesce.
  • partitionBy – Distributes data horizontally across partitions.
  • bucketBy – Decomposes data into more manageable parts (buckets) based on hashed columns.
  • cache/persist – Pulls datasets into a clusterwide in-memory cache. Doing this is useful when data is accessed repeatedly, such as when querying a small lookup dataset or when running an iterative algorithm.

Best practice 3: Carefully calculate the preceding additional properties based on application requirements. Set these properties appropriately in spark-defaults, when submitting a Spark application (spark-submit), or within a SparkConf object.

3. Implement a proper garbage collector to clear memory effectively

Garbage collection can lead to out-of-memory errors in certain cases. These include cases when there are multiple large RDDs in the application. Other cases occur when there is an interference between the task execution memory and RDD cached memory.

You can use multiple garbage collectors to evict the old objects and place the new ones into the memory. However, the latest Garbage First Garbage Collector (G1GC) overcomes the latency and throughput limitations with the old garbage collectors.

Best practice 4: Always set up a garbage collector when handling large volume of data through Spark.

The parameter -XX:+UseG1GC specifies that the G1GC garbage collector should be used. (The default is -XX:+UseParallelGC.) To understand the frequency and execution time of the garbage collection, use the parameters -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps. To initiate garbage collection sooner, set InitiatingHeapOccupancyPercent to 35 (the default is 0.45). Doing this helps avoid potential garbage collection for the total memory, which can take a significant amount of time. An example follows.

"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",

4. Set the YARN configuration parameters

Even if all the Spark configuration properties are calculated and set correctly, virtual out-of-memory errors can still occur rarely as virtual memory is bumped up aggressively by the OS. To prevent these application failures, set the following flags in the YARN site settings.

Best practice 5: Always set the virtual and physical memory check flag to false.

"yarn.nodemanager.vmem-check-enabled":"false",
"yarn.nodemanager.pmem-check-enabled":"false"

5. Perform debugging and monitoring

To get details on where the spark configuration options are coming from, you can run spark-submit with the –verbose option. Also, you can use Ganglia and Spark UI to monitor the application progress, Cluster RAM usage, Network I/O, etc.

In the following example, we compare the outcomes between configured and non-configured Spark applications using Ganglia graphs.

When configured following the methods described, a Spark application can process 10 TB data successfully without any memory issues on an Amazon EMR cluster whose specs are as follows:

  • 1 r5.12xlarge master node
  • 19 r5.12xlarge core nodes
  • 8 TB total RAM
  • 960 total virtual CPUs
  • 170 executor instances
  • 5 virtual CPUs/executor
  • 37 GB memory/executor
  • Parallelism equals 1,700

Following, you can find Ganglia graphs for reference.

If you run the same Spark application with default configurations on the same cluster, it fails with an out-of-physical-memory error. This is because the default configurations (two executor instances, parallelism of 2, one vCPU/executor, 8-GB memory/executor) aren’t enough to process 10 TB data. Though the cluster had 7.8 TB memory, the default configurations limited the application to use only 16 GB memory, leading to the following out-of-memory error.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.5 GB of 8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Also, for large datasets, the default garbage collectors don’t clear the memory efficiently enough for the tasks to run in parallel, causing frequent failures. The following charts help in comparing the RAM usage and garbage collection with the default and G1GC garbage collectors.With G1GC, the RAM used is maintained below 5 TB (see the blue area in the graph).

With the default garbage collector (CMS), the RAM used goes above 5 TB. This can lead to the failure of the Spark job when running many tasks continuously.

Example: EMR instance template with configuration

There are different ways to set the Spark and YARN configuration parameters. One of ways is to pass these when creating the EMR cluster.

To do this, in the Amazon EMR console’s Edit software settings section, you can enter the appropriately updated configuration template (Enter configuration). Or the configuration can be passed from S3 (Load JSON from S3).

Following is a configuration template with sample values. At a minimum, calculate and set the following parameters for a successful Spark application.

{
      "InstanceGroups":[
         {
            "Name":"AmazonEMRMaster",
            "Market":"ON_DEMAND",
            "InstanceRole":"MASTER",
            "InstanceType":"r5.12xlarge",
            "InstanceCount":1,
            "Configurations":[
               {
                 "Classification": "yarn-site",
                 "Properties": {
                   "yarn.nodemanager.vmem-check-enabled": "false",
                   "yarn.nodemanager.pmem-check-enabled": "false"
                 }
               },
               {
                 "Classification": "spark",
                 "Properties": {
                   "maximizeResourceAllocation": "false"
                 }
               },
               {
                 "Classification": "spark-defaults",
                 "Properties": {
                   "spark.network.timeout": "800s",
                   "spark.executor.heartbeatInterval": "60s",
                   "spark.dynamicAllocation.enabled": "false",
                   "spark.driver.memory": "21000M",
                   "spark.executor.memory": "21000M",
                   "spark.executor.cores": "5",
                   "spark.executor.instances": "171",
                   "spark.yarn.executor.memoryOverhead": "21000M",
                   "spark.yarn.driver.memoryOverhead": "21000M",
                   "spark.memory.fraction": "0.80",
                   "spark.memory.storageFraction": "0.30",
                   "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.yarn.scheduler.reporterThread.maxFailures": "5",
                   "spark.storage.level": "MEMORY_AND_DISK_SER",
                   "spark.rdd.compress": "true",
                   "spark.shuffle.compress": "true",
                   "spark.shuffle.spill.compress": "true",
                   "spark.default.parallelism": "3400"
                 }
               },
               {
                 "Classification": "mapred-site",
                 "Properties": {
                   "mapreduce.map.output.compress": "true"
                 }
               },
               {
                 "Classification": "hadoop-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Configurations": [],
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                   }
                 }],
                 "Properties": {}
               },
               {
                 "Classification": "spark-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                   }
                 }],
                 "Properties": {}
               }
            ]
        },
        {
            "Name":"AmazonEMRCore",
            "Market":"ON_DEMAND",
             "InstanceRole":"CORE",
             "InstanceType":"r5.12xlarge",
             "InstanceCount":19,
             "Configurations":[        
        ..............
        ..............
        ..............
        }
      ],
      "Ec2KeyName":"KEY_NAME"
  } 

Conclusion

In this blog post, I detailed the possible out-of-memory errors, their causes, and a list of best practices to prevent these errors when submitting a Spark application on Amazon EMR.

My colleagues and I formed these best practices after thorough research and understanding of various Spark configuration properties and testing multiple Spark applications. These best practices apply to most of out-of-memory scenarios, though there might be some rare scenarios where they don’t apply. However, we believe that this blog post provides all the details needed so you can tweak parameters and successfully run a Spark application.

 


About the Author

Karunanithi Shanmugam is a data engineer with AWS Tech and Finance.

 

 

 

 

Federate Amazon Redshift access with Okta as an identity provider

Post Syndicated from Rajiv Gupta original https://aws.amazon.com/blogs/big-data/federate-amazon-redshift-access-with-okta-as-an-identity-provider/

Managing database users and access can be a daunting and error-prone task. In the past, database administrators had to determine which groups a user belongs to and which objects a user/group is authorized to use. These lists were maintained within the database and could easily get disjointed from the corporate directory.

With federation, you can manage users and groups within the enterprise identity provider (IdP) and pass them to Amazon Redshift at login. In a previous post, Federate Database User Authentication Easily with IAM and Amazon Redshift, I discussed the internals of the federation workflow using Active Directory Federation Service (AD FS) as our identity provider.

In this post, I focus on Okta as the identity provider. I provide step-by-step guidance showing how you can set up a trial Okta.com account, build users and groups within your organization’s directory, and enable single sign-on (SSO) into Amazon Redshift. You can do all of this while also maintaining group-level access controls within your data warehouse.

The steps in this post are structured into the following sections:

  • Identity provider (Okta) configuration – You set up Okta, which contains your users organized into logical groups.
  • AWS configuration – You set up a role that establishes a trust relationship between your identity provider and AWS and a role that Okta uses to access Amazon Redshift.
  • Identity provider (Okta) advanced configuration – You finalize the Okta configuration by inputting the roles that you just created. You also inform Okta about which groups are allowed to be passed to Amazon Redshift.
  • Amazon Redshift server/client setup – You set up groups within the Amazon Redshift database to match the Okta groups. You also authorize these groups to access certain schemas and tables. Finally, you set up your client tools to use your enterprise credentials and sign in to Amazon Redshift.

Identity provider (Okta) configuration

In this first step, you set up Okta, add users, and organize them into logical groups. You then add the Amazon Web Services Redshift Okta application.

Step 1: Create an Okta account

If you don’t already have access to an Okta account, you can start a 30-day free trial: https://www.okta.com/free-trial/.

Step 2: Set up your Okta directory

Sign in to Okta.com using the following URL, where <prefix> is specific to your account and was created at account setup:

https://<prefix>-admin.okta.com/admin/dashboard

Navigate to the Directory page to add people and groups into Okta that match your organization. Be sure to use lowercase group names because Amazon Redshift expects the group names to be lowercase.

In the following example, I added three users and two groups, where one of the users (Jorge) belongs to both the “sales” and “marketing” groups.

First, choose Admin in the upper-right corner.

To add users, choose Add Person. The following example shows the users that were created.

To add groups into Okta, choose Add Group. The following example shows three groups.

Step 3: Add the “Amazon Web Services Redshift” Okta application

Navigate to the Applications page. Choose Add Application, and search for the Amazon Web Services Redshift application. Proceed with the default settings.

Step 4: Download the Okta application metadata

Make sure that you have navigated to the Amazon Web Services Redshift application’s settings page, which appears as follows.

Choose Sign On, and then choose the Identity Provider metadata link to download the metadata file in xml format (for example, metadata.xml).

AWS configuration

Next, you set up a role that establishes a trust relationship between the identity provider and AWS. You also create a role that Okta uses to access Amazon Redshift.

Step 5: Create the SAML IAM identity provider

Switching to AWS Management Console, sign in using your AWS credentials. Then open the AWS Identity and Access Management (IAM) console.

On the IAM console, choose Identity providers, and then choose Create Provider, as shown following.

Provide a name for your IdP, and upload the metadata file that you downloaded in the previous step.

Step 6: Create the IAM SAML 2.0 federation role

On the IAM console, navigate to Roles and create a new SAML 2.0 federation role.  Reference the IdP that you created in the previous step, and choose Allow programmatic and AWS Management Console access.

Step 7: Add other permissions to query Amazon Redshift

Choose Next: Assign Permissions. Then choose Create policy.

Create the following custom policy, replacing the region, account, and cluster parameters. These permissions allow the role to use Amazon Redshift to query data, create users, and allow users to join groups.

{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
           "Action": [
                "redshift:CreateClusterUser",
                "redshift:JoinGroup",
                "redshift:GetClusterCredentials",
                "redshift:ListSchemas",
                "redshift:ListTables",
                "redshift:ListDatabases",
                "redshift:ExecuteQuery",
                "redshift:FetchResults",
                "redshift:CancelQuery",
                "redshift:DescribeClusters",
                "redshift:DescribeQuery",
                "redshift:DescribeTable"],
           "Resource": "arn:aws:redshift:<region>:<account>:cluster:<cluster>"}]
}

There are a few important things to note:

  • The group membership lasts only for the duration of the user session.
  • There is no CreateGroup permission because groups need to be manually created and granted DB privileges.

The following image shows the summary page for the role.

Identity provider (Okta) advanced configuration

In this section, you finalize the Okta configuration by adding the roles that you just created. You also tell Okta which groups are allowed to be passed to Amazon Redshift.

Step 8: Configure the advanced sign-on settings

Switch back to Okta.com. Navigate to the settings page for the Amazon Web Services Redshift application. In the Sign-On section, scroll to Advanced Sign-On Settings.

Enter the previously created IdP and role ARNS, which are globally unique and ensure that Okta will be directed to your AWS account. Allowed DB Groups is a list of allowed groups that will be sent to Amazon Redshift in the DBGroup SAML assertion.

Don’t use the asterisk (*) wildcard. This will cause the Everyone group to be passed, and Amazon Redshift will complain because it expects the group names to be lowercase.  Note that the ${user.username} is sent in the DBUser SAML assertion.

Step 9: Authorize users

Authorize users to use the Amazon Web Services Redshift application by selecting their respective groups or individual user accounts. In this example, I authorized users by group.

Amazon Redshift server/client setup

Next, you set up groups in the Amazon Redshift database to match the Okta groups. You also authorize these groups to access certain schemas and tables. Finally, you set up your client tools to use your enterprise credentials and sign in to Amazon Redshift.

Step 10: Set up groups

Log in to your Amazon Redshift cluster with an admin account. Create groups that match the IdP group names, and grant the appropriate permissions to tables and schemas.

CREATE GROUP sales;
CREATE GROUP marketing;
ALTER DEFAULT PRIVILEGES IN SCHEMA sales 
GRANT ALL on TABLES to GROUP sales; 
ALTER DEFAULT PRIVILEGES IN SCHEMA marketing 
GRANT ALL on TABLES to GROUP marketing;

Step 11: Configure the JDBC SQL client

Assuming that the Amazon Redshift JDBC driver is installed, set up a new connection to your cluster using your IdP credentials. In the following example, I am using SQLWorkbenchJ. For the URL, be sure to enter “iam” to instruct the driver to authenticate using IAM. For Username and Password, enter the values that you set in Okta.

Enter the extended properties as follows. For app_id and idp_host, refer to the URL for the application in your web browser:

https://<prefix>-admin.okta.com/admin/app/amazon_aws_redshift/instance/<app_id>

Step 12: Configure the ODBC SQL client

Assuming that the Amazon Redshift ODBC driver is installed, set up a new connection to your cluster using your IdP credentials. In the following example, I modified the ~/Library/ODBC/odbc.ini file.  See the previous instructions for determining the <app_id> and <prefix> values.

[ODBC Data Sources]
Redshift DSN=Installed

[Redshift DSN]
Driver=/opt/amazon/redshift/lib/libamazonredshiftodbc.dylib
Host=<endpoint>
Port=<port>
Database=<database>
locale=en-US
app_id=<app_id>
plugin_name=okta
idp_host=<prefix>.okta.com
iam=1

Step 13: Test user access

You should now be able to sign on with the users created. In our example, [email protected] has access to the tables in the “sales” schema only. The user [email protected] has access to tables in the “marketing” schema only. And [email protected] has access to tables in both schemas. Using the [email protected] user, you get following results when trying to query data from each of the schemas:

select storeid From sales.stores


storeid	
-------
1234
2345
3456
[…]


select * From marketing.campaign


An error occurred when executing the SQL command:
select * From marketing.campaign

[Amazon](500310) Invalid operation: permission denied for schema marketing;
1 statement failed.

Execution time: 0.16s

Summary

In this post, I provided a step-by-step guide for configuring and using Okta as your Identity Provider (IdP) to enable single sign-on to an Amazon Redshift cluster. I also showed how group membership within your IdP can be passed along, enabling you to manage user access to Amazon Redshift resources from within your IdP.

If you have questions or suggestions, please comment below.

 


About the Author

Rajiv Gupta is a data warehouse specialist solutions architect with Amazon Web Services.

 

 

 

 

Granting fine-grained access to the Amazon Redshift Management Console

Post Syndicated from Raj Jayaraman original https://aws.amazon.com/blogs/big-data/granting-fine-grained-access-to-the-amazon-redshift-management-console/

As a fully managed service, Amazon Redshift is designed to be easy to set up and use. In this blog post, we demonstrate how to grant access to users in an operations group to perform only specific actions in the Amazon Redshift Management Console. If you implement a custom IAM policy, you can set it up so these users can monitor and terminate running queries. At the same time, you can prevent these users from performing other more privileged operations such as modifying, restarting, or deleting an Amazon Redshift cluster.

An overview of Amazon Redshift access control

Since its release in February 2013, Amazon Redshift has quickly become a popular cloud-based data warehousing platform for thousands of customers worldwide.

Access to Amazon Redshift requires credentials that AWS can use to authenticate your requests. Those credentials must have permissions to access Amazon Redshift resources, such as an Amazon Redshift cluster or a snapshot. For more details on these credentials, see Authentication and Access Control for Amazon Redshift in the Amazon Redshift documentation.

Every AWS resource is owned by an AWS account, and permissions to create or access the resources are governed by AWS Identity and Access Management (IAM) policies. An AWS account administrator can attach permissions policies to IAM identities (users, groups, and roles). In particular, an AWS account administrator can attach an IAM permissions policy to a specific user. Such a policy grants permissions for that user to manage an Amazon Redshift resource, such as a snapshot or an event subscription.

When granting permissions, you can decide who gets the permissions and which Amazon Redshift resources they get permissions for. You can also decide on the specific actions that you want to allow on those resources. Policies attached to an IAM identity are referred to as identity-based IAM policies, and policies attached to a resource are referred to as resource-based policies. Amazon Redshift supports only identity-based IAM policies.

Use case: Setting limited access for a user

Consider the following use case. Suppose that an IAM user who is a member of a customer’s operations group needs to monitor and terminate queries running in an Amazon Redshift cluster. It’s best if they do so through the Amazon Redshift console. This user is not allowed to modify or delete any other Amazon Redshift resources.

To implement this use case, we need to implement a custom IAM policy that ensures this IAM user has read-only access to the Amazon Redshift console. Doing this means that the user can get descriptions of the available clusters and navigate to the Queries tab. Additionally, we want the IAM user to be able to cancel a running query through the Amazon Redshift console. To allow this, we use the redshift:CancelQuerySession IAM action. For descriptions of other allowed Amazon Redshift actions in an IAM policy and what each action means, see Actions Defined by Amazon Redshift in the Amazon Redshift documentation.

To create such a custom IAM policy, follow these instructions:

  1. Sign in to the AWS Management Console and open the IAM console at https://console.aws.amazon.com/iam/.
  2. In the navigation pane on the left, choose Policies.
  3. Choose Create policy.
  4. Choose the JSON tab and input the following policy:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "redshift:Describe*",
                "redshift:CancelQuerySession",
                "redshift:ViewQueriesInConsole",
                "ec2:DescribeAccountAttributes",
                "ec2:DescribeAddresses",
                "ec2:DescribeAvailabilityZones",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:DescribeInternetGateways",
                "sns:Get*",
                "sns:List*",
                "cloudwatch:Describe*",
                "cloudwatch:List*",
                "cloudwatch:Get*"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}

  1. On the Review policy page, type a value for Name and optionally for Description for the policy that you are creating. Review the policy Summary to see the permissions that are granted by your policy. Then choose Create policy to save your work.
  2. Attach this policy to an existing or a new IAM user.

With this permission policy, an IAM user can select an Amazon Redshift cluster, list all running queries in the Queries tab, and terminate a query if needed. All the permissions are read-only. Thus, the user can’t create a new Amazon Redshift cluster or modify or delete an existing cluster. However, the user can view available clusters, cluster snapshots, parameter groups, and cluster subnet groups, and view other properties of existing clusters.

Validating the use case

With the above IAM policy in place, after the IAM user logs into the Amazon Redshift Management Console, the user can select and view details about the Amazon Redshift cluster or clusters in the account. After navigating to the Queries tab, the user can see both the running and completed queries.

To cancel or terminate a long running query, the user can select the query from the list and choose Terminate Query. However, this user can’t modify or delete anything else in the Amazon Redshift console. As an example, if the user tries to modify an Amazon Redshift cluster (to change its endpoint), that user encounters the following error.

Conclusion

In this post, we have walked through a detailed customer use case of providing fine-grained access to the Amazon Redshift console. Using a set of carefully tailored IAM policies, a customer’s operations personnel can have read-only access to the Amazon Redshift console. These personnel can cancel or terminate running queries without the ability to modify, add, or delete any other Amazon Redshift resources.

We want to acknowledge our fellow AWS co-workers Ryan Mich, Sulay Shah and Hunter Grider for their many useful comments and suggestions.

If you have any questions or suggestions, leave your feedback in the comment section. If you need any further assistance to optimize your Amazon Redshift implementation, contact your AWS account team or a trusted AWS partner.

 


About the authors

Raj Jayaraman is a cloud support engineer with AWS Support at Amazon Web Services.

 

 

 

 

Po Hong, Ph.D. is a senior data architect within the Global Data & Analytics Specialty Practice at AWS Professional Services.

 

 

 

 

Build a modern analytics stack optimized for sharing and collaborating with Mode and Amazon Redshift

Post Syndicated from Benn Stancil original https://aws.amazon.com/blogs/big-data/build-a-modern-analytics-stack-optimized-for-sharing-and-collaborating-with-mode-and-amazon-redshift/

Leading technology companies, such as Netflix and Airbnb, are building on AWS to solve problems on the edge of the data ecosystem. While these companies show us what data and analytics make possible, the complexity and scale of their problems aren’t typical. Most of our challenges aren’t figuring out how to process billions of records to provide real-time recommendations to millions of customers. Instead, we struggle with wrangling the data that we have, finding and sharing insights from that data, and then acting on them. This leads even the most savvy data teams that have adopted tools, such as Jupyter notebooks, to either be bottlenecked by slow data pipelines, or resort to a manual process to democratize insights for their stakeholders.

This blog post walks you through the following:

  1. The stages of BI modernization that illustrate what problems arise when organizations adopt modern BI tools without truly modernizing the analytics platform.
  2. What a cloud-first data science platform, with dashboarding and notebooking solutions integrated together with efficient data pipelines, could look like.
  3. How to replicate such a platform in a few steps. This combines a collaborative analytics solution, such as Mode (an Amazon Redshift partner), with a strong analytics foundation built with AWS data warehousing, ETL, and data exploration services.

The stages of BI modernization

There are two primary stages in the evolution of BI – the use of Microsoft Excel for creating reports and the use of dashboarding tools for sharing insights in a consumable format.

Stage 1: The Excel workflow

At AWS and Mode, we talk to thousands of companies who are looking to get more value out of their data. When we ask them how they use data today, the most common answer we get is shown in the following example:

Most companies recognize this workflow as broken. The data pipelines are impossible to manage, and the analysis requires manual effort to reproduce. And in the end, we don’t know if “budget_analysis_v3_final_revised_FINAL.xls” is indeed final.

Stage 2: The dash to the dashboards

When looking for a more effective solution, companies often turn to BI products like Tableau, Amazon QuickSight, PowerBI, and Looker. These products, which were either born in the cloud or are heavily invested in it now, make it efficient to create and share reports and dashboards. KPIs can be delivered through up-to-date URLs rather than emailed files. This helps ensure that everyone has the same view of what’s happening across the business. The BI process is shown in the following example:

While modern BI is a significant step forward, it’s an incomplete solution. Dashboards reveal what’s happening, but businesses that want to use that data for action must understand why things are happening. Before a company can respond to falling sales in one region of the country, for example, it must understand what’s driving the drop. Because dashboards can’t be easily modified, extended, or reused for further analysis, they are often the wrong tool for analysts and data scientists who are charged with answering open-ended exploratory questions. As a result, data infrastructures remain fragmented, and analytics and data science workflows are still built on manual processes.

A cloud-first data science platform

The ideal technology stack for modern data science teams unifies these two stages described in the previous section. Dashboards should serve as the start for exploratory questions for analysts, analysts’ work should be as accessible as company dashboards, and the platform should facilitate a close collaboration between data scientists and business stakeholders.

Pioneering data teams at leading tech companies have developed internal solutions to do exactly this. Uber built a data science workbench for data exploration, data preparation, adhoc analyses, model exploration, workflow scheduling, dashboarding, and collaboration. Netflix recently unveiled the Netflix Data Platform, which automates the execution and distribution of Jupyter notebooks. Instacart built Blazer for exploring and sharing data.

All of these platforms have three things in common:

  • They combine visualization tools and interactive analysis tools, such as R and Python notebooks, and a collaboration platform.
  • They are powered by a modern data warehouse that can scale to accommodate any size of data and any number of analysts.
  • They have reliable ETL pipelines that provide analysts and data scientists access to the data they need, when they need it.

Building a cloud-first data science platform

Fortunately, AWS and its partners offer solutions that check all these boxes and provide the same power to data science teams that aren’t able to build it themselves. Data warehousing services like Amazon Redshift and Athena are fast, scalable, and accessible to anyone who can write standard SQL. ETL partners like Fivetran, Segment, and Matillion provide reliable, push-button ETL services from hundreds of applications into Amazon Redshift and Amazon S3. Finally, a cloud-based analytics platform such as Mode combines visualizations tools, fully hosted R and Python notebooks, and a distribution platform.

This modern stack, which is as powerful as the tooling inside Netflix or Airbnb, provides fully automated BI and data science tooling. It can be deployed in a matter of days and at a fraction of the cost of legacy data science tools.

Three steps to building the platform

Implement this data science infrastructure by using the following three steps:

  1. Set up a data warehouse.
  2. Populate your warehouse with data from around your company.
  3. Add a data science solution on top of your warehouse.

These steps do not require a large investment into engineering teams and custom-built software.

There are many ways to customize this stack to fit your company’s needs. However, this section shows how to set up using Amazon Redshift for a warehouse, Fivetran for ETL, and Mode for data science.

Step 1: Setting up Amazon Redshift

For information about setting up an Amazon Redshift warehouse, see Getting Started with Amazon Redshift. While you need an AWS account to set it up, the process requires no code and only takes a few minutes.

Most configuration options, including the size of the cluster, can be adjusted after the initial setup. Therefore, it’s not necessary to get everything exact at first. If a different configuration is more appropriate later, you can go back and change most of the Amazon Redshift settings.

Step 2: Populating Amazon Redshift with data

Your warehouse is only as good as the data in it. Fortunately, a number of ETL tools make it more efficient to continuously stream data from around your business and the applications you use. Application databases, third party apps like Salesforce and Zendesk, even CSV files – all of these can be easily fed into Amazon Redshift without any engineering effort.

Fivetran, an Amazon Redshift partner, is one such ETL tool (it’s a tool that we’re happy with at Mode). To connect Fivetran to your Amazon Redshift database, first configure your database to allow Fivetran to connect. Fivetran supports a variety of options for connecting, including connecting directly or by using an SSH tunnel. For more information about the steps, see the connection options.

As a final step, create an Amazon Redshift user for Fivetran. We recommend that you use another user than the master user. To create this user, log into the Amazon Redshift query editor (or a SQL client of your choice) and run the following commands:

CREATE USER fivetran PASSWORD <password>;
GRANT CREATE ON DATABASE <database> TO fivetran;

After Amazon Redshift is configured:

  1. Create a new Fivetran account.
  2. Select I already have an existing warehouse, then choose Redshift.
  3. Fill out the form with your Amazon Redshift credentials, as shown in the following example, then choose Save.

  1. After Fivetran is connected to Amazon Redshift, connect it with the data sources that you want to pull into Amazon Redshift. This process is now more efficient.
  2. In Fivetran, choose Connectors.
  3. Choose Add connector, then choose the data source that you want to integrate. Though the specifics vary by source, most of them follow the same pattern.
  4. Choose a schema in Amazon Redshift that you want to write your data to, and then follow the authorization flow that Fivetran automatically steps you through.

The following are examples of connection flows:

Connection flow for Salesforce

 

Connection flow for Google Analytics

 

By using similar flows, you can also connect other databases, such as Amazon RDS Postgres or the MySQL database, and directly upload CSVs.

When these connections are set up, data automatically syncs between your data sources and Amazon Redshift. If you want more control, Fivetran lets you choose which data to sync, and how often it’s updated.

Can’t find the data source you’re looking for? Other ETL tools, including Stitch Data, Segment, and ETLeap, provide similar services that are just as easy to set up. We recommend this guide when making a decision about which tool is right for you.

Step 3: Connecting Amazon Redshift to Mode

Finally, by connecting Mode to your Amazon Redshift, you can provide your entire company access to your data in a collaborative analytics environment.

To connect Mode, configure your security groups so that Mode can access Amazon Redshift. If you’re connecting Mode directly to your cluster, follow the security groups documentation linked above to grant access to the following IP addresses:

54.68.30.98/32

54.68.45.3/32

54.164.204.122/32

54.172.100.146/32

Mode also offers alternative ways of connecting if you’re unable to modify your firewall.

After you’ve completed these steps, you need only enter your credentials on Mode’s data source connection page, as shown in the following example:

After the connection is made, choose who in your organization can access that connection. Then you can immediately query your data and build the analysis for your team from the Mode Editor, as shown in the following example:

In addition to a SQL environment and visualization builder, Mode also offers integrated Python and R notebooks. Choose New Notebook in the left navigation bar to start a new Python or R instance that’s automatically populated with your query results as DataFrames. This enables data scientists to seamlessly create and share analysis directly with everyone around the company. Ultimately, this approach lets you build the most flexible platform for your analytical needs. Your business analysts and data scientists can now work in the same environment. They can collaborate seamlessly, and access the same data at all times.

Conclusion

This new architecture lets organizations to do more with their data, faster. Data teams that use Python and R can go beyond sharing static dashboards and reports; instead, they can also use popular forecasting and machine learning libraries like Prophet and TensorFlow. These libraries help teams find insights that they couldn’t have found otherwise. This lets teams deliver regular updates that keep everyone informed, and also answer strategic and high-value questions that drive key decisions. Moreover, Mode makes these analyses accessible to everyone around the business. Because the notebooks are fully managed, data scientists can share their work directly with stakeholders without any extra work from IT departments.

By combining Mode with Amazon Redshift, data teams also remove common bottlenecks in data integration, cleansing, or ETL processes that loads data into Amazon Redshift. With Amazon Redshift Spectrum, they can query data directly in their Amazon S3 data lake from a Mode dashboard or notebook. Moreover, they can combine these queries with data already loaded into the data warehouse.

Try it yourself

We’ve built an experience for you to get a feel for this stack. If you think it could work for your case, you can get started using Mode with Amazon Redshift in a matter of minutes. If you’re not already using Amazon Redshift, you can get started with a 2-month free trial and deploy the solution, as suggested. With Mode connected to Amazon Redshift, you can start exploring your data right away or try using one of the publicly available datasets.

 


About the Authors

Benn Stancil is a co­founder and Chief Analyst at Mode, a company building collaborative tools for data scientists and analysts. Benn is responsible for overseeing Mode’s internal analytics efforts, and is also an active contributor to the data science community. In addition, Benn provides strategic guidance to Mode’s product direction as a member of the product leadership team.

 

 

 

Ayush Jain is a Product Marketer at Amazon Web Services. He loves growing cloud services and helping customers get more value from the cloud deployments. He has several years of experience in Software Development, Product Management and Product Marketing in developer and data services.

 

 

 

Himanshu Raja is a Senior Product Manager for Amazon Redshift. Himanshu loves solving hard problems with data and cherishes moments when data goes against intuition. In his spare time, Himanshu enjoys cooking Indian food and watching action movies.

Amazon QuickSight Announces General Availability of ML Insights

Post Syndicated from Luis Wang original https://aws.amazon.com/blogs/big-data/amazon-quicksight-announces-general-availability-of-ml-insights/

At re:Invent 2018, we announced the preview of ML Insights, a set of out-of-the-box machine learning and natural language features that provide Amazon QuickSight users with business insights beyond visualization. Today, we are announcing the general availability of ML Insights.

As the volume of data that customers generate continues to grow every day, it’s becoming more challenging to harness that data for business insights. This is where machine learning comes into play. Amazon is a pioneer in using machine learning to automate and scale various aspects of business analytics.

With new ML Insights features, Amazon QuickSight can help you discover hidden data trends, identify key business drivers, forecast future results, and summarize your data in easy-to-read, natural language narratives, saving hours of manual analysis and investigation. You can build comprehensive BI solutions that integrate out-of-the-box machine learning with the analytical richness of Amazon QuickSight and distribute interactive dashboards to everyone in your organization. ML Insights makes machine learning easy, allowing anyone regardless of their technical and ML skillset to easily get insights from their data in minutes rather than weeks. ML Insights features include:

  • ML-powered anomaly detection to uncover hidden insights by continuously analyzing billions of data points.
  • ML-powered forecasting to predict growth and business trends with point-and-click simplicity.
  • Auto-narratives to tell customers the story of their dashboard using plain-language narratives.

Check out this video to get a quick overview of ML Insights:

To get you started with ML Insights, this blog post will walk you through new ML-powered capabilities.

Customer use cases

During the past three months of ML Insights preview availably, customers from a broad range of industries, including telecommunication, entertainment, marketing, retail, energy, financial services, and healthcare, have used ML Insights to harness their growing volume of data on AWS and on-premises for business insights. Here are some of the cool things customers are doing with ML Insights:

Expedia Group is the world’s travel platform, and its purpose is to bring the world within reach.

“At Expedia Group two of our key strategic imperatives are to be customer centric and locally relevant on a global basis. This is why tools such as Amazon QuickSight are so helpful in making it easier to measure, report, and act on our business metrics to help our customers find the best matches for their travel searches. Amazon QuickSight’s out-of-the-box machine learning insights help us to continuously monitor our business for anomalies, alert stakeholders when outliers occur, and help our business project future trends, which in turn allows teams to focus on other priorities instead of building out these capabilities from scratch.”

Amit Marwah, Director of Technology, Flights Data & Analytics, Expedia Group

Ricoh Company, Ltd., is a global corporation that provides imaging equipment for offices, production print solutions, document management systems, IT services, and more, for approximately 200 countries and regions throughout the world.

“Machine learning is becoming more important than ever to meet our growing data volume and BI needs. Amazon QuickSight’s ML Insights functionality makes powerful machine learning easy to use in just a few clicks. It allows us to continuously monitor for unexpected usage behaviors in our fleet of smart devices worldwide, forecast usage trends and deliver comprehensive dashboards that incorporate these machine generated insights as auto-narratives to our line of business users. With ML Insights, we can quickly pinpoint and take actions on anomalies down to the specific devices and features, to improve the experience and add value to our customers.”

Naoki Umehara, Group Leader, Ricoh Company, Ltd.

Tata Consultancy Services Limited is an Indian multinational information technology (IT) service and consulting company headquartered in Mumbai, Maharashtra, with international presence in 46 countries.

“Amazon QuickSight allows us to quickly and easily integrate our Amazon Connect contact center metrics with our client ticketing tools in order to deliver interactive and automatic dashboards that our customers love. We revolutionized our staffing, training, and outage reporting with Amazon QuickSight ML Insights, predicting where the call flow is moving and react accordingly in order to prevent call spikes and provide a better service to our customers.”

Marco David Martinez, Cloud Manager, Tata Consultancy Services

Siemens is a global powerhouse focusing on electrification, automation, and digitalization. The company is also a leading supplier of power generation and transmission systems.

“Amazon QuickSight’s out-of-the-box ML Insights and usage-based pricing make it easy and cost effective to deliver robust machine-learning-based anomaly detection for our customers to analyze performance of their manufacturing process, detect faults in the production line, monitor downtime duration across hundreds of machineries, and understand the root cause of the failures — without heavy investment machine learning and custom development. This allows line supervisors and production managers to receive automated alerts on unexpected events and take actions to optimize the manufacturing process and improve performance.”

Massimilliano Ponticelli, Product Manager, Siemens

Daiso Industries Co., Ltd. is a global retailer and franchise of 100-yen shops founded in Japan.

“With Amazon QuickSight, we were able to build out our BI environment that handles the data of 5,000 stores x 70,000 products in 2 months. Precise sales forecast and inventory optimization are the most important challenges in our business. Amazon QuickSight’s ML Insights allow us to easily and quickly identify unexpected trend changes across our products and improve sales forecast and inventory optimization.”

Kenjiro Marumoto, Section Chief, Daiso Industries Co., Ltd.

Getting started

ML Insights is only available on the Enterprise Edition of Amazon QuickSight. If you are using the Standard Edition, you can easily upgrade with 1-click on the Manage QuickSight page.

To get started with ML Insights, you’ll need to connect a data source to Amazon QuickSight. Data sets can be accessed by direct query to the SQL-compatible database source or by using SPICE.

For this walkthrough, your data must have the following properties:

  • At least one date field.
  • At least one metric, such as sales, orders, shipped units, or sign ups.
  • At least one category dimension, such as product, channel, segment, or industry.
  • More than 40 historical data points per metric.

For optimal results, make sure that your data set has enough historical data points. The built-in ML algorithm requires at least 40 historical data points to learn and train the model, and it will use up to the most recent 1,000 data points. For example, if you’re analyzing daily sales by geographic region, make sure that you have at least 40 days of data. Three months to twelve months of data is preferred, depending on the seasonality of your business

You can use your own data set, or you can download the following sample data set. We’ll use this dataset for the walkthrough:

https://s3.amazonaws.com/quicksight-ml-insights/ML-Insights-Sample-Dataset-V1.csv

Once you have created a data set in Amazon QuickSight, create a new analysis from the data set. For more information about creating data sets and analyses in Amazon QuickSight, see the Amazon QuickSight User Guide.

Suggested insights

ML Insights automatically interprets your data and provides contextual insights called suggested insights. Different visuals may result in different types of insights. For example, if you have a time-series visual, you may get insights such as period-over-period changes, anomalies, and forecasts.

Let’s walk through an example.

1. Create a line chart with a metric and a date, such as revenue over time, aggregated daily.

2. Choose Insights in the top left-hand corner of the visual.

You should then see a list of suggested insights on the left pane. Suggested insights provide you with a quick summary of the data in plain language. As you add visuals to your analyses, you will see additional suggested insights on the left pane, grouped by the visual name.

You can choose a suggested insight such as day over day change to highlight the data point or segment on the visual. Choose it again to deselect it.

ML-powered anomaly detection

With ML Insights, you can run ML-powered anomaly detection on up to a million metrics simultaneously to discover hidden trends and outliers that are often buried in aggregates. To learn more about pricing for anomaly detection, go to Amazon QuickSight pricing and choose ML Insights.

Let’s get started with anomaly detection.

1. Choose Add on the application bar, and then choose Add anomaly to sheet. This creates an insights visual for anomaly detection.

2. Expand the field wells on the top of the page and add at least one category field.

The categories represent the dimensional values by which Amazon QuickSight will split the metric. For example, let’s say you are analyzing anomalies in revenue across all product categories and product SKUs. Assuming there are 10 product categories, each with 10 product SKUs, Amazon QuickSight will split the metric by the 100 unique combinations and run anomaly detection on each of the split metrics.

3. Choose Get Started on the insights visual to configure the anomaly detection job.

4. On the anomaly detection configuration pane, configure the following options:

  • Analyze all combinations of these categories – If you select three categories, Amazon QuickSight will run anomaly detection on the following combinations, hierarchically: A, AB, ABC. If you select this option, Amazon QuickSight will analyze all combinations, including: A, AB, ABC, BC, AC. If your data is not hierarchical, you should select this option.
  • Number of anomalies to show – This setting allows you to control the number of top anomalies you want to display on the insights cards as narratives.
  • Schedule – Set the schedule to run anomaly detection on your data hourly, daily, weekly or monthly, depending on your data. Choose the start time and the time zone of the start time.
  • Contribution analysis – You can select up to four additional dimensions for Amazon QuickSight to analyze for top contributors when an anomaly is detected. For example, Amazon QuickSight can show you the top customers that contributed to a spike in sales in the USA for Home Improvement products. If you have additional dimensions in your data (dimensions not used in the anomaly detection), you can add them here for contribution analysis. For this example, choose the Geo for contribution analysis.

5. Choose OK. Amazon QuickSight will not implement the schedule until you publish the analysis as a dashboard. Within an analysis, you will have the option to run anomaly detection manually without the schedule.

6. After the configuration is set, choose Run Now to run detection manually. You will see a “Analyzing for anomalies… This may take a while…” message. Depending on the size of your data set, analysis may take anywhere from a few minutes to an hour.

Once anomaly detection is complete, you will see the top anomalies for the latest period in your data listed in the insights visual. Amazon QuickSight also computes and displays the expected value so you can better understand the significance of the anomaly.

7. To see all anomalies for this data, choose the selector in the upper right of the visual and choose Explore Anomalies.

On the detailed anomalies page, you can see all of the anomalies detected for the latest period. The title of the visual represents the metric that is applied to the unique combination of the categorical fields. The highlighted data point on the chart—on the far right of the chart—represents the most recent anomaly detected for that time series.

On the left pane, you will see the top contributors to the anomaly based on the dimensions you have predefined. When you hover over the top contributors, Amazon QuickSight displays an explanation of the significance of the contribution.

8. To see anomalies by date, choose Show Anomalies by Date from the top of the visual to expose a date picker. The cart will display the number of anomalies detected for each unit of your anomaly detection configuration. You can choose a particular date to see the anomalies for that date. For example, if you choose Nov. 1st, 2018, from the graph, then the bar chart highlights the anomalies for that date.

Important: Amazon QuickSight uses the first 40 data points in a data set for training; these data points will not be scored by the anomaly detection algorithm. You may not see any anomalies on the first 40 data points.

9. Use the filter controls at the top of the pane to change the anomaly threshold to show anomalies with high, medium or low significance or to show only anomalies that are higher than expected or lower than expected. You can also filter by the categorical values that are present in your data set to look at anomalies only for those categories.

10. To go back to your analysis, choose Back to analysis at the top of the page.

ML-powered forecasting

Using the built-in ML algorithm, you can now forecast business metrics with point-and-click simplicity without having to write code or build a complex spreadsheet.

1. On your time series chart, choose the selector in the upper right corner of the visual, and then choose Add forecast. Amazon QuickSight will analyze the historical data using ML and present a forecast for the next 14 periods.

2. On the Forecast properties pane at the left, you can customize forecast settings. For example, you can change the number of periods to forecast into the future or add “forecast” periods into the past to compare historical actuals against ML-based expectations.

You can adjust the width of the prediction band by changing the prediction internal and manually setting the seasonality (number of periods). Choose Apply to save your changes.

3. Select a forecasted data point on the chart and choose What-if analysis. With What-if analysis, you can set target value for a particular date or date range, and Amazon QuickSight will adjust the forecast gracefully to meet the target.

4. Choose Apply to see the new forecast adjusted for the target along with the original forecast. You can hover over the data points to see details.

With ML-powered forecasting, Amazon QuickSight allows you to forecast complex, real-world scenarios such as data with multiple seasonality. Outliers will be excluded automatically, and missing values will be imputed.

To export the forecasting data in CSV format, choose the selector in the upper right corner of the visual, and then choose Export to CSV.

Auto-narratives

Auto narratives allow you to create natural language summaries of your visuals. You can embed these summaries into your dashboard to highlight key insights that are important for your readers, allowing them to access the data without having to sift through the entire dashboard. When you define a template, narratives update automatically as the data in your data set refreshes, just like a visual. The following steps show you how to get started with auto narratives.

1. In your time series chart, choose Insights again to show the suggested insights.

2. An easy way to add a narrative insight to your analysis is to choose the plus sign (+) next to a suggested insight. For the purpose of this walkthrough, choose the Day Over Day Change insight.

You’ll see an insight visual on your analysis with the predefined template. Notice that the field wells have the date, metric, and category filled in. These settings are populated from the visual that you used to create the insight visual. You can customize the fields as needed.

3. To edit the narrative, choose the insight visual menu and select Customize Narrative. You’ll see the Configure narrative pane where you can edit your insights template.. You can format the content with different sizes and colors using the formatting toolbar. You can also insert expressions and conditional statements like IF and FOR statements.

In the left pane, you can add computations to your narrative. Computations are predefined calculations such as period-over-period, period-to-date, growth-rate, max, min, and top movers, that you can reference in your template to describe your data. Currently, Amazon QuickSight supports 13 different types of computations. In this example, PeriodOverPeriod is added by default since we selected the Day Over Day Change insight from the suggested insights pane.

4. To add a new computation, choose Add computation in the bottom left corner of the pane. You’ll be prompted to select from a list of computations. For the purpose of this walkthrough, select the Growth rate computation type, and select Next.

5. You can configure certain aspects of the computation. In the case of growth rate, you can change the number of periods over which that you want to compute growth. After you make your selections, choose OK.

6. Now expand Computations on the left pane. You should see both PeriodOverPeriod and GrowthRate options.

Please note that computation names must be unique. When you create a computation, assign a unique name. You can reference multiple computations of the same type in your template. For example, if you have two metrics, such as $sales and units sold, you can create a GrowthRate computation for each of the metrics, with a different name for each computation. The specific computations can then be referenced by name in the template.

Also be aware that anomaly computation is not compatible with all other computation types. For example, if you have a PeriodOverPeriod or GrowthRate computation, you will not be able to add an Anomaly computation to the same insight visual.

7. To add growth rate to your narrative, enter the phrase Compounded Growth Rate for the last on the narrative template. From the Computations pane, choose GrowthRate, and then choose timePeriods to insert the expression GrowthRate.timePeriods into your narrative. This expression references the number of periods set in the configuration.

8. Complete the sentence by entering days is. Then add another expression from the Computations pane by choosing GrowthRate, then compounded GrowthRate, and then formattedValue. The selection formattedValue returns a phrase formatted according to the format applied to the metric on the field. To see a raw value in integer or decimal format, choose value instead of formattedValue.

Now, let’s try using a conditional statement.

1. To insert an IF statement, place the cursor at the end of your narrative template. From the Insert Code menu, choose Inline IF.

2. You’ll be prompted to enter some code. On the left pane choose GrowthRate, then choose compoundedGrowthRate, and then choose value. To insert the value, enter > 3, and choose Save.

3. For the conditional content, enter Great! Select the text and use the format menu to format menu to change the color to green.

4. Repeat the previous steps, entering <3 for the growth rate value. For conditional content, enter Bad! And format the text as red.

5. Choose Apply. You should see the results similar to the following.

The template provides you with a sophisticated tool to customize your narrative. Within the template, you can also reference parameters in your analysis or dashboard and leverage a set of built-in functions to perform more calculations.

The best way to get started with auto narratives and to learn the syntax is to use the existing templates built from suggested insights.  But you can also create insight visuals from scratch by choosing Add and then choosing Add Insight.

Try it yourself! Try creating a narrative that enumerates the top selling products for the last three months.

Conclusion

As you can see from the walkthrough, ML Insights helps you perform large scale anomaly detection and create business forecast in a few simple clicks. You can build rich and user-friendly auto narratives within your dashboards in minutes, without any custom development or ML skillset necessary.

 


About the Author

Luis Wang is a principal product manager for Amazon QuickSight. He’s been with AWS for over 6 years, working on various services including Amazon EC2 and then launching Amazon QuickSight. Luis is now focused on the application of machine learning and AI to business intelligence and analytics at QuickSight. He enjoys running, watching sitcoms and spending time with his family.

Best practices for running Apache Spark applications using Amazon EC2 Spot Instances with Amazon EMR

Post Syndicated from Ran Sheinberg original https://aws.amazon.com/blogs/big-data/best-practices-for-running-apache-spark-applications-using-amazon-ec2-spot-instances-with-amazon-emr/

Apache Spark has become one of the most popular tools for running analytics jobs. This popularity is due to its ease of use, fast performance, utilization of memory and disk, and built-in fault tolerance. These features strongly correlate with the concepts of cloud computing, where instances can be disposable and ephemeral.

Amazon EC2 Spot Instances offer spare compute capacity available in the AWS Cloud at steep discounts compared to On-Demand prices. EC2 can interrupt Spot Instances with two minutes of notification when EC2 needs the capacity back. You can use Spot Instances for various fault-tolerant and flexible applications. Some examples are analytics, containerized workloads, high-performance computing (HPC), stateless web servers, rendering, CI/CD, and other test and development workloads.

Amazon EMR provides a managed Hadoop framework that makes it easy, fast, and cost-effective to process vast amounts of data using EC2 instances. When using Amazon EMR, you don’t need to worry about installing, upgrading, and maintaining Spark software (or any other tool from the Hadoop framework). You also don’t need to worry about installing and maintaining underlying hardware or operating systems. Instead, you can focus on your business applications and use Amazon EMR to remove the undifferentiated heavy lifting.

In this blog post, we are going to focus on cost-optimizing and efficiently running Spark applications on Amazon EMR by using Spot Instances. We recommend several best practices to increase the fault tolerance of your Spark applications and use Spot Instances. These work without compromising availability or having a large impact on performance or the length of your jobs.

Use the Spot Instance Advisor to target instance types with suitable interruption rates

As mentioned, Spot Instances can be interrupted if EC2 needs the capacity back. In this blog post, we share best practices on how to increase the fault tolerance of your Spark applications to withstand occasional loss of underlying EC2 instances due to Spot interruptions. However, even then, targeting EC2 Spot Instances with lower interruption rates can help further. This approach helps by decreasing occurrences where your job gets prolonged because Spark needs to redo some of the work when interruptions occur.

Use the Spot Instance Advisor to check the interruption rates and try to create your Amazon EMR cluster using instance types that historically have lower interruption rates. For example, the frequency of interruption for r4.2xlarge in the US East (Ohio) region at the time of writing this post is less than 5 percent. This means that less than 5 percent of all r4.2xlarge Spot Instances launched in the last 30 days were interrupted by EC2.

Run your Spot workloads on a diversified set of instance types

When running workloads (analytics or others) on EC2 instances and using On-Demand or Reserved Instances purchase options, you can generally use a single instance type across your entire cluster. You might do so after benchmarking to find the right instance type to fit the application’s requirement. However, with Spot Instances, using multiple Spot capacity pools (an instance type in an Availability Zone) in a cluster is key. This practice enables you to achieve scale and preserve capacity for running your jobs.

For example, suppose that I run my Spark application using On-Demand r4.xlarge instances (30.5 GiB memory and four vCPUs). When I start using Spot Instances, I can configure my Amazon EMR cluster’s Core or Task Instance Fleets with several instance types that have similar vCPUs to memory ratio (roughly 7 GB per vCPU) and let EMR choose the right instance type to run in the cluster. These include r4.2xlarge, r5.xlarge, i3.2xlarge, are i3.4xlarge. Taking this approach makes it more likely I’ll have sufficient Spot capacity to launch the cluster. It also increases the chance that Amazon EMR will be able to replenish the required capacity for the cluster to continue running (from other capacity pools) in case some of the capacity in the cluster is terminated by EC2 Spot Interruptions.

Instance typeNumber of vCPUsRAM (in GB)
R4.xlarge430.5
R4.2xlarge861
R5.xlarge432
I3.2xlarge861
I3.4xlarge16122

Size your Spark executors to allow using multiple instance types

As described just previously, a key factor for running on Spot instances is using a diversified fleet of instances. This also helps decrease the impact of Spot interruptions on your jobs. This approach dictates the architecture for your Spark applications.

Running with memory intensive executors (over 20 GB of RAM) ties your application to a specific set of instance types. These might not have sufficient Spot capacity for you to stand up your cluster. These also might have high Spot interruption rates, which might have an impact on your running jobs.

For example, for a Spark application with 90 GiB of RAM and 15 cores per executor, only 11 instance types fit the hardware requirements and fall below the 20 percent Spot interruption rate. Suppose that we break down the executors, keeping the ratio of 6 GiB of RAM per core, to two cores per executor. If we do so, we open up to 20 additional instance types that our job can run on (below the 20 percent interruption rate).

A fair approach to resizing an executor is to decide on the minimum number of cores to run your application on. Two is a good start. You then allocate memory using the following calculation:

NUM_CORES * ((EXECUTOR_MEMORY + MEMORY_OVERHEAD) / EXECUTOR_CORES)

In our example, that is 2 * (( 90 + 20 ) / 15) = 15GB

For more information about the memoryOverhead setting, see the Spark documentation.

Avoid large shuffles in Spark

To reduce the amount of data that Spark needs to reprocess if a Spot Instance is interrupted in your Amazon EMR cluster, you should avoid large shuffles.

Wide dependency operations like GroupBy and some types of joins can produce vast amounts of intermediate data. Intermediate data is stored on local disk and then transferred (shuffled) to other executors in your cluster.

Although you can’t always do so, we recommend to either avoid shuffle operations or work toward minimizing the amount of shuffle data. We recommend this for two reasons:

  • This is a general Spark best practice, because shuffle is an expensive operation.
  • In the context of Spot Instances, doing this decreases the fault tolerance of the job. This is because losing one node that either contains shuffle data or relies on shuffled data for computations (usually both) requires you to rerun some part of the shuffle process.

There are several patterns that we encounter that produce unnecessary amounts of shuffle data, described following.

The explode to group pattern

From a developer point of view, using explode on complex data types might be a quick solution to some use cases (exploding an array to multiple rows). We thus multiply the number of rows, and later in the job can join them back together.

For example, suppose that our data contains user IDs and an array of dates that describe visits to a website:

AB
1user_idvisit_dates_array
20[ “28/01/2018”29/01/2018”, “01/01/2019”]
3100000[ “01/11/2017”, “01/12/2017”]
4999999[ “01/01/2017”, “02/01/2017”, “03/01/2017”,  “04/01/2017”, “05/01/2017”, “06/01/2017”]

 

Suppose that we run a Spark application that sums the number of visits of users in the website. In this case, an easy solution is to use explode and then aggregate the data, as shown following.

Explode the data:

df.selectExpr("user_id", "explode(visit_dates_array) visit_day").createOrReplaceTempView("visits")

Aggregate back the data:

spark.sql("select count(visit_day), user_id 
                  from visits
                  group by user_id")

Although this method is quick and easy, it bloats our data to three times more than the original data. To accurately sum the visits for each user_id, our data also has to be shipped across the network to other executors.

What are the alternatives to exploding and grouping?

One option is to create a UDF that does the calculations in place, avoiding or minimizing shuffles. The following example is in Scala.

val countVisitsUDF = (array: Seq[String]) => {
    array.length
}

spark.udf.register("countVisits",  countVisitsUDF  )

spark.sql("SELECT user_id, countVisits(arr) 
           FROM tab").show
+-------+--------------------+
|user_id|UDF:countVisits(arr)|
+-------+--------------------+
|  20000|                   3|
| 100000|                   2|
|   9999|                   6|
+-------+--------------------+

Another option that was recently introduced in Spark 2.4 is the aggregate function. This function can also reduce the amount of shuffle data to a bare minimum, just the user_id and the count of their visits:

spark.sql("SELECT user_id, 
           sum(aggregate(arr, 0, (acc, x) -> acc +1)) summary 
           FROM tab 
           GROUP BY user_id").show
+-------+-------+
|user_id|summary|
+-------+-------+
| 100000|      2|
|   9999|      6|
|  00000|      3|
+-------+-------+

Huge data joins (bucketing)

When performing join operations, Spark repartitions (shuffles) the data by the join keys.

If you perform multiple joins on the same table or tables with the same key, you can use bucketing to shuffle the data only once. When persisting the data, any subsequent joins on that same key don’t require shuffle because the data is already “pre-shuffled” on Amazon S3.

To bucket your data, you need to decide on the number of buckets to divide your data into and the columns on which the bucketing occurs.

df.write.bucketBy(4,"user_id").saveAsTable("ExampleTable")

Work with data skew

In some cases, data doesn’t distribute uniformly between partitions. This is an issue for several reasons:

  • Generally, most of the executors finish in a timely manner. However, those that handle the large outliers run for a longer time. This increases your risk of getting your Spot Instances interrupted and having to recompute the whole job. It also has a negative impact on overall performance and prolongs the length of the job or causes resources to be underutilized.
  • Data skew can also be a source for large amounts of shuffle data, which can cause issues as discussed previously.

To handle data skew, we recommend that you try to do the computation that  you’re interested in locally on the executors. You then compute over the results. This approach is also known as a combine operation.

A common technique to handle data skew is salting the keys.

Break huge Spark jobs into smaller ones to increase resiliency

One antipattern that we encounter is large applications that perform numerous jobs that can take hours or days to complete.

This kind of job creates an all-or-nothing situation. Here, a failure can cause loss of time and money due to an issue throughout the runtime of the job.

It might sound obvious, but breaking up your jobs to a chain of smaller jobs increases your resiliency to handle failures and Spot interruptions. Breaking up jobs also means that you can remediate any issues preventing the job from finishing successfully. In addition, it decreases the chances of losing the effort already invested in the process.

Work with Amazon EMR Instance fleets

You can use Amazon EMR instance fleets in a couple of techniques to work effectively with Spark.

Diversify the EC2 instance types in your cluster

By configuring Amazon EMR instance fleets, you can set up a fleet of up to five EC2 instance types for each Amazon EMR node type (Master, Core, Task). As discussed earlier, being instance-flexible is key to the ability to launch and maintain Spot capacity for your Amazon EMR cluster.

For the Master node group, one instance out of your selection is chosen by Amazon EMR. In the Core and Task node groups, Amazon EMR selects the best instance types to use in the cluster based on capacity availability and low price. Also, you can specify multiple subnets in different Availability Zones. In this case, Amazon EMR selects the AZ that best fits the target capacity to launch the entire cluster in.

Size Amazon EMR instance fleets according to the job’s hardware requirements

Amazon EMR instance fleets enable you to define a pool of resources by specifying which instance types fit your application. You can also specify the weight each instance type carries in the pool toward your target capacity.

By default, instances are given weight equivalent to the number of their vCPUs. However, you can also provide weights according to other instance characteristics, such as memory, which I demonstrate in this section.

Sizing by CPU:

For example, suppose that I have a job that requires four cores per executor and 1 GB RAM per core, so the Spark configuration is as follows:

--executor-cores 4 --executor-memory 4G

We want the job to run with 20 executors, which means that we need 80 cores (20*4):

The screenshot shows 80 Spot Units as a representation of the 80 cores that are needed to run the job. It also shows the selection of different instance types that fit the hardware requirements.

Amazon EMR chooses any combination of these instance types to fulfill my target capacity of 80 spot units, while possibly some of the larger instance types will run more than one executor.

 

Sizing by memory

Some Spark application requirements are memory-intensive and require a different weight strategy.

For example, if our job runs with four cores and 6 GB per core (--executor-cores 4 --executor-memory 24G), we first choose instances that have at least 28 GB of RAM:

As you can see in the screenshot, in this configuration the instance type selection is set to accommodate the memory requirement. This leaves about 15–20 percent memory free for other processes running inside the instance operating system.

You then calculate the total units calculated by multiplying the number of units of the smallest eligible instances, with the desired number of executors (25*100).

As in the CPU intensive job, some instance types run only one executor while some run several.

Compensating for performance differences between instance generations

Some workloads can see performance improvements of up to 50 percent just by running on newer instance types. This effect is due to AWS Nitro technology, fast CPU clock speeds, or different CPU architecture (moving from Haswell/Broadwell to Skylake), or a combination of these.

If decreasing application running time is a major requirement, you can offset the performance difference between instance type generations by specifying smaller weights to the older instance generations.

For example, suppose that your job runs for an hour with 10 r5.2xlarge instance and two hours with 10 r4.2xlarge instance. In this case, you might prefer defining your instance fleet as follows:

Select the right purchase option for each node type

Spot Blocks are defined-duration Spot Instances that can run up to six hours without being interrupted, and come at a smaller discount rate compared to Spot Instances. However, you can also use Spot Blocks if your jobs can’t suffer Spot interruptions, given that the cluster run time is forecasted to be smaller than six hours.

Master node: Unless your cluster is very short-lived and the runs are cost-driven, avoid running your Master node on a Spot Instance. We suggest this because a Spot interruption on the Master node terminates the entire cluster. Alternatively to On-Demand, you can set up the Master node on a Spot Block. You do so by setting the defined duration of the node and failing over to On-Demand if the Spot Block capacity is unavailable.

Core nodes: Avoid using Spot Instances for Core nodes if the jobs on the cluster use HDFS. That prevents a situation where Spot interruptions cause data loss for data that was written to the HDFS volumes on the instances.

Task nodes: Use Spot Instances for your core nodes by selecting up to five instance types that match your hardware requirement. Amazon EMR fulfills the most suitable capacity by price and capacity availability.

Get EC2 Spot interruption notifications

When EC2 needs to interrupt Spot Instances, a 2-minute warning is issued for each instance that is going to be interrupted. You can programmatically react to the warning in two ways: from within the instance by polling the instance’s metadata service, and by using Amazon CloudWatch Events. You can find the specifics in the documentation.

The use of this warning varies between types of workloads. For example, you can opt to detach the soon-to-be interrupted instances from an Elastic Load Balancer to drain in-flight connections before the instance gets shuts down. Alternatively, you can copy the logs to a centralized location, or gracefully shut down an application.

To learn more about how EMR handles EC2 Spot interruptions, see the AWS Big Data blog post
Spark enhancements for elasticity and resiliency on Amazon EMR.

You might still want to track the Spot interruptions, possibly to correlate between Amazon EMR job failures and Spot interruptions or job length. In this case, you can set up a CloudWatch Event to trigger an AWS Lambda function to feed the interruption into a data store. This approach means that you can query the historical interruptions in your account. For smaller scale or even initial testing, you can use Amazon SNS with an email target to simply get the interruption notifications by email.

Tag your Amazon EMR cluster and track your costs

Tagging your resources in the AWS Cloud is a fundamental best practice. You can read more about tagging strategies on this AWS Answers page. In Amazon EMR, after you tag the cluster, your tags propagate to the underlying EC2 instances and the Amazon EBS volumes that are created by the cluster. This enables you to have a holistic view of the costs of running your Amazon EMR clusters, and can be easily visualized with AWS Cost Explorer.

Summary

In this blog post, we list best practices for cost-optimizing your Spark applications on Amazon EMR by using Spot Instances. We hope that you find these useful and that you test these best practices with your Spark applications to cost-optimize your workloads.

 


About the authors

Ran Sheinberg is a specialist solutions architect for EC2 Spot Instances with Amazon Web Services. He works with AWS customers on cost optimizing their compute spend by utilizing Spot Instances across different types of workloads: stateless web applications, queue workers, containerized workloads, analytics, HPC and others.

 

 

 

Daniel Haviv is a specialist solutions architect for Analytics with Amazon Web Services.

 

 

 

 

How to enable cross-account Amazon Redshift COPY and Redshift Spectrum query for AWS KMS–encrypted data in Amazon S3

Post Syndicated from Asim Kumar Sasmal original https://aws.amazon.com/blogs/big-data/how-to-enable-cross-account-amazon-redshift-copy-and-redshift-spectrum-query-for-aws-kms-encrypted-data-in-amazon-s3/

This post shows a step-by-step walkthrough of how to set up a cross-account Amazon Redshift COPY and Spectrum query using a sample dataset in Amazon S3. The sample dataset is encrypted at rest using AWS KMS-managed keys (SSE-KMS).

About AWS Key Management Service (AWS KMS)

With AWS Key Management Service (AWS KMS), you can have centralized control over the encryption keys used to protect your data at rest. You can create, import, rotate, disable, delete, define usage policies, and audit the use of encryption keys used to encrypt your data. AWS KMS uses FIPS 140-2 validated cryptographic modules to protect the confidentiality and integrity of your master keys.

AWS KMS is seamlessly integrated with most AWS services. This integration means that you can easily use customer master keys (CMKs) to control the encryption of the data you store within these services. When deciding to encrypt data in a service such as Amazon Redshift, you can choose to use an AWS-managed CMK that Amazon Redshift automatically creates in KMS. You can track the usage of the key, but it’s managed by the service on your behalf. In some cases, you might need direct control over the lifecycle of a CMK or want to allow other accounts to use it. In these cases, you can create and manage your own CMK that AWS services such as Amazon Redshift can use on your behalf. These customer-managed CMKs enable you to have full control over the access permissions that determine who can use the key and under which conditions. AWS KMS is integrated with AWS CloudTrail, a service that provides a record of actions performed by a user, role, or AWS service in AWS KMS.

About Amazon Redshift and Redshift Spectrum

Amazon Redshift is a petabyte scale, fully managed data warehouse service on AWS. It uses a distributed, massively parallel processing (MPP), shared-nothing architecture that scales horizontally to meet usage requirements.

Amazon Redshift Spectrum is a feature of Amazon Redshift that extends the analytic power of Amazon Redshift beyond the data that is stored on local disks in the data warehouse. In other words, Amazon Redshift Spectrum enables you to use the same ANSI SQL syntax of Amazon Redshift on the data that is stored in an Amazon S3 data lake. You do so using external tables, without having to ingest the data into Amazon Redshift first. A common pattern is to run queries that span both the frequently accessed “hot” data stored locally in Amazon Redshift and the “warm/cold” data stored cost-effectively in Amazon S3. That pattern separates compute and storage by enabling independent scaling of both to match the use case. This means you don’t have to pay for unused compute capacity just to add more storage. More importantly, this approach enables seamless interoperability between your data lake and Amazon Redshift.

The Amazon Redshift COPY command supports the following types of Amazon S3 encryption:

  • Server-side encryption with Amazon S3-managed keys (SSE-S3)
  • Server-side encryption with AWS KMS-managed keys (SSE-KMS)
  • Client-side encryption using a client-side symmetric master key

The Amazon Redshift COPY command doesn’t support the following types of Amazon S3 encryption:

  • Server-side encryption with customer-provided keys (SSE-C)
  • Client-side encryption using an AWS KMS–managed customer master key
  • Client-side encryption using a customer-provided asymmetric master key

About the use case

A multiple-account AWS environment is a common pattern across our customers for a variety of reasons. One of the common reasons for data lake customers in AWS is to separate ownership of data assets from different business units in the company. At the same time, business units might need to grant access to some of their data assets to each other for new business insights.

As illustrated in the following drawing, in our example Account A owns an S3 bucket with SSE-KMS encrypted data and Account B owns an Amazon Redshift cluster with Redshift Spectrum enabled. Account B needs access to the same data to load to the Amazon Redshift cluster using the COPY command and also to query using Redshift Spectrum.

Solution walkthrough

Following, we walk through a couple different options to support this use case.

Prerequisites

The solution assumes that you already have the following set up:

    1. Access to two AWS accounts (we call them Account A and B) in the same AWS Region.*
    2. Grant the AdministratorAccess policy to the AWS accounts (which should be restricted further for production).
    3. Account A has a customer-managed CMK in AWS KMS with the following attributes:
      • Alias as kms_key_account_a
      • Description as Cross Account KMS Key in Account A
      • Administrator as current IAM user using which you signed in to the AWS console and created the KMS key
      • Account B added as External Accounts

      Copy and save the CMK Amazon Resource Name (ARN) to be used shortly

    4. Account A uses the following sample dataset from AWS:
      Customer - s3://awssampledbuswest2/ssbgz/customer0002_part_00.gz

    5. Account A has an S3 bucket called rs-xacct-kms-bucket with bucket encryption option set to AWS KMS using the KMS key kms_key_account_a created earlier.
    6. Use the following AWS CLI command to copy the customer table data from AWS sample dataset SSB – Sample Schema Benchmark, found in the Amazon Redshift documentation.Note: Because bucket names are global across all AWS customers, you need a unique bucket name for your test run. Be sure to replace rs-xacct-kms-bucket with your own bucket name in the following command:
      aws s3 cp s3://awssampledbuswest2/ssbgz/ s3://rs-xacct-kms-bucket/customer/ --recursive --exclude '*' --include 'customer*'

    7. After the copy is complete, check the KMS key ID for the file from S3 console, as shown following.
    8. Account B has an Amazon Redshift cluster:
      • The cluster name is rstest
      • It’s publicly accessible
      • It has an IAM role attached called redshift_role_account_b with the following two managed IAM policies:
        • AmazonS3ReadOnlyAccess
        • AWSGlueConsoleFullAccess

            Note: Be sure to update redshift_role_account_b with your own IAM role.

            You can set up a database session successfully from a client tool, such as SQL Workbench from your laptop.

* This walkthrough uses a publicly available AWS sample dataset from the US-West-2 (Oregon) Region. Hence, we recommend that you use the US-West-2 (Oregon) Region for your test run to reduce cross-region network latency and cost due to data movement.

Step-by-step walkthrough

Depending on which account’s AWS Glue Data Catalog you want to use for Redshift Spectrum, there are two solution options to choose from:

  1. AWS Glue Data Catalog in Account B
  2. AWS Glue Data Catalog in Account A

Option 1: AWS Glue Data Catalog in Account B

Set up permissions

  1. Sign in to Account A’s AWS console. Then, change the AWS Region to us-west-2 (Oregon). Add the following bucket policy for the rs-xacct-kms-bucket bucket so that Account B (which owns the Amazon Redshift cluster – rstest) can access the bucket.

Note: Replace <Account B> with AWS Account ID for Account B and rs-xacct-kms-bucket with your bucket name.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowS3",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<Account B>:root"
            },
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::rs-xacct-kms-bucket/*",
                "arn:aws:s3:::rs-xacct-kms-bucket"
            ]
        }
    ]
}
    1. Sign in to Account B’s AWS console. Then, change the AWS Region to us-west-2 (Oregon). Create IAM policies and roles as described following:

a) Create the following two IAM permission policies: rs_xacct_bucket_policy to give Account B access to the S3 bucket in Account A, and rs_xacct_kms_policy to give Account B access to the CMK in Account A.

Policy name: rs_xacct_kms_policy

Note: Replace <ARN of kms_key_account_a from Account A> with your KMS key ARN from Account A.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowUseOfTheKey",
            "Effect": "Allow",
            "Action": [
                "kms:Encrypt",
                "kms:Decrypt",
                "kms:ReEncrypt*",
                "kms:GenerateDataKey*",
                "kms:DescribeKey"
            ],
            "Resource": [
                "<ARN of kms_key_account_a from Account A>"
            ]
        },
        {
            "Sid": "AllowAttachmentOfPersistentResources",
            "Effect": "Allow",
            "Action": [
                "kms:CreateGrant",
                "kms:ListGrants",
                "kms:RevokeGrant"
            ],
            "Resource": [
                "<ARN of kms_key_account_a from Account A>"
            ],
            "Condition": {
                "Bool": {
                    "kms:GrantIsForAWSResource": true
                }
            }
        }
    ]
}

Policy name: rs_xacct_bucket_policy

Note: Replace rs-xacct-kms-bucket with your bucket name.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowS3",
            "Effect": "Allow",
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::rs-xacct-kms-bucket/*",
                "arn:aws:s3:::rs-xacct-kms-bucket"
            ]
        }
    ]
}

b) Create a new IAM role called xacct_kms_role_account_b for the Amazon Redshift service with the following IAM policies attached:

rs_xacct_bucket_policy
rs_xacct_kms_policy
AWSGlueConsoleFullAccess

Save the Amazon Resource Name (ARN) of the IAM role. You’ll use it soon.

c) Now let’s set up the IAM role chaining for Amazon Redshift between the two IAM roles, redshift_role_account_b and xacct_kms_role_account_b.

To chain roles, you establish a trust relationship between the roles. A role that assumes another role (for example, Role A) must have a permission policy that allows it to assume the next chained role (for example, Role B). Similarly, the role that passes permissions (Role B) must have a trust policy that allows it to pass its permissions to the previous chained role (Role A).

The first role in the chain must be a role attached to the Amazon Redshift cluster. The first role and each subsequent role that assumes the next role in the chain must have a policy that includes a specific statement. This statement has the Allow effect on the sts:AssumeRole action and the ARN of the next role in a Resource element.

In our example, Role A is redshift_role_account_b, which needs the permission policy rs_xacct_assume_role_policy, which  allows it to assume Role B (which is xacct_kms_role_account_b). Both IAM roles are owned by AWS Account B.

d) Let’s create the IAM permission policy rs_xacct_assume_role_policy and attach the policy to the IAM role redshift_role_account_b.

Policy name: rs_xacct_assume_role_policy

Note: Replace <ARN for IAM role xacct_kms_role_account_b from Account B>.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1487639602000",
            "Effect": "Allow",
            "Action": [
                "sts:AssumeRole"
            ],
            "Resource": [
"<ARN for IAM role xacct_kms_role_account_b from Account B>"
            ]
        }
    ]
}

e) Change the trust relationship for IAM role xacct_kms_role_account_b by choosing Edit trust relationship and replacing the existing trust policy with the following:

Note: Replace <Account B> with the AWS Account ID for Account B.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "redshift.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<Account B>:root"
      },
      "Action": "sts:AssumeRole"
    }
  ]
} 

f) Create an AWS Glue service IAM role called glue_service_role_account_b with the following policies attached:

• AWSGlueServiceRole (AWS managed policy)
• rs_xacct_bucket_policy (managed policy created earlier)
• rs_xacct_kms_policy (managed policy created earlier)

Note: Be sure to update glue_service_role_account_b with your own IAM role.

Perform the Amazon Redshift COPY

  1. Log in to the Amazon Redshift cluster from your query tool and create the customer table using the DDL following.
CREATE TABLE customer 
(
  c_custkey      INTEGER NOT NULL,
  c_name         VARCHAR(25) NOT NULL,
  c_address      VARCHAR(25) NOT NULL,
  c_city         VARCHAR(10) NOT NULL,
  c_nation       VARCHAR(15) NOT NULL,
  c_region       VARCHAR(12) NOT NULL,
  c_phone        VARCHAR(15) NOT NULL,
  c_mktsegment   VARCHAR(10) NOT NULL
);

2. Now you can run the COPY statement following successfully.

copy customer from 's3://rs-xacct-kms-bucket/customer/' 
iam_role '<IAM role ARN of redshift_role_account_b,IAM role ARN of xacct_kms_role_account_b>'
gzip
region 'us-west-2';

Note: Replace the IAM role ARNs from Account B separated by a comma without any spaces around it.

3. Run the following sample query to verify that the data was loaded successfully.

select * from customer limit 10;

Set up an AWS Glue Data Catalog table for Redshift Spectrum to query

Let’s now create an AWS Glue crawler in Account B to crawl the same customer data and create a table called customer in the AWS Glue Data Catalog database spectrumdb_account_b following these steps:

  1. Navigate to Databases on the AWS Glue console and choose Add database to create an AWS Glue Data Catalog database called spectrumdb_account_b, as shown following.

  1. Navigate to Crawlers on the AWS Glue console and choose Add crawler, as shown following.

  1. Create a crawler customerxacct, as shown following.

Note: The Crawler job name (customerxacct in this case) is not same as the table name created by the crawler (a common confusion). The table name is picked up automatically from the prefix and folder name from your S3 bucket and folder structure. You also have an option to attach a table name prefix if you want to.              

  1. Choose Next to enter Data store details of the customer table, as following.

  1. Choose Next to get to the Add another data store We leave the default, No, because we don’t have any other data stores to add.

  1. Choose Next to choose the IAM role created earlier, glue_service_role_account_b, for the crawler to use, as shown following.

  1. Choose Next to go to the Schedule page and choose the schedule that you want this crawler job to run. For this example, we can choose Run on demand.

  1. Choose Next to choose the AWS Glue Data Catalog database spectrumdb_account_b (created earlier by create external schema command) as the crawler output location.

  1. Choose Next to get to the review page.

  1. After reviewing the details, choose Finish to finish creating the crawler.

  1. Now, let’s run the crawler job by selecting the job as following and choosing Run crawler.

  1. Wait and watch for the job to complete. Its status changes from Starting to Stopping to Ready. You can choose the refresh button for the latest status.

  1. If the job fails, the failure is recorded in Amazon CloudWatch logs. To view the logs, choose Logs, shown in the screenshot preceding, which takes you to the CloudWatch logs.
  1. Now, let’s go to the AWS Glue Data Catalog database to make sure that the table exists.

Choose Databases, choose the spectrumdb_account_b database, and then choose View Tables, or choose the hyperlink of the database name. You should see the customer table, as shown following.

  1. Choose the customer hyperlink to get to the external table, details following.

Because the data file didn’t have a header record, the AWS Glue crawler has assigned a default column naming convention as shown preceding. For the customer table, this naming is column 0 to column 7

  1. Choose Edit Schema and assign appropriate column names, as per the mapping following.

c0 => c_custkey

c1 => c_name

c2 => c_address

c3 => c_city

c4 => c_nation

c5 => c_region

c6 => c_phone

c7 => c_mktsegment

When you are done, choose Save.

Perform the Redshift Spectrum query

Now that the customer table is created in AWS Glue Data Catalog, let’s query the table using Redshift Spectrum.

  1. Log in to the Amazon Redshift cluster from your query tool.
  2. Run the statements following to create an external schema called spectrumxacct for Redshift Spectrum pointing to the AWS Glue Data Catalog database. This database is spectrumdb_account_b in Account B, already created on the AWS Glue console.
    drop schema if exists spectrumxacct;
    create external schema spectrumxacct
    from data catalog 
    database 'spectrumdb_account_b'
    iam_role '<IAM role ARN of redshift_role_account_b,IAM role ARN of xacct_kms_role_account_b>'
    create external database if not exists;
    

    Note: Replace the IAM role ARNs from Account B separated by a comma without any spaces around it.

  3. Run the following sample query to verify that Redshift Spectrum can query the data successfully.
    select * from spectrumxacct.customer limit 10;

Note: Redshift Spectrum uses the AWS Glue Data Catalog in Account B, not Account A.

Option 2: AWS Glue Data Catalog in Account A

 

Set up permissions

1. Sign in to the Account A AWS console, then change the AWS Region to us-west-2 (Oregon).

    • a) Create the following IAM policies:

• rs-xacct-bucket-policy to give access to the S3 bucket in Account A
• rs_xacct_kms_policy to give access to the CMK in Account A

Policy name: rs_xacct_bucket_policy

Note: Replace the bucket name rs-xacct-kms-bucket with your bucket name.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowS3",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::rs-xacct-kms-bucket/*",
                "arn:aws:s3:::rs-xacct-kms-bucket"
            ]
        }
    ]
}

Policy name: rs_xacct_kms_policy

Note: Replace <ARN of kms_key_account_a from Account A> with your KMS key ARN from Account A.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowUseOfTheKey",
            "Effect": "Allow",
            "Action": [
                "kms:Encrypt",
                "kms:Decrypt",
                "kms:ReEncrypt*",
                "kms:GenerateDataKey*",
                "kms:DescribeKey"
            ],
            "Resource": [
                "<ARN of kms_key_account_a from Account A>"
            ]
        },
        {
            "Sid": "AllowAttachmentOfPersistentResources",
            "Effect": "Allow",
            "Action": [
                "kms:CreateGrant",
                "kms:ListGrants",
                "kms:RevokeGrant"
            ],
            "Resource": [
                "<ARN of kms_key_account_a from Account A>"
            ],
            "Condition": {
                "Bool": {
                    "kms:GrantIsForAWSResource": true
                }
            }
        }
    ]
}

b) Create a new IAM role called xacct_kms_role_account_a for the Amazon Redshift service with the following IAM policies:

rs_xacct_bucket_policy
rs_xacct_kms_policy
AWSGlueConsoleFullAccess (this managed policy provides the required permissions for the AWS Glue Data Catalog)

Save the IAM role ARN to be used shortly.

c) Change the trust relationship for the IAM role xacct_kms_role_account_a by choosing Edit trust relationship and replacing the existing trust policy with the following:

Note: Replace <Account B> with the AWS account ID for Account B.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "redshift.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<Account B>:root"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

d) Create an AWS Glue service IAM role called glue_service_role_account_a with the following policies attached:

AWSGlueServiceRole (AWS managed policy)
rs_xacct_bucket_policy (managed policy created earlier)
rs_xacct_kms_policy (managed policy created earlier)

Note: Be sure to update glue_service_role_account_a with your own IAM role

2. Sign in to Account B’s AWS console and change the AWS Region to us-west-2 (Oregon) if it’s not already selected.

a) Modify the existing IAM policy rs_xacct_assume_role_policy and replace the existing JSON policy with the following:

 Note: Replace <ARN for IAM role xacct_kms_role_account_a from Account A>.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1487639602000",
            "Effect": "Allow",
            "Action": [
                "sts:AssumeRole"
            ],
            "Resource": [
"<ARN for IAM role xacct_kms_role_account_a from Account A>"
            ]
        }
    ]
}

Perform the Amazon Redshift COPY

1. Log in to the Amazon Redshift cluster from your query tool and create the customer table using the DDL following.

CREATE TABLE customer 
(
  c_custkey      INTEGER NOT NULL,
  c_name         VARCHAR(25) NOT NULL,
  c_address      VARCHAR(25) NOT NULL,
  c_city         VARCHAR(10) NOT NULL,
  c_nation       VARCHAR(15) NOT NULL,
  c_region       VARCHAR(12) NOT NULL,
  c_phone        VARCHAR(15) NOT NULL,
  c_mktsegment   VARCHAR(10) NOT NULL
);

2. Now you should be able to run the COPY statement following successfully.

copy customer from 's3://rs-xacct-kms-bucket/customer/' 
iam_role '<ARN for IAM role redshift_role_account_b from Account B,<ARN for IAM role xacct_kms_role_account_a from Account A>'
gzip
region 'us-west-2';

Note: Replace the IAM role ARNs separated by a comma without any spaces around it.

3. Run the sample query following to validate that the data was loaded successfully.

select * from customer limit 10;

Set up AWS Glue Data Catalog table for Redshift Spectrum to query

Let’s now create an AWS Glue crawler in Account A to crawl the same customer data and create a table called customer in the AWS Glue Data Catalog database spectrumdb_account_a in Account A following these steps:

Follow the same steps as outlined in Option 1 to create and run a crawler with the following changes:

  1. This time, create the crawler in Account A (as opposed to Account B for Option 1).
  2. Create an AWS Glue Data Catalog database spectrumdb_account_a in Account A (as opposed to spectrumdb_account_b in Account B), and choose that database for crawler to create the customer table.
  3. While providing S3 path, choose the option Specified path in my account (unlike Specified path in another account chosen for Option 1).
  4. Make sure to use glue_service_role_account_a created earlier as the AWS Glue service IAM role.=

Perform the Redshift Spectrum query

Now that the customer table is created in the AWS Glue Data Catalog, let’s query the table using Redshift Spectrum.

1. Log in to the Amazon Redshift cluster from your query tool and run the statements following. These create an external schema called spectrumxacct2 for Redshift Spectrum pointing to the AWS Glue Data Catalog database spectrumdb_account_a (created earlier from AWS Glue console) in Account A.

drop schema if exists spectrumxacct2;
create external schema spectrumxacct2
from data catalog 
database 'spectrumdb_account_a' 
iam_role '<ARN for IAM role redshift_role_account_b from Account B,<ARN for IAM role xacct_kms_role_account_a from Account A>'
create external database if not exists;

Note: Replace the IAM role ARNs separated by a comma without any spaces around it.

2. Run the following query, which should run successfully.

select * from spectrumxacct2.customer limit 10;

Note: Spectrum uses the AWS Glue Data Catalog in Account A, not Account B.

Summary

This post shows a step-by-step walkthrough of how to set up a cross-account Amazon Redshift COPY and query using Redshift Spectrum for a sample KMS encrypted dataset in Amazon S3. It demonstrates two solution options to choose from depending on which account’s AWS Glue Catalog you want to use for Redshift Spectrum.

If you have questions or suggestions, please leave a comment.

 


About the Author

Asim Kumar Sasmal is a Sr. Data Architect – IoT in the Global Specialty Practice of AWS Professional Services. He helps AWS customers around the globe to design and build data driven solutions by providing expert technical consulting, best practices guidance, and implementation services on AWS platform. He is passionate about working backwards from customer ask, help them to think big, and dive deep to solve real business problems by leveraging the power of AWS platform.

Amazon Kinesis Data Streams Adds Enhanced Fan-Out and HTTP/2 for Faster Streaming

Post Syndicated from Randall Hunt original https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/

A few weeks ago, we launched two significant performance improving features for Amazon Kinesis Data Streams (KDS): enhanced fan-out and an HTTP/2 data retrieval API. Enhanced fan-out allows developers to scale up the number of stream consumers (applications reading data from a stream in real-time) by offering each stream consumer its own read throughput. Meanwhile, the HTTP/2 data retrieval API allows data to be delivered from producers to consumers in 70 milliseconds or better (a 65% improvement) in typical scenarios. These new features enable developers to build faster, more reactive, highly parallel, and latency-sensitive applications on top of Kinesis Data Streams.

Kinesis actually refers to a family of streaming services: Kinesis Video Streams, Kinesis Data Firehose, Kinesis Data Analytics, and the topic of today’s blog post, Kinesis Data Streams (KDS). Kinesis Data Streams allows developers to easily and continuously collect, process, and analyze streaming data in real-time with a fully-managed and massively scalable service. KDS can capture gigabytes of data per second from hundreds of thousands of sources – everything from website clickstreams and social media feeds to financial transactions and location-tracking events.

Kinesis Data Streams are scaled using the concept of a shard. One shard provides an ingest capacity of 1MB/second or 1000 records/second and an output capacity of 2MB/second. It’s not uncommon for customers to have thousands or tens of thousands of shards supporting 10s of GB/sec of ingest and egress. Before the enhanced fan-out capability, that 2MB/second/shard output was shared between all of the applications consuming data from the stream. With enhanced fan-out developers can register stream consumers to use enhanced fan-out and receive their own 2MB/second pipe of read throughput per shard, and this throughput automatically scales with the number of shards in a stream. Prior to the launch of Enhanced Fan-out customers would frequently fan-out their data out to multiple streams to support their desired read throughput for their downstream applications. That sounds like undifferentiated heavy lifting to us, and that’s something we decided our customers shouldn’t need to worry about. Customers pay for enhanced fan-out based on the amount of data retrieved from the stream using enhanced fan-out and the number of consumers registered per-shard. You can find additional info on the pricing page.

Before we jump into a description of the new API, let’s cover a few quick notes about HTTP/2 and how we use that with the new SubscribeToShard API.

HTTP/2

HTTP/2 is a major revision to the HTTP network protocol that introduces a new method for framing and transporting data between clients and servers. It’s a binary protocol. It enables many new features focused on decreasing latency and increasing throughput. The first gain is the use of HPACK to compress headers. Another useful feature is connection multiplexing which allows us to use a single TCP connection for multiple parallel non-blocking requests. Additionally, instead of the traditional request-response semantics of HTTP, the communication pipe is bidirectional. A server using HTTP/2 can push multiple responses to a client without waiting for the client to request those resources. Kinesis’s SubscribeToShard API takes advantage of this server push feature to receive new records and makes use of another HTTP/2 feature called flow control. Kinesis pushes data to the consumer and keeps track of the number of bytes that have been unacknowledged. The client acknowledges bytes received by sending WINDOW_UPDATE frames to the server. If the client can’t handle the rate of data, then Kinesis will pause the flow of data until a new WINDOW_UPDATE frame is received or until the 5 minute subscription expires.

Now that we have a grasp on SubscribeToShard and HTTP/2 let’s cover how we use this to take advantage of enhanced fan-out!

Using Enhanced Fan-out

The easiest way to make use of enhanced fan-out is to use the updated Kinesis Client Library 2.0 (KCL). KCL will automatically register itself as a consumer of the stream. Then KCL will enumerate the shards and subscribe to them using the new SubscribeToShard API. It will also continuously call SubscribeToShard whenever the underlying connections are terminated. Under the hood, KCL handles checkpointing and state management of a distributed app with a Amazon DynamoDB table it creates in your AWS account. You can see an example of this in the documentation.

The general process for using enhanced fan-out is:

  1. Call RegisterStreamConsumer and provide the StreamARN and ConsumerName (commonly the application name). Save the ConsumerARN returned by this API call. As soon as the consumer is registered, enhanced fan-out is enabled and billing for consumer-shard-hours begins.
  2. Enumerate stream shards and call SubscribeToShard on each of them with the ConsumerARN returned by RegisterStreamConsumer. This establishes an HTTP/2 connection, and KDS will push SubscribeToShardEvents to the listening client. These connections are terminated by KDS every 5 minutes, so the client will need to call SubscribeToShard again if you want to continue receiving events. Bytes pushed to the client using enhanced fan-out are billed under enhanced fan-out data retrieval rates.
  3. Finally, remember to call DeregisterStreamConsumer when you’re no longer using the consumer since it does have an associated cost.

You can see some example code walking through this process in the documentation.

You can view Amazon CloudWatch metrics and manage consumer applications in the console, including deregistering them.

Available Now

Enhanced fan-out and the new HTTP/2 SubscribeToShard API are both available now in all regions for new streams and existing streams. There’s a lot more information than what I’ve covered in this blog post in the documentation. There is a per-stream limit of 5 consumer applications (e.g., 5 different KCL applications) reading from all shards but this can be increased with a  support ticket. I’m excited to see customers take advantage of these new features to reduce the complexity of managing multiple stream consumers and to increase the speed and parallelism of their real-time applications.

As always feel free to leave comments below or on Twitter.

Randall

AWS Online Tech Talks – July 2018

Post Syndicated from Sara Rodas original https://aws.amazon.com/blogs/aws/aws-online-tech-talks-july-2018/

Join us this month to learn about AWS services and solutions featuring topics on Amazon EMR, Amazon SageMaker, AWS Lambda, Amazon S3, Amazon WorkSpaces, Amazon EC2 Fleet and more! We also have our third episode of the “How to re:Invent” where we’ll dive deep with the AWS Training and Certification team on Bootcamps, Hands-on Labs, and how to get AWS Certified at re:Invent. Register now! We look forward to seeing you. Please note – all sessions are free and in Pacific Time.

 

Tech talks featured this month:

 

Analytics & Big Data

July 23, 2018 | 11:00 AM – 12:00 PM PT – Large Scale Machine Learning with Spark on EMR – Learn how to do large scale machine learning on Amazon EMR.

July 25, 2018 | 01:00 PM – 02:00 PM PT – Introduction to Amazon QuickSight: Business Analytics for Everyone – Get an introduction to Amazon Quicksight, Amazon’s BI service.

July 26, 2018 | 11:00 AM – 12:00 PM PT – Multi-Tenant Analytics on Amazon EMR – Discover how to make an Amazon EMR cluster multi-tenant to have different processing activities on the same data lake.

 

Compute

July 31, 2018 | 11:00 AM – 12:00 PM PT – Accelerate Machine Learning Workloads Using Amazon EC2 P3 Instances – Learn how to use Amazon EC2 P3 instances, the most powerful, cost-effective and versatile GPU compute instances available in the cloud.

August 1, 2018 | 09:00 AM – 10:00 AM PT – Technical Deep Dive on Amazon EC2 Fleet – Learn how to launch workloads across instance types, purchase models, and AZs with EC2 Fleet to achieve the desired scale, performance and cost.

 

Containers

July 25, 2018 | 11:00 AM – 11:45 AM PT – How Harry’s Shaved Off Their Operational Overhead by Moving to AWS Fargate – Learn how Harry’s migrated their messaging workload to Fargate and reduced message processing time by more than 75%.

 

Databases

July 23, 2018 | 01:00 PM – 01:45 PM PT – Purpose-Built Databases: Choose the Right Tool for Each Job – Learn about purpose-built databases and when to use which database for your application.

July 24, 2018 | 11:00 AM – 11:45 AM PT – Migrating IBM Db2 Databases to AWS – Learn how to migrate your IBM Db2 database to the cloud database of your choice.

 

DevOps

July 25, 2018 | 09:00 AM – 09:45 AM PT – Optimize Your Jenkins Build Farm – Learn how to optimize your Jenkins build farm using the plug-in for AWS CodeBuild.

 

Enterprise & Hybrid

July 31, 2018 | 09:00 AM – 09:45 AM PT – Enable Developer Productivity with Amazon WorkSpaces – Learn how your development teams can be more productive with Amazon WorkSpaces.

August 1, 2018 | 11:00 AM – 11:45 AM PT – Enterprise DevOps: Applying ITIL to Rapid Innovation – Innovation doesn’t have to equate to more risk for your organization. Learn how Enterprise DevOps delivers agility while maintaining governance, security and compliance.

 

IoT

July 30, 2018 | 01:00 PM – 01:45 PM PT – Using AWS IoT & Alexa Skills Kit to Voice-Control Connected Home Devices – Hands-on workshop that covers how to build a simple backend service using AWS IoT to support an Alexa Smart Home skill.

 

Machine Learning

July 23, 2018 | 09:00 AM – 09:45 AM PT – Leveraging ML Services to Enhance Content Discovery and Recommendations – See how customers are using computer vision and language AI services to enhance content discovery & recommendations.

July 24, 2018 | 09:00 AM – 09:45 AM PT – Hyperparameter Tuning with Amazon SageMaker’s Automatic Model Tuning – Learn how to use Automatic Model Tuning with Amazon SageMaker to get the best machine learning model for your datasets, to tune hyperparameters.

July 26, 2018 | 09:00 AM – 10:00 AM PT – Build Intelligent Applications with Machine Learning on AWS – Learn how to accelerate development of AI applications using machine learning on AWS.

 

re:Invent

July 18, 2018 | 08:00 AM – 08:30 AM PT – Episode 3: Training & Certification Round-Up – Join us as we dive deep with the AWS Training and Certification team on Bootcamps, Hands-on Labs, and how to get AWS Certified at re:Invent.

 

Security, Identity, & Compliance

July 30, 2018 | 11:00 AM – 11:45 AM PT – Get Started with Well-Architected Security Best Practices – Discover and walk through essential best practices for securing your workloads using a number of AWS services.

 

Serverless

July 24, 2018 | 01:00 PM – 02:00 PM PT – Getting Started with Serverless Computing Using AWS Lambda – Get an introduction to serverless and how to start building applications with no server management.

 

Storage

July 30, 2018 | 09:00 AM – 09:45 AM PT – Best Practices for Security in Amazon S3 – Learn about Amazon S3 security fundamentals and lots of new features that help make security simple.

Analyze data in Amazon DynamoDB using Amazon SageMaker for real-time prediction

Post Syndicated from YongSeong Lee original https://aws.amazon.com/blogs/big-data/analyze-data-in-amazon-dynamodb-using-amazon-sagemaker-for-real-time-prediction/

Many companies across the globe use Amazon DynamoDB to store and query historical user-interaction data. DynamoDB is a fast NoSQL database used by applications that need consistent, single-digit millisecond latency.

Often, customers want to turn their valuable data in DynamoDB into insights by analyzing a copy of their table stored in Amazon S3. Doing this separates their analytical queries from their low-latency critical paths. This data can be the primary source for understanding customers’ past behavior, predicting future behavior, and generating downstream business value. Customers often turn to DynamoDB because of its great scalability and high availability. After a successful launch, many customers want to use the data in DynamoDB to predict future behaviors or provide personalized recommendations.

DynamoDB is a good fit for low-latency reads and writes, but it’s not practical to scan all data in a DynamoDB database to train a model. In this post, I demonstrate how you can use DynamoDB table data copied to Amazon S3 by AWS Data Pipeline to predict customer behavior. I also demonstrate how you can use this data to provide personalized recommendations for customers using Amazon SageMaker. You can also run ad hoc queries using Amazon Athena against the data. DynamoDB recently released on-demand backups to create full table backups with no performance impact. However, it’s not suitable for our purposes in this post, so I chose AWS Data Pipeline instead to create managed backups are accessible from other services.

To do this, I describe how to read the DynamoDB backup file format in Data Pipeline. I also describe how to convert the objects in S3 to a CSV format that Amazon SageMaker can read. In addition, I show how to schedule regular exports and transformations using Data Pipeline. The sample data used in this post is from Bank Marketing Data Set of UCI.

The solution that I describe provides the following benefits:

  • Separates analytical queries from production traffic on your DynamoDB table, preserving your DynamoDB read capacity units (RCUs) for important production requests
  • Automatically updates your model to get real-time predictions
  • Optimizes for performance (so it doesn’t compete with DynamoDB RCUs after the export) and for cost (using data you already have)
  • Makes it easier for developers of all skill levels to use Amazon SageMaker

All code and data set in this post are available in this .zip file.

Solution architecture

The following diagram shows the overall architecture of the solution.

The steps that data follows through the architecture are as follows:

  1. Data Pipeline regularly copies the full contents of a DynamoDB table as JSON into an S3
  2. Exported JSON files are converted to comma-separated value (CSV) format to use as a data source for Amazon SageMaker.
  3. Amazon SageMaker renews the model artifact and update the endpoint.
  4. The converted CSV is available for ad hoc queries with Amazon Athena.
  5. Data Pipeline controls this flow and repeats the cycle based on the schedule defined by customer requirements.

Building the auto-updating model

This section discusses details about how to read the DynamoDB exported data in Data Pipeline and build automated workflows for real-time prediction with a regularly updated model.

Download sample scripts and data

Before you begin, take the following steps:

  1. Download sample scripts in this .zip file.
  2. Unzip the src.zip file.
  3. Find the automation_script.sh file and edit it for your environment. For example, you need to replace 's3://<your bucket>/<datasource path>/' with your own S3 path to the data source for Amazon ML. In the script, the text enclosed by angle brackets—< and >—should be replaced with your own path.
  4. Upload the json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar file to your S3 path so that the ADD jar command in Apache Hive can refer to it.

For this solution, the banking.csv  should be imported into a DynamoDB table.

Export a DynamoDB table

To export the DynamoDB table to S3, open the Data Pipeline console and choose the Export DynamoDB table to S3 template. In this template, Data Pipeline creates an Amazon EMR cluster and performs an export in the EMRActivity activity. Set proper intervals for backups according to your business requirements.

One core node(m3.xlarge) provides the default capacity for the EMR cluster and should be suitable for the solution in this post. Leave the option to resize the cluster before running enabled in the TableBackupActivity activity to let Data Pipeline scale the cluster to match the table size. The process of converting to CSV format and renewing models happens in this EMR cluster.

For a more in-depth look at how to export data from DynamoDB, see Export Data from DynamoDB in the Data Pipeline documentation.

Add the script to an existing pipeline

After you export your DynamoDB table, you add an additional EMR step to EMRActivity by following these steps:

  1. Open the Data Pipeline console and choose the ID for the pipeline that you want to add the script to.
  2. For Actions, choose Edit.
  3. In the editing console, choose the Activities category and add an EMR step using the custom script downloaded in the previous section, as shown below.

Paste the following command into the new step after the data ­­upload step:

s3://#{myDDBRegion}.elasticmapreduce/libs/script-runner/script-runner.jar,s3://<your bucket name>/automation_script.sh,#{output.directoryPath},#{myDDBRegion}

The element #{output.directoryPath} references the S3 path where the data pipeline exports DynamoDB data as JSON. The path should be passed to the script as an argument.

The bash script has two goals, converting data formats and renewing the Amazon SageMaker model. Subsequent sections discuss the contents of the automation script.

Automation script: Convert JSON data to CSV with Hive

We use Apache Hive to transform the data into a new format. The Hive QL script to create an external table and transform the data is included in the custom script that you added to the Data Pipeline definition.

When you run the Hive scripts, do so with the -e option. Also, define the Hive table with the 'org.openx.data.jsonserde.JsonSerDe' row format to parse and read JSON format. The SQL creates a Hive EXTERNAL table, and it reads the DynamoDB backup data on the S3 path passed to it by Data Pipeline.

Note: You should create the table with the “EXTERNAL” keyword to avoid the backup data being accidentally deleted from S3 if you drop the table.

The full automation script for converting follows. Add your own bucket name and data source path in the highlighted areas.

#!/bin/bash
hive -e "
ADD jar s3://<your bucket name>/json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar ; 
DROP TABLE IF EXISTS blog_backup_data ;
CREATE EXTERNAL TABLE blog_backup_data (
 customer_id map<string,string>,
 age map<string,string>, job map<string,string>, 
 marital map<string,string>,education map<string,string>, 
 default map<string,string>, housing map<string,string>,
 loan map<string,string>, contact map<string,string>, 
 month map<string,string>, day_of_week map<string,string>, 
 duration map<string,string>, campaign map<string,string>,
 pdays map<string,string>, previous map<string,string>, 
 poutcome map<string,string>, emp_var_rate map<string,string>, 
 cons_price_idx map<string,string>, cons_conf_idx map<string,string>,
 euribor3m map<string,string>, nr_employed map<string,string>, 
 y map<string,string> ) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
LOCATION '$1/';

INSERT OVERWRITE DIRECTORY 's3://<your bucket name>/<datasource path>/' 
SELECT concat( customer_id['s'],',', 
 age['n'],',', job['s'],',', 
 marital['s'],',', education['s'],',', default['s'],',', 
 housing['s'],',', loan['s'],',', contact['s'],',', 
 month['s'],',', day_of_week['s'],',', duration['n'],',', 
 campaign['n'],',',pdays['n'],',',previous['n'],',', 
 poutcome['s'],',', emp_var_rate['n'],',', cons_price_idx['n'],',',
 cons_conf_idx['n'],',', euribor3m['n'],',', nr_employed['n'],',', y['n'] ) 
FROM blog_backup_data
WHERE customer_id['s'] > 0 ; 

After creating an external table, you need to read data. You then use the INSERT OVERWRITE DIRECTORY ~ SELECT command to write CSV data to the S3 path that you designated as the data source for Amazon SageMaker.

Depending on your requirements, you can eliminate or process the columns in the SELECT clause in this step to optimize data analysis. For example, you might remove some columns that have unpredictable correlations with the target value because keeping the wrong columns might expose your model to “overfitting” during the training. In this post, customer_id  columns is removed. Overfitting can make your prediction weak. More information about overfitting can be found in the topic Model Fit: Underfitting vs. Overfitting in the Amazon ML documentation.

Automation script: Renew the Amazon SageMaker model

After the CSV data is replaced and ready to use, create a new model artifact for Amazon SageMaker with the updated dataset on S3.  For renewing model artifact, you must create a new training job.  Training jobs can be run using the AWS SDK ( for example, Amazon SageMaker boto3 ) or the Amazon SageMaker Python SDK that can be installed with “pip install sagemaker” command as well as the AWS CLI for Amazon SageMaker described in this post.

In addition, consider how to smoothly renew your existing model without service impact, because your model is called by applications in real time. To do this, you need to create a new endpoint configuration first and update a current endpoint with the endpoint configuration that is just created.

#!/bin/bash
## Define variable 
REGION=$2
DTTIME=`date +%Y-%m-%d-%H-%M-%S`
ROLE="<your AmazonSageMaker-ExecutionRole>" 


# Select containers image based on region.  
case "$REGION" in
"us-west-2" )
    IMAGE="174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest"
    ;;
"us-east-1" )
    IMAGE="382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest" 
    ;;
"us-east-2" )
    IMAGE="404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest" 
    ;;
"eu-west-1" )
    IMAGE="438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest" 
    ;;
 *)
    echo "Invalid Region Name"
    exit 1 ;  
esac

# Start training job and creating model artifact 
TRAINING_JOB_NAME=TRAIN-${DTTIME} 
S3OUTPUT="s3://<your bucket name>/model/" 
INSTANCETYPE="ml.m4.xlarge"
INSTANCECOUNT=1
VOLUMESIZE=5 
aws sagemaker create-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --algorithm-specification TrainingImage=${IMAGE},TrainingInputMode=File --role-arn ${ROLE}  --input-data-config '[{ "ChannelName": "train", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://<your bucket name>/<datasource path>/", "S3DataDistributionType": "FullyReplicated" } }, "ContentType": "text/csv", "CompressionType": "None" , "RecordWrapperType": "None"  }]'  --output-data-config S3OutputPath=${S3OUTPUT} --resource-config  InstanceType=${INSTANCETYPE},InstanceCount=${INSTANCECOUNT},VolumeSizeInGB=${VOLUMESIZE} --stopping-condition MaxRuntimeInSeconds=120 --hyper-parameters feature_dim=20,predictor_type=binary_classifier  

# Wait until job completed 
aws sagemaker wait training-job-completed-or-stopped --training-job-name ${TRAINING_JOB_NAME}  --region ${REGION}

# Get newly created model artifact and create model
MODELARTIFACT=`aws sagemaker describe-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --query 'ModelArtifacts.S3ModelArtifacts' --output text `
MODELNAME=MODEL-${DTTIME}
aws sagemaker create-model --region ${REGION} --model-name ${MODELNAME}  --primary-container Image=${IMAGE},ModelDataUrl=${MODELARTIFACT}  --execution-role-arn ${ROLE}

# create a new endpoint configuration 
CONFIGNAME=CONFIG-${DTTIME}
aws sagemaker  create-endpoint-config --region ${REGION} --endpoint-config-name ${CONFIGNAME}  --production-variants  VariantName=Users,ModelName=${MODELNAME},InitialInstanceCount=1,InstanceType=ml.m4.xlarge

# create or update the endpoint
STATUS=`aws sagemaker describe-endpoint --endpoint-name  ServiceEndpoint --query 'EndpointStatus' --output text --region ${REGION} `
if [[ $STATUS -ne "InService" ]] ;
then
    aws sagemaker  create-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}    
else
    aws sagemaker  update-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}
fi

Grant permission

Before you execute the script, you must grant proper permission to Data Pipeline. Data Pipeline uses the DataPipelineDefaultResourceRole role by default. I added the following policy to DataPipelineDefaultResourceRole to allow Data Pipeline to create, delete, and update the Amazon SageMaker model and data source in the script.

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "sagemaker:CreateTrainingJob",
 "sagemaker:DescribeTrainingJob",
 "sagemaker:CreateModel",
 "sagemaker:CreateEndpointConfig",
 "sagemaker:DescribeEndpoint",
 "sagemaker:CreateEndpoint",
 "sagemaker:UpdateEndpoint",
 "iam:PassRole"
 ],
 "Resource": "*"
 }
 ]
}

Use real-time prediction

After you deploy a model into production using Amazon SageMaker hosting services, your client applications use this API to get inferences from the model hosted at the specified endpoint. This approach is useful for interactive web, mobile, or desktop applications.

Following, I provide a simple Python code example that queries against Amazon SageMaker endpoint URL with its name (“ServiceEndpoint”) and then uses them for real-time prediction.

=== Python sample for real-time prediction ===

#!/usr/bin/env python
import boto3
import json 

client = boto3.client('sagemaker-runtime', region_name ='<your region>' )
new_customer_info = '34,10,2,4,1,2,1,1,6,3,190,1,3,4,3,-1.7,94.055,-39.8,0.715,4991.6'
response = client.invoke_endpoint(
    EndpointName='ServiceEndpoint',
    Body=new_customer_info, 
    ContentType='text/csv'
)
result = json.loads(response['Body'].read().decode())
print(result)
--- output(response) ---
{u'predictions': [{u'score': 0.7528127431869507, u'predicted_label': 1.0}]}

Solution summary

The solution takes the following steps:

  1. Data Pipeline exports DynamoDB table data into S3. The original JSON data should be kept to recover the table in the rare event that this is needed. Data Pipeline then converts JSON to CSV so that Amazon SageMaker can read the data.Note: You should select only meaningful attributes when you convert CSV. For example, if you judge that the “campaign” attribute is not correlated, you can eliminate this attribute from the CSV.
  2. Train the Amazon SageMaker model with the new data source.
  3. When a new customer comes to your site, you can judge how likely it is for this customer to subscribe to your new product based on “predictedScores” provided by Amazon SageMaker.
  4. If the new user subscribes your new product, your application must update the attribute “y” to the value 1 (for yes). This updated data is provided for the next model renewal as a new data source. It serves to improve the accuracy of your prediction. With each new entry, your application can become smarter and deliver better predictions.

Running ad hoc queries using Amazon Athena

Amazon Athena is a serverless query service that makes it easy to analyze large amounts of data stored in Amazon S3 using standard SQL. Athena is useful for examining data and collecting statistics or informative summaries about data. You can also use the powerful analytic functions of Presto, as described in the topic Aggregate Functions of Presto in the Presto documentation.

With the Data Pipeline scheduled activity, recent CSV data is always located in S3 so that you can run ad hoc queries against the data using Amazon Athena. I show this with example SQL statements following. For an in-depth description of this process, see the post Interactive SQL Queries for Data in Amazon S3 on the AWS News Blog. 

Creating an Amazon Athena table and running it

Simply, you can create an EXTERNAL table for the CSV data on S3 in Amazon Athena Management Console.

=== Table Creation ===
CREATE EXTERNAL TABLE datasource (
 age int, 
 job string, 
 marital string , 
 education string, 
 default string, 
 housing string, 
 loan string, 
 contact string, 
 month string, 
 day_of_week string, 
 duration int, 
 campaign int, 
 pdays int , 
 previous int , 
 poutcome string, 
 emp_var_rate double, 
 cons_price_idx double,
 cons_conf_idx double, 
 euribor3m double, 
 nr_employed double, 
 y int 
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n' 
LOCATION 's3://<your bucket name>/<datasource path>/';

The following query calculates the correlation coefficient between the target attribute and other attributes using Amazon Athena.

=== Sample Query ===

SELECT corr(age,y) AS correlation_age_and_target, 
 corr(duration,y) AS correlation_duration_and_target, 
 corr(campaign,y) AS correlation_campaign_and_target,
 corr(contact,y) AS correlation_contact_and_target
FROM ( SELECT age , duration , campaign , y , 
 CASE WHEN contact = 'telephone' THEN 1 ELSE 0 END AS contact 
 FROM datasource 
 ) datasource ;

Conclusion

In this post, I introduce an example of how to analyze data in DynamoDB by using table data in Amazon S3 to optimize DynamoDB table read capacity. You can then use the analyzed data as a new data source to train an Amazon SageMaker model for accurate real-time prediction. In addition, you can run ad hoc queries against the data on S3 using Amazon Athena. I also present how to automate these procedures by using Data Pipeline.

You can adapt this example to your specific use case at hand, and hopefully this post helps you accelerate your development. You can find more examples and use cases for Amazon SageMaker in the video AWS 2017: Introducing Amazon SageMaker on the AWS website.

 


Additional Reading

If you found this post useful, be sure to check out Serving Real-Time Machine Learning Predictions on Amazon EMR and Analyzing Data in S3 using Amazon Athena.

 


About the Author

Yong Seong Lee is a Cloud Support Engineer for AWS Big Data Services. He is interested in every technology related to data/databases and helping customers who have difficulties in using AWS services. His motto is “Enjoy life, be curious and have maximum experience.”