All posts by Kaituo Li

Detect anomalies on one million unique entities with Amazon OpenSearch Service

Post Syndicated from Kaituo Li original https://aws.amazon.com/blogs/big-data/detect-anomalies-on-one-million-unique-entities-with-amazon-opensearch-service/

Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) supports a highly performant, integrated anomaly detection engine that enables the real-time identification of anomalies in streaming data. Last year, we released high-cardinality anomaly detection (HCAD) to detect individual entities’ anomalies. With the 1.1 release, we have allowed you to monitor a million entities with steady, predictable performance. HCAD is easiest when described in contrast to the non-HCAD single-stream solution. In a single-stream detector, we detect anomalies for an aggregate entity. For example, we can use a single-stream detector to sift through aggregated traffic across all IP addresses so that users can be notified when unusual spikes occur. However, we often need to identify anomalies in entities, such as individual hosts and IP addresses. Each entity may work on a different baseline, which means its time series’ distribution (measured in parameters such as magnitude, trend, and seasonality, to name a few) are different. The different baselines make it inaccurate to detect anomalies using a single monolithic model. HCAD distinguishes itself from single-stream detectors by customizing anomaly detection models to entities.

Example use cases of HCAD include the following:

  • Internet of things – Continuously tracking the temperature of fridges and warning users of temperatures at which food or medicine longevity is at risk, so users can take measures to avoid them. Each entity has specific categorical fields that describe it, and you can think of the categorical fields as characteristics for those entities. A fridge’s serial number is the categorical field that uniquely identifies the fridges. Using a single model generates a lot of false alarms because ambient temperatures can be different. A temperature of 5° C is normal during winter in Seattle, US, but such a temperature in a tropical place during winter is likely anomalous. Also, users may open the door to a fridge several times, triggering a spike in the temperature. The duration and frequency of spikes can vary according to user behavior. HCAD can group temperature data into geographies and users to detect varying local temperatures and user behavior.
  • Security – An intrusion detection system identifying an increase in failed login attempts in authentication logs. The user name and host IP are the categorical fields used to determine the user accessing from the host. Hackers might guess user passwords by brute force, and not all users on the same host IP may be targeted. The number of failed login counts varies on a host for a particular user at a specific time of day. HCAD creates a representative baseline per user on each host and adapts to changes in the baseline.
  • IT operations – Monitoring access traffic by shard in a distributed service. The shard ID is the categorical field, and the entity is the shard. A modern distributed system usually consists of shards linked together. When a shard experiences an outage, the traffic increases significantly for dependent shards due to retry storms. It’s hard to discover the increase because only a limited number of shards are affected. For example, traffic on the related shards might be as much as 64 times that of normal levels, whereas average traffic across all shards might just grow by a small constant factor (less than 2).

Making HCAD real time and performant while achieving completeness and scalability is a formidable challenge:

  • Completeness – Model all or as many entities as possible.
  • ScalabilityHorizontal and vertical scaling without changing model fidelity. That is, when scaling the machine up or out, an anomaly detector can add models monotonically. HCAD uses the same model and gives the same answer for an entity’s time series as in single-stream detection.
  • Performance – Low impact to system resource usage and high overall throughput.

The first release of HCAD in Amazon OpenSearch Service traded completeness and scalability for performance: the anomaly detector limited the number of entities to 1,000. You can change the setting plugins.anomaly_detection.max_entities_per_query to increase the number of monitored entities per interval. However, such a change incurs a non-negligible cost, which opens the door to cluster instability. Each entity uses memory to host models, disk I/O to read and write model checkpoints and anomaly results, CPU cycles for metadata maintenance and model training and inference, and garbage collection for deleted models and metadata. The more entities, the more resource usage. Furthermore, HCAD could suffer a combinatorial explosion of entities when supporting multiple categorical fields (a feature released in Amazon OpenSearch Service 1.1). Imagine a detector with only one categorical field geolocation. Geolocation has 1,000 possible values. Adding another categorical field product with 1,000 allowed values gives the detector 1 million entities.

For the next version of HCAD, we devoted much effort to improving completeness and scalability. Our approach captures sizing a cluster right and combines in-memory model hosting and on-disk model loading. Performance metrics show HCAD doesn’t saturate the cluster with substantial cost and still leaves plenty of room for other tasks. As a result, HCAD can analyze one million entities in 10 minutes and flags anomalies in different patterns. In this post, we will explore how HCAD can analyze one million entities and the technical implementations behind the improvements.

How to size domains

Model management is a trade-off: disk-based solutions that reload-use-stop-store models on every interval offer savings in memory but suffer high overhead and are hard to scale. Memory-based solutions offer lower overhead and higher throughput but typically increase memory requirements. We exploit the trade-off by implementing an adaptive mechanism that hosts models in memory as much as allowed (capped via the cluster setting plugins.anomaly_detection.model_max_size_percent), as required by best performance. When models don’t fit in memory, we process extra model requests by loading models from disks.

The use of memory whenever possible is responsible for the HCAD scalability. Therefore, it is crucial to sizing a cluster right to offer enough memory for HCAD. The main factors to consider when sizing a cluster are:

  • Sum of all detectors’ total entity count – A detector’s total entity count is the cardinality of the categorical fields. If there are multiple categorical fields, the number counts all unique combinations of values of these fields present in data. You can decide the cardinality via cardinality aggregation in Amazon OpenSearch Service. If the detector is a single-stream detector, the number of entities is one because there is no defined categorical field.
  • Heap size – Amazon OpenSearch Service sets aside 50% of RAM for heap. To determine the heap size of an instance type, refer to Amazon OpenSearch Service pricing. For example, an r5.2xlarge host has 64 GB RAM. Therefore, the host’s heap size is 32 GB.
  • Anomaly detection (AD) maximum memory percentage – AD can use up to 10% of the heap by default. You can customize the percentage via the cluster setting plugins.anomaly_detection.model_max_size_percent. The following update allows AD to use half of the heap via the aforementioned setting:
