All posts by Veronika Megler

Improve the Operational Efficiency of Amazon Elasticsearch Service Domains with Automated Alarms Using Amazon CloudWatch

Post Syndicated from Veronika Megler original https://aws.amazon.com/blogs/big-data/improve-the-operational-efficiency-of-amazon-elasticsearch-service-domains-with-automated-alarms-using-amazon-cloudwatch/

A customer has been successfully creating and running multiple Amazon Elasticsearch Service (Amazon ES) domains to support their business users’ search needs across products, orders, support documentation, and a growing suite of similar needs. The service has become heavily used across the organization.  This led to some domains running at 100% capacity during peak times, while others began to run low on storage space. Because of this increased usage, the technical teams were in danger of missing their service level agreements.  They contacted me for help.

This post shows how you can set up automated alarms to warn when domains need attention.

Solution overview

Amazon ES is a fully managed service that delivers Elasticsearch’s easy-to-use APIs and real-time analytics capabilities along with the availability, scalability, and security that production workloads require.  The service offers built-in integrations with a number of other components and AWS services, enabling customers to go from raw data to actionable insights quickly and securely.

One of these other integrated services is Amazon CloudWatch. CloudWatch is a monitoring service for AWS Cloud resources and the applications that you run on AWS. You can use CloudWatch to collect and track metrics, collect and monitor log files, set alarms, and automatically react to changes in your AWS resources.

CloudWatch collects metrics for Amazon ES. You can use these metrics to monitor the state of your Amazon ES domains, and set alarms to notify you about high utilization of system resources.  For more information, see Amazon Elasticsearch Service Metrics and Dimensions.

While the metrics are automatically collected, the missing piece is how to set alarms on these metrics at appropriate levels for each of your domains. This post includes sample Python code to evaluate the current state of your Amazon ES environment, and to set up alarms according to AWS recommendations and best practices.

There are two components to the sample solution:

  • es-check-cwalarms.py: This Python script checks the CloudWatch alarms that have been set, for all Amazon ES domains in a given account and region.
  • es-create-cwalarms.py: This Python script sets up a set of CloudWatch alarms for a single given domain.

The sample code can also be found in the amazon-es-check-cw-alarms GitHub repo. The scripts are easy to extend or combine, as described in the section “Extensions and Adaptations”.

Assessing the current state

The first script, es-check-cwalarms.py, is used to give an overview of the configurations and alarm settings for all the Amazon ES domains in the given region. The script takes the following parameters:

python es-checkcwalarms.py -h
usage: es-checkcwalarms.py [-h] [-e ESPREFIX] [-n NOTIFY] [-f FREE][-p PROFILE] [-r REGION]
Checks a set of recommended CloudWatch alarms for Amazon Elasticsearch Service domains (optionally, those beginning with a given prefix).
optional arguments:
  -h, --help   		show this help message and exit
  -e ESPREFIX, --esprefix ESPREFIX	Only check Amazon Elasticsearch Service domains that begin with this prefix.
  -n NOTIFY, --notify NOTIFY    List of CloudWatch alarm actions; e.g. ['arn:aws:sns:xxxx']
  -f FREE, --free FREE  Minimum free storage (MB) on which to alarm
  -p PROFILE, --profile PROFILE     IAM profile name to use
  -r REGION, --region REGION       AWS region for the domain. Default: us-east-1

The script first identifies all the domains in the given region (or, optionally, limits them to the subset that begins with a given prefix). It then starts running a set of checks against each one.

The script can be run from the command line or set up as a scheduled Lambda function. For example, for one customer, it was deemed appropriate to regularly run the script to check that alarms were correctly set for all domains. In addition, because configuration changes—cluster size increases to accommodate larger workloads being a common change—might require updates to alarms, this approach allowed the automatic identification of alarms no longer appropriately set as the domain configurations changed.

The output shown below is the output for one domain in my account.

Starting checks for Elasticsearch domain iotfleet , version is 53
Iotfleet Automated snapshot hour (UTC): 0
Iotfleet Instance configuration: 1 instances; type:m3.medium.elasticsearch
Iotfleet Instance storage definition is: 4 GB; free storage calced to: 819.2 MB
iotfleet Desired free storage set to (in MB): 819.2
iotfleet WARNING: Not using VPC Endpoint
iotfleet WARNING: Does not have Zone Awareness enabled
iotfleet WARNING: Instance count is ODD. Best practice is for an even number of data nodes and zone awareness.
iotfleet WARNING: Does not have Dedicated Masters.
iotfleet WARNING: Neither index nor search slow logs are enabled.
iotfleet WARNING: EBS not in use. Using instance storage only.
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-ClusterStatus.yellow-Alarm ClusterStatus.yellow
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-ClusterStatus.red-Alarm ClusterStatus.red
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-CPUUtilization-Alarm CPUUtilization
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-JVMMemoryPressure-Alarm JVMMemoryPressure
iotfleet WARNING: Missing alarm!! ('ClusterIndexWritesBlocked', 'Maximum', 60, 5, 'GreaterThanOrEqualToThreshold', 1.0)
iotfleet Alarm ok; definition matches. Test-Elasticsearch-iotfleet-AutomatedSnapshotFailure-Alarm AutomatedSnapshotFailure
iotfleet Alarm: Threshold does not match: Test-Elasticsearch-iotfleet-FreeStorageSpace-Alarm Should be:  819.2 ; is 3000.0

The output messages fall into the following categories:

  • System overview, Informational: The Amazon ES version and configuration, including instance type and number, storage, automated snapshot hour, etc.
  • Free storage: A calculation for the appropriate amount of free storage, based on the recommended 20% of total storage.
  • Warnings: best practices that are not being followed for this domain. (For more about this, read on.)
  • Alarms: An assessment of the CloudWatch alarms currently set for this domain, against a recommended set.

The script contains an array of recommended CloudWatch alarms, based on best practices for these metrics and statistics. Using the array allows alarm parameters (such as free space) to be updated within the code based on current domain statistics and configurations.

For a given domain, the script checks if each alarm has been set. If the alarm is set, it checks whether the values match those in the array esAlarms. In the output above, you can see three different situations being reported:

  • Alarm ok; definition matches. The alarm set for the domain matches the settings in the array.
  • Alarm: Threshold does not match. An alarm exists, but the threshold value at which the alarm is triggered does not match.
  • WARNING: Missing alarm!! The recommended alarm is missing.

