All posts by Antonio Vespoli

Real-time time series anomaly detection for streaming applications on Amazon Kinesis Data Analytics

Post Syndicated from Antonio Vespoli original https://aws.amazon.com/blogs/big-data/real-time-time-series-anomaly-detection-for-streaming-applications-on-amazon-kinesis-data-analytics/

Detecting anomalies in real time from high-throughput streams is key for informing on timely decisions in order to adapt and respond to unexpected scenarios. Stream processing frameworks such as Apache Flink empower users to design systems that can ingest and process continuous flows of data at scale. In this post, we present a streaming time series anomaly detection algorithm based on matrix profiles and left-discords, inspired by Lu et al., 2022, with Apache Flink, and provide a working example that will help you get started on a managed Apache Flink solution using Amazon Kinesis Data Analytics.

Challenges of anomaly detection

Anomaly detection plays a key role in a variety of real-world applications, such as fraud detection, sales analysis, cybersecurity, predictive maintenance, and fault detection, among others. The majority of these use cases require actions to be taken in near real-time. For instance, card payment networks must be able to identify and reject potentially fraudulent transactions before processing them. This raises the challenge to design near-real-time anomaly detection systems that are able to scale to ultra-fast arriving data streams.

Another key challenge that anomaly detection systems face is concept drift. The ever-changing nature of some use cases requires models to dynamically adapt to new scenarios. For instance, in a predictive maintenance scenario, you could use several Internet of Things (IoT) devices to monitor the vibrations produced by an electric motor with the objective of detecting anomalies and preventing unrecoverable damage. Sounds emitted by the vibrations of the motor can vary significantly over time due to different environmental conditions such as temperature variations, and this shift in pattern can invalidate the model. This class of scenarios creates the necessity for online learning—the ability of the model to continuously learn from new data.

Time series anomaly detection

Time series are a particular class of data that incorporates time in their structuring. The data points that characterize a time series are recorded in an orderly fashion and are chronological in nature. This class of data is present in every industry and is common at the core of many business requirements or key performance indicators (KPIs). Natural sources of time series data include credit card transactions, sales, sensor measurements, machine logs, and user analytics.

In the time series domain, an anomaly can be defined as a deviation from the expected patterns that characterize the time series. For instance, a time series can be characterized by its expected ranges, trends, seasonal, or cyclic patterns. Any significant alteration of this normal flow of data points is considered an anomaly.

Detecting anomalies can be more or less challenging depending on the domain. For instance, a threshold-based approach might be suitable for time series that are informed of their expected ranges, such as the working temperature of a machine or CPU utilization. On the other hand, applications such as fraud detection, cybersecurity, and predictive maintenance can’t be classified via simple rule-based approaches and require a more fine-grained mechanism to capture unexpected observations. Thanks to their parallelizable and event-driven setup, streaming engines such as Apache Flink provide an excellent environment for scaling real-time anomaly detection to fast-arriving data streams.

Solution overview

Apache Flink is a distributed processing engine for stateful computations over streams. A Flink program can be implemented in Java, Scala, or Python. It supports ingestion, manipulation, and delivery of data to the desired destinations. Kinesis Data Analytics allows you to run Flink applications in a fully managed environment on AWS.

Distance-based anomaly detection is a popular approach where a model is characterized by a number of internally stored data points that are used for comparison against the new incoming data points. At inference time, these methods compute distances and classify new data points according to how dissimilar they are from the past observations. In spite of the plethora of algorithms in literature, there is increasing evidence that distance-based anomaly detection algorithms are still competitive with the state of the art (Nakamura et al., 2020).

In this post, we present a streaming version of a distance-based unsupervised anomaly detection algorithm called time series discords, and explore some of the optimizations introduced by the Discord Aware Matrix Profile (DAMP) algorithm (Lu et al., 2022), which further develops the discords method to scale to trillions of data points.

Understanding the algorithm

A left-discord is a subsequence that is significantly dissimilar from all the subsequences that precede it. In this post, we demonstrate how to use the concept of left-discords to identify time series anomalies in streams using Kinesis Data Analytics for Apache Flink.