PUT /_cluster/settings
{
	"persistent": {
		"plugins.anomaly_detection.model_max_size_percent": "0.5"
	}
}
  • Entity in-memory model size – An entity’s in-memory model size varies according to shingle size, the number of features, and Amazon OpenSearch Service version as we’re constantly improving. All entity models of the same detector configuration in the same software version have the same size. A safe way to obtain the size is to run a profile API on the same detector configuration on an experimental cluster before creating a production cluster. In the following case, each entity model of detector fkzfBX0BHok1ZbMqLMdu is of size 470,491 bytes:

Enter the following profile request:

GET /_plugins/_anomaly_detection/detectors/fkzfBX0BHok1ZbMqLMdu/_profile/models

We get the following response:

{
	...{
		"model_id": "fkzfBX0BHok1ZbMqLMdu_entity_GOIubzeHCXV-k6y_AA4K3Q",
		"entity": [{
				"name": "host",
				"value": "host141"
			},
			{
				"name": "process",
				"value": "process54"
			}
		],
		"model_size_in_bytes": 470491,
		"node_id": "OcxBDJKYRYKwCLDtWUKItQ"
	}
	...
}
  • Storage requirement for result indexes – Real-time detectors store detection results as much as possible when the indexing pressure isn’t high, including both anomalous and non-anomalous results. When the indexing pressure is high, we save anomalous and a random subset of non-anomalous results. OpenSearch Dashboard employs non-anomalous results as the context of abnormal results and plots the results as a function of time. Additionally, AD stores the history of all generated results for a configurable number of days after generating results. This result retention period is 30 days by default, and adjustable via the cluster setting plugins.anomaly_detection.ad_result_history_retention_period. We need to ensure enough disk space is available to store the results by multiplying the amount of data generated per day by the retention period. For example, consider a detector that generates 1 million result documents for a 10-minute interval detector with 1 million entities per interval. One document’s size is about 1 KB. That’s roughly 144 GB per day, 4,320 GB after a 30-day retention period. The total disk requirement should also be multiplied by the number of shard copies. Currently, AD chooses one primary shard per node (up to 10) and one replica when called for the first time. Because the number of replicas is 1, every shard has two copies, and the total disk requirement is closer to 8,640 GB for the million entities in our example.
  • Anomaly detection overhead – AD incurs memory overhead for historical analyses and internal operations. We recommend reserving 20% more memory for the overhead to keep running models uninterrupted.

In order to derive the required number of data nodes D, we must first derive an expression for the number of entity models N that a node can host in memory. We define Si to be the entity model size of detector i. If we use an instance type with heap size H where the maximum AD memory percentage is PN is equal to AD memory allowance divided by the maximum entity model size among all detectors:

We consider the required number of data nodes D as a function of N. Let’s denote by Ci the total entity counts of detector i. Given n detectors, it follows that:

The fact that AD needs an extra 20% memory overhead is expressed by multiplying 1.2 in the formula. The ceil function represents the smallest integer greater than or equal to the argument.

For example, an r5.2xlarge Amazon Elastic Compute Cloud (Amazon EC2) instance has 64 GB RAM, so the heap size is 32 GB. We configure AD to use at most half of the allowed heap size. We have two HCAD detectors, whose model sizes are 471 KB and 403 KB, respectively. To host 500,000 entities for each detector, we need a 36-data-node cluster according to the following calculation:


We also need to ensure there is enough disk space. In the end, we used a 39-node r5.2xlarge cluster (3 primary and 36 data nodes) with 4 TB Amazon Elastic Block Store (EBS) storage on each node.

What if a detector’s entity count is unknown?

Sometimes, it’s hard to know a detector’s entity count. We can check historical data and estimate the cardinality. But it’s impossible to predict the future accurately. A general guideline is to allocate buffer memory during planning. Appropriately used, buffer memory provides room for small changes. If the changes are significant, you can adjust the number of data nodes because HCAD can scale in and out horizontally.

What if the number of active entities is changing?

The total number of entities created can be higher than the number of active entities, as evident from the following two figures. The total number of entities in the HTTP logs dataset is 2 million within 2 months, but each entity only appears seven times on average. The number of active entities within a time-boxed interval is much less than 2 million. The following figure presents an example time series of network size of IP addresses from the HTTP logs dataset.

http log data distribution

The KPI dataset also shows similar behavior, where entities often appear in a short amount of time during bursts of entity activities.

kpi data distribution

AD requires large sample sizes to create a comprehensive picture of the data patterns, making it suitable for dense time series that can be uniformly sampled. AD can still train models and produce predictions if the preceding bursty behavior can last a while and provide at least 400 points. However, training becomes more difficult, and prediction accuracy is lower as data gets more sparse.

It’s wasteful to preallocate memory according to the total number of entities in this case. Instead of the total number of entities, we need to consider the maximal active entities within an interval. You can get an approximate number by using a date_histogram and cardinality aggregation pipeline, and sorting during a representative period. You can run the following query if you’re indexing host-cloudwatch and want to find out the maximal number of active hosts within a 10-minute interval throughout 10 days:

GET /host-cloudwatch/_search?size=0
{
	"query": {
		"range": {
			"@timestamp": {
				"gte": "2021-11-17T22:21:48",
				"lte": "2021-11-27T22:22:48"
			}
		}
	},
	"aggs": {
		"by_10m": {
			"date_histogram": {
				"field": "@timestamp",
				"fixed_interval": "10m"
			},
			"aggs": {
				"dimension": {
					"cardinality": {
						"field": "host"
					}
				},
				"multi_buckets_sort": {
					"bucket_sort": {
						"sort": [{
							"dimension": {
								"order": "desc"
							}
						}],
						"size": 1
					}
				}
			}
		}
	}
}

The query result shows that at most about 1,000 hosts are active during a ten-minute interval:

{
	...
	"aggregations": {
		"by_10m": {
			"buckets": [{
				"key_as_string": "2021-11-17T22:30:00.000Z",
				"key": 1637188200000,
				"doc_count": 1000000,
				"dimension": {
					"value": 1000
				}
			}]
		}
	}
	...
}

HCAD has a cache to store models and maintain a timestamp of last access for each model. For each model, an hourly job checks the time of inactivity and invalidates the model if the time of inactivity is longer than 1 hour. Depending on the timing of the hourly check and the cache capacity, the elapsed time a model is cached varies. If the cache capacity isn’t large enough to hold all non-expired models, we have an adapted least frequently used (LFU) cache policy to evict models (more on this in a later section), and the cache time of those invalidated models is less than 1 hour. If the last access time of a model is reset immediately after the hourly check, when the next hourly check happens, the model doesn’t expire. The model can take another hour to expire when the next hourly check comes. So the max cache time is 2 hours.

The upper bound of active entities that detector i can observe is:


This equation has the following parameters:

  • Ai is the maximum number of active entities per interval of detector i. We get the number from the preceding query.
  • 120 is the number of minutes in 2 hours. ∆Ti denotes detector i’s interval in minutes. The ceil function represents the smallest integer greater than or equal the argument. ceil(120÷∆Ti) refers to the max number of intervals a model is cached.

Accordingly, we should account for Bi in the sizing formula:

Sizing calculation flow chart

With the definitions of calculating the number of data nodes in place, we can use the following flow chart to make decisions under different scenarios.

sizing flowchart

What if the cluster is underscaled?

If the cluster is underscaled, AD prioritizes more frequent and recent entities. AD makes its best effort to accommodate extra entities by loading their models on demand from disk without hosting them in the in-memory cache. Loading the models on demand means reloading-using-stopping-storing models at every interval, whose overheads are quite high. The overheads mostly have to do with network or disk I/O, rather than with the cost of model inferencing. Therefore, we did it in a steady, controlled manner. If the system resource usage isn’t heavy and there is enough time, HCAD may finish processing the extra entities. Otherwise, HCAD doesn’t necessarily find all the anomalies it could otherwise find.

Example: Analysis of 1 million entities

In the following example, you will learn how to set up a detector to analyze one million entities.

Ingest data

We generated 10 billion documents for 1 million entities in our evaluation of scalability and completeness improvement. Each entity has a cosine wave time series with randomly injected anomalies. With help from the tips in this post, we created the index host-cloudwatch and ingested the documents into the cluster. host-cloudwatch records elapsed CPU and JVM garbage collection (GC) time by a process within a host. Index mapping is as follows:

{
	...
	"mappings": {
		"properties": {
			"@timestamp": {
				"type": "date"
			},
			"cpuTime": {
				"type": "double"
			},
			"jvmGcTime": {
				"type": "double"
			},
			"host": {
				"type": "keyword"
			},
			"process": {
				"type": "keyword"
			}
		}
	}
	...
}

Create a detector

Consider the following factors before you create a detector:

  • Indexes to monitor – You can use a group of index names, aliases, or patterns. Here we use the host-cloudwatch index created in the last step.
  • Timestamp field – A detector monitors time series data. Each document in the provided index must be associated with a timestamp. In our example, we use the @timetamp field.
  • Filter – A filter selects data you want to analyze based on some condition. One example filter selects requests with status code 400 afterwards from HTTP request logs. The 4xx and 5xx classes of HTTP status code indicate that a request is returned with an error. Then you can create an anomaly detector for the number of error requests. In our running example, we analyze all of the data, and thus no filter is used.
  • Category field – Every entity has specific characteristics that describe it. Category fields provide categories of those characteristics. An entity can have up to two category fields as of Amazon OpenSearch Service 1.1. Here we monitor a specific process of a particular host by specifying the process and host field.
  • Detector interval – The detector interval is typically application-defined. We aggregate data within an interval and run models on the aggregated data. As mentioned earlier, AD is suitable for dense time series that can be uniformly sampled. You should at least make sure most intervals have data. Also, different detector intervals require different trade-offs between delay and accuracy. Long intervals smooth out long-term and short-term workload fluctuations and, therefore, may be less prone to noise, resulting in a high delay in detection. Short intervals lead to quicker detection but may find anticipated workload fluctuations instead of anomalies. You can plot your time series with various intervals and observe which interval keeps relevant anomalies while reducing noise. For this example, we use the default 10-minute interval.
  • Feature – A feature is an aggregated value extracted from the monitored data. It gets sent to models to measure the degrees of abnormality. Forming a feature can be as simple as picking a field to monitor and the aggregation function that summarizes the field data as metrics. We provide a suite of functions such as min and average. You can also use a runtime field via scripting. We’re interested in the garbage collection time field aggregated via the average function in this example.
  • Window delay – Ingestion delay. If the value isn’t configured correctly, a detector might analyze data before the late data arrives at the cluster. Because we ingested all the data in advance, the window delay is 0 in this case.

Our detector’s configuration aggregates average garbage collection processing time every 10 minutes and analyzes the average at the granularity of processes on different hosts. The API request to create such a detector is as follows. You can also use our streamlined UI to create and start a detector.