All in all, the list above shows that this domain does not have a configuration that adheres to best practices, nor does it have all the recommended alarms.

Setting up alarms

Now that you know that the domains in their current state are missing critical alarms, you can correct the situation.

To demonstrate the script, set up a new domain named “ver”, in us-west-2. Specify 1 node, and a 10-GB EBS disk. Also, create an SNS topic in us-west-2 with a name of “sendnotification”, which sends you an email.

Run the second script, es-create-cwalarms.py, from the command line. This script creates (or updates) the desired CloudWatch alarms for the specified Amazon ES domain, “ver”.

python es-create-cwalarms.py -r us-west-2 -e test -c ver -n "['arn:aws:sns:us-west-2:xxxxxxxxxx:sendnotification']"
EBS enabled: True type: gp2 size (GB): 10 No Iops 10240  total storage (MB)
Desired free storage set to (in MB): 2048.0
Creating  Test-Elasticsearch-ver-ClusterStatus.yellow-Alarm
Creating  Test-Elasticsearch-ver-ClusterStatus.red-Alarm
Creating  Test-Elasticsearch-ver-CPUUtilization-Alarm
Creating  Test-Elasticsearch-ver-JVMMemoryPressure-Alarm
Creating  Test-Elasticsearch-ver-FreeStorageSpace-Alarm
Creating  Test-Elasticsearch-ver-ClusterIndexWritesBlocked-Alarm
Creating  Test-Elasticsearch-ver-AutomatedSnapshotFailure-Alarm
Successfully finished creating alarms!

As with the first script, this script contains an array of recommended CloudWatch alarms, based on best practices for these metrics and statistics. This approach allows you to add or modify alarms based on your use case (more on that below).

After running the script, navigate to Alarms on the CloudWatch console. You can see the set of alarms set up on your domain.

Because the “ver” domain has only a single node, cluster status is yellow, and that alarm is in an “ALARM” state. It’s already sent a notification that the alarm has been triggered.

What to do when an alarm triggers

After alarms are set up, you need to identify the correct action to take for each alarm, which depends on the alarm triggered. For ideas, guidance, and additional pointers to supporting documentation, see Get Started with Amazon Elasticsearch Service: Set CloudWatch Alarms on Key Metrics. For information about common errors and recovery actions to take, see Handling AWS Service Errors.

In most cases, the alarm triggers due to an increased workload. The likely action is to reconfigure the system to handle the increased workload, rather than reducing the incoming workload. Reconfiguring any backend store—a category of systems that includes Elasticsearch—is best performed when the system is quiescent or lightly loaded. Reconfigurations such as setting zone awareness or modifying the disk type cause Amazon ES to enter a “processing” state, potentially disrupting client access.

Other changes, such as increasing the number of data nodes, may cause Elasticsearch to begin moving shards, potentially impacting search performance on these shards while this is happening. These actions should be considered in the context of your production usage. For the same reason I also do not recommend running a script that resets all domains to match best practices.

Avoid the need to reconfigure during heavy workload by setting alarms at a level that allows a considered approach to making the needed changes. For example, if you identify that each weekly peak is increasing, you can reconfigure during a weekly quiet period.

While Elasticsearch can be reconfigured without being quiesced, it is not a best practice to automatically scale it up and down based on usage patterns. Unlike some other AWS services, I recommend against setting a CloudWatch action that automatically reconfigures the system when alarms are triggered.

There are other situations where the planned reconfiguration approach may not work, such as low or zero free disk space causing the domain to reject writes. If the business is dependent on the domain continuing to accept incoming writes and deleting data is not an option, the team may choose to reconfigure immediately.

Extensions and adaptations

You may wish to modify the best practices encoded in the scripts for your own environment or workloads. It’s always better to avoid situations where alerts are generated but routinely ignored. All alerts should trigger a review and one or more actions, either immediately or at a planned date. The following is a list of common situations where you may wish to set different alarms for different domains:

  • Dev/test vs. production
    You may have a different set of configuration rules and alarms for your dev environment configurations than for test. For example, you may require zone awareness and dedicated masters for your production environment, but not for your development domains. Or, you may not have any alarms set in dev. For test environments that mirror your potential peak load, test to ensure that the alarms are appropriately triggered.
  • Differing workloads or SLAs for different domains
    You may have one domain with a requirement for superfast search performance, and another domain with a heavy ingest load that tolerates slower search response. Your reaction to slow response for these two workloads is likely to be different, so perhaps the thresholds for these two domains should be set at a different level. In this case, you might add a “max CPU utilization” alarm at 100% for 1 minute for the fast search domain, while the other domain only triggers an alarm when the average has been higher than 60% for 5 minutes. You might also add a “free space” rule with a higher threshold to reflect the need for more space for the heavy ingest load if there is danger that it could fill the available disk quickly.
  • “Normal” alarms versus “emergency” alarms
    If, for example, free disk space drops to 25% of total capacity, an alarm is triggered that indicates action should be taken as soon as possible, such as cleaning up old indexes or reconfiguring at the next quiet period for this domain. However, if free space drops below a critical level (20% free space), action must be taken immediately in order to prevent Amazon ES from setting the domain to read-only. Similarly, if the “ClusterIndexWritesBlocked” alarm triggers, the domain has already stopped accepting writes, so immediate action is needed. In this case, you may wish to set “laddered” alarms, where one threshold causes an alarm to be triggered to review the current workload for a planned reconfiguration, but a different threshold raises a “DefCon 3” alarm that immediate action is required.

The sample scripts provided here are a starting point, intended for you to adapt to your own environment and needs.

Running the scripts one time can identify how far your current state is from your desired state, and create an initial set of alarms. Regularly re-running these scripts can capture changes in your environment over time and adjusting your alarms for changes in your environment and configurations. One customer has set them up to run nightly, and to automatically create and update alarms to match their preferred settings.

Removing unwanted alarms

Each CloudWatch alarm costs approximately $0.10 per month. You can remove unwanted alarms in the CloudWatch console, under Alarms. If you set up a “ver” domain above, remember to remove it to avoid continuing charges.

Conclusion