Let’s consider an unbounded stream and all its subsequences of length n. The m most recent subsequences will be stored and used for inference. When a new data point arrives, a new subsequence that includes the new event is formed. The algorithm compares this latest subsequence (query) to the m subsequences retained from the model, with the exclusion of the latest n subsequences because they overlap with the query and would therefore characterize a self-match. After computing these distances, the algorithm classifies the query as an anomaly if its distance from its closest non-self-matching subsequence is above a certain moving threshold.

For this post, we use a Kinesis data stream to ingest the input data, a Kinesis Data Analytics application to run the Flink anomaly detection program, and another Kinesis data stream to ingest the output produced by your application. For visualization purposes, we consume from the output stream using Kinesis Data Analytics Studio, which provides an Apache Zeppelin Notebook that we use to visualize and interact with the data in real time.

Implementation details

The Java application code for this example is available on GitHub. To download the application code, complete the following steps:

  1. Clone the remote repository using the following command:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples

  2. Navigate to the amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords directory:

Let’s walk through the code step by step.

The MPStreamingJob class defines the data flow of the application, and the MPProcessFunction class defines the logic of the function that detects anomalies.

The implementation is best described by three core components:

  • The Kinesis data stream source, used to read from the input stream
  • The anomaly detection process function
  • The Kinesis data stream sink, used to deliver the output into the output stream

The anomaly detection function is implemented as a ProcessFunction<String, String>. Its method MPProcessFunction#processElement is called for every data point:

@Override
public void processElement(String dataPoint, ProcessFunction<String, OutputWithLabel>.Context context,
                            Collector<OutputWithLabel> collector) {

   Double record = Double.parseDouble(dataPoint);

   int currentIndex = timeSeriesData.add(record);

   Double minDistance = 0.0;
   String anomalyTag = "INITIALISING";

   if (timeSeriesData.readyToCompute()) {
       minDistance = timeSeriesData.computeNearestNeighbourDistance();
       threshold.update(minDistance);
   }

   /*
   * Algorithm will wait for initializationPeriods * sequenceLength data points until starting
   * to compute the Matrix Profile (MP).
   */
   if (timeSeriesData.readyToInfer()) {
       anomalyTag = minDistance > threshold.getThreshold() ? "IS_ANOMALY" : "IS_NOT_ANOMALY";
   }

   OutputWithLabel output = new OutputWithLabel(currentIndex, record, minDistance, anomalyTag);

   collector.collect(output);
}

For every incoming data point, the anomaly detection algorithm takes the following actions:

  1. Adds the record to the timeSeriesData.
  2. If it has observed at least 2 * sequenceLength data points, starts computing the matrix profile.
  3. If it has observed at least initializationPeriods * sequenceLength data points, starts outputting anomaly labels.

Following these actions, the MPProcessFunction function outputs an OutputWithLabel object with four attributes:

  • index – The index of the data point in the time series
  • input – The input data without any transformation (identity function)
  • mp – The distance to the closest non-self-matching subsequence for the subsequence ending in index
  • anomalyTag – A binary label that indicates whether the subsequence is an anomaly

In the provided implementation, the threshold is learned online by fitting a normal distribution to the matrix profile data:

/*
 * Computes the threshold as two standard deviations away from the mean (p = 0.02)
 *
 * @return an estimated threshold
 */
public Double getThreshold() {
   Double mean = sum/counter;

   return mean + 2 * Math.sqrt(squaredSum/counter - mean*mean);
}

In this example, the algorithm classifies as anomalies those subsequences whose distance from their nearest neighbor deviates significantly from the average minimum distance (more than two standard deviations away from the mean).

The TimeSeries class implements the data structure that retains the context window, namely, the internally stored records that are used for comparison against the new incoming records. In the provided implementation, the n most recent records are retained, and when the TimeSeries object is at capacity, the oldest records are overridden.

Prerequisites

Before you create a Kinesis Data Analytics application for this exercise, create two Kinesis data streams: InputStream and OutputStream in us-east-1. The Flink application will use these streams as its respective source and destination streams. To create these resources, launch the following AWS CloudFormation stack:

Launch Stack

Alternatively, follow the instructions in Creating and Updating Data Streams.

Create the application