POST _plugins/_anomaly_detection/detectors
{
	"name": "detect_gc_time",
	"description": "detect gc processing time anomaly",
	"time_field": "@timestamp",
	"indices": [
		"host-cloudwatch"
	],
	"category_field": ["host", "process"],
	"feature_attributes": [{
		"feature_name": "jvmGcTime average",
		"feature_enabled": true,
		"importance": 1,
		"aggregation_query": {
			"gc_time_average": {
				"avg": {
					"field": "jvmGcTime"
				}
			}
		}
	}],
	"detection_interval": {
		"period": {
			"interval": 10,
			"unit": "MINUTES"
		}
	},
	"schema_version": 2
}

After the initial training is complete, all models of the 1 million entities are up in the memory, and 1 million results are generated every detector interval after a few hours. To verify the number of active models in the cache, you can run the profile API:

GET /_plugins/_anomaly_detection/detectors/fkzfBX0BHok1ZbMqLMdu/_profile/models

We get the following response:

{
	...
	"model_count": 1000000
}

You can observe how many results are generated every detector interval (in our case 10 minutes) by invoking the result search API:

GET /_plugins/_anomaly_detection/detectors/results/_search
{
	"query": {
		"range": {
			"execution_start_time": {
				"gte": 1636501467000,
				"lte": 1636502067000
			}
		}
	},
	"track_total_hits": true
}

We get the following response:

{
	...
	"hits": {
		"total": {
			"value": 1000000,
			"relation": "eq"
		},
		...
	}
	...
}

The OpenSearch Dashboard gives an exposition of the top entities producing the most severe or most number of anomalies.

anomaly overview

You can choose a colored cell to review the details of anomalies occurring within that given period.

press anomaly

You can view anomaly grade, confidence, and the corresponding features in a shaded area.

feature graph

Create a monitor

You can create an alerting monitor to notify you of anomalies based on the defined anomaly detector, as shown in the following screenshot.

create monitor

We use anomaly grade and confidence to define a trigger. Both anomaly grade and confidence are values between 0 and 1.

Anomaly grade represents the severity of an anomaly. The closer the grade is to 1, the higher the severity. 0 grade means the corresponding prediction isn’t an anomaly.

Confidence measures whether an entity’s model has observed enough data such that the model contains enough unique, real-world data points. If a confidence value from one model is larger than the confidence of a different model, then the anomaly from the first model has observed more data.

Because we want to receive high fidelity alerts, we configured the grade threshold to be 0 and the confidence threshold to be 0.99.

edit trigger

The final step of creating a monitor is to add an action on what to include in the notification. Our example detector finds anomalies at a particular process in a host. The notification message should contain the entity identity. In this example, we use ctx.results.0.hits.hits.0._source.entity to grab the entity identity.

edit action

A monitor based on a detector extracts the maximum grade anomaly and triggers an alert based on the configured grade and confidence threshold. The following is an example alert message:

Attention

Monitor detect_cpu_gc_time2-Monitor just entered alert status. Please investigate the issue.
- Trigger: detect_cpu_gc_time2-trigger
- Severity: 1
- Period start: 2021-12-08T01:01:15.919Z
- Period end: 2021-12-08T01:21:15.919Z
- Entity: {0={name=host, value=host107}, 1={name=process, value=process622}}

You can customize the extraction query and trigger condition by changing the monitor defining method to Extraction query monitor and modifying the corresponding query and condition. Here is the explanation of all anomaly result index fields you can query.

edit monitor

Evaluation

In this section, we evaluate HCAD’s precision, recall, and overall performance.

Precision and recall

We evaluated precision and recall over the cosine wave data, as mentioned earlier. Such evaluations aren’t easy in the context of real-time processing because only one point is available per entity during each detector interval (10 minutes in the example). Processing all the points takes a long time. Instead, we simulated real-time processing by fast-forwarding the processing in a script. The results are an average of 100 runs. The standard deviation is around 0.12.

The overall average precision, including the effects of cold start using linear interpolation, for the synthetic data is 0.57. The recall is 0.61. We note that no transformations were applied; it’s possible and likely that transformations improve these numbers. The precision is 0.09, and recall is 0.34 for the first 300 points due to interpolated cold start data for training. The numbers pick up as the model observes more real data. After another 5,000 real data points, the precision and recall improve to 0.57 and 0.63, respectively. We reiterate that the exact numbers vary based on the data characteristics—a different benchmark or detection configuration would have other numbers. Further, if there is no missing data, the fidelity of the HCAD model would be the same as that of a single-stream detector.

Performance

We ran HCAD on an idle cluster without ingestion or search traffic. Metrics such as JVM memory pressure and CPU of each node are well within the safe zone, as shown in the following screenshot. JVM memory pressure varies between 23–39%. CPU is mostly around 1%, with hourly spikes up to 65%. An internal hourly maintenance job can account for the spike due to saving hundreds of thousands of model checkpoints, clearing unused models, and performing bookkeeping for internal states. However, this can be a future improvement.

jvm memory pressure

cpu

Implementation

We next discuss the specifics of the technical work that is germane to HCAD’s completeness and scalability.

RCF 2.0

In Amazon OpenSearch Service 1.1, we integrated with Random Cut Forest library (RCF) 2.0. RCF is based on partitioning data into different bounding boxes. The previous RCF version maintains bounding boxes in memory. However, a real-time detector only uses the bounding boxes when processing a new data point and leaves them dormant most of the time. RCF 2.0 allows for recreating those bounding boxes when required so that bounding boxes are present in memory when processing the corresponding input. The on-demand recreation has led to nine times memory overhead reduction and therefore can support hosting nine times as many models in a node. In addition, RCF 2.0 revamps the serialization module. The new module serializes and deserializes a model 26 times faster using 20 times smaller disk space.