Setting CloudWatch alarms appropriately for your Amazon ES domains can help you avoid suboptimal performance and allow you to respond to workload growth or configuration issues well before they become urgent. This post gives you a starting point for doing so. The additional sleep you’ll get knowing you don’t need to be concerned about Elasticsearch domain performance will allow you to focus on building creative solutions for your business and solving problems for your customers.

Enjoy!


Additional Reading

If you found this post useful, be sure to check out Analyzing Amazon Elasticsearch Service Slow Logs Using Amazon CloudWatch Logs Streaming and Kibana and Get Started with Amazon Elasticsearch Service: How Many Shards Do I Need?

 


About the Author

Dr. Veronika Megler is a senior consultant at Amazon Web Services. She works with our customers to implement innovative big data, AI and ML projects, helping them accelerate their time-to-value when using AWS.

 

 

 

Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR

Post Syndicated from Veronika Megler original https://blogs.aws.amazon.com/bigdata/post/Tx2642DKK75JBP8/Anomaly-Detection-Using-PySpark-Hive-and-Hue-on-Amazon-EMR

Veronika Megler, Ph.D., is a Senior Consultant with AWS Professional Services

We are surrounded by more and more sensors – some of which we’re not even consciously aware. As sensors become cheaper and easier to connect, they create an increasing flood of data that’s getting cheaper and easier to store and process.

However, sensor readings are notoriously “noisy” or “dirty”. To produce meaningful analyses, we’d like to identify anomalies in the sensor data and remove them before we perform further analysis. Or we may wish to analyze the anomalies themselves, as they may help us understand how our system really works or how our system is changing. For example, throwing away (more and more) high temperature readings in the Arctic because they are “obviously bad data” would cause us to miss the warming that is happening there.

The use case for this post is from the domain of road traffic: freeway traffic sensors. These sensors report three measures (called “features”): speed, volume, and occupancy, each of which are sampled several times a minute (see “Appendix: Measurement Definitions” at the end of this post for details on measurement definitions). Each reading from the sensors is called an observation. Sensors of different types (radar, in-road, Bluetooth) are often mixed in a single network and may be installed in varied configurations. For in-road sensors, there’s often a separate sensor in each lane; in freeways with a “carpool” lane, that lane will have different traffic characteristics from the others. Different sections of the freeway may have different traffic characteristics, such as rush hour on the inbound vs. outbound side of the freeway.

Thus, anomaly detection is frequently an iterative process where the system, as represented by the data from the sensors, must first be segmented in some way and “normal” characterized for each part of the system, before variations from that “normal” can be detected. After these variations or anomalies are removed, we can perform various analyses of the cleaned data such as trend analysis, model creation, and predictions. This post describes how two popular and powerful open-source technologies, Spark and Hive, were used to detect anomalies in data from a network of traffic sensors. While it’s based on real usage (see "References" at the end of this post), here you’ll work with similar, anonymized data.