To create your application, complete the following steps:

  1. Clone the remote repository using the following command:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples 

  2. Navigate to the amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core directory.
    cd amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core

  3. Create your JAR file by running the following Maven command in the core directory, which contains the pom.xml file:
    mvn package -Dflink.version=1.15.4
  4. Create an Amazon Simple Storage Service (Amazon S3) bucket and upload the file target/left-discords-1.0.0.jar.
  5. Create and run a Kinesis Data Analytics application as described in Create and Run the Kinesis Data Analytics Application:
    1. Use the target/left-discords-1.0.0.jar.
    2. Note that the input and output streams are called InputStream and OutputStream, respectively.
    3. The provided example is set up to run in us-east-1.

Populate the input stream

You can populate InputStream by running the script.py file from the cloned repository, using the command python script.py. By editing the last two lines, you can populate the stream with synthetic data or with real data from a CSV dataset.

Visualize data on Kinesis Data Analytics Studio

Kinesis Data Analytics Studio provides the perfect setup for observing data in real time. The following screenshot shows sample visualizations. The first plot shows the incoming time series data, the second plot shows the matrix profile, and the third plot shows which data points have been classified as anomalies.

To visualize the data, complete the following steps:

  1. Create a notebook.
  2. Add the following paragraphs to the Zeppelin note:

Create a table and define the shape of the records generated by the application:

%flink.ssql

CREATE TABLE data (
index INT,
input VARCHAR(6),
mp VARCHAR(6),
anomalyTag VARCHAR(20)
)
PARTITIONED BY (index)
WITH (
'connector' = 'kinesis',
'stream' = 'OutputStream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)

Visualize the input data (choose Line Chart from the visualization options):

%flink.ssql(type=update)

SELECT index, input FROM data;

Visualize the output matrix profile data (choose Scatter Chart from the visualization options):

%flink.ssql(type=update)

SELECT index, mp FROM data;

Visualize the labeled data (choose Scatter Chart from the visualization options):

%flink.ssql(type=update)

SELECT index, anomalyTag FROM data;

Clean up

To delete all the resources that you created, follow the instructions in Clean Up AWS Resources.

Future developments

In this section, we discuss future developments for this solution.

Optimize for speed

The online time series discords algorithm is further developed and optimized for speed in Lu et al., 2022. The proposed optimizations include:

  • Early stopping – If the algorithm finds a subsequence that is similar enough (below the threshold), it stops searching and marks the query as non-anomaly.
  • Look-ahead windowing – Look at some amount of data in the future and compare it to the current query to cheaply discover and prune future subsequences that could not be left-discords. Note that this introduces some delay. The reason why disqualifying improves performance is that data points that are close in time are more likely to be similar than data points that are distant in time.
  • Use of MASS – The MASS (Mueen’s Algorithm for Similarity Search) search algorithm is designed for efficiently discovering the most similar subsequence in the past.

Parallelization

The algorithm above operates with parallelism 1, which means that when a single worker is enough to handle the data stream throughput, the above algorithm can be directly used. This design can be enhanced with further distribution logic for handling high throughput scenarios. In order to parallelise this algorithm, you may to design a partitioner operator that ensures that the anomaly detection operators would have at their disposal the relevant past data points. The algorithm can maintain a set of the most recent records to which it compares the query. Efficiency and accuracy trade-offs of approximate solutions are interesting to explore. Since the best solution for parallelising the algorithm depends largely on the nature of the data, we recommend experimenting with various approaches using your domain-specific knowledge.

Conclusion

In this post, we presented a streaming version of an anomaly detection algorithm based on left-discords. By implementing this solution, you learned how to deploy an Apache Flink-based anomaly detection solution on Kinesis Data Analytics, and you explored the potential of Kinesis Data Analytics Studio for visualizing and interacting with streaming data in real time. For more details on how to implement anomaly detection solutions in Apache Flink, refer to the GitHub repository that accompanies this post. To learn more about Kinesis Data Analytics and Apache Flink, explore the Amazon Kinesis Data Analytics Developer Guide.

Give it a try and share your feedback in the comments section.


About the Authors

Antonio Vespoli is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Samuel Siebenmann is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Nuno Afonso is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.