Pagination

Regarding feature aggregation, we switched from getting top hits using terms aggregation to pagination via composite aggregation. We evaluated multiple pagination implementations using a generated dataset with 1 million entities. Each entity has two documents. The experiment configurations can vary according to the number of data nodes, primary shards, and categorical fields. We believe composite queries are the right choice because even though they may not be the fastest in all cases, they’re the most stable on average (40 seconds).

Amortize expensive operations

HCAD can face thundering herd traffic, in which many entities make requests like reading checkpoints from disks at approximately the same time. Therefore, we create various queues to buffer pent-up requests. These queues amortize expensive costs by performing a small and bounded amount of work steadily. Therefore, HCAD can offer predictable performance and availability at scale.

In-memory cache

HCAD appeals to caching to process entities whose memory requirement is larger than the configured memory size. At first, we tried a least recently used cache but experienced thrashing when running the HTTP logs workload: with 100 1-minute interval detectors and millions of entities for each detector, we saw few cache hits (many hundreds) within 7 hours. We were wasting CPU cycles swapping models in and out of memory all the time. As a general rule, a hit-to-miss ratio worse than 3:1 is not worth considering caching for quick model accesses.

Instead, we turned to a modified LFU caching, augmented to include heavy hitters’ approximation. A decayed count is maintained for each model in the cache. The decayed count for a model in the cache is incremented when the model is accessed. The model with the smallest decayed count is the least frequently used model. When the cache reaches its capacity, it invalidates and removes the least frequently used model if the new entity’s frequency is no smaller than the least frequently used entity. This connection between heavy hitter approximation and traditional LFU allows us to make the more frequent and recent models sticky in memory and phase out models with lesser cache hit probabilities.

Fault tolerance

Unrecoverable memory state is limited, and enough information of models is stored on disk for crash resilience. Models are recovered on a different host after a crash is detected.

High performance

HCAD builds on asynchronous I/O: all I/O requests such as network calls or disk accesses are non-blocking. In addition, model distribution is balanced across the cluster using a consistent hash ring.

Summary

We enhanced HCAD to improve its scalability and completeness without altering the fidelity of the computation. As a result of these improvements, I showed you how to size an OpenSearch domain and use HCAD to monitor 1 million entities in 10 minutes. To learn more about HCAD, see anomaly detection documentation.

If you have feedback about this post, submit comments in the comments section below. If you have questions about this post, start a new thread on the Machine Learning forum.


About the Author

bio

Kaituo Li is an engineer in Amazon OpenSearch Service. He has worked on distributed systems, applied machine learning, monitoring, and database storage in Amazon. Before Amazon, Kaituo was a PhD student in Computer Science at University of Massachusetts, Amherst. He likes reading and sports.

A deep dive into high-cardinality anomaly detection in Elasticsearch

Post Syndicated from Kaituo Li original https://aws.amazon.com/blogs/big-data/a-deep-dive-into-high-cardinality-anomaly-detection-in-elasticsearch/

In May 2020, we announced the general availability of real-time anomaly detection for Elasticsearch. With that release we leveraged the Random Cut Forest (RCF) algorithm to identify anomalous behaviors in the multi-dimensional data streams generated by Elasticsearch queries. We focused on aggregation first, to enable our users to quickly and accurately detect anomalies in their data streams. However, consider the example data in the following table.

timestamp avg. latency region
12:00 pm 3.1 Seattle
12:00 pm 4.1 New York
12:00 pm 5.9 Berlin
12:01 pm 2.6 Seattle
12:01 pm 5.3 New York
12:01 pm 5.8 Berlin

The data consists of one data field, avg. latency, and one attribute or categorical field, time. If we want to perform anomaly detection on this data, we could take the following strategy:

  1. Separate the data by the region attribute to create a separate data stream entity for each region.
  2. Construct an anomaly detector for each entity. If the cardinality of the region attribute, that is the number of possible choices of region value, is small, we can create separate anomaly detectors by filtering on each possible value of region.

But what if the cardinality of this attribute is large? Or what if the set of possible values changes over time, such as a source IP address or product ID? The existing anomaly detection tool doesn’t scale well in this situation.

We define the high-cardinality anomaly detection (HCAD) problem as performing anomaly detection on a data stream where individual entities in the stream are defined by a choice of attribute. In this use case, our goal is to perform anomaly detection on each data stream defined by a particular choice of region. That is, the Seattle region produces its own latency data stream, as well as the New York and Berlin regions.

In this post, we dive into the motivation, design, and development of the HCAD capability. We begin with an in-depth description of the HCAD problem and its properties. We then share the details of our solution and the challenges and questions we encountered during our research and development. Finally, we describe the system and architecture of our solution, especially the components tackling scalability concerns.

High-cardinality anomaly detection

In this section, we elaborate on the definition of the HCAD problem. As described earlier, we can think of HCAD as a way to produce multiple data streams by defining each data stream by a particular choice of attribute. For each data stream, we want to perform streaming anomaly detection as usual—we want to detect anomalies relative to that individual data stream’s own history.

We define an individual entity by fixing a particular value of one or more attributes and aggregating values over all remaining attributes. In the language of table manipulation in SQL, each entity is defined using GROUPBY on an attribute. Specifically, a group of entities is defined by selecting one or more attributes, where each entity is given by the data fields for each particular value of those attributes. For example, applying GROUPBY to the region attribute in the preceding example data produces three data stream entities: one for Seattle, one for New York, and one for Berlin. Within each of these data streams, we want to find anomalies with respect to that data stream’s history.