The same characteristics and challenges apply to many other sensor networks. Specific examples I’ve worked with include weather stations, such as Weather Underground (www.wunderground.com), that report temperature, air pressure, humidity, wind and rainfall, amongst other things; ocean observatories such as CMOP (http://www.stccmop.org/datamart/observation_network) that collect physical, geochemical and biological observations; and satellite data from NOAA (http://www.nodc.noaa.gov/).

Detecting anomalies

An anomaly in a sensor network may be a single variable with an unreasonable reading (speed = 250 m.p.h.; for a thermometer, air temperature = 200F). However, each traffic sensor reading has several features (speed, volume, occupancy). There can be situations where each reading itself has a reasonable value, but the combination itself is highly unlikely (an anomaly). For traffic sensors, a speed of more than 100 m.p.h. is possible during times of low congestion (that is, low occupancy and low volume) but extremely unlikely during a traffic jam.

Many of these “valid” or “invalid” combinations are situational, as is the case here. Common combinations often have descriptive terms, such as “traffic jam”, “congested traffic”, or “light traffic”. These terms are representative of a commonly seen combination of characteristics, which would be represented in the data as a cluster of observations.

So, to detect anomalies: First, identify the common situations (as represented by a large cluster of similar combinations of features), and then identify observations that are sufficiently different from those clusters. You essentially apply two methods from basic statistics: clustering using the most common algorithm, k-means. Then, measure the distance from each observation to the closest cluster, and classify those “far away” as being anomalies. (Note that other anomaly detection techniques exist, some of which could be used against the same data, but would reflect a different model or understanding of the problem.)

This post walks through the three major steps:

Clustering the data.

Choosing the number of clusters.

Detecting probable anomalies.

For the project, you process the data using Spark, Hive, and Hue on an Amazon EMR cluster, reading input data from an Amazon S3 bucket.

Clustering the data

To perform k-means clustering, you first need to know how many clusters exist in the data. However, in most cases, as is true here, you don’t know the “right” number to use. A common solution is to repeatedly cluster the data, each time using a different number (“k”) of clusters. For each “k”,  calculate a metric: the sum of the squared distance of each point from its closest cluster center, known as the Within Set Sum of Squared Error (WSSSE). (My code extends this sample.) The smaller the WSSSE, the better your clustering is considered to be – within limits, as more clusters will almost always give a smaller WSSSE but having more clusters may distract rather than add to your analysis.

Here, the input data is a CSV format file stored in an S3 bucket. Each row contains a single observation taken by a specific sensor at a specific time, and consists of 9 numeric values. There are two versions of the input:   

s3://vmegler/traffic/sensorinput/, with 24M rows

s3://vmegler/traffic/sensorinputsmall/, an extract with 50,000 rows

In this post, I show how to run the programs here with the smaller input. However, the exact same code runs over the 24M row input. Here’s the Hive SQL definition for the input:

CREATE EXTERNAL TABLE sensorinput (
highway int, — highway id
sensorloc int, — one sensor location may have
— multiple sensors, e.g. for different highway lanes
sensorid int, — sensor id
dayofyear bigint, — yyyyddd
dayofweek bigint, — 0=Sunday, 1=Monday, etc
time decimal(10,2), — seconds since midnight
— e.g. a value of 185.67 is 3:05:67 a.m.
volume int, — a count
speed int, — average, in m.p.h.
occupancy int — a count
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘n’
LOCATION ‘s3://vmegler/traffic/sensorinput/’;

Start an EMR cluster in us-west-2 (where this bucket is located), specifying Spark, Hue, Hive, and Ganglia. (For more information, see Getting Started: Analyzing Big Data with Amazon EMR.)  I’ve run the same program in two different clusters: a small cluster with 1 master and 2 core nodes, all m3.xlarge; and a larger cluster, with 1 master and 8 core nodes, all m4.mxlarge.

Spark has two interfaces that can be used to run a Spark/Python program: an interactive interface, pyspark, and batch submission via spark-submit. I generally begin my projects by reviewing my data and testing my approach interactively in pyspark, while logged on to the cluster master. Then, I run my completed program using spark-submit (see also Submitting User Applications with spark-submit). After the program is ready to operationalize, I start submitting the jobs as steps to a running cluster using the AWS CLI for EMR or from a script such as a Python script using Boto3 to interface to EMR, with appropriate parameterization.

I’ve written two PySpark programs: one to repeatedly cluster the data and calculate the WSSSE using different numbers of clusters (kmeanswsssey.py); and a second one (kmeansandey.py) to calculate the distances of each observation from the closest cluster. The other parts of the anomaly detection—choosing the number of clusters to use, and deciding which observations are the outliers—are performed interactively, using Hue and Hive. I also provide a file (traffic-hive.hql), with the table definitions and sample queries.

For simplicity, I’ll describe how to run the programs using spark-submit while logged on to the master instance console.

To prepare the cluster for executing your programs, install some Python packages:

sudo yum install python-numpy python-scipy -y 

Copy the programs from S3 onto the master node’s local disk; I often run this way while I’m still editing the programs and experimenting with slightly different variations:  

aws s3 cp s3://vmegler/traffic/code/kmeanswsssey.py /home/hadoop
aws s3 cp s3://vmegler/traffic/code/kmeansandey.py /home/hadoop
aws s3 cp s3://vmegler/traffic/code/traffic-hive.hql /home/hadoop

My first PySpark program (kmeanswsssey.py) calculates WSSSE repeatedly, starting with 1 cluster (k=1), then for 2 clusters, and so on, up to some maximum k that you define. It outputs a CSV file; for each k, it appends a set of lines containing the WSSSE and some statistics that describe each of the clusters. This program takes 3 arguments: the input file location, the maximum k to use, and a prefix to prepend to the output file for this run for when I’m testing multiple variations:  <infile> <maxk> <runId> <outfile>. For example:

spark-submit /home/hadoop/kmeanswsssey.py s3://vmegler/traffic/sensorinputsmall/ 10 run1 s3:///sensorclusterssmall/

When run on the small cluster with the small input, this program took around 5 minutes. The same program, run on the 24M row input on the larger cluster, took 2.5 hours. Running the large input on the smaller cluster produces correct results, but takes over 24 hours to complete.

Choosing the number of clusters

Next, review the clustering results and choose the number of clusters to use for the actual anomaly detection.

A common and easy way to do that is to graph the WSSSE calculated for each k, and to choose “the knee in the curve”. That is, look for a point where the total distance has dropped sufficiently that increasing the number of clusters does not drop the WSSSE by much. If you’re very lucky, each cluster has characteristics that match your mental model for the problem domain such as low speed, high occupancy, and high volume, matching “congested traffic”.

Here you use Hue and Hive, conveniently selected when you started the cluster, for data exploration and simple graphing. In Hue’s Hive Query Editor, define a table that describes the output file you created in the previous step. Here, I’m pointing to a precomputed version calculated over the larger dataset:

CREATE EXTERNAL TABLE kcalcs (
run string,
wssse decimal(20,3),
k decimal,
clusterid decimal,
clustersize decimal,
volcntr decimal(10,1),
spdcntr decimal(10,1),
occcntr decimal(10,1),
volstddev decimal(10,1),
spdstddev decimal(10,1),
occstddev decimal(10,1)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘n’
LOCATION ‘s3://vmegler/traffic/sensorclusters/’
tblproperties ("skip.header.line.count"="2");

To decide how many clusters to use for the next step, use Hue to plot a line graph of the number of clusters versus WSSSE. First, select the information to display:

SELECT DISTINCT k, wssse FROM kcalcs ORDER BY k;

In the results panel, choose Chart. Choose the icon representing a line graph, choose “k” for X-Axis and “wssse” for Y-Axis, and Hue builds the following chart. Hover your cursor above a particular bar, and Hue shows the value of the X and Y axis for that bar. 

Chart built by Hue

For the “best number of clusters”, you’re looking for the “knee in the curve”: the place where going to a higher number of clusters does not significantly reduce the total distance function (WSSE). For this data, it looks as though around 4 is a good choice, as the gains of going to 5 or 6 clusters looks minimal.

You can explore the characteristics of the identified clusters with the following SELECT statement:

SELECT DISTINCT k, clusterid, clustersize, volcntr, spdcntr, occcntr, volstddev, spdstddev, occstddev
FROM kcalcs
ORDER BY k, spdcntr;

By looking at, for example, the lines for three clusters (k=3), you can see a “congestion” cluster (17.1 m.p.h., occupancy 37.7 cars), a “free-flowing heavy-traffic” cluster, and a “light traffic” cluster (65.2 m.p.h., occupancy 5.1). With k=4, you still see the “congestion” and “fast, light traffic” clusters, but the “free-flowing heavy-traffic” cluster from k=3 has been split into two distinct clusters with very different occupancy. Choose to stay with 4 clusters.

Detecting anomalies

Use the following method with these clusters to identify anomalies:

Assign each sensor reading to the closest cluster.

Calculate the distance (using some measure) for each reading to the assigned cluster center.

Filter for the entries with a greater distance than some chosen threshold.

I like to use Mahalanobis distance as the distance measure, as it compensates for differences in units (speed in m.p.h., while volume and occupancy are counts), averages, and scales of the several features I’m clustering across. 

Run the second PySpark program, kmeansandey.py (you copied this program onto local disk earlier, during setup). Give this program the number of clusters to use, decided in the previous step, and the input data. For each input observation, this program does the following:

Identifies the closest cluster center.

Calculates the Mahalanobis distance from this observation to the closest center.

Creates an output record consisting of the original observation, plus the cluster number, the cluster center, and the distance.

The program takes the following parameters: <infile> <k> <outfile>. The output is a CSV file, placed in an S3 bucket of your choice. To run the program, use spark-submit:

spark-submit /mnt/var/kmeansandey.py s3://vmegler/traffic/sensorinputsmall/ 4 s3://<your_bucket>/sensoroutputsmall/

On the small cluster with the small input, this job finished in under a minute; on the bigger cluster, the 24M dataset took around 17 minutes. In the next step, you review the observations that are “distant” from the closest cluster as calculated by that distance calculation. Because these observations are unlike the majority of the other observations, they are considered outliers, and probable anomalies.

Exploring identified anomalies

Now you’re ready to look at the probable anomalies and decide whether they really should be considered anomalies. In Hive, you define a table that describes the output file created in the previous step. Use an output file from the S3 bucket, which contains the original 7 columns (sensorid through occupancy) plus 5 new ones (clusterid through maldist). The smaller dataset’s output is less interesting to explore as it only contains data from one sensor, so I’ve precomputed the output over the large dataset for this exploration. Here is the modified table definition:

CREATE EXTERNAL TABLE sensoroutput (
highway int, — highway id etc. (as before)

occupancy int, — a count
clusterid int, — cluster identifier
volcntr decimal(10,2), — cluster center, volume
spdcntr decimal(10,2), — cluster center, speed
occcntr decimal(10,2), — cluster center, occupancy
maldist decimal(10,2) — Mahalanobis distance to this cluster
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘n’
LOCATION ‘s3://vmegler/traffic/sensoroutput/’;

Explore your results. To look at the number of observations assigned to each cluster for each sensor, try the following query:

SELECT sensorid, clusterid, concat(cast(sensorid AS string), ‘.’, cast(clusterid AS string)) AS senclust, count(*) AS howmany, max(maldist) AS dist
FROM sensoroutput
GROUP BY sensorid, clusterid
ORDER BY sensorid, clusterid;

The “concat” statement creates a compound column, senclust, that you can use in Hue’s built-in graphing tool to compare the clusters visually for each sensor. For this chart, choose a bar graph, choose the compound column “senclust” for X-Axis and “howmany” for Y-Axis, and Hue builds the following chart.

You can now easily compare the sizes, and the largest and average distances for each cluster across the different sensors. The smaller clusters probably bear investigation; they either represent unusual traffic conditions, or a cluster of bad readings. Note that an additional cluster of known bad readings (0 speed, volume, and occupancy) was identified using a similar process during a prior run; these observations are all assigned to a dummy clusterid of “-1” and have a high maldist.

SELECT clusterid, volcntr, spdcntr, occcntr, count(*) AS num, max(maldist) AS maxmaldist, avg(maldist) AS avgmaldist,
stddev_pop(maldist) AS stddevmal
FROM sensoroutput
GROUP BY clusterid, volcntr, spdcntr, occcntr
ORDER BY spdcntr;

How do you choose the threshold for defining an observation as an anomaly? This is another black art. I chose 2.5 by a combination of standard practice, discussing the graphs, and looking at how much and which data I’d be throwing away by using that assumption.  To explore the distribution of outliers across the sensors, use a query like the following:

SELECT sensorid, clusterid, count(*) AS num_outliers, avg(spdcntr) AS spdcntr, avg(maldist) AS avgdist
FROM sensoroutput
WHERE maldist > 2.5
GROUP BY sensorid, clusterid
ORDER BY sensorid, clusterid;

The number of outliers varies quite a bit by sensor and cluster. You can explore the 100 entries for sensor 44, cluster 2:

SELECT *
FROM sensoroutput
WHERE maldist > 2.5
AND sensorid = 44 AND clusterid = 0
ORDER BY maldist desc
LIMIT 100;

The query results show some entries that look reasonable (volume 6, occupancy 1), and others that look less so (volume of 3 and occupancy of 10). Depending on your intended use, you may decide that the number of observations that might not really be anomalies is small enough that you should just exclude all these entries – but perhaps you want to study these entries further to find a pattern, such as that this is a state that often occurs during transition times from one traffic pattern to another.

After you understand your clusters and the flagged “potential anomalies” sufficiently, you can choose which observations to exclude from further analysis.

Conclusion

This post describes anomaly detection for sensor data, and works through a case of identifying anomalies in traffic sensor data. You’ve dived into some of the complexities that comes with deciding which subset of sensor data is dirty or not, and the tools used to ask those questions. I showed how an iterative approach is often needed, with each analysis leading to further questions and further analyses.

In the real use case (see "References" below), we iteratively clustered subsets of the data: for different highways, days of the week, different sensor types, and so on, to understand the data and anomalies. We’ve seen here some of the challenges in deciding whether or not something is an anomaly in the data, or an anomaly in our approach. We used Amazon EMR, along with Apache Spark, Apache Hive and Hue to implement the approach and explore the results, allowing us to quickly experiment with a number of alternative clusters before settling on the combination that we felt best identified the real anomalies in our data.

Now, you can move forward: providing “clean data” to the business users; combining this data with weather, school holiday calendars, and sporting events to identify the causes of specific traffic patterns and pattern changes; and then using that model to predict future traffic conditions.

Appendix: Measurement Definitions

Volume measures how many vehicles have passed this sensor during the given time period. Occupancy measures the number of vehicles at the sensor at the measurement time. The combination of volume and occupancy gives a view of overall traffic density. For example: if the traffic is completely stopped, a sensor may have very high occupancy – many vehicles sitting at the sensor – but a volume close to 0, as very few vehicles have passed the sensor. This is a common circumstance for sensors at freeway entrances that limit freeway entry, often via lights that only permit one car from one lane to pass every few seconds.

Note that different sensor types may have different capabilities to detect these situations, such as radar vs. in-road sensors, and different sensors types or models may have different defaults for how they report various situations. For example, “0,0,0” may mean no traffic, or known bad data, or assumed bad data based on hard limits, such as traffic above a specific density (ouch!). Thus sensor type, capability, and context are all important factors in identifying “bad data”. In this study, the analysis of which sensors were “similar enough” for the data to be analyzed together was performed prior to data extract. The anomaly detection steps described here were performed separately for each set of similar sensors, as defined by the pre-analysis.

References

V. M. Megler, K. A. Tufte, and D. Maier, “Improving Data Quality in Intelligent Transportation Systems (Technical Report),” Portland  Or., Jul-2015 [Online]. Available: http://arxiv.org/abs/1602.03100

If you have questions or suggestions, please leave a comment below.

—————————–

Related

Large-Scale Machine Learning with Spark on Amazon EMR

 

Anomaly Detection Using PySpark, Hive, and Hue on Amazon EMR

Post Syndicated from Veronika Megler original https://blogs.aws.amazon.com/bigdata/post/Tx2642DKK75JBP8/Anomaly-Detection-Using-PySpark-Hive-and-Hue-on-Amazon-EMR

Veronika Megler, Ph.D., is a Senior Consultant with AWS Professional Services

We are surrounded by more and more sensors – some of which we’re not even consciously aware. As sensors become cheaper and easier to connect, they create an increasing flood of data that’s getting cheaper and easier to store and process.

However, sensor readings are notoriously “noisy” or “dirty”. To produce meaningful analyses, we’d like to identify anomalies in the sensor data and remove them before we perform further analysis. Or we may wish to analyze the anomalies themselves, as they may help us understand how our system really works or how our system is changing. For example, throwing away (more and more) high temperature readings in the Arctic because they are “obviously bad data” would cause us to miss the warming that is happening there.

The use case for this post is from the domain of road traffic: freeway traffic sensors. These sensors report three measures (called “features”): speed, volume, and occupancy, each of which are sampled several times a minute (see “Appendix: Measurement Definitions” at the end of this post for details on measurement definitions). Each reading from the sensors is called an observation. Sensors of different types (radar, in-road, Bluetooth) are often mixed in a single network and may be installed in varied configurations. For in-road sensors, there’s often a separate sensor in each lane; in freeways with a “carpool” lane, that lane will have different traffic characteristics from the others. Different sections of the freeway may have different traffic characteristics, such as rush hour on the inbound vs. outbound side of the freeway.

Thus, anomaly detection is frequently an iterative process where the system, as represented by the data from the sensors, must first be segmented in some way and “normal” characterized for each part of the system, before variations from that “normal” can be detected. After these variations or anomalies are removed, we can perform various analyses of the cleaned data such as trend analysis, model creation, and predictions. This post describes how two popular and powerful open-source technologies, Spark and Hive, were used to detect anomalies in data from a network of traffic sensors. While it’s based on real usage (see "References" at the end of this post), here you’ll work with similar, anonymized data.

The same characteristics and challenges apply to many other sensor networks. Specific examples I’ve worked with include weather stations, such as Weather Underground (www.wunderground.com), that report temperature, air pressure, humidity, wind and rainfall, amongst other things; ocean observatories such as CMOP (http://www.stccmop.org/datamart/observation_network) that collect physical, geochemical and biological observations; and satellite data from NOAA (http://www.nodc.noaa.gov/).

Detecting anomalies

An anomaly in a sensor network may be a single variable with an unreasonable reading (speed = 250 m.p.h.; for a thermometer, air temperature = 200F). However, each traffic sensor reading has several features (speed, volume, occupancy). There can be situations where each reading itself has a reasonable value, but the combination itself is highly unlikely (an anomaly). For traffic sensors, a speed of more than 100 m.p.h. is possible during times of low congestion (that is, low occupancy and low volume) but extremely unlikely during a traffic jam.

Many of these “valid” or “invalid” combinations are situational, as is the case here. Common combinations often have descriptive terms, such as “traffic jam”, “congested traffic”, or “light traffic”. These terms are representative of a commonly seen combination of characteristics, which would be represented in the data as a cluster of observations.

So, to detect anomalies: First, identify the common situations (as represented by a large cluster of similar combinations of features), and then identify observations that are sufficiently different from those clusters. You essentially apply two methods from basic statistics: clustering using the most common algorithm, k-means. Then, measure the distance from each observation to the closest cluster, and classify those “far away” as being anomalies. (Note that other anomaly detection techniques exist, some of which could be used against the same data, but would reflect a different model or understanding of the problem.)

This post walks through the three major steps:

Clustering the data.

Choosing the number of clusters.

Detecting probable anomalies.

For the project, you process the data using Spark, Hive, and Hue on an Amazon EMR cluster, reading input data from an Amazon S3 bucket.

Clustering the data

To perform k-means clustering, you first need to know how many clusters exist in the data. However, in most cases, as is true here, you don’t know the “right” number to use. A common solution is to repeatedly cluster the data, each time using a different number (“k”) of clusters. For each “k”,  calculate a metric: the sum of the squared distance of each point from its closest cluster center, known as the Within Set Sum of Squared Error (WSSSE). (My code extends this sample.) The smaller the WSSSE, the better your clustering is considered to be – within limits, as more clusters will almost always give a smaller WSSSE but having more clusters may distract rather than add to your analysis.

Here, the input data is a CSV format file stored in an S3 bucket. Each row contains a single observation taken by a specific sensor at a specific time, and consists of 9 numeric values. There are two versions of the input:   

s3://vmegler/traffic/sensorinput/, with 24M rows

s3://vmegler/traffic/sensorinputsmall/, an extract with 50,000 rows

In this post, I show how to run the programs here with the smaller input. However, the exact same code runs over the 24M row input. Here’s the Hive SQL definition for the input:

CREATE EXTERNAL TABLE sensorinput (
highway int, — highway id
sensorloc int, — one sensor location may have
— multiple sensors, e.g. for different highway lanes
sensorid int, — sensor id
dayofyear bigint, — yyyyddd
dayofweek bigint, — 0=Sunday, 1=Monday, etc
time decimal(10,2), — seconds since midnight
— e.g. a value of 185.67 is 3:05:67 a.m.
volume int, — a count
speed int, — average, in m.p.h.
occupancy int — a count
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘n’
LOCATION ‘s3://vmegler/traffic/sensorinput/’;

Start an EMR cluster in us-west-2 (where this bucket is located), specifying Spark, Hue, Hive, and Ganglia. (For more information, see Getting Started: Analyzing Big Data with Amazon EMR.)  I’ve run the same program in two different clusters: a small cluster with 1 master and 2 core nodes, all m3.xlarge; and a larger cluster, with 1 master and 8 core nodes, all m4.mxlarge.

Spark has two interfaces that can be used to run a Spark/Python program: an interactive interface, pyspark, and batch submission via spark-submit. I generally begin my projects by reviewing my data and testing my approach interactively in pyspark, while logged on to the cluster master. Then, I run my completed program using spark-submit (see also Submitting User Applications with spark-submit). After the program is ready to operationalize, I start submitting the jobs as steps to a running cluster using the AWS CLI for EMR or from a script such as a Python script using Boto3 to interface to EMR, with appropriate parameterization.

I’ve written two PySpark programs: one to repeatedly cluster the data and calculate the WSSSE using different numbers of clusters (kmeanswsssey.py); and a second one (kmeansandey.py) to calculate the distances of each observation from the closest cluster. The other parts of the anomaly detection—choosing the number of clusters to use, and deciding which observations are the outliers—are performed interactively, using Hue and Hive. I also provide a file (traffic-hive.hql), with the table definitions and sample queries.

For simplicity, I’ll describe how to run the programs using spark-submit while logged on to the master instance console.

To prepare the cluster for executing your programs, install some Python packages:

sudo yum install python-numpy python-scipy -y 

Copy the programs from S3 onto the master node’s local disk; I often run this way while I’m still editing the programs and experimenting with slightly different variations:  

aws s3 cp s3://vmegler/traffic/code/kmeanswsssey.py /home/hadoop
aws s3 cp s3://vmegler/traffic/code/kmeansandey.py /home/hadoop
aws s3 cp s3://vmegler/traffic/code/traffic-hive.hql /home/hadoop

My first PySpark program (kmeanswsssey.py) calculates WSSSE repeatedly, starting with 1 cluster (k=1), then for 2 clusters, and so on, up to some maximum k that you define. It outputs a CSV file; for each k, it appends a set of lines containing the WSSSE and some statistics that describe each of the clusters. This program takes 3 arguments: the input file location, the maximum k to use, and a prefix to prepend to the output file for this run for when I’m testing multiple variations:  <infile> <maxk> <runId> <outfile>. For example:

spark-submit /home/hadoop/kmeanswsssey.py s3://vmegler/traffic/sensorinputsmall/ 10 run1 s3:///sensorclusterssmall/

When run on the small cluster with the small input, this program took around 5 minutes. The same program, run on the 24M row input on the larger cluster, took 2.5 hours. Running the large input on the smaller cluster produces correct results, but takes over 24 hours to complete.

Choosing the number of clusters

Next, review the clustering results and choose the number of clusters to use for the actual anomaly detection.

A common and easy way to do that is to graph the WSSSE calculated for each k, and to choose “the knee in the curve”. That is, look for a point where the total distance has dropped sufficiently that increasing the number of clusters does not drop the WSSSE by much. If you’re very lucky, each cluster has characteristics that match your mental model for the problem domain such as low speed, high occupancy, and high volume, matching “congested traffic”.

Here you use Hue and Hive, conveniently selected when you started the cluster, for data exploration and simple graphing. In Hue’s Hive Query Editor, define a table that describes the output file you created in the previous step. Here, I’m pointing to a precomputed version calculated over the larger dataset:

CREATE EXTERNAL TABLE kcalcs (
run string,
wssse decimal(20,3),
k decimal,
clusterid decimal,
clustersize decimal,
volcntr decimal(10,1),
spdcntr decimal(10,1),
occcntr decimal(10,1),
volstddev decimal(10,1),
spdstddev decimal(10,1),
occstddev decimal(10,1)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘n’
LOCATION ‘s3://vmegler/traffic/sensorclusters/’
tblproperties ("skip.header.line.count"="2");

To decide how many clusters to use for the next step, use Hue to plot a line graph of the number of clusters versus WSSSE. First, select the information to display:

SELECT DISTINCT k, wssse FROM kcalcs ORDER BY k;

In the results panel, choose Chart. Choose the icon representing a line graph, choose “k” for X-Axis and “wssse” for Y-Axis, and Hue builds the following chart. Hover your cursor above a particular bar, and Hue shows the value of the X and Y axis for that bar. 

Chart built by Hue

For the “best number of clusters”, you’re looking for the “knee in the curve”: the place where going to a higher number of clusters does not significantly reduce the total distance function (WSSE). For this data, it looks as though around 4 is a good choice, as the gains of going to 5 or 6 clusters looks minimal.

You can explore the characteristics of the identified clusters with the following SELECT statement:

SELECT DISTINCT k, clusterid, clustersize, volcntr, spdcntr, occcntr, volstddev, spdstddev, occstddev
FROM kcalcs
ORDER BY k, spdcntr;

By looking at, for example, the lines for three clusters (k=3), you can see a “congestion” cluster (17.1 m.p.h., occupancy 37.7 cars), a “free-flowing heavy-traffic” cluster, and a “light traffic” cluster (65.2 m.p.h., occupancy 5.1). With k=4, you still see the “congestion” and “fast, light traffic” clusters, but the “free-flowing heavy-traffic” cluster from k=3 has been split into two distinct clusters with very different occupancy. Choose to stay with 4 clusters.

Detecting anomalies

Use the following method with these clusters to identify anomalies:

Assign each sensor reading to the closest cluster.

Calculate the distance (using some measure) for each reading to the assigned cluster center.

Filter for the entries with a greater distance than some chosen threshold.

I like to use Mahalanobis distance as the distance measure, as it compensates for differences in units (speed in m.p.h., while volume and occupancy are counts), averages, and scales of the several features I’m clustering across. 

Run the second PySpark program, kmeansandey.py (you copied this program onto local disk earlier, during setup). Give this program the number of clusters to use, decided in the previous step, and the input data. For each input observation, this program does the following:

Identifies the closest cluster center.

Calculates the Mahalanobis distance from this observation to the closest center.

Creates an output record consisting of the original observation, plus the cluster number, the cluster center, and the distance.

The program takes the following parameters: <infile> <k> <outfile>. The output is a CSV file, placed in an S3 bucket of your choice. To run the program, use spark-submit:

spark-submit /mnt/var/kmeansandey.py s3://vmegler/traffic/sensorinputsmall/ 4 s3://<your_bucket>/sensoroutputsmall/

On the small cluster with the small input, this job finished in under a minute; on the bigger cluster, the 24M dataset took around 17 minutes. In the next step, you review the observations that are “distant” from the closest cluster as calculated by that distance calculation. Because these observations are unlike the majority of the other observations, they are considered outliers, and probable anomalies.

Exploring identified anomalies

Now you’re ready to look at the probable anomalies and decide whether they really should be considered anomalies. In Hive, you define a table that describes the output file created in the previous step. Use an output file from the S3 bucket, which contains the original 7 columns (sensorid through occupancy) plus 5 new ones (clusterid through maldist). The smaller dataset’s output is less interesting to explore as it only contains data from one sensor, so I’ve precomputed the output over the large dataset for this exploration. Here is the modified table definition:

CREATE EXTERNAL TABLE sensoroutput (
highway int, — highway id etc. (as before)

occupancy int, — a count
clusterid int, — cluster identifier
volcntr decimal(10,2), — cluster center, volume
spdcntr decimal(10,2), — cluster center, speed
occcntr decimal(10,2), — cluster center, occupancy
maldist decimal(10,2) — Mahalanobis distance to this cluster
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LINES TERMINATED BY ‘n’
LOCATION ‘s3://vmegler/traffic/sensoroutput/’;

Explore your results. To look at the number of observations assigned to each cluster for each sensor, try the following query:

SELECT sensorid, clusterid, concat(cast(sensorid AS string), ‘.’, cast(clusterid AS string)) AS senclust, count(*) AS howmany, max(maldist) AS dist
FROM sensoroutput
GROUP BY sensorid, clusterid
ORDER BY sensorid, clusterid;

The “concat” statement creates a compound column, senclust, that you can use in Hue’s built-in graphing tool to compare the clusters visually for each sensor. For this chart, choose a bar graph, choose the compound column “senclust” for X-Axis and “howmany” for Y-Axis, and Hue builds the following chart.

You can now easily compare the sizes, and the largest and average distances for each cluster across the different sensors. The smaller clusters probably bear investigation; they either represent unusual traffic conditions, or a cluster of bad readings. Note that an additional cluster of known bad readings (0 speed, volume, and occupancy) was identified using a similar process during a prior run; these observations are all assigned to a dummy clusterid of “-1” and have a high maldist.

SELECT clusterid, volcntr, spdcntr, occcntr, count(*) AS num, max(maldist) AS maxmaldist, avg(maldist) AS avgmaldist,
stddev_pop(maldist) AS stddevmal
FROM sensoroutput
GROUP BY clusterid, volcntr, spdcntr, occcntr
ORDER BY spdcntr;

How do you choose the threshold for defining an observation as an anomaly? This is another black art. I chose 2.5 by a combination of standard practice, discussing the graphs, and looking at how much and which data I’d be throwing away by using that assumption.  To explore the distribution of outliers across the sensors, use a query like the following:

SELECT sensorid, clusterid, count(*) AS num_outliers, avg(spdcntr) AS spdcntr, avg(maldist) AS avgdist
FROM sensoroutput
WHERE maldist > 2.5
GROUP BY sensorid, clusterid
ORDER BY sensorid, clusterid;

The number of outliers varies quite a bit by sensor and cluster. You can explore the 100 entries for sensor 44, cluster 2:

SELECT *
FROM sensoroutput
WHERE maldist > 2.5
AND sensorid = 44 AND clusterid = 0
ORDER BY maldist desc
LIMIT 100;

The query results show some entries that look reasonable (volume 6, occupancy 1), and others that look less so (volume of 3 and occupancy of 10). Depending on your intended use, you may decide that the number of observations that might not really be anomalies is small enough that you should just exclude all these entries – but perhaps you want to study these entries further to find a pattern, such as that this is a state that often occurs during transition times from one traffic pattern to another.

After you understand your clusters and the flagged “potential anomalies” sufficiently, you can choose which observations to exclude from further analysis.

Conclusion

This post describes anomaly detection for sensor data, and works through a case of identifying anomalies in traffic sensor data. You’ve dived into some of the complexities that comes with deciding which subset of sensor data is dirty or not, and the tools used to ask those questions. I showed how an iterative approach is often needed, with each analysis leading to further questions and further analyses.

In the real use case (see "References" below), we iteratively clustered subsets of the data: for different highways, days of the week, different sensor types, and so on, to understand the data and anomalies. We’ve seen here some of the challenges in deciding whether or not something is an anomaly in the data, or an anomaly in our approach. We used Amazon EMR, along with Apache Spark, Apache Hive and Hue to implement the approach and explore the results, allowing us to quickly experiment with a number of alternative clusters before settling on the combination that we felt best identified the real anomalies in our data.

Now, you can move forward: providing “clean data” to the business users; combining this data with weather, school holiday calendars, and sporting events to identify the causes of specific traffic patterns and pattern changes; and then using that model to predict future traffic conditions.

Appendix: Measurement Definitions

Volume measures how many vehicles have passed this sensor during the given time period. Occupancy measures the number of vehicles at the sensor at the measurement time. The combination of volume and occupancy gives a view of overall traffic density. For example: if the traffic is completely stopped, a sensor may have very high occupancy – many vehicles sitting at the sensor – but a volume close to 0, as very few vehicles have passed the sensor. This is a common circumstance for sensors at freeway entrances that limit freeway entry, often via lights that only permit one car from one lane to pass every few seconds.

Note that different sensor types may have different capabilities to detect these situations, such as radar vs. in-road sensors, and different sensors types or models may have different defaults for how they report various situations. For example, “0,0,0” may mean no traffic, or known bad data, or assumed bad data based on hard limits, such as traffic above a specific density (ouch!). Thus sensor type, capability, and context are all important factors in identifying “bad data”. In this study, the analysis of which sensors were “similar enough” for the data to be analyzed together was performed prior to data extract. The anomaly detection steps described here were performed separately for each set of similar sensors, as defined by the pre-analysis.

References

V. M. Megler, K. A. Tufte, and D. Maier, “Improving Data Quality in Intelligent Transportation Systems (Technical Report),” Portland  Or., Jul-2015 [Online]. Available: http://arxiv.org/abs/1602.03100

If you have questions or suggestions, please leave a comment below.

—————————–

Related

Large-Scale Machine Learning with Spark on Amazon EMR