This idea of defining a data stream entity by attribute values extends to multiple attribute fields. When multiple attribute fields exist but we group by only one of those attributes, we aggregate each entity over the remaining attribute fields. For example, suppose you have network traffic data consisting of two attribute fields and one data field. The attribute fields are a source_ip address and a dest_ip address. The data field is the number of bytes_transferred in that particular network transaction between the given two IP addresses. The following table gives an example of such a dataset.

timestamp source_ip dest_ip bytes_transferred
12:00 pm 192.168.1.1 192.168.1.20 138
12:00 pm 192.168.1.1 192.168.1.300 21
12:00 pm 192.168.1.20 192.168.1.300 5
12:01 pm 192.168.1.1 192.168.1.20 289
12:01 pm 192.168.1.1 192.168.1.300 10
12:02 pm 192.168.1.1 192.168.1.20 244
12:02 pm 192.168.1.1 192.168.1.300 16
12:02 pm 192.168.1.20 192.168.1.300 8

One way to define an entity is to group by both source IP address and destination IP address combinations. Under this method of defining entities, we end up with the following data streams.

entity: (source_ip, dest_ip) 12:00 pm 12:01 pm 12:02 pm 12:03 pm
(192.168.1.1, 192.168.1.20) 138 289 244
(192.168.1.1, 192.168.1.300) 25 10 16
(192.168.1.20, 192.168.1.300) 5 0 8

On the other hand, if we define an entity only by its source IP address, we aggregate bytes transferred over the possible destination IP addresses.

entity: (source_ip,) 12:00 pm 12:01 pm 12:02 pm 12:03 pm
(192.168.1.1,) 159 299 260
(192.168.1.20,) 5 0 8

HCAD is distinct from another anomaly detection technique called population analysis. The goal of population analysis is to discover entire entities with values and patterns distinct from other entities. For example, the bytes transferred data stream associated with the entity (192.168.1.1, 192.168.1.20) is much larger in value than either of the other entities. Assuming many entities exist with values in the range of 1 to 30, this entity is considered a population anomaly. An entity can be a population data stream contains no anomalies relative to its own history.

Depending on the way we define entities from attributes, the number of data stream entities changes. This is an important consideration with regards to scale and density of the data streams: grouping by too many attributes may leave you with entities that have too few observations for a meaningful data stream. This is not uncommon in real-world datasets. Even in a dataset with only one attribute, real-world data tends to adhere to a power-law scaling of data density. Simply put, the majority of data stream activity occurs in a minority of entities. There is likely a long tail of sparse entities. Given this observation, if the stream aggregation window is too small, there are many missing data points in these sparse entities.

Data stream models for HCAD

We described the HCAD problem, but how do we build a machine learning solution? Furthermore, how is this solution different from the currently available non-HCAD single-stream solution? In this section, we explain our process for model selection and why we arrived at using Random Cut Forests for the high-cardinality regime. We then address scalability problems by exploring RCF’s hyperparameter space. Finally, we address certain issues that arise when dealing with sparse data streams.

Model selection

Designing an HCAD solution has several scientific challenges. Whatever algorithmic solution we arrive at must satisfy several systems constraints:

  • The algorithm must work in a streaming context: aggregated feature queries are streaming in Elasticsearch and the anomaly detection models only receive each new feature aggregate one at a time
  • The HCAD solution must respect the business needs of the customer hardware and should have restricted CPU and memory impact
  • The solution should be scalable with respect to data throughput, number of entities, and number of nodes in the cluster
  • The algorithm must be unsupervised, because the goal is to classify anomalous data in a streaming context without any labeled training set

Our team identified three classes of anomaly detection model based on the relationship between number of entities and number of models:

  • 1:1 model – Each entity is given its own AD model. No data or anomaly information is shared between the models, but because the number of models scales with the number of entities, we must keep the model small to satisfy customer scaling needs.
  • N:1 model – A single AD model is responsible for detecting each entity’s anomalies. Deep learning-based AD models typically fall under this category.
  • N:K model – A subset of entities is assigned to one of several individual models. Typically, some clustering algorithm is used to determine an appropriate partition of entities by identifying common features in the data streams.

Each general class of solution has its own tradeoffs with respect to the ability to distribute across cluster nodes, scale with respect to the number of entities, and detect anomalies on benchmark datasets. After some analysis of these tradeoffs and experimentation, we decided on the 1:1 approach. Within this class of HCAD solution, there are many candidate data stream anomaly detection algorithms. We explored many of these algorithms and tested different lightweight models before deciding on using Random Cut Forests. RCF works particularly well across a wide variety of data stream behaviors. This fit well with our goal of providing support for as wide of a range of customer use cases as possible.

Scaling Random Cut Forests

To keep memory costs down when using RCFs as our AD model, we started by exploring the algorithm’s hyperparameter space. The model has three main hyperparameters:

  • T – Number of trees
  • S – Sample size per free
  • D – Shingle dimension

The RCF model size is O(TDS). Sample size per tree is related to expected anomaly rate, and based on our experiments with a wide variety of datasets, it was best to leave this hyperparameter at its default value of 256 from the single-stream solution. The dimensionality is a function of the customer input but also of the model’s shingle size. We discuss the role of shingle size in the next section. Primarily to satisfy the scaling and model size constraints of the HCAD system, we focused on studying the effect of the number of trees on algorithm performance.

Experiments show that 10 trees per forest gives acceptable results on benchmark datasets; a default number of 100 trees is used in the single-stream solution. In the original plugin, we chose this large number of trees to ensure that the model can keep an accurate sketch of a long enough period of data samples. In doing so, we can recognize long time-scale changes to the data stream. However, we found in our benchmark high-cardinality data streams that this large of a model is unnecessary and that 10 trees is often sufficient for summarizing each high-cardinality data stream’s statistics.

Our experiments measured the precision and recall on labeled data streams. Labels were of the form of anomaly windows: regions in time where an anomaly is known to occur at some point inside the window. A true positive is the positive identification of such a window by the anomaly detection method. Any positively predicted point outside a window is considered a false positive. For an example labeled dataset, see Using Random Cut Forests for real-time anomaly detection in Amazon Elasticsearch Service.

Handling sparse data streams

As mentioned earlier, real-world high-cardinality datasets typically exhibit a power-law like distribution in entity activity. That is, a minority of the entities produce the majority of the data. The earlier source and destination IP address use case is an example: for many websites, the majority of traffic comes from a small collection of sources, whereas individual visitors make up a long tail of sparse activity. Under this assumption, the choice of shingle size is important in defining our entity data streams.

Shingling is a standard preprocessing technique for transforming a one-dimensional data stream xt​ into a d-dimensional data stream st​ by converting subsequences of length d into d-dimensional vectors: st = (xt−d+1​, …, xt−1​, xt​). The following diagram illustrates the shingling process using a shingle size of four.

These vectors, instead of the raw stream values, are then fed into the RCF model. In anomaly detection, using shingling has several benefits. First, shingling filters out small-scale noise in the data. Second, shingles allow the model to detect breaks in certain local patterns or frequency changes. That is, a shingled RCF model learns some of the local temporal behavior of your data stream.

From discussions with our customers and analysis of real-world anomalies, we realized that many customers are looking for distributional anomalies: values that are outside the normal range of values of a data stream. This is in contrast to contextual anomalies, where a data point is considered anomalous in the context of just the data stream’s local history. The following figure depicts this distinction. On the left is a plot of a data stream, and on the right is a histogram of the values attained by this stream in the time window shown. The red data point is a distributional anomaly because its value falls within a low-density regime of the value distribution. The orange data point, on the other hand, is a contextual anomaly: its value is commonly occurring within this span of time but the presence of a spike at this particular point in time is unexpected.

The use of a shingle dimension greater than one allows the RCF model to detect these contextual anomalies in addition to the distributional anomalies.

One challenge with using shingles, however, is how to handle missing data. When data is unavailable at a particular time t, the shingles at times t, t+1, …, t+d−1 cannot be constructed. This results in a delay in the model’s ability to report anomalies. Our the impact of the occasional missing datum by using interpolation. However, when a data stream is sparse, it’s unlikely that any shingle can be constructed, thus turning interpolation into a prediction problem. Whether or not shingling is appropriate for your data is a function of the aggregation window used in the Elasticsearch query and the entity data density.

Scaling anomaly detection in Elasticsearch

In this section, we deep dive into the engineering challenges encountered in building the HCAD tool, particularly regarding the scalability with respect to the number of entities. We first describe the challenges we faced. Then we explain how our HCAD solution balances scalability and resource usage. Finally, we collected these ideas into a description of the overall HCAD framework.

The challenge

As described earlier, our goal was to support filtering the data by attribute or categorical fields and create a separate model for each attribute or categorical value. After examining several real-world use cases, we needed the HCAD plugin to handle millions of categorical values. Processing this many unique values was a challenging scalability issue that affected several key resources:

  • Storage – At the extreme, with 100 1-minute interval detectors and millions of entities for each detector running on our evaluation workload, we have seen the checkpoint index reach up to 170 GB in 1 day.
  • Memory – Compared to the single-stream detector, we could decrease the model size by approximately 20 times by decreasing shingle size and the number of RCF trees. But the number of entities is unbounded.
  • CPU – A single-stream detector mostly runs serial processing. During an HC detector run, multiple entities compete for CPU cycles for model update and inference. The CPU time grows linearly relative to the number of entities processed in each AD job run.

Designing for scalability and resource control

Based on these scalability issues, we chose to extend the current AD architecture because it already had these attributes:

  • Easy to scale out
  • Powerful enough to handle unpredictable scaling requirements
  • Able to control resource usage

However, meeting these challenges for HCAD required three key changes to our existing AD architecture.

First, we placed embarrassingly parallel computations on multiple nodes instead of a coordinating node. The coordinating node acts as the start of the task workflow. It only fetches features and assigns each node in the cluster a portion of the features that is roughly the same in size for all nodes. Other nodes process the features, train and run local models, and write results. Therefore, increasing the number of nodes by a factor of K asymptotically increases the number of categorical values we can handle by the same factor.

Second, in a single-stream detector, the amount of memory used is proportional to the number of features and is fixed when the detector is defined. However, with the introduction of HCAD, the number of entities is not fixed and the number of active entities is likely to change. Therefore, the size of the required memory may continuously change in the lifetime of a detector. Caching can accommodate such requirements without the need to pre-allocate memory for a detector in a fixed amount. If enough memory exists, we create models for all entities and monitor anomalies. Otherwise, we cache the hottest entities’ models up to the amount that the cache memory can contain. For example, if our memory can host only 100 models and there are millions of entities, the maximum active entities in the cache are the hottest 100 entities. We maintain a time-decayed count of each entity. The cache uses this information to measure an entity’s hotness.

Finally, we implemented various strategies for combating the extra overhead of running HC detectors:

  • Rate limiting – We limit concurrent computations and throttle bursty usage. For example, when replacing models in the cache, the cache sends get and search requests to fetch and potentially train models. If there is bursty traffic to replace models, the number of requests might exceed Elasticsearch’s get and search thread pool’s maximum queue size and cause Elasticsearch to reject all get and search requests. We install rate limiting to restrict models’ replacing speed.
  • Active cleanup – This keeps resource usage under a safe level before it’s too late. For example, we keep checkpoints within 3 days. When any of the checkpoint shards is larger than 50 GB (recommended maximum shard size), we start deleting checkpoints more aggressively.
  • Minimizing space usage – For example, in single-stream anomaly detection, we record a model’s running results during each interval. An entity’s model may take time to get ready when there is not enough historical data for training. We don’t need to record such entities’ results because we won’t record anything useful other than that anomaly grade and confidence are both equal to zero. This optimization can reduce the result index size by 4–8 times in one of our experiments.

Architecture

The following figure summarizes the HCAD architecture.

The end-to-end story of HCAD is as follows:

  1. A user wants to get alerts when an anomaly for a particular entity in the whole corpus arises (for example, high CPU usage on a host).
  2. The user creates an HCAD detector to describe the source data (index name), feature (for example, average CPU usage within an interval), and sampling frequency (for example, 1 minute).
  3. Based on the detector configuration, the AD plugin issues a query to fetch feature data for each host regularly (every 1 minute). Users don’t need to know what hosts to query for in the first place.
  4. A coordinating node infers the entities from the query result.
  5. The coordinating node distributes entities’ features to all nodes in the cluster.
  6. On each node, models are trained for the incoming entities, and anomaly grades are inferred, indicating how different the current CPU usage is from the trends that have recently been observed for the same hosts’ CPU usage.
  7. If cache memory is enough for all incoming entities, the cache admits entities’ models based on the entities’ hotness.

The Kibana workflow

In this section, we show how to use the HCAD in Kibana. Let’s imagine that we need to monitor the high or low CPU usage of our hosts. To do that, we create a detector, define its features, and choose a category field.

Creating a detector

To create and configure a detector, complete the following steps:

  1. On the navigation bar, choose Anomaly detection.
  2. Choose Create detector.

  1. Enter a name and description for the detector.

  1. Choose index or enter index pattern for the data source.
  2. For Timestamp field, choose a field so the detector can create a time series profile of the data.
  3. If you want the detector to ignore specific data (such as invalid CPU usage number), you can configure a data filter.

  1. Specify time frames for detection interval and window delay.

Window delay time should be a conservative estimate. Otherwise, the detector may query for documents within an interval that has not been indexed yet. For this post, we want to have an average CPU usage per minute, and we expect the index processing time to be 1 minute at most.

Defining features

In addition to the preceding settings, we need to add features. The detector aggregates a set of values within a time interval (shingle) to compute the single value according to the feature definition.

  1. Choose Configure model.

  1. For Feature name, enter a name.
  2. Specify your aggregation functions and fields.

We provide five built-in single metric aggregations: Min, Max, Sum, Average, and Count. You can add a customized aggregation by choosing Custom expression for Find anomalies based on. For this post, we add a feature that returns the average of CPU usage values.

As mentioned earlier, you can customize the aggregation method as long as it returns a single value. For example, when a DevOps engineer wants to monitor the count of distinct IPs accessing their company’s Amazon Simple Storage Service (Amazon S3) buckets, they can define a cardinality aggregation that counts unique source IPs.

Choosing a category field

The host-cloudwatch index in our example has CPU usage per host per minute. We can define a single-stream detector to model all of the hosts’ average CPU usage together. But if each host’s CPU values have different distributions, we can split the hosts’ time series and model them separately. Giving each categorical value a separate baseline is the main change that HCAD introduces.

Previewing and starting the detector

You might want to try out different choices of detector configurations and feature definitions before finalizing them. You can use Sample anomalies for iterative experiments.

Start the detector by choosing  Save and start detector. After confirming, the anomaly detector starts collecting data in real time and performing detection.

The detector starts in an initializing state.

We can use the profile API to check initialization progress (see the following code). A detector is initialized if its hottest entities’ models are fully initialized and ready to emit anomaly grade. Because the hottest entity may change, the initialization progress may go backward.

GET _opendistro/_anomaly_detection/detectors/stf6vnUB0XEggmgl3TCj/_profile/init_progress

{
    "init_progress": {
        "percentage": "92%",
        "estimated_minutes_left": 10,
        "needed_shingles": 10
    }
}

After the detector runs for a while, we can check its result on the detector’s Anomaly results tab. The following heatmap gives an overview of anomalies per entity across a timeline, by showing the hostname along the Y-axis and the timeline along the X-axis. A colored block means there is an anomaly, and a gray block means there is no anomaly.

Choosing one of the blocks shows you a more detailed view of the anomaly grade and confidence and the feature values causing the anomalies. We can observe the detector reports anomalies between 4:30 and 4:50 because the CPU usage is approaching 100%.

The time series of the host-cloudwatch index confirms host i-WrSNK7zgys has a CPU usage spike between 4:30–4:50.

We can set up alerts for the detection results. For instructions, see Anomaly Detection.

Conclusion

General-purpose anomaly detection is challenging. Earlier in 2020, we launched a tool that can find anomalies in your feature queries. In this work, we extended the anomaly detection capabilities to the high-cardinality case. We can now find anomalies within your data when the data contains attribute or categorical fields. Our solution can discover anomalies across many entities defined by these attribute values and also scale with respect to increasing or decreasing number of entities in your data. We look forward to hearing your questions, comments, and feedback.


About the Authors

Kaituo Li is an engineer in Amazon Elasticsearch Service. He has worked on distributed systems, applied machine learning, monitoring, and database storage in Amazon. Before Amazon, Kaituo was a PhD student in Computer Science at the University of Massachusetts Amherst. He likes reading, watching TV, and sports.

 

 

Chris Swierczewski is an applied scientist at AWS. He enjoys reading, weightlifting, painting, and board games.