Tag Archives: Amazon Kinesis

Build a real-time streaming generative AI application using Amazon Bedrock, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Streams

Post Syndicated from Felix John original https://aws.amazon.com/blogs/big-data/build-a-real-time-streaming-generative-ai-application-using-amazon-bedrock-amazon-managed-service-for-apache-flink-and-amazon-kinesis-data-streams/

Generative artificial intelligence (AI) has gained a lot of traction in 2024, especially around large language models (LLMs) that enable intelligent chatbot solutions. Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies such as AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon through a single API, along with a broad set of capabilities to help you build generative AI applications with security, privacy, and responsible AI. Use cases around generative AI are vast and go well beyond chatbot applications; for instance, generative AI can be used for analysis of input data such as sentiment analysis of reviews.

Most businesses generate data continuously in real-time. Internet of Things (IoT) sensor data, application log data from your applications, or clickstream data generated by users of your website are only some examples of continuously generated data. In many situations, the ability to process this data quickly (in real-time or near real-time) helps businesses increase the value of insights they get from their data.

One option to process data in real-time is using stream processing frameworks such as Apache Flink. Flink is a framework and distributed processing engine for processing data streams. AWS provides a fully managed service for Apache Flink through Amazon Managed Service for Apache Flink, which enables you to build and deploy sophisticated streaming applications without setting up infrastructure and managing resources.

Data streaming enables generative AI to take advantage of real-time data and provide businesses with rapid insights. This post looks at how to integrate generative AI capabilities when implementing a streaming architecture on AWS using managed services such as Managed Service for Apache Flink and Amazon Kinesis Data Streams for processing streaming data and Amazon Bedrock to utilize generative AI capabilities. We focus on the use case of deriving review sentiment in real-time from customer reviews in online shops. We include a reference architecture and a step-by-step guide on infrastructure setup and sample code for implementing the solution with the AWS Cloud Development Kit (AWS CDK). You can find the code to try it out yourself on the GitHub repo.

Solution overview

The following diagram illustrates the solution architecture. The architecture diagram depicts the real-time streaming pipeline in the upper half and the details on how you gain access to the Amazon OpenSearch Service dashboard in the lower half.

Architecture Overview

The real-time streaming pipeline consists of a producer that is simulated by running a Python script locally that is sending reviews to a Kinesis Data Stream. The reviews are from the Large Movie Review Dataset and contain positive or negative sentiment. The next step is the ingestion to the Managed Service for Apache Flink application. From within Flink, we are asynchronously calling Amazon Bedrock (using Anthropic Claude 3 Haiku) to process the review data. The results are then ingested into an OpenSearch Service cluster for visualization with OpenSearch Dashboards. We directly call the PutRecords API of Kinesis Data Streams within the Python script for the sake of simplicity and to cost-effectively run this example. You should consider using an Amazon API Gateway REST API as a proxy in front of Kinesis Data Streams when using a similar architecture in production, as described in Streaming Data Solution for Amazon Kinesis.

To gain access to the OpenSearch dashboard, we need to use a bastion host that is deployed in the same private subnet within your virtual private cloud (VPC) as your OpenSearch Service cluster. To connect with the bastion host, we use Session Manager, a capability of Amazon Systems Manager, which allows us to connect to our bastion host securely without having to open inbound ports. To access it, we use Session Manager to port forward the OpenSearch dashboard to our localhost.

The walkthrough consists of the following high-level steps:

  1. Create the Flink application by building the JAR file.
  2. Deploy the AWS CDK stack.
  3. Set up and connect to OpenSearch Dashboards.
  4. Set up the streaming producer.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Implementation details

This section focuses on the Flink application code of this solution. You can find the code on GitHub. The StreamingJob.java file inside the flink-async-bedrock directory file serves as entry point to the application. The application uses the FlinkKinesisConsumer, which is a connector for reading streaming data from a Kinesis Data Stream. It applies a map transformation to convert each input string into an instance of Review class object, resulting in DataStream<Review> to ease processing.

The Flink application uses the helper class AsyncDataStream defined in the StreamingJob.java file to incorporate an asynchronous, external operation into Flink. More specifically, the following code creates an asynchronous data stream by applying the AsyncBedrockRequest function to each element in the inputReviewStream. The application uses unorderedWait to increase throughput and reduce idle time because event ordering is not required. The timeout is set to 25,000 milliseconds to give the Amazon Bedrock API enough time to process long reviews. The maximum concurrency or capacity is limited to 1,000 requests at a time. See the following code:

DataStream<ProcessedReview> processedReviewStream = AsyncDataStream.unorderedWait(inputReviewStream, new AsyncBedrockRequest(applicationProperties), 25000, TimeUnit.MILLISECONDS, 1000).uid("processedReviewStream");

The Flink application initiates asynchronous calls to the Amazon Bedrock API, invoking the Anthropic Claude 3 Haiku foundation model for each incoming event. We use Anthropic Claude 3 Haiku on Amazon Bedrock because it is Anthropic’s fastest and most compact model for near-instant responsiveness. The following code snippet is part of the AsyncBedrockRequest.java file and illustrates how we set up the required configuration to call the Anthropic’s Claude Messages API to invoke the model:

@Override
public void asyncInvoke(Review review, final ResultFuture<ProcessedReview> resultFuture) throws Exception {

    // [..]

    JSONObject user_message = new JSONObject()
        .put("role", "user")
        .put("content", "<review>" + reviewText + "</review>");

    JSONObject assistant_message = new JSONObject()
        .put("role", "assistant")
        .put("content", "{");

    JSONArray messages = new JSONArray()
            .put(user_message)
            .put(assistant_message);

    String payload = new JSONObject()
            .put("system", systemPrompt)
            .put("anthropic_version", "bedrock-2023-05-31")
            .put("temperature", 0.0)
            .put("max_tokens", 4096)
            .put("messages", messages)
            .toString();

    InvokeModelRequest request = InvokeModelRequest.builder()
            .body(SdkBytes.fromUtf8String(payload))
            .modelId("anthropic.claude-3-haiku-20240307-v1:0")
            .build();

    CompletableFuture<InvokeModelResponse> completableFuture = client.invokeModel(request)
            .whenComplete((response, exception) -> {
                if (exception != null) {
                    LOG.error("Model invocation failed: " + exception);
                }
            })
            .orTimeout(250000, TimeUnit.MILLISECONDS);

Prompt engineering

The application uses advanced prompt engineering techniques to guide the generative AI model’s responses and provide consistent responses. The following prompt is designed to extract a summary as well as a sentiment from a single review:

String systemPrompt = 
     "Summarize the review within the <review> tags 
     into a single and concise sentence alongside the sentiment 
     that is either positive or negative. Return a valid JSON object with 
     following keys: summary, sentiment. 
     <example> {\\\"summary\\\": \\\"The reviewer strongly dislikes the movie, 
     finding it unrealistic, preachy, and extremely boring to watch.\\\", 
     \\\"sentiment\\\": \\\"negative\\\"} 
     </example>";

The prompt instructs the Anthropic Claude model to return the extracted sentiment and summary in JSON format. To maintain consistent and well-structured output by the generative AI model, the prompt uses various prompt engineering techniques to improve the output. For example, the prompt uses XML tags to provide a clearer structure for Anthropic Claude. Moreover, the prompt contains an example to enhance Anthropic Claude’s performance and guide it to produce the desired output. In addition, the prompt pre-fills Anthropic Claude’s response by pre-filling the Assistant message. This technique helps provide a consistent output format. See the following code:

JSONObject assistant_message = new JSONObject()
    .put("role", "assistant")
    .put("content", "{");

Build the Flink application

The first step is to download the repository and build the JAR file of the Flink application. Complete the following steps:

  1. Clone the repository to your desired workspace:
    git clone https://github.com/aws-samples/aws-streaming-generative-ai-application.git

  2. Move to the correct directory inside the downloaded repository and build the Flink application:
    cd flink-async-bedrock && mvn clean package

Building Jar File

Maven will compile the Java source code and package it in a distributable JAR format in the directory flink-async-bedrock/target/ named flink-async-bedrock-0.1.jar. After you deploy your AWS CDK stack, the JAR file will be uploaded to Amazon Simple Storage Service (Amazon S3) to create your Managed Service for Apache Flink application.

Deploy the AWS CDK stack

After you build the Flink application, you can deploy your AWS CDK stack and create the required resources:

  1. Move to the correct directory cdk and deploy the stack:
    cd cdk && npm install & cdk deploy

This will create the required resources in your AWS account, including the Managed Service for Apache Flink application, Kinesis Data Stream, OpenSearch Service cluster, and bastion host to quickly connect to OpenSearch Dashboards, deployed in a private subnet within your VPC.

  1. Take note of the output values. The output will look similar to the following:
 ✅  StreamingGenerativeAIStack

✨  Deployment time: 1414.26s

Outputs:
StreamingGenerativeAIStack.BastionHostBastionHostIdC743CBD6 = i-0970816fa778f9821
StreamingGenerativeAIStack.accessOpenSearchClusterOutput = aws ssm start-session --target i-0970816fa778f9821 --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["vpc-generative-ai-opensearch-qfssmne2lwpzpzheoue7rkylmi.us-east-1.es.amazonaws.com"]}'
StreamingGenerativeAIStack.bastionHostIdOutput = i-0970816fa778f9821
StreamingGenerativeAIStack.domainEndpoint = vpc-generative-ai-opensearch-qfssmne2lwpzpzheoue7rkylmi.us-east-1.es.amazonaws.com
StreamingGenerativeAIStack.regionOutput = us-east-1
Stack ARN:
arn:aws:cloudformation:us-east-1:<AWS Account ID>:stack/StreamingGenerativeAIStack/3dec75f0-cc9e-11ee-9b16-12348a4fbf87

✨  Total time: 1418.61s

Set up and connect to OpenSearch Dashboards

Next, you can set up and connect to OpenSearch Dashboards. This is where the Flink application will write the extracted sentiment as well as the summary from the processed review stream. Complete the following steps:

  1. Run the following command to establish connection to OpenSearch from your local workspace in a separate terminal window. The command can be found as output named accessOpenSearchClusterOutput.
    • For Mac/Linux, use the following command:
aws ssm start-session --target <BastionHostId> --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["<OpenSearchDomainHost>"]}'
    • For Windows, use the following command:
aws ssm start-session ^
    —target <BastionHostId> ^
    —document-name AWS-StartPortForwardingSessionToRemoteHost ^    
    —parameters host="<OpenSearchDomainHost>",portNumber="443",localPortNumber="8157"

It should look similar to the following output:

Session Manager CLI

  1. Create the required index in OpenSearch by issuing the following command:
    • For Mac/Linux, use the following command:
curl --location -k --request PUT https://localhost:8157/processed_reviews \
--header 'Content-Type: application/json' \
--data-raw '{
  "mappings": {
    "properties": {
        "reviewId": {"type": "integer"},
        "userId": {"type": "keyword"},
        "summary": {"type": "keyword"},
        "sentiment": {"type": "keyword"},
        "dateTime": {"type": "date"}}}}}'
    • For Windows, use the following command:
$url = https://localhost:8157/processed_reviews
$headers = @{
    "Content-Type" = "application/json"
}
$body = @{
    "mappings" = @{
        "properties" = @{
            "reviewId" = @{ "type" = "integer" }
            "userId" = @{ "type" = "keyword" }
            "summary" = @{ "type" = "keyword" }
            "sentiment" = @{ "type" = "keyword" }
            "dateTime" = @{ "type" = "date" }
        }
    }
} | ConvertTo-Json -Depth 3
Invoke-RestMethod -Method Put -Uri $url -Headers $headers -Body $body -SkipCertificateCheck
  1. After the session is established, you can open your browser and navigate to https://localhost:8157/_dashboards. Your browser might consider the URL not secure. You can ignore this warning.
  2. Choose Dashboards Management under Management in the navigation pane.
  3. Choose Saved objects in the sidebar.
  4. Import export.ndjson, which can be found in the resources folder within the downloaded repository.

OpenSearch Dashboards Upload

  1. After you import the saved objects, you can navigate to Dashboards under My Dashboard in the navigation pane.

At the moment, the dashboard appears blank because you haven’t uploaded any review data to OpenSearch yet.

Set up the streaming producer

Finally, you can set up the producer that will be streaming review data to the Kinesis Data Stream and ultimately to the OpenSearch Dashboards. The Large Movie Review Dataset was originally published in 2011 in the paper “Learning Word Vectors for Sentiment Analysis” by Andrew L. Maas, Raymond E. Daly, Peter T. Pham, Dan Huang, Andrew Y. Ng, and Christopher Potts. Complete the following steps:

  1. Download the Large Movie Review Dataset here.
  2. After the download is complete, extract the .tar.gz file to retrieve the folder named aclImdb 3 or similar that contains the review data. Rename the review data folder to aclImdb.
  3. Move the extracted dataset to data/ inside the repository that you previously downloaded.

Your repository should look like the following screenshot.

Folder Overview

  1. Modify the DATA_DIR path in producer/producer.py if the review data is named differently.
  2. Move to the producer directory using the following command:
    cd producer

  3. Install the required dependencies and start generating the data:
    pip install -r requirements.txt && python produce.py

The OpenSearch dashboard should be populated after you start generating streaming data and writing it to the Kinesis Data Stream. Refresh the dashboard to view the latest data. The dashboard shows the total number of processed reviews, the sentiment distribution of the processed reviews in a pie chart, and the summary and sentiment for the latest reviews that have been processed.

When you have a closer look at the Flink application, you will notice that the application marks the sentiment field with the value error whenever there is an error with the asynchronous call made by Flink to the Amazon Bedrock API. The Flink application simply filters the correctly processed reviews and writes them to the OpenSearch dashboard.

For robust error handling, you should write any incorrectly processed reviews to a separate output stream and not discard them completely. This separation allows you to handle failed reviews differently than successful ones for simpler reprocessing, analysis, and troubleshooting.

Clean up

When you’re done with the resources you created, complete the following steps:

  1. Delete the Python producer using Ctrl/Command + C.
  2. Destroy your AWS CDK stack by returning to the root folder and running the following command in your terminal:
    cd cdk && cdk destroy

  3. When asked to confirm the deletion of the stack, enter yes.

Conclusion

In this post, you learned how to incorporate generative AI capabilities in your streaming architecture using Amazon Bedrock and Managed Service for Apache Flink using asynchronous requests. We also gave guidance on prompt engineering to derive the sentiment from text data using generative AI. You can build this architecture by deploying the sample code from the GitHub repository.

For more information on how to get started with Managed Service for Apache Flink, refer to Getting started with Amazon Managed Service for Apache Flink (DataStream API). For details on how to set up Amazon Bedrock, refer to Set up Amazon Bedrock. For other posts on Managed Service for Apache Flink, browse through the AWS Big Data Blog.


About the Authors

Felix John is a Solutions Architect and data streaming expert at AWS, based in Germany. He focuses on supporting small and medium businesses on their cloud journey. Outside of his professional life, Felix enjoys playing Floorball and hiking in the mountains.

Michelle Mei-Li Pfister is a Solutions Architect at AWS. She is supporting customers in retail and consumer packaged goods (CPG) industry on their cloud journey. She is passionate about topics around data and machine learning.

Build Spark Structured Streaming applications with the open source connector for Amazon Kinesis Data Streams

Post Syndicated from Idan Maizlits original https://aws.amazon.com/blogs/big-data/build-spark-structured-streaming-applications-with-the-open-source-connector-for-amazon-kinesis-data-streams/

Apache Spark is a powerful big data engine used for large-scale data analytics. Its in-memory computing makes it great for iterative algorithms and interactive queries. You can use Apache Spark to process streaming data from a variety of streaming sources, including Amazon Kinesis Data Streams for use cases like clickstream analysis, fraud detection, and more. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at any scale.

With the new open source Amazon Kinesis Data Streams Connector for Spark Structured Streaming, you can use the newer Spark Data Sources API. It also supports enhanced fan-out for dedicated read throughput and faster stream processing. In this post, we deep dive into the internal details of the connector and show you how to use it to consume and produce records from and to Kinesis Data Streams using Amazon EMR.

Introducing the Kinesis Data Streams connector for Spark Structured Streaming

The Kinesis Data Streams connector for Spark Structured Streaming is an open source connector that supports both provisioned and On-Demand capacity modes offered by Kinesis Data Streams. The connector is built using the latest Spark Data Sources API V2, which uses Spark optimizations. Starting with Amazon EMR 7.1, the connector comes pre-packaged on Amazon EMR on Amazon EKS, Amazon EMR on Amazon EC2, and Amazon EMR Serverless, so you don’t need to build or download any packages. For using it with other Apache Spark platforms, the connector is available as a public JAR file that can be directly referred to while submitting a Spark Structured Streaming job. Additionally, you can download and build the connector from the GitHub repo.

Kinesis Data Streams supports two types of consumers: shared throughput and dedicated throughput. With shared throughput, 2 Mbps of read throughput per shard is shared across consumers. With dedicated throughput, also known as enhanced fan-out, 2 Mbps of read throughput per shard is dedicated to each consumer. This new connector supports both consumer types out of the box without any additional coding, providing you the flexibility to consume records from your streams based on your requirements. By default, this connector uses a shared throughput consumer, but you can configure it to use enhanced fan-out in the configuration properties.

You can also use the connector as a sink connector to produce records to a Kinesis data stream. The configuration parameters for using the connector as a source and sink differ—for more information, see Kinesis Source Configuration. The connector also supports multiple storage options, including Amazon DynamoDB, Amazon Simple Service for Storage (Amazon S3), and HDFS, to store checkpoints and provide continuity.

For scenarios where a Kinesis data stream is deployed in an AWS producer account and the Spark Structured Streaming application is in a different AWS consumer account, you can use the connector to do cross-account processing. This requires additional Identity and Access Management (IAM) trust policies to allow the Spark Structured Streaming application in the consumer account to assume the role in the producer account.

You should also consider reviewing the security configuration with your security teams based on your data security requirements.

How the connector works

Consuming records from Kinesis Data Streams using the connector involves multiple steps. The following architecture diagram shows the internal details of how the connector works. A Spark Structured Streaming application consumes records from a Kinesis data stream source and produces records to another Kinesis data stream.

A Kinesis data stream is composed of set of shards. A shard is a uniquely identified sequence of data records in a stream and provides a fixed unit of capacity. The total capacity of the stream is the sum of the capacity of all of its shards.

A Spark application consists of a driver and a set of executor processes. The Spark driver acts as a coordinator, and the tasks running in executors are responsible for producing and consuming records to and from shards.

The solution workflow includes the following steps:

  1. Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs. At the beginning of a micro-batch run, the driver uses the Kinesis Data Streams ListShard API to determine the latest description of all available shards. The connector exposes a parameter (kinesis.describeShardInterval) to configure the interval between two successive ListShard API calls.
  2. The driver then determines the starting position in each shard. If the application is a new job, the starting position of each shard is determined by kinesis.startingPosition. If it’s a restart of an existing job, it’s read from last record metadata checkpoint from storage (for this post, DynamoDB) and ignores kinesis.startingPosition.
  3. Each shard is mapped to one task in an executor, which is responsible for reading data. The Spark application automatically creates an equal number of tasks based on the number of shards and distributes it across the executors.
  4. The tasks in an executor use either polling mode (shared) or push mode (enhanced fan-out) to get data records from the starting position for a shard.
  5. Spark tasks running in the executors write the processed data to the data sink. In this architecture, we use the Kinesis Data Streams sink to illustrate how the connector writes back to the stream. Executors can write to more than one Kinesis Data Streams output shard.
  6. At the end of each task, the corresponding executor process saves the metadata (checkpoint) about the last record read for each shard in the offset storage (for this post, DynamoDB). This information is used by the driver in the construction of the next micro-batch.

Solution overview

The following diagram shows an example architecture of how to use the connector to read from one Kinesis data stream and write to another.

In this architecture, we use the Amazon Kinesis Data Generator (KDG) to generate sample streaming data (random events per country) to a Kinesis Data Streams source. We start an interactive Spark Structured Streaming session and consume data from the Kinesis data stream, and then write to another Kinesis data stream.

We use Spark Structured Streaming to count events per micro-batch window. These events for each country are being consumed from Kinesis Data Streams. After the count, we can see the results.

Prerequisites

To get started, follow the instructions in the GitHub repo. You need the following prerequisites:

After you deploy the solution using the AWS CDK, you will have the following resources:

  • An EMR cluster with the Kinesis Spark connector installed
  • A Kinesis Data Streams source
  • A Kinesis Data Streams sink

Create your Spark Structured Streaming application

After the deployment is complete, you can access the EMR primary node to start a Spark application and write your Spark Structured Streaming logic.

As we mentioned earlier, you use the new open source Kinesis Spark connector to consume data from Amazon EMR. You can find the connector code on the GitHub repo along with examples on how to build and set up the connector in Spark.

In this post, we use Amazon EMR 7.1, where the connector is natively available. If you’re not using Amazon EMR 7.1 and above, you can use the connector by running the following code:

cd /usr/lib/spark/jars 
sudo wget https://awslabs-code-us-east-1.s3.amazonaws.com/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.2.1.jar
sudo chmod 755 spark-streaming-sql-kinesis-connector_2.12-1.2.1.jar

Complete the following steps:

  1. On the Amazon EMR console, navigate to the emr-spark-kinesis cluster.
  2. On the Instances tab, select the primary instance and choose the Amazon Elastic Compute Cloud (Amazon EC2) instance ID.

You’re redirected to the Amazon EC2 console.

  1. On the Amazon EC2 console, select the primary instance and choose Connect.
  2. Use Session Manager, a capability of AWS Systems Manager, to connect to the instance.
  3. Because the user that is used to connect is the ssm-user, we need to switch to the Hadoop user:
    sudo su hadoop

  4. Start a Spark shell either using Scala or Python to interactively build a Spark Structured Streaming application to consume data from a Kinesis data stream.

For this post, we use Python for writing to a stream using a PySpark shell in Amazon EMR.

  1. Start the PySpark shell by entering the command pyspark.

Because you already have the connector installed in the EMR cluster, you can now create the Kinesis source.

  1. Create the Kinesis source with the following code:
    kinesis = spark.readStream.format("aws-kinesis") \
        .option("kinesis.region", "<aws-region>") \
        .option("kinesis.streamName", "kinesis-source") \
        .option("kinesis.consumerType", "GetRecords") \
        .option("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") \
        .option("kinesis.startingposition", "LATEST") \
        .load()

For creating the Kinesis source, the following parameters are required:

  • Name of the connector – We use the connector name aws-kinesis
  • kinesis.region – The AWS Region of the Kinesis data stream you are consuming
  • kinesis.consumerType – Use GetRecords (standard consumer) or SubscribeToShard (enhanced fan-out consumer)
  • kinesis.endpointURL – The Regional Kinesis endpoint (for more details, see Service endpoints)
  • kinesis.startingposition – Choose LATEST, TRIM_HORIZON, or AT_TIMESTAMP (refer to ShardIteratorType)

For using an enhanced fan-out consumer, additional parameters are needed, such as the consumer name. The additional configuration can be found in the connector’s GitHub repo.

kinesis_efo = spark \
.readStream \
.format("aws-kinesis") \
.option("kinesis.region", "<aws-region>") \
.option("kinesis.streamName", "kinesis-source") \
.option("kinesis.consumerType", "SubscribeToShard") \
.option("kinesis.consumerName", "efo-consumer") \
.option("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") \
.option("kinesis.startingposition", "LATEST") \
.load()

Deploy the Kinesis Data Generator

Complete the following steps to deploy the KDG and start generating data:

  1. Choose Launch Stack:
    launch stack 1

You might need to change your Region when deploying. Make sure that the KDG is launched in the same Region as where you deployed the solution.

  1. For the parameters Username and Password, enter the values of your choice. Note these values to use later when you log in to the KDG.
  2. When the template has finished deploying, go to the Outputs tab of the stack and locate the KDG URL.
  3. Log in to the KDG, using the credentials you set when launching the CloudFormation template.
  4. Specify your Region and data stream name, and use the following template to generate test data:
    {
        "id": {{random.number(100)}},
        "data": "{{random.arrayElement(
            ["Spain","Portugal","Finland","France"]
        )}}",
        "date": "{{date.now("YYYY-MM-DD hh:mm:ss")}}"
    }

  5. Return to Systems Manager to continue working with the Spark application.
  6. To be able to apply transformations based on the fields of the events, you first need to define the schema for the events:
    from pyspark.sql.types import *
    
    pythonSchema = StructType() \
     .add("id", LongType()) \
     .add("data", StringType()) \
     .add("date", TimestampType())

  7. Run the following the command to consume data from Kinesis Data Streams:
    from pyspark.sql.functions import *
    
    events= kinesis \
      .selectExpr("cast (data as STRING) jsonData") \
      .select(from_json("jsonData", pythonSchema).alias("events")) \
      .select("events.*")

  8. Use the following code for the Kinesis Spark connector sink:
    events \
        .selectExpr("CAST(id AS STRING) as partitionKey","data","date") \
        .writeStream \
        .format("aws-kinesis") \
        .option("kinesis.region", "<aws-region>") \
        .outputMode("append") \
        .option("kinesis.streamName", "kinesis-sink") \
        .option("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") \
        .option("checkpointLocation", "/kinesisCheckpoint") \
        .start() \
        .awaitTermination()

You can view the data in the Kinesis Data Streams console.

  1. On the Kinesis Data Streams console, navigate to kinesis-sink.
  2. On the Data viewer tab, choose a shard and a starting position (for this post, we use Latest) and choose Get records.

You can see the data sent, as shown in the following screenshot. Kinesis Data Streams uses base64 encoding by default, so you might see text with unreadable characters.

Clean up

Delete the following CloudFormation stacks created during this deployment to delete all the provisioned resources:

  • EmrSparkKinesisStack
  • Kinesis-Data-Generator-Cognito-User-SparkEFO-Blog

If you created any additional resources during this deployment, delete them manually.

Conclusion

In this post, we discussed the open source Kinesis Data Streams connector for Spark Structured Streaming. It supports the newer Data Sources API V2 and Spark Structured Streaming for building streaming applications. The connector also enables high-throughput consumption from Kinesis Data Streams with enhanced fan-out by providing dedicated throughput up to 2 Mbps per shard per consumer. With this connector, you can now effortlessly build high-throughput streaming applications with Spark Structured Streaming.

The Kinesis Spark connector is open source under the Apache 2.0 license on GitHub. To get started, visit the GitHub repo.


About the Authors


Idan Maizlits is a Senior Product Manager on the Amazon Kinesis Data Streams team at Amazon Web Services. Idan loves engaging with customers to learn about their challenges with real-time data and to help them achieve their business goals. Outside of work, he enjoys spending time with his family exploring the outdoors and cooking.


Subham Rakshit is a Streaming Specialist Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build search and streaming data platforms that help them achieve their business objective. Outside of work, he enjoys spending time solving jigsaw puzzles with his daughter.

Francisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon MSK and AWS’s managed offering for Apache Flink.

Umesh Chaudhari is a Streaming Solutions Architect at AWS. He works with customers to design and build real-time data processing systems. He has extensive working experience in software engineering, including architecting, designing, and developing data analytics systems. Outside of work, he enjoys traveling, reading, and watching movies.

Uplevel your data architecture with real- time streaming using Amazon Data Firehose and Snowflake

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/uplevel-your-data-architecture-with-real-time-streaming-using-amazon-data-firehose-and-snowflake/

Today’s fast-paced world demands timely insights and decisions, which is driving the importance of streaming data. Streaming data refers to data that is continuously generated from a variety of sources. The sources of this data, such as clickstream events, change data capture (CDC), application and service logs, and Internet of Things (IoT) data streams are proliferating. Snowflake offers two options to bring streaming data into its platform: Snowpipe and Snowflake Snowpipe Streaming. Snowpipe is suitable for file ingestion (batching) use cases, such as loading large files from Amazon Simple Storage Service (Amazon S3) to Snowflake. Snowpipe Streaming, a newer feature released in March 2023, is suitable for rowset ingestion (streaming) use cases, such as loading a continuous stream of data from Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Before Snowpipe Streaming, AWS customers used Snowpipe for both use cases: file ingestion and rowset ingestion. First, you ingested streaming data to Kinesis Data Streams or Amazon MSK, then used Amazon Data Firehose to aggregate and write streams to Amazon S3, followed by using Snowpipe to load the data into Snowflake. However, this multi-step process can result in delays of up to an hour before data is available for analysis in Snowflake. Moreover, it’s expensive, especially when you have small files that Snowpipe has to upload to the Snowflake customer cluster.

To solve this issue, Amazon Data Firehose now integrates with Snowpipe Streaming, enabling you to capture, transform, and deliver data streams from Kinesis Data Streams, Amazon MSK, and Firehose Direct PUT to Snowflake in seconds at a low cost. With a few clicks on the Amazon Data Firehose console, you can set up a Firehose stream to deliver data to Snowflake. There are no commitments or upfront investments to use Amazon Data Firehose, and you only pay for the amount of data streamed.

Some key features of Amazon Data Firehose include:

  • Fully managed serverless service – You don’t need to manage resources, and Amazon Data Firehose automatically scales to match the throughput of your data source without ongoing administration.
  • Straightforward to use with no code – You don’t need to write applications.
  • Real-time data delivery – You can get data to your destinations quickly and efficiently in seconds.
  • Integration with over 20 AWS services – Seamless integration is available for many AWS services, such as Kinesis Data Streams, Amazon MSK, Amazon VPC Flow Logs, AWS WAF logs, Amazon CloudWatch Logs, Amazon EventBridge, AWS IoT Core, and more.
  • Pay-as-you-go model – You only pay for the data volume that Amazon Data Firehose processes.
  • Connectivity – Amazon Data Firehose can connect to public or private subnets in your VPC.

This post explains how you can bring streaming data from AWS into Snowflake within seconds to perform advanced analytics. We explore common architectures and illustrate how to set up a low-code, serverless, cost-effective solution for low-latency data streaming.

Overview of solution

The following are the steps to implement the solution to stream data from AWS to Snowflake:

  1. Create a Snowflake database, schema, and table.
  2. Create a Kinesis data stream.
  3. Create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination using a secure private link.
  4. To test the setup, generate sample stream data from the Amazon Kinesis Data Generator (KDG) with the Firehose delivery stream as the destination.
  5. Query the Snowflake table to validate the data loaded into Snowflake.

The solution is depicted in the following architecture diagram.

Prerequisites

You should have the following prerequisites:

Create a Snowflake database, schema, and table

Complete the following steps to set up your data in Snowflake:

  • Log in to your Snowflake account and create the database:
    create database adf_snf;

  • Create a schema in the new database:
    create schema adf_snf.kds_blog;

  • Create a table in the new schema:
    create or replace table iot_sensors
    (sensorId number,
    sensorType varchar,
    internetIP varchar,
    connectionTime timestamp_ntz,
    currentTemperature number
    );

Create a Kinesis data stream

Complete the following steps to create your data stream:

  • On the Kinesis Data Streams console, choose Data streams in the navigation pane.
  • Choose Create data stream.
  • For Data stream name, enter a name (for example, KDS-Demo-Stream).
  • Leave the remaining settings as default.
  • Choose Create data stream.

Create a Firehose delivery stream

Complete the following steps to create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination:

  • On the Amazon Data Firehose console, choose Create Firehose stream.
  • For Source, choose Amazon Kinesis Data Streams.
  • For Destination, choose Snowflake.
  • For Kinesis data stream, browse to the data stream you created earlier.
  • For Firehose stream name, leave the default generated name or enter a name of your preference.
  • Under Connection settings, provide the following information to connect Amazon Data Firehose to Snowflake:
    • For Snowflake account URL, enter your Snowflake account URL.
    • For User, enter the user name generated in the prerequisites.
    • For Private key, enter the private key generated in the prerequisites. Make sure the private key is in PKCS8 format. Do not include the PEM header-BEGIN prefix and footer-END suffix as part of the private key. If the key is split across multiple lines, remove the line breaks.
    • For Role, select Use custom Snowflake role and enter the IAM role that has access to write to the database table.

You can connect to Snowflake using public or private connectivity. If you don’t provide a VPC endpoint, the default connectivity mode is public. To allow list Firehose IPs in your Snowflake network policy, refer to Choose Snowflake for Your Destination. If you’re using a private link URL, provide the VPCE ID using SYSTEM$GET_PRIVATELINK_CONFIG:

select SYSTEM$GET_PRIVATELINK_CONFIG();

This function returns a JSON representation of the Snowflake account information necessary to facilitate the self-service configuration of private connectivity to the Snowflake service, as shown in the following screenshot.

  • For this post, we’re using a private link, so for VPCE ID, enter the VPCE ID.
  • Under Database configuration settings, enter your Snowflake database, schema, and table names.
  • In the Backup settings section, for S3 backup bucket, enter the bucket you created as part of the prerequisites.
  • Choose Create Firehose stream.

Alternatively, you can use an AWS CloudFormation template to create the Firehose delivery stream with Snowflake as the destination rather than using the Amazon Data Firehose console.

To use the CloudFormation stack, choose

BDB-4100-CFN-Launch-Stack

Generate sample stream data
Generate sample stream data from the KDG with the Kinesis data stream you created:

{ 
"sensorId": {{random.number(999999999)}}, 
"sensorType": "{{random.arrayElement( ["Thermostat","SmartWaterHeater","HVACTemperatureSensor","WaterPurifier"] )}}", 
"internetIP": "{{internet.ip}}", 
"connectionTime": "{{date.now("YYYY-MM-DDTHH:m:ss")}}", 
"currentTemperature": {{random.number({"min":10,"max":150})}} 
}

Query the Snowflake table

Query the Snowflake table:

select * from adf_snf.kds_blog.iot_sensors;

You can confirm that the data generated by the KDG that was sent to Kinesis Data Streams is loaded into the Snowflake table through Amazon Data Firehose.

Troubleshooting

If data is not loaded into Kinesis Data Steams after the KDG sends data to the Firehose delivery stream, refresh and make sure you are logged in to the KDG.

If you made any changes to the Snowflake destination table definition, recreate the Firehose delivery stream.

Clean up

To avoid incurring future charges, delete the resources you created as part of this exercise if you are not planning to use them further.

Conclusion

Amazon Data Firehose provides a straightforward way to deliver data to Snowpipe Streaming, enabling you to save costs and reduce latency to seconds. To try Amazon Kinesis Firehose with Snowflake, refer to the Amazon Data Firehose with Snowflake as destination lab.


About the Authors

Swapna Bandla is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. Swapna has a passion towards understanding customers data and analytics needs and empowering them to develop cloud-based well-architected solutions. Outside of work, she enjoys spending time with her family.

Mostafa Mansour is a Principal Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Firehose. He specializes in developing intuitive product experiences that solve complex challenges for customers at scale. When he’s not hard at work on Amazon Kinesis Data Firehose, you’ll likely find Mostafa on the squash court, where he loves to take on challengers and perfect his dropshots.

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.

Deliver decompressed Amazon CloudWatch Logs to Amazon S3 and Splunk using Amazon Data Firehose

Post Syndicated from Ranjit Kalidasan original https://aws.amazon.com/blogs/big-data/deliver-decompressed-amazon-cloudwatch-logs-to-amazon-s3-and-splunk-using-amazon-data-firehose/

You can use Amazon Data Firehose to aggregate and deliver log events from your applications and services captured in Amazon CloudWatch Logs to your Amazon Simple Storage Service (Amazon S3) bucket and Splunk destinations, for use cases such as data analytics, security analysis, application troubleshooting etc. By default, CloudWatch Logs are delivered as gzip-compressed objects. You might want the data to be decompressed, or want logs to be delivered to Splunk, which requires decompressed data input, for application monitoring and auditing.

AWS released a feature to support decompression of CloudWatch Logs in Firehose. With this new feature, you can specify an option in Firehose to decompress CloudWatch Logs. You no longer have to perform additional processing using AWS Lambda or post-processing to get decompressed logs, and can deliver decompressed data to Splunk. Additionally, you can use optional Firehose features such as record format conversion to convert CloudWatch Logs to Parquet or ORC, and dynamic partitioning to automatically group streaming records based on keys in the data (for example, by month) and deliver the grouped records to corresponding Amazon S3 prefixes.

In this post, we look at how to enable the decompression feature for Splunk and Amazon S3 destinations. We start with Splunk and then Amazon S3 for new streams, then we address migration steps to take advantage of this feature and simplify your existing pipeline.

Decompress CloudWatch Logs for Splunk

You can use subscription filter in CloudWatch log groups to ingest data directly to Firehose or through Amazon Kinesis Data Streams.

Note: For the CloudWatch Logs decompression feature, you need a HTTP Event Collector (HEC) data input created in Splunk, with indexer acknowledgement enabled and the source type. This is required to map to the right source type for the decompressed logs. When creating the HEC input, include the source type mapping (for example, aws:cloudtrail).

To create a Firehose delivery stream for the decompression feature, complete the following steps:

  1. Provide your destination settings and select Raw endpoint as endpoint type.

You can use a raw endpoint for the decompression feature to ingest both raw and JSON-formatted event data to Splunk. For example, VPC Flow Logs data is raw data, and AWS CloudTrail data is in JSON format.

  1. Enter the HEC token for Authentication token.
  2. To enable decompression feature, deselect Transform source records with AWS Lambda under Transform records.
  3. Select Turn on decompression and Turn on message extraction for Decompress source records from Amazon CloudWatch Logs.
  4. Select Turn on message extraction for the Splunk destination.

Message extraction feature

After decompression, CloudWatch Logs are in JSON format, as shown in the following figure. You can see the decompressed data has metadata information such as logGroup, logStream, and subscriptionFilters, and the actual data is included within the message field under logEvents (the following example shows an example of CloudTrail events in the CloudWatch Logs).

When you enable message extraction, Firehose will extract just the contents of the message fields and concatenate the contents with a new line between them, as shown in following figure. With the CloudWatch Logs metadata filtered out with this feature, Splunk will successfully parse the actual log data and map to the source type configured in HEC token.

Additionally, If you want to deliver these CloudWatch events to your Splunk destination in real time, you can use zero buffering, a new feature that was launched recently in Firehose. You can use this feature to set up 0 seconds as the buffer interval or any time interval between 0–60 seconds to deliver data to the Splunk destination in real time within seconds.

With these settings, you can now seamlessly ingest decompressed CloudWatch log data into Splunk using Firehose.

Decompress CloudWatch Logs for Amazon S3

The CloudWatch Logs decompression feature for an Amazon S3 destination works similar to Splunk, where you can turn off data transformation using Lambda and turn on the decompression and message extraction options. You can use the decompression feature to write the log data as a text file to the Amazon S3 destination or use with other Amazon S3 destination features like record format conversion using Parquet or ORC, or dynamic partitioning to partition the data.

Dynamic partitioning with decompression

For Amazon S3 destination, Firehose supports dynamic partitioning, which enables you to continuously partition streaming data by using keys within data, and then deliver the data grouped by these keys into corresponding Amazon S3 prefixes. This enables you to run high-performance, cost-efficient analytics on streaming data in Amazon S3 using services such as Amazon Athena, Amazon EMR, Amazon Redshift Spectrum, and Amazon QuickSight. Partitioning your data minimizes the amount of data scanned, optimizes performance, and reduces costs of your analytics queries on Amazon S3.

With the new decompression feature, you can perform dynamic partitioning without any Lambda function for mapping the partitioning keys on CloudWatch Logs. You can enable the Inline parsing for JSON option, scan the decompressed log data, and select the partitioning keys. The following screenshot shows an example where inline parsing is enabled for CloudTrail log data with a partitioning schema selected for account ID and AWS Region in the CloudTrail record.

Record format conversion with decompression

For CloudWatch Logs data, you can use the record format conversion feature on decompressed data for Amazon S3 destination. Firehose can convert the input data format from JSON to Apache Parquet or Apache ORC before storing the data in Amazon S3. Parquet and ORC are columnar data formats that save space and enable faster queries compared to row-oriented formats like JSON. You can use the features for record format conversion under the Transform and convert records settings to convert the CloudWatch log data to Parquet or ORC format. The following screenshot shows an example of record format conversion settings for Parquet format using an AWS Glue schema and table for CloudTrail log data. When the dynamic partitioning settings are configured, record format conversion works along with dynamic partitioning to create the files in the output format with a partition folder structure in the target S3 bucket.

Migrate existing delivery streams for decompression

If you want to migrate an existing Firehose stream that uses Lambda for decompression to this new decompression feature of Firehose, refer to the steps outlined in Enabling and disabling decompression.

Pricing

The Firehose decompression feature decompress the data and charges per GB of decompressed data. To understand decompression pricing, refer to Amazon Data Firehose pricing.

Clean up

To avoid incurring future charges, delete the resources you created in the following order:

  1. Delete the CloudWatch Logs subscription filter.
  2. Delete the Firehose delivery stream.
  3. Delete the S3 buckets.

Conclusion

The decompression and message extraction feature of Firehose simplifies delivery of CloudWatch Logs to Amazon S3 and Splunk destinations without requiring any code development or additional processing. For an Amazon S3 destination, you can use Parquet or ORC conversion and dynamic partitioning capabilities on decompressed data.

For more information, refer to the following resources:


About the Authors

Ranjit Kalidasan is a Senior Solutions Architect with Amazon Web Services based in Boston, Massachusetts. He is a Partner Solutions Architect helping security ISV partners co-build and co-market solutions with AWS. He brings over 25 years of experience in information technology helping global customers implement complex solutions for security and analytics. You can connect with Ranjit on LinkedIn.

Phaneendra Vuliyaragoli is a Product Management Lead for Amazon Data Firehose at AWS. In this role, Phaneendra leads the product and go-to-market strategy for Amazon Data Firehose.

Krones real-time production line monitoring with Amazon Managed Service for Apache Flink

Post Syndicated from Florian Mair original https://aws.amazon.com/blogs/big-data/krones-real-time-production-line-monitoring-with-amazon-managed-service-for-apache-flink/

Krones provides breweries, beverage bottlers, and food producers all over the world with individual machines and complete production lines. Every day, millions of glass bottles, cans, and PET containers run through a Krones line. Production lines are complex systems with lots of possible errors that could stall the line and decrease the production yield. Krones wants to detect the failure as early as possible (sometimes even before it happens) and notify production line operators to increase reliability and output. So how to detect a failure? Krones equips their lines with sensors for data collection, which can then be evaluated against rules. Krones, as the line manufacturer, as well as the line operator have the possibility to create monitoring rules for machines. Therefore, beverage bottlers and other operators can define their own margin of error for the line. In the past, Krones used a system based on a time series database. The main challenges were that this system was hard to debug and also queries represented the current state of machines but not the state transitions.

This post shows how Krones built a streaming solution to monitor their lines, based on Amazon Kinesis and Amazon Managed Service for Apache Flink. These fully managed services reduce the complexity of building streaming applications with Apache Flink. Managed Service for Apache Flink manages the underlying Apache Flink components that provide durable application state, metrics, logs, and more, and Kinesis enables you to cost-effectively process streaming data at any scale. If you want to get started with your own Apache Flink application, check out the GitHub repository for samples using the Java, Python, or SQL APIs of Flink.

Overview of solution

Krones’s line monitoring is part of the Krones Shopfloor Guidance system. It provides support in the organization, prioritization, management, and documentation of all activities in the company. It allows them to notify an operator if the machine is stopped or materials are required, regardless where the operator is in the line. Proven condition monitoring rules are already built-in but can also be user defined via the user interface. For example, if a certain data point that is monitored violates a threshold, there can be a text message or trigger for a maintenance order on the line.

The condition monitoring and rule evaluation system is built on AWS, using AWS analytics services. The following diagram illustrates the architecture.

Architecture Diagram for Krones Production Line Monitoring

Almost every data streaming application consists of five layers: data source, stream ingestion, stream storage, stream processing, and one or more destinations. In the following sections, we dive deeper into each layer and how the line monitoring solution, built by Krones, works in detail.

Data source

The data is gathered by a service running on an edge device reading several protocols like Siemens S7 or OPC/UA. Raw data is preprocessed to create a unified JSON structure, which makes it easier to process later on in the rule engine. A sample payload converted to JSON might look like the following:

{
  "version": 1,
  "timestamp": 1234,
  "equipmentId": "84068f2f-3f39-4b9c-a995-d2a84d878689",
  "tag": "water_temperature",
  "value": 13.45,
  "quality": "Ok",
  "meta": {      
    "sequenceNumber": 123,
    "flags": ["Fst", "Lst", "Wmk", "Syn", "Ats"],
    "createdAt": 12345690,
    "sourceId": "filling_machine"
  }
}

Stream ingestion

AWS IoT Greengrass is an open source Internet of Things (IoT) edge runtime and cloud service. This allows you to act on data locally and aggregate and filter device data. AWS IoT Greengrass provides prebuilt components that can be deployed to the edge. The production line solution uses the stream manager component, which can process data and transfer it to AWS destinations such as AWS IoT Analytics, Amazon Simple Storage Service (Amazon S3), and Kinesis. The stream manager buffers and aggregates records, then sends it to a Kinesis data stream.

Stream storage

The job of the stream storage is to buffer messages in a fault tolerant way and make it available for consumption to one or more consumer applications. To achieve this on AWS, the most common technologies are Kinesis and Amazon Managed Streaming for Apache Kafka (Amazon MSK). For storing our sensor data from production lines, Krones choose Kinesis. Kinesis is a serverless streaming data service that works at any scale with low latency. Shards within a Kinesis data stream are a uniquely identified sequence of data records, where a stream is composed of one or more shards. Each shard has 2 MB/s of read capacity and 1 MB/s write capacity (with max 1,000 records/s). To avoid hitting those limits, data should be distributed among shards as evenly as possible. Every record that is sent to Kinesis has a partition key, which is used to group data into a shard. Therefore, you want to have a large number of partition keys to distribute the load evenly. The stream manager running on AWS IoT Greengrass supports random partition key assignments, which means that all records end up in a random shard and the load is distributed evenly. A disadvantage of random partition key assignments is that records aren’t stored in order in Kinesis. We explain how to solve this in the next section, where we talk about watermarks.

Watermarks

A watermark is a mechanism used to track and measure the progress of event time in a data stream. The event time is the timestamp from when the event was created at the source. The watermark indicates the timely progress of the stream processing application, so all events with an earlier or equal timestamp are considered as processed. This information is essential for Flink to advance event time and trigger relevant computations, such as window evaluations. The allowed lag between event time and watermark can be configured to determine how long to wait for late data before considering a window complete and advancing the watermark.

Krones has systems all around the globe, and needed to handle late arrivals due to connection losses or other network constraints. They started out by monitoring late arrivals and setting the default Flink late handling to the maximum value they saw in this metric. They experienced issues with time synchronization from the edge devices, which lead them to a more sophisticated way of watermarking. They built a global watermark for all the senders and used the lowest value as the watermark. The timestamps are stored in a HashMap for all incoming events. When the watermarks are emitted periodically, the smallest value of this HashMap is used. To avoid stalling of watermarks by missing data, they configured an idleTimeOut parameter, which ignores timestamps that are older than a certain threshold. This increases latency but gives strong data consistency.

public class BucketWatermarkGenerator implements WatermarkGenerator<DataPointEvent> {
private HashMap <String, WatermarkAndTimestamp> lastTimestamps;
private Long idleTimeOut;
private long maxOutOfOrderness;
}

Stream processing

After the data is collected from sensors and ingested into Kinesis, it needs to be evaluated by a rule engine. A rule in this system represents the state of a single metric (such as temperature) or a collection of metrics. To interpret a metric, more than one data point is used, which is a stateful calculation. In this section, we dive deeper into the keyed state and broadcast state in Apache Flink and how they’re used to build the Krones rule engine.

Control stream and broadcast state pattern

In Apache Flink, state refers to the ability of the system to store and manage information persistently across time and operations, enabling the processing of streaming data with support for stateful computations.

The broadcast state pattern allows the distribution of a state to all parallel instances of an operator. Therefore, all operators have the same state and data can be processed using this same state. This read-only data can be ingested by using a control stream. A control stream is a regular data stream, but usually with a much lower data rate. This pattern allows you to dynamically update the state on all operators, enabling the user to change the state and behavior of the application without the need for a redeploy. More precisely, the distribution of the state is done by the use of a control stream. By adding a new record into the control stream, all operators receive this update and are using the new state for the processing of new messages.

This allows users of Krones application to ingest new rules into the Flink application without restarting it. This avoids downtime and gives a great user experience as changes happen in real time. A rule covers a scenario in order to detect a process deviation. Sometimes, the machine data is not as easy to interpret as it might look at first glance. If a temperature sensor is sending high values, this might indicate an error, but also be the effect of an ongoing maintenance procedure. It’s important to put metrics in context and filter some values. This is achieved by a concept called grouping.

Grouping of metrics

The grouping of data and metrics allows you to define the relevance of incoming data and produce accurate results. Let’s walk through the example in the following figure.

Grouping of metrics

In Step 1, we define two condition groups. Group 1 collects the machine state and which product is going through the line. Group 2 uses the value of the temperature and pressure sensors. A condition group can have different states depending on the values it receives. In this example, group 1 receives data that the machine is running, and the one-liter bottle is selected as the product; this gives this group the state ACTIVE. Group 2 has metrics for temperature and pressure; both metrics are above their thresholds for more than 5 minutes. This results in group 2 being in a WARNING state. This means group 1 reports that everything is fine and group 2 does not. In Step 2, weights are added to the groups. This is needed in some situations, because groups might report conflicting information. In this scenario, group 1 reports ACTIVE and group 2 reports WARNING, so it’s not clear to the system what the state of the line is. After adding the weights, the states can be ranked, as shown in step 3. Lastly, the highest ranked state is chosen as the winning one, as shown in Step 4.

After the rules are evaluated and the final machine state is defined, the results will be further processed. The action taken depends on the rule configuration; this can be a notification to the line operator to restock materials, do some maintenance, or just a visual update on the dashboard. This part of the system, which evaluates metrics and rules and takes actions based on the results, is referred to as a rule engine.

Scaling the rule engine

By letting users build their own rules, the rule engine can have a high number of rules that it needs to evaluate, and some rules might use the same sensor data as other rules. Flink is a distributed system that scales very well horizontally. To distribute a data stream to several tasks, you can use the keyBy() method. This allows you to partition a data stream in a logical way and send parts of the data to different task managers. This is often done by choosing an arbitrary key so you get an evenly distributed load. In this case, Krones added a ruleId to the data point and used it as a key. Otherwise, data points that are needed are processed by another task. The keyed data stream can be used across all rules just like a regular variable.

Destinations

When a rule changes its state, the information is sent to a Kinesis stream and then via Amazon EventBridge to consumers. One of the consumers creates a notification from the event that is transmitted to the production line and alerts the personnel to act. To be able to analyze the rule state changes, another service writes the data to an Amazon DynamoDB table for fast access and a TTL is in place to offload long-term history to Amazon S3 for further reporting.

Conclusion

In this post, we showed you how Krones built a real-time production line monitoring system on AWS. Managed Service for Apache Flink allowed the Krones team to get started quickly by focusing on application development rather than infrastructure. The real-time capabilities of Flink enabled Krones to reduce machine downtime by 10% and increase efficiency up to 5%.

If you want to build your own streaming applications, check out the available samples on the GitHub repository. If you want to extend your Flink application with custom connectors, see Making it Easier to Build Connectors with Apache Flink: Introducing the Async Sink. The Async Sink is available in Apache Flink version 1.15.1 and later.


About the Authors

Florian Mair is a Senior Solutions Architect and data streaming expert at AWS. He is a technologist that helps customers in Europe succeed and innovate by solving business challenges using AWS Cloud services. Besides working as a Solutions Architect, Florian is a passionate mountaineer, and has climbed some of the highest mountains across Europe.

Emil Dietl is a Senior Tech Lead at Krones specializing in data engineering, with a key field in Apache Flink and microservices. His work often involves the development and maintenance of mission-critical software. Outside of his professional life, he deeply values spending quality time with his family.

Simon Peyer is a Solutions Architect at AWS based in Switzerland. He is a practical doer and is passionate about connecting technology and people using AWS Cloud services. A special focus for him is data streaming and automations. Besides work, Simon enjoys his family, the outdoors, and hiking in the mountains.

Exploring real-time streaming for generative AI Applications

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/exploring-real-time-streaming-for-generative-ai-applications/

Foundation models (FMs) are large machine learning (ML) models trained on a broad spectrum of unlabeled and generalized datasets. FMs, as the name suggests, provide the foundation to build more specialized downstream applications, and are unique in their adaptability. They can perform a wide range of different tasks, such as natural language processing, classifying images, forecasting trends, analyzing sentiment, and answering questions. This scale and general-purpose adaptability are what makes FMs different from traditional ML models. FMs are multimodal; they work with different data types such as text, video, audio, and images. Large language models (LLMs) are a type of FM and are pre-trained on vast amounts of text data and typically have application uses such as text generation, intelligent chatbots, or summarization.

Streaming data facilitates the constant flow of diverse and up-to-date information, enhancing the models’ ability to adapt and generate more accurate, contextually relevant outputs. This dynamic integration of streaming data enables generative AI applications to respond promptly to changing conditions, improving their adaptability and overall performance in various tasks.

To better understand this, imagine a chatbot that helps travelers book their travel. In this scenario, the chatbot needs real-time access to airline inventory, flight status, hotel inventory, latest price changes, and more. This data usually comes from third parties, and developers need to find a way to ingest this data and process the data changes as they happen.

Batch processing is not the best fit in this scenario. When data changes rapidly, processing it in a batch may result in stale data being used by the chatbot, providing inaccurate information to the customer, which impacts the overall customer experience. Stream processing, however, can enable the chatbot to access real-time data and adapt to changes in availability and price, providing the best guidance to the customer and enhancing the customer experience.

Another example is an AI-driven observability and monitoring solution where FMs monitor real-time internal metrics of a system and produces alerts. When the model finds an anomaly or abnormal metric value, it should immediately produce an alert and notify the operator. However, the value of such important data diminishes significantly over time. These notifications should ideally be received within seconds or even while it’s happening. If operators receive these notifications minutes or hours after they happened, such an insight is not actionable and has potentially lost its value. You can find similar use cases in other industries such as retail, car manufacturing, energy, and the financial industry.

In this post, we discuss why data streaming is a crucial component of generative AI applications due to its real-time nature. We discuss the value of AWS data streaming services such as Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Firehose in building generative AI applications.

In-context learning

LLMs are trained with point-in-time data and have no inherent ability to access fresh data at inference time. As new data appears, you will have to continuously fine-tune or further train the model. This is not only an expensive operation, but also very limiting in practice because the rate of new data generation far supersedes the speed of fine-tuning. Additionally, LLMs lack contextual understanding and rely solely on their training data, and are therefore prone to hallucinations. This means they can generate a fluent, coherent, and syntactically sound but factually incorrect response. They are also devoid of relevance, personalization, and context.

LLMs, however, have the capacity to learn from the data they receive from the context to more accurately respond without modifying the model weights. This is called in-context learning, and can be used to produce personalized answers or provide an accurate response in the context of organization policies.

For example, in a chatbot, data events could pertain to an inventory of flights and hotels or price changes that are constantly ingested to a streaming storage engine. Furthermore, data events are filtered, enriched, and transformed to a consumable format using a stream processor. The result is made available to the application by querying the latest snapshot. The snapshot constantly updates through stream processing; therefore, the up-to-date data is provided in the context of a user prompt to the model. This allows the model to adapt to the latest changes in price and availability. The following diagram illustrates a basic in-context learning workflow.

A commonly used in-context learning approach is to use a technique called Retrieval Augmented Generation (RAG). In RAG, you provide the relevant information such as most relevant policy and customer records along with the user question to the prompt. This way, the LLM generates an answer to the user question using additional information provided as context. To learn more about RAG, refer to Question answering using Retrieval Augmented Generation with foundation models in Amazon SageMaker JumpStart.

A RAG-based generative AI application can only produce generic responses based on its training data and the relevant documents in the knowledge base. This solution falls short when a near-real-time personalized response is expected from the application. For example, a travel chatbot is expected to consider the user’s current bookings, available hotel and flight inventory, and more. Moreover, the relevant customer personal data (commonly known as the unified customer profile) is usually subject to change. If a batch process is employed to update the generative AI’s user profile database, the customer may receive dissatisfying responses based on old data.

In this post, we discuss the application of stream processing to enhance a RAG solution used for building question answering agents with context from real-time access to unified customer profiles and organizational knowledge base.

Near-real-time customer profile updates

Customer records are typically distributed across data stores within an organization. For your generative AI application to provide a relevant, accurate, and up-to-date customer profile, it is vital to build streaming data pipelines that can perform identity resolution and profile aggregation across the distributed data stores. Streaming jobs constantly ingest new data to synchronize across systems and can perform enrichment, transformations, joins, and aggregations across windows of time more efficiently. Change data capture (CDC) events contain information about the source record, updates, and metadata such as time, source, classification (insert, update, or delete), and the initiator of the change.

The following diagram illustrates an example workflow for CDC streaming ingestion and processing for unified customer profiles.

In this section, we discuss the main components of a CDC streaming pattern required to support RAG-based generative AI applications.

CDC streaming ingestion

A CDC replicator is a process that collects data changes from a source system (usually by reading transaction logs or binlogs) and writes CDC events with the exact same order they occurred in a streaming data stream or topic. This involves a log-based capture with tools such as AWS Database Migration Service (AWS DMS) or open source connectors such as Debezium for Apache Kafka connect. Apache Kafka Connect is part of the Apache Kafka environment, allowing data to be ingested from various sources and delivered to variety of destinations. You can run your Apache Kafka connector on Amazon MSK Connect within minutes without worrying about configuration, setup, and operating an Apache Kafka cluster. You only need to upload your connector’s compiled code to Amazon Simple Storage Service (Amazon S3) and set up your connector with your workload’s specific configuration.

There are also other methods for capturing data changes. For example, Amazon DynamoDB provides a feature for streaming CDC data to Amazon DynamoDB Streams or Kinesis Data Streams. Amazon S3 provides a trigger to invoke an AWS Lambda function when a new document is stored.

Streaming storage

Streaming storage functions as an intermediate buffer to store CDC events before they get processed. Streaming storage provides reliable storage for streaming data. By design, it is highly available and resilient to hardware or node failures and maintains the order of the events as they are written. Streaming storage can store data events either permanently or for a set period of time. This allows stream processors to read from part of the stream if there is a failure or a need for re-processing. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at scale. Amazon MSK is a fully managed, highly available, and secure service provided by AWS for running Apache Kafka.

Stream processing

Stream processing systems should be designed for parallelism to handle high data throughput. They should partition the input stream between multiple tasks running on multiple compute nodes. Tasks should be able to send the result of one operation to the next one over the network, making it possible for processing data in parallel while performing operations such as joins, filtering, enrichment, and aggregations. Stream processing applications should be able to process events with regards to the event time for use cases where events could arrive late or correct computation relies on the time events occur rather than the system time. For more information, refer to Notions of Time: Event Time and Processing Time.

Stream processes continuously produce results in the form of data events that need to be output to a target system. A target system could be any system that can integrate directly with the process or via streaming storage as in intermediary. Depending on the framework you choose for stream processing, you will have different options for target systems depending on available sink connectors. If you decide to write the results to an intermediary streaming storage, you can build a separate process that reads events and applies changes to the target system, such as running an Apache Kafka sink connector. Regardless of which option you choose, CDC data needs extra handling due to its nature. Because CDC events carry information about updates or deletes, it’s important that they merge in the target system in the right order. If changes are applied in the wrong order, the target system will be out of sync with its source.

Apache Flink is a powerful stream processing framework known for its low latency and high throughput capabilities. It supports event time processing, exactly-once processing semantics, and high fault tolerance. Additionally, it provides native support for CDC data via a special structure called dynamic tables. Dynamic tables mimic the source database tables and provide a columnar representation of the streaming data. The data in dynamic tables changes with every event that is processed. New records can be appended, updated, or deleted at any time. Dynamic tables abstract away the extra logic you need to implement for each record operation (insert, update, delete) separately. For more information, refer to Dynamic Tables.

With Amazon Managed Service for Apache Flink, you can run Apache Flink jobs and integrate with other AWS services. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up.

AWS Glue is a fully managed extract, transform, and load (ETL) service, which means AWS handles the infrastructure provisioning, scaling, and maintenance for you. Although it’s primarily known for its ETL capabilities, AWS Glue can also be used for Spark streaming applications. AWS Glue can interact with streaming data services such as Kinesis Data Streams and Amazon MSK for processing and transforming CDC data. AWS Glue can also seamlessly integrate with other AWS services such as Lambda, AWS Step Functions, and DynamoDB, providing you with a comprehensive ecosystem for building and managing data processing pipelines.

Unified customer profile

Overcoming the unification of the customer profile across a variety of source systems requires the development of robust data pipelines. You need data pipelines that can bring and synchronize all records into one data store. This data store provides your organization with the holistic customer records view that is needed for operational efficiency of RAG-based generative AI applications. For building such a data store, an unstructured data store would be best.

An identity graph is a useful structure for creating a unified customer profile because it consolidates and integrates customer data from various sources, ensures data accuracy and deduplication, offers real-time updates, connects cross-systems insights, enables personalization, enhances customer experience, and supports regulatory compliance. This unified customer profile empowers the generative AI application to understand and engage with customers effectively, and adhere to data privacy regulations, ultimately enhancing customer experiences and driving business growth. You can build your identity graph solution using Amazon Neptune, a fast, reliable, fully managed graph database service.

AWS provides a few other managed and serverless NoSQL storage service offerings for unstructured key-value objects. Amazon DocumentDB (with MongoDB compatibility) is a fast, scalable, highly available, and fully managed enterprise document database service that supports native JSON workloads. DynamoDB is a fully managed NoSQL database service that provides fast and predictable performance with seamless scalability.

Near-real-time organizational knowledge base updates

Similar to customer records, internal knowledge repositories such as company policies and organizational documents are siloed across storage systems. This is typically unstructured data and is updated in a non-incremental fashion. The use of unstructured data for AI applications is effective using vector embeddings, which is a technique of representing high dimensional data such as text files, images, and audio files as multi-dimensional numeric.

AWS provides several vector engine services, such as Amazon OpenSearch Serverless, Amazon Kendra, and Amazon Aurora PostgreSQL-Compatible Edition with the pgvector extension for storing vector embeddings. Generative AI applications can enhance the user experience by transforming the user prompt into a vector and use it to query the vector engine to retrieve contextually relevant information. Both the prompt and the vector data retrieved are then passed to the LLM to receive a more precise and personalized response.

The following diagram illustrates an example stream-processing workflow for vector embeddings.

Knowledge base contents need to be converted to vector embeddings before being written to the vector data store. Amazon Bedrock or Amazon SageMaker can help you access the model of your choice and expose a private endpoint for this conversion. Furthermore, you can use libraries such as LangChain to integrate with these endpoints. Building a batch process can help you convert your knowledge base content to vector data and store it in a vector database initially. However, you need to rely on an interval to reprocess the documents to synchronize your vector database with changes in your knowledge base content. With a large number of documents, this process can be inefficient. Between these intervals, your generative AI application users will receive answers according to the old content, or will receive an inaccurate answer because the new content is not vectorized yet.

Stream processing is an ideal solution for these challenges. It produces events as per existing documents initially and further monitors the source system and creates a document change event as soon as they occur. These events can be stored in streaming storage and wait to be processed by a streaming job. A streaming job reads these events, loads the content of the document, and transforms the contents to an array of related tokens of words. Each token further transforms into vector data via an API call to an embedding FM. Results are sent for storage to the vector storage via a sink operator.

If you’re using Amazon S3 for storing your documents, you can build an event-source architecture based on S3 object change triggers for Lambda. A Lambda function can create an event in the desired format and write that to your streaming storage.

You can also use Apache Flink to run as a streaming job. Apache Flink provides the native FileSystem source connector, which can discover existing files and read their contents initially. After that, it can continuously monitor your file system for new files and capture their content. The connector supports reading a set of files from distributed file systems such as Amazon S3 or HDFS with a format of plain text, Avro, CSV, Parquet, and more, and produces a streaming record. As a fully managed service, Managed Service for Apache Flink removes the operational overhead of deploying and maintaining Flink jobs, allowing you to focus on building and scaling your streaming applications. With seamless integration into the AWS streaming services such as Amazon MSK or Kinesis Data Streams, it provides features like automatic scaling, security, and resiliency, providing reliable and efficient Flink applications for handling real-time streaming data.

Based on your DevOps preference, you can choose between Kinesis Data Streams or Amazon MSK for storing the streaming records. Kinesis Data Streams simplifies the complexities of building and managing custom streaming data applications, allowing you to focus on deriving insights from your data rather than infrastructure maintenance. Customers using Apache Kafka often opt for Amazon MSK due to its straightforwardness, scalability, and dependability in overseeing Apache Kafka clusters within the AWS environment. As a fully managed service, Amazon MSK takes on the operational complexities associated with deploying and maintaining Apache Kafka clusters, enabling you to concentrate on constructing and expanding your streaming applications.

Because a RESTful API integration suits the nature of this process, you need a framework that supports a stateful enrichment pattern via RESTful API calls to track for failures and retry for the failed request. Apache Flink again is a framework that can do stateful operations in at-memory speed. To understand the best ways to make API calls via Apache Flink, refer to Common streaming data enrichment patterns in Amazon Kinesis Data Analytics for Apache Flink.

Apache Flink provides native sink connectors for writing data to vector datastores such as Amazon Aurora for PostgreSQL with pgvector or Amazon OpenSearch Service with VectorDB. Alternatively, you can stage the Flink job’s output (vectorized data) in an MSK topic or a Kinesis data stream. OpenSearch Service provides support for native ingestion from Kinesis data streams or MSK topics. For more information, refer to Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion and Loading streaming data from Amazon Kinesis Data Streams.

Feedback analytics and fine-tuning

It’s important for data operation managers and AI/ML developers to get insight about the performance of the generative AI application and the FMs in use. To achieve that, you need to build data pipelines that calculate important key performance indicator (KPI) data based on the user feedback and variety of application logs and metrics. This information is useful for stakeholders to gain real-time insight about the performance of the FM, the application, and overall user satisfaction about the quality of support they receive from your application. You also need to collect and store the conversation history for further fine-tuning your FMs to improve their ability in performing domain-specific tasks.

This use case fits very well in the streaming analytics domain. Your application should store each conversation in streaming storage. Your application can prompt users about their rating of each answer’s accuracy and their overall satisfaction. This data can be in a format of a binary choice or a free form text. This data can be stored in a Kinesis data stream or MSK topic, and get processed to generate KPIs in real time. You can put FMs to work for users’ sentiment analysis. FMs can analyze each answer and assign a category of user satisfaction.

Apache Flink’s architecture allows for complex data aggregation over windows of time. It also provides support for SQL querying over stream of data events. Therefore, by using Apache Flink, you can quickly analyze raw user inputs and generate KPIs in real time by writing familiar SQL queries. For more information, refer to Table API & SQL.

With Amazon Managed Service for Apache Flink Studio, you can build and run Apache Flink stream processing applications using standard SQL, Python, and Scala in an interactive notebook. Studio notebooks are powered by Apache Zeppelin and use Apache Flink as the stream processing engine. Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. With support for user-defined functions (UDFs), Apache Flink allows for building custom operators to integrate with external resources such as FMs for performing complex tasks such as sentiment analysis. You can use UDFs to compute various metrics or enrich user feedback raw data with additional insights such as user sentiment. To learn more about this pattern, refer to Proactively addressing customer concern in real-time with GenAI, Flink, Apache Kafka, and Kinesis.

With Managed Service for Apache Flink Studio, you can deploy your Studio notebook as a streaming job with one click. You can use native sink connectors provided by Apache Flink to send the output to your storage of choice or stage it in a Kinesis data stream or MSK topic. Amazon Redshift and OpenSearch Service are both ideal for storing analytical data. Both engines provide native ingestion support from Kinesis Data Streams and Amazon MSK via a separate streaming pipeline to a data lake or data warehouse for analysis.

Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses and data lakes, using AWS-designed hardware and machine learning to deliver the best price-performance at scale. OpenSearch Service offers visualization capabilities powered by OpenSearch Dashboards and Kibana (1.5 to 7.10 versions).

You can use the outcome of such analysis combined with user prompt data for fine-tuning the FM when is needed. SageMaker is the most straightforward way to fine-tune your FMs. Using Amazon S3 with SageMaker provides a powerful and seamless integration for fine-tuning your models. Amazon S3 serves as a scalable and durable object storage solution, enabling straightforward storage and retrieval of large datasets, training data, and model artifacts. SageMaker is a fully managed ML service that simplifies the entire ML lifecycle. By using Amazon S3 as the storage backend for SageMaker, you can benefit from the scalability, reliability, and cost-effectiveness of Amazon S3, while seamlessly integrating it with SageMaker training and deployment capabilities. This combination enables efficient data management, facilitates collaborative model development, and makes sure that ML workflows are streamlined and scalable, ultimately enhancing the overall agility and performance of the ML process. For more information, refer to Fine-tune Falcon 7B and other LLMs on Amazon SageMaker with @remote decorator.

With a file system sink connector, Apache Flink jobs can deliver data to Amazon S3 in open format (such as JSON, Avro, Parquet, and more) files as data objects. If you prefer to manage your data lake using a transactional data lake framework (such as Apache Hudi, Apache Iceberg, or Delta Lake), all of these frameworks provide a custom connector for Apache Flink. For more details, refer to Create a low-latency source-to-data lake pipeline using Amazon MSK Connect, Apache Flink, and Apache Hudi.

Summary

For a generative AI application based on a RAG model, you need to consider building two data storage systems, and you need to build data operations that keep them up to date with all the source systems. Traditional batch jobs are not sufficient to process the size and diversity of the data you need to integrate with your generative AI application. Delays in processing the changes in source systems result in an inaccurate response and reduce the efficiency of your generative AI application. Data streaming enables you to ingest data from a variety of databases across various systems. It also allows you to transform, enrich, join, and aggregate data across many sources efficiently in near-real time. Data streaming provides a simplified data architecture to collect and transform users’ real-time reactions or comments on the application responses, helping you deliver and store the results in a data lake for model fine-tuning. Data streaming also helps you optimize data pipelines by processing only the change events, allowing you to respond to data changes more quickly and efficiently.

Learn more about AWS data streaming services and get started building your own data streaming solution.


About the Authors

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems which are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Imtiaz (Taz) Sayed is the World-Wide Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Invoke AWS Lambda functions from cross-account Amazon Kinesis Data Streams

Post Syndicated from Amar Surjit original https://aws.amazon.com/blogs/big-data/invoke-aws-lambda-functions-from-cross-account-amazon-kinesis-data-streams/

A multi-account architecture on AWS is essential for enhancing security, compliance, and resource management by isolating workloads, enabling granular cost allocation, and facilitating collaboration across distinct environments. It also mitigates risks, improves scalability, and allows for advanced networking configurations.

In a streaming architecture, you may have event producers, stream storage, and event consumers in a single account or spread across different accounts depending on your business and IT requirements. For example, your company may want to centralize its clickstream data or log data from multiple different producers across different accounts. Data consumers from marketing, product engineering, or analytics require access to the same streaming data across accounts, which requires the ability to deliver a multi-account streaming architecture.

To build a multi-account streaming architecture, you can use Amazon Kinesis Data Streams as the stream storage and AWS Lambda as the event consumer. Amazon Kinesis Data Streams enables real-time processing of streaming data at scale. When integrated with Lambda, it allows for serverless data processing, enabling you to analyze and react to data streams in real time without managing infrastructure. This integration supports various use cases, including real-time analytics, log processing, Internet of Things (IoT) data ingestion, and more, making it valuable for businesses requiring timely insights from their streaming data. In this post, we demonstrate how you can process data ingested into a stream in one account with a Lambda function in another account.

The recent launch of Kinesis Data Streams support for resource-based policies enables invoking a Lambda from another account. With a resource-based policy, you can specify AWS accounts, AWS Identity and Access Management (IAM) users, or IAM roles and the exact Kinesis Data Streams actions for which you want to grant access. After access is granted, you can configure a Lambda function in another account to start processing the data stream belonging to your account. This reduces cost and simplifies the data processing pipeline, because you no longer have to copy streaming data using Lambda functions in both accounts. Sharing access to your data streams or registered consumers does not incur additional charges to your account. Cross-account usage of Kinesis Data Streams resources will continue to be billed to the resource owners.

In this post, we use Kinesis Data Streams with enhanced fan-out feature, empowering consumers with dedicated read throughput tailored to their applications. By default, Kinesis Data Streams offers shared read throughput of 2 MB/sec per shard across consumers, but with enhanced fan-out, each consumer can enjoy dedicated throughput of 2 MB/sec per shard. This flexibility allows you to seamlessly adapt Kinesis Data Streams to your specific requirements, choosing between enhanced fan-out for dedicated throughput or shared throughput according to your needs.

Solution overview

For our solution, we deploy Kinesis Data Streams in Account 1 and Lambda as the consumer in Account 2 to receive data from the data stream. The following diagram illustrates the high-level architecture.

Amazon KDS-Lambda cross acct solution architecture

The setup requires the following key elements:

  • Kinesis data stream in Account 1 and Lambda function in Account 2
  • Kinesis Data Streams resource policies in Account 1, allowing a cross-account Lambda execution role to perform operations on the Kinesis data stream
  • A Lambda execution role in Account 2 and an enhanced fan-out consumer resource policy in Account 1, allowing the cross-account Lambda execution role to perform operations on the Kinesis data stream

For the setup, you use three AWS CloudFormation templates to create the key resources:

  • CloudFormation template 1 creates the following key resources in Account 1:
    • Kinesis data stream
    • Kinesis data stream enhanced fan-out consumer
  • CloudFormation template 2 creates the following key resources in Account 2:
    • Consumer Lambda function
    • Consumer Lambda function execution role
  • CloudFormation template 3 creates the following resource in Account 2:
    • Consumer Lambda function event source mapping

The solution supports single-Region deployment, and the CloudFormation templates must be deployed in the same Region across different AWS accounts. In this solution, we use Kinesis Data Streams enhanced fan-out, which is a best practice for deploying architectures requiring large throughput across multiple consumers. Complete the steps in the following sections to deploy this solution.

Prerequisites

You should have two AWS accounts and the required permissions to run a CloudFormation template to create the services mentioned in the solution architecture. You also need the AWS Command Line Interface (AWS CLI) installed, version 2.15 and above.

Launch CloudFormation template 1

Complete the following steps to launch the first CloudFormation template:

  1. Sign in to the AWS Management Console as Account 1 and select the appropriate AWS Region.
  2. Download and launch CloudFormation template 1 where you want to deploy your Kinesis data stream.
  3. For LambdaConsumerAccountId, enter your Lambda consumer account ID and click submit. The CloudFormation template deployment will take a few minutes to complete.
  4. When the stack is complete, on the AWS CloudFormation console, navigate to the stack Outputs tab and copy the values of following parameters:
    • KinesisStreamArn
    • KinesisStreamEFOConsumerArn
    • KMSKeyArn

You will need these values in later steps.

Launch CloudFormation template 2

Complete the following steps to launch the second CloudFormation template:

  1. Sign in to the console as Account 2 and select the appropriate Region.
  2. Download and launch CloudFormation template 2 where you want to host the Lambda consumer.
  3. Provide the following input parameters captured from the previous step:
    • KinesisStreamArn
    • KinesisStreamEFOConsumerArn
    • KMSKeyArn

The CloudFormation template creates the following key resources:

  • Lambda consumer
  • Lambda execution role

The Lambda function’s execution role is an IAM role that grants the function permission to access AWS services and resources. Here, you create a Lambda execution role that has the required Kinesis Data Streams and Lambda invocation permissions.

The CloudFormation template deployment will take a few minutes to complete.

  1. When the stack is complete, on the AWS CloudFormation console, navigate to the stack Outputs tab and copy the values of following parameters:
    • KinesisStreamCreateResourcePolicyCommand
    • KinesisStreamEFOConsumerCreateResourcePolicyCommand
  2. Run the following AWS CLI commands in Account 1 using AWS CloudShell. We recommend using CloudShell because it will have the latest version of the AWS CLI and avoid any kind of failures.
    • KinesisStreamCreateResourcePolicyCommand – This creates the resource policy in Account 1 for Kinesis Data Stream. The following is a sample resource policy:
      {
      "Version": "2012-10-17",
      "Statement": [
      {
      "Sid": "StreamEFOReadStatementID",
      "Effect": "Allow",
      "Principal": {
      "AWS": [
      "arn:aws:iam::<AWS Lambda - Consumer account id>:role/kds-cross-account-stream-consumer-lambda-execution-role"
      ]
      },
      "Action": [
      "kinesis:DescribeStreamSummary",
      "kinesis:ListShards",
      "kinesis:DescribeStream",
      "kinesis:GetRecords",
      "kinesis:GetShardIterator"
      ],
      "Resource": "arn:aws:kinesis:<region id>:<Account 1 - Amazon KDS account id>:stream/kds-cross-account-stream"
      }
      ]
      }

    • KinesisStreamEFOConsumerCreateResourcePolicyCommand – This creates the resource policy for the enhanced fan-out consumer for the Kinesis data stream in Account 1. The following is a sample resource policy:
      {
      "Version": "2012-10-17",
      "Statement": [
      {
      "Sid": "ConsumerEFOReadStatementID",
      "Effect": "Allow",
      "Principal": {
      "AWS": [
      " arn:aws:iam::<AWS Lambda - Consumer account id>:role/kds-cross-account-stream-consumer-lambda-execution-role"
      ]
      },
      "Action": [
      "kinesis:DescribeStreamConsumer",
      "kinesis:SubscribeToShard"
      ],
      "Resource": "arn:aws:kinesis:<region id>:<Account 1 - Amazon KDS account id>:stream/kds-cross-account-stream/consumer/kds-cross-account-stream-efo-consumer:1706616477"
      }
      ]
      }

You can also access this policy on the Kinesis Data Streams console, under Enhanced fan-out, Consumer name, and Consumer sharing resource-based policy.

Launch CloudFormation template 3

Now that you have created resource policies in Account 1 for the Kinesis data stream and its enhanced fan-out consumer, you can create Lambda event source mapping for the consumer Lambda function in Account 2. Complete the following steps:

  1. Sign in to the console as Account 2 and select the appropriate Region.
  2. Download and launch CloudFormation template 3 to update the stack you created using CloudFormation template 2.

The CloudFormation template creates the Lambda event source mapping.

Validate the solution

At this point, the deployment is complete. A Kinesis data stream is available to consume the messages and a Lambda function receives these messages in the destination account. To send sample messages to the data stream in Account 1, run the following AWS CLI command using CloudShell:

aws kinesis put-record --stream-name kds-cross-account-stream --data sampledatarecord --partition-key samplepartitionkey3 --region <region id>

The Lambda function in Account 2 is able to receive the messages, and you should be able to verify the same using Amazon CloudWatch logs:

  1. On the CloudWatch console, choose Log groups in the navigation pane.
  2. Locate the log group /aws/lambda/kds-cross-account-stream-efo-consumer.
  3. Choose Search log group to view the relevant log messages. The following is an example message:
    "Records": [
    {
    "kinesis": {
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "samplepartitionkey3",
    "sequenceNumber": "49648798411111169765201534322676841348246990356337393698",
    "data": "sampledatarecord",
    "approximateArrivalTimestamp": 1706623274.658
    },

Clean up

It’s always a good practice to clean up all the resources you created as part of this post to avoid any additional cost.

To clean up your resources, delete the respective CloudFormation stacks from Accounts 1 and 2, and stop the producer from pushing events to the Kinesis data stream. This makes sure that you are not charged unnecessarily.

Summary

In this post, we demonstrated how to configure a cross-account Lambda integration with Kinesis Data Streams using AWS resource-based policies. This enables processing of data ingested into a stream within one AWS account through a Lambda function located in another account. To support customers who use a Kinesis data stream in their central account and have multiple consumers reading data from it, we have used the Kinesis Data Streams enhanced fan-out feature.

To get started, open the Kinesis Data Streams console or use the new API PutResourcePolicy to attach a resource policy to your data stream or consumer.


About the authors

Pratik Patel is Sr. Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices and proactively keep customers’ AWS environments operationally healthy.

Amar is a Senior Solutions Architect at Amazon AWS in the UK. He works across power, utilities, manufacturing and automotive customers on strategic implementations, specializing in using AWS Streaming and advanced data analytics solutions, to drive optimal business outcomes.

Reference guide to analyze transactional data in near-real time on AWS

Post Syndicated from Jason Dalba original https://aws.amazon.com/blogs/big-data/reference-guide-to-analyze-transactional-data-in-near-real-time-on-aws/

Business leaders and data analysts use near-real-time transaction data to understand buyer behavior to help evolve products. The primary challenge businesses face with near-real-time analytics is getting the data prepared for analytics in a timely manner, which can often take days. Companies commonly maintain entire teams to facilitate the flow of data from ingestion to analysis.

The consequence of delays in your organization’s analytics workflow can be costly. As online transactions have gained popularity with consumers, the volume and velocity of data ingestion has led to challenges in data processing. Consumers expect more fluid changes to service and products. Organizations that can’t quickly adapt their business strategy to align with consumer behavior may experience loss of opportunity and revenue in competitive markets.

To overcome these challenges, businesses need a solution that can provide near-real-time analytics on transactional data with services that don’t lead to latent processing and bloat from managing the pipeline. With a properly deployed architecture using the latest technologies in artificial intelligence (AI), data storage, streaming ingestions, and cloud computing, data will become more accurate, timely, and actionable. With such a solution, businesses can make actionable decisions in near-real time, allowing leaders to change strategic direction as soon as the market changes.

In this post, we discuss how to architect a near-real-time analytics solution with AWS managed analytics, AI and machine learning (ML), and database services.

Solution overview

The most common workloads, agnostic of industry, involve transactional data. Transactional data volumes and velocity have continued to rapidly expand as workloads have been pushed online. Near-real-time data is data stored, processed, and analyzed on a continual basis. It generates information that is available for use almost immediately after being generated. With the power of near-real-time analytics, business units across an organization, including sales, marketing, and operations, can make agile, strategic decisions. Without the proper architecture to support near real-time analytics, organizations will be dependent on delayed data and will not be able to capitalize on emerging opportunities. Missed opportunities could impact operational efficiency, customer satisfaction, or product innovation.

Managed AWS Analytics and Database services allow for each component of the solution, from ingestion to analysis, to be optimized for speed, with little management overhead. It is crucial for critical business solutions to follow the six pillars of the AWS Well-Architected Framework. The framework helps cloud architects build the most secure, high performing, resilient, and efficient infrastructure for critical workloads.

The following diagram illustrates the solution architecture.

Solution architecture

By combining the appropriate AWS services, your organization can run near-real-time analytics off a transactional data store. In the following sections, we discuss the key components of the solution.

Transactional data storage

In this solution, we use Amazon DynamoDB as our transactional data store. DynamoDB is a managed NoSQL database solution that acts as a key-value store for transactional data. As a NoSQL solution, DynamoDB is optimized for compute (as opposed to storage) and therefore the data needs to be modeled and served up to the application based on how the application needs it. This makes DynamoDB good for applications with known access patterns, which is a property of many transactional workloads.

In DynamoDB, you can create, read, update, or delete items in a table through a partition key. For example, if you want to keep track of how many fitness quests a user has completed in your application, you can query the partition key of the user ID to find the item with an attribute that holds data related to completed quests, then update the relevant attribute to reflect a specific quests completion. There are also some added benefits of DynamoDB by design, such as the ability to scale to support massive global internet-scale applications while maintaining consistent single-digit millisecond latency performance, because the date will be horizontally partitioned across the underlying storage nodes by the service itself through the partition keys. Modeling your data here is very important so DynamoDB can horizontally scale based on a partition key, which is again why it’s a good fit for a transactional store. In transactional workloads, when you know what the access patterns are, it will be easier to optimize a data model around those patterns as opposed to creating a data model to accept ad hoc requests. All that being said, DynamoDB doesn’t perform scans across many items as efficiently, so for this solution, we integrate DynamoDB with other services to help meet the data analysis requirements.

Data streaming

Now that we have stored our workload’s transactional data in DynamoDB, we need to move that data to another service that will be better suited for analysis of said data. The time to insights on this data matters, so rather than send data off in batches, we stream the data into an analytics service, which helps us get the near-real time aspect of this solution.

We use Amazon Kinesis Data Streams to stream the data from DynamoDB to Amazon Redshift for this specific solution. Kinesis Data Streams captures item-level modifications in DynamoDB tables and replicates them to a Kinesis data stream. Your applications can access this stream and view item-level changes in near-real time. You can continuously capture and store terabytes of data per hour. Additionally, with the enhanced fan-out capability, you can simultaneously reach two or more downstream applications. Kinesis Data Streams also provides durability and elasticity. The delay between the time a record is put into the stream and the time it can be retrieved (put-to-get delay) is typically less than 1 second. In other words, a Kinesis Data Streams application can start consuming the data from the stream almost immediately after the data is added. The managed service aspect of Kinesis Data Streams relieves you of the operational burden of creating and running a data intake pipeline. The elasticity of Kinesis Data Streams enables you to scale the stream up or down, so you never lose data records before they expire.

Analytical data storage

The next service in this solution is Amazon Redshift, a fully managed, petabyte-scale data warehouse service in the cloud. As opposed to DynamoDB, which is meant to update, delete, or read more specific pieces of data, Amazon Redshift is better suited for analytic queries where you are retrieving, comparing, and evaluating large amounts of data in multi-stage operations to produce a final result. Amazon Redshift achieves efficient storage and optimum query performance through a combination of massively parallel processing, columnar data storage, and very efficient, targeted data compression encoding schemes.

Beyond just the fact that Amazon Redshift is built for analytical queries, it can natively integrate with Amazon streaming engines. Amazon Redshift Streaming Ingestion ingests hundreds of megabytes of data per second, so you can query data in near-real time and drive your business forward with analytics. With this zero-ETL approach, Amazon Redshift Streaming Ingestion enables you to connect to multiple Kinesis data streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams and pull data directly to Amazon Redshift without staging data in Amazon Simple Storage Service (Amazon S3). You can define a schema or choose to ingest semi-structured data with the SUPER data type. With streaming ingestion, a materialized view is the landing area for the data read from the Kinesis data stream, and the data is processed as it arrives. When the view is refreshed, Redshift compute nodes allocate each data shard to a compute slice. We recommend you enable auto refresh for this materialized view so that your data is continuously updated.

Data analysis and visualization

After the data pipeline is set up, the last piece is data analysis with Amazon QuickSight to visualize the changes in consumer behavior. QuickSight is a cloud-scale business intelligence (BI) service that you can use to deliver easy-to-understand insights to the people who you work with, wherever they are.

QuickSight connects to your data in the cloud and combines data from many different sources. In a single data dashboard, QuickSight can include AWS data, third-party data, big data, spreadsheet data, SaaS data, B2B data, and more. As a fully managed cloud-based service, QuickSight provides enterprise-grade security, global availability, and built-in redundancy. It also provides the user-management tools that you need to scale from 10 users to 10,000, all with no infrastructure to deploy or manage.

QuickSight gives decision-makers the opportunity to explore and interpret information in an interactive visual environment. They have secure access to dashboards from any device on your network and from mobile devices. Connecting QuickSight to the rest of our solution will complete the flow of data from being initially ingested into DynamoDB to being streamed into Amazon Redshift. QuickSight can create a visual analysis of the data in near-real time because that data is relatively up to date, so this solution can support use cases for making quick decisions on transactional data.

Using AWS for data services allows for each component of the solution, from ingestion to storage to analysis, to be optimized for speed and with little management overhead. With these AWS services, business leaders and analysts can get near-real-time insights to drive immediate change based on customer behavior, enabling organizational agility and ultimately leading to customer satisfaction.

Next steps

The next step to building a solution to analyze transactional data in near-real time on AWS would be to go through the workshop Enable near real-time analytics on data stored in Amazon DynamoDB using Amazon Redshift. In the workshop, you will get hands-on with AWS managed analytics, AI/ML, and database services to dive deep into an end-to-end solution delivering near-real-time analytics on transactional data. By the end of the workshop, you will have gone through the configuration and deployment of the critical pieces that will enable users to perform analytics on transactional workloads.

Conclusion

Developing an architecture that can serve transactional data to near-real-time analytics on AWS can help business become more agile in critical decisions. By ingesting and processing transactional data delivered directly from the application on AWS, businesses can optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction.

The end-to-end solution is designed for individuals in various roles, such as business users, data engineers, data scientists, and data analysts, who are responsible for comprehending, creating, and overseeing processes related to retail inventory forecasting. Overall, being able to analyze near-real time transactional data on AWS can provide businesses timely insight, allowing for quicker decision making in fast paced industries.


About the Authors

Jason D’Alba is an AWS Solutions Architect leader focused on database and enterprise applications, helping customers architect highly available and scalable database solutions.

Veerendra Nayak is a Principal Database Solutions Architect based in the Bay Area, California. He works with customers to share best practices on database migrations, resiliency, and integrating operational data with analytics and AI services.

Evan Day is a Database Solutions Architect at AWS, where he helps customers define technical solutions for business problems using the breadth of managed database services on AWS. He also focuses on building solutions that are reliable, performant, and cost efficient.

Amazon Kinesis Data Streams: celebrating a decade of real-time data innovation

Post Syndicated from Roy Wang original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-streams-celebrating-a-decade-of-real-time-data-innovation/

Data is a key strategic asset for every organization, and every company is a data business at its core. However, in many organizations, data is typically spread across a number of different systems such as software as a service (SaaS) applications, operational databases, and data warehouses. Such data silos make it difficult to get unified views of the data in an organization and act in real time to derive the most value.

Ten years ago, we launched Amazon Kinesis Data Streams, the first cloud-native serverless streaming data service, to serve as the backbone for companies, to move data across system boundaries, breaking data silos. With data streaming, you can power data lakes running on Amazon Simple Storage Service (Amazon S3), enrich customer experiences via personalization, improve operational efficiency with predictive maintenance of machinery in your factories, and achieve better insights with more accurate machine learning (ML) models. Amazon Kinesis Data Streams is a foundational data strategy pillar for tens of thousands of customers. As streams of raw data come together, they unlock capabilities to continuously transform, enrich, and query data in real time via seamless integration with stream processing engines such as Amazon Managed Service for Apache Flink.

As an example, the National Hockey League (NHL) reimagined the fan experience by streaming live NHL EDGE game data and stats to offer hockey fans valuable insights to keep fans at the edge of their seats. NHL EDGE technology in the puck and players’ sweaters (jerseys) generate thousands of data points every second for the NHL, which can be analyzed by AWS to predict likely outcomes for key events like face-offs. To process and analyze thousands of signals, the NHL built a real-time streaming data foundation with Kinesis Data Streams and Amazon Managed Service for Apache Flink to stream, prepare, and feed data into ML models, helping inform face-off predictions in seconds and expanding new ways to engage viewers.

Building on such streaming data foundations, many customers are currently thinking about how to deliver transformative new products and services with generative AI. Streaming allows companies to connect the data available within data stores to large language models (LLMs) securely and in real time. Although LLMs are capable of working with billions of parameters, in order to deliver an engaging experience that is tailored to a company’s customers, LLMs require personalization data for the company’s users and proprietary knowledge stores within the company’s data stores. A data strategy that incorporates streaming is necessary to deliver personalization and proprietary data that is available for querying in real time.

Customers with real-time streaming data strategy are at the cutting edge of providing innovative products with generative AI. One customer adopted Kinesis Data Streams for their data strategy, and they stream billions of events from their digital products to derive real-time insights. With a combination of low-latency data streaming and analytics, they are able to understand and personalize the user experience via a seamlessly integrated, self-reliant system for experimentation and automated feedback. Earlier this year, building on their already strong data foundation, they launched an innovative digital media generative AI product. The same data foundation built on Kinesis Data Streams is used to continuously analyze how users interact with the generated content and helps the product team fine-tune the application.

Real-time streaming data technologies are essential for digital transformation. These services help customers bring data to their applications and models, making them smarter. Real-time data gives companies an advantage in data-driven decisions, predictions, and insights by using the data at the very moment it is generated, providing an unparalleled edge in a world where timing is the key to success. Bring the data in once, use it across your organization, and act before the value of that data diminishes.”

– Mindy Ferguson, VP of AWS Streaming and Messaging.

As we celebrate the tenth anniversary of Kinesis Data Streams, customers have shared four key reasons they continue to value this revolutionary service. They love how they can easily stream data with no underlying servers to provision or manage, operate at a massive scale with consistent performance, achieve high resiliency and durability, and benefit from broad integration with myriad sources and sinks to ingest and process data respectively.

Ease of use

Getting started with Kinesis Data Streams is straightforward: developers can create a data stream with a few clicks on the Kinesis Data Streams console or with a single API call. Changing the size or configuration is also a single API call, and each data stream comes with a default 24-hour data retention period. Developers don’t have to worry about clusters, version upgrades, or storage capacity planning. They just turn on a data stream and start ingesting data.

The needs of our customers have evolved in the past 10 years. As more events get captured and streamed, customers want their data streams to scale elastically without any operational overhead. In response, we launched On-Demand streams in 2021 to provide a simple and automatic scaling experience. With On-Demand streams, you let the service handle scaling up a stream’s capacity proactively, and you’re only charged for the actual data ingested, retrieved, and stored. As our customers continued to ask for more capabilities, we increased the ingestion throughput limit of each On-Demand stream from 200MB/s to 1GB/s in March 2023, and then to 2GB/s in October 2023, to accommodate higher throughput workloads. To continue innovating to be the easiest streaming data service to use, we actively listen to our customer use cases.

Canva is an online design and visual communication platform. As it has rapidly grown from 30 million to 135 million monthly users, it has built a streaming data platform at scale that is effortless to operate for driving product innovations and personalizing the user experience.

“Amazon Kinesis Data Streams and AWS Lambda are used throughout Canva’s logging platform, ingesting and processing over 60 billion log events per day. The combination of Kinesis Data Streams and Lambda has abstracted plenty of work that’s often required in managing a massive data pipeline, such as deploying and managing a fleet of servers, whilst also providing a highly scalable and reliable service. It has allowed us to focus on delivering a world-class product by building highly requested features rather than spending time on operational work.”

– Phoebe Zhou, Software Engineer at Canva.

Operate at massive scale with consistent performance

A fundamental requirement of a streaming data strategy is ingesting and processing large volumes of data with low latency. Kinesis Data Streams processes trillions of records per day across tens of thousands of customers. Customers run more than 3.5 million unique streams and process over 45 PB of data per day. Our largest customers ingest more than 15 GB per second of real-time data with individual streams. That’s equivalent to streaming multiple data points for every person on earth, every second! Even at this scale, all our customers still retrieve data within milliseconds of availability.

Customers also want to process the same data with multiple applications, with each deriving a different value, without worrying about one application impacting the read throughput of another. Enhanced Fan-out offers dedicated read throughput and low latency for each data consumer. This has enabled enterprise platform teams to provide real-time data to more teams and applications.

VMware Carbon Black uses Kinesis Data Streams to ingest petabytes of data every day to secure millions of customer endpoints. The team focuses on its expertise while AWS manages data streaming to meet growing customer traffic and needs in real time.

“When an individual customer’s data increases or decreases, we can use the elasticity of Amazon Kinesis Data Streams to scale compute up or down to process data reliably while effectively managing our cost. This is why Kinesis Data Streams is a good fit. The biggest advantage is the managed nature of our solution on AWS. This has shaped our architecture and helped us shift complexity elsewhere.”

– Stoyan Dimkov, Staff Engineer and Software Architect at VMware Carbon Black.

Learn more about the case study.

Provide resiliency and durability for data streaming

With burgeoning data, customers want more flexibility in processing and reprocessing data. For example, if an application that is consuming data goes offline for a period, teams want to ensure that they resume processing at a later time without data loss. Kinesis Data Streams provides a default 24-hour retention period, enabling you to select a specific timestamp from which to start processing records. With the extended retention feature, you can configure the data retention period to be up to 7 days.

Some industries like financial services and healthcare have stricter compliance requirements, so customers asked for even longer data retention periods to support these requirements. Therefore, we followed up with long-term storage that supports data retention for up to 1 year. Now, thousands of Kinesis Data Streams customers use these features to make their streaming applications more resilient and durable.

Mercado Libre, a leading ecommerce and payments platform in Latin America, relies on Kinesis Data Streams to power its streaming data strategy around payment processing, customer experience, and operations.

“With Amazon Kinesis Data Streams at the core, we process approximately 70 billion daily messages distributed across thousands of data producers. By leveraging Kinesis Data Streams and Amazon DynamoDB Streams, we’ve embraced an event-driven architecture and are able to swiftly respond to data changes.”

– Joaquin Fernandez, Senior Software Expert at Mercado Libre.

Access your data no matter where it lives

Our customers use a wide variety of tools and applications, and an organization’s data often resides in many places. Therefore, the ability to easily integrate data across an organization is crucial to derive timely insights. Developers use the Kinesis Producer Library, Kinesis Client Library, and AWS SDK to quickly build custom data producer and data consumer applications. Customers have expanded their data producers ranging from microservices to smart TVs and even cars. We have over 40 integrations with AWS services and third-party applications like Adobe Experience Platform and Databricks. As detailed in our whitepaper on building a modern data streaming architecture on AWS, Kinesis Data Streams serves as the backbone to serverless and real-time use cases such as personalization, real-time insights, Internet of Things (IoT), and event-driven architecture. Our recent integration with Amazon Redshift enables you to ingest hundreds of megabytes of data from Kinesis Data Streams into data warehouses in seconds. To learn more about how to use this integration to detect fraud in near-real time, refer to Near-real-time fraud detection using Amazon Redshift Streaming Ingestion with Amazon Kinesis Data Streams and Amazon Redshift ML.

Another integration launched in 2023 is with Amazon Monitron to power predictive maintenance management. You can now stream measurement data and the corresponding inference results to Kinesis Data Streams, coordinate predictive maintenance, and build an IoT data lake. For more details, refer to Generate actionable insights for predictive maintenance management with Amazon Monitron and Amazon Kinesis.

Next, let’s go back to the NHL use case where they combine IoT, data streaming, and machine learning.

The NHL Edge IQ powered by AWS is helping bring fans closer to the action with advanced analytics and new ML stats such as Face-off Probability and Opportunity Analysis.

“We use Amazon Kinesis Data Streams to process NHL EDGE data on puck and Player positions, face-off location, and the current game situation to decouple data producers from consuming applications. Amazon Managed Service for Apache Flink is used to run Flink applications and consumes data from Kinesis Data Streams to call the prediction model in Amazon SageMaker to deliver the real-time Face-off Probability metric. The probability results are also stored in Amazon S3 to continuously retrain the model in SageMaker. The success of this project led us to build the next metric, Opportunity Analysis, which delivers over 25 insights into the quality of the scoring opportunity presented by each shot on goal. Kinesis Data Streams and Amazon Managed Service for Apache Flink applications were critical to making live, in-game predictions, enabling the system to perform opportunity analysis calculations for up to 16 live NHL games simultaneously.”

– Eric Schneider, SVP, Software Engineering at National Hockey League.

Learn more about the case study.

The future of data is real time

The fusion of real-time data streaming and generative AI promises to be the cornerstone of our digitally connected world. Generative AI, empowered by a constant influx of real-time information from IoT devices, sensors, social media, and beyond, is becoming ubiquitous. From autonomous vehicles navigating dynamically changing traffic conditions to smart cities optimizing energy consumption based on real-time demand, the combination of AI and real-time data will underpin efficiency and innovation across industries. Ubiquitous, adaptive, and deeply integrated into our lives, these AI-driven applications will enhance convenience and address critical challenges such as climate change, healthcare, and disaster response by using the wealth of real-time insights at their disposal. With Kinesis Data Streams, organizations can build a solid data foundation, positioning you to quickly adopt new technologies and unlock new opportunities sooner—which we anticipate will be enormous.

Learn more about what our customers are doing with data streaming. If you would like a quick exploration of Kinesis Data Streams concepts and use cases, check out our Amazon Kinesis Data Streams 101 playlist. To get started with building your data streams, visit the Amazon Kinesis Data Streams Developer Guide.


About the author

Roy (KDS) Wang is a Senior Product Manager with Amazon Kinesis Data Streams. He is passionate about learning from and collaborating with customers to help organizations run faster and smarter. Outside of work, Roy strives to be a good dad to his new son and builds plastic model kits.

Processing large records with Amazon Kinesis Data Streams

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/processing-large-records-with-amazon-kinesis-data-streams/

In today’s digital era, data is abundant and constantly flowing. Businesses across industries are seeking ways to harness this wealth of information to gain valuable insights and make real-time decisions. To meet this need, AWS offers Amazon Kinesis Data Streams, a powerful and scalable real-time data streaming service. With Kinesis Data Streams, you can effortlessly collect, process, and analyze streaming data in real time at any scale. This service seamlessly integrates into your data architecture, allowing you to tap into the full potential of your data for informed decision-making.

Data streaming technologies like Kinesis Data Streams are designed to efficiently process and manage continuous streams of data in real time at large scale. The individual pieces of data within these streams are often referred to as records. In scenarios like large file processing or performing image, audio, or video analytics, your record may exceed 1 MB. You may struggle to ingest such a large record with Kinesis Data Streams because, as of this writing, the service has a 1 MB upper limit for maximum data record size.

In this post, we show you some different options for handling large records within Kinesis Data Streams and the benefits and disadvantages of each approach. We provide some sample code for each option to help you get started with any of these approaches with your own workloads.

Understanding the default behavior of Kinesis Data Streams

You can send records to Kinesis Data Streams using the PutRecord or PutRecords API calls. These APIs include a mandatory field known as PartitionKey, where you must provide a specific value. This partition key is used by the service to map records with the same partition keys to the same shard to ensure ordering and locality for consumption. Locality means that you want the same consumer to process all records for a given partition key. This helps ensure that data with the same partition key stays together within the same shard, maintaining data order.

Each shard, which holds your data, can handle writing up to 1 MB per second. Let’s consider a scenario where you define a partition key and attempt to send a data record that exceeds 1 MB in size. Based on the explanation so far, the service will reject this request because the record size is over 1 MB. To help you understand better, we experimented by trying to send a record of 1.5 MB to a stream, and the outcome was the following exception message:

import json
import boto3
client = boto3.client('kinesis', region_name='ap-southeast-2')

def lambda_handler(event, context):
    try:
        response = client.put_record(
            StreamName='test',
            Data=b'Sample 1 MB....',
            PartitionKey='string'
            #StreamARN='string'
        )
    
    except Exception as e:
        print (e)

START RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb Version: $LATEST An error occurred (ValidationException) when calling the PutRecord operation: 1 validation error detected: Value at 'data' failed to satisfy constraint: Member must have length less than or equal to 1048576 END RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb

Strategies for handling large records

Now that we understand the behavior of the PutRecord and PutRecords APIs, let’s discuss strategies you can use to overcome this situation. One thing to keep in mind is that there is no single best solution; in the following sections, we discuss some of the approaches that you can evaluate based on your use case:

  • Store large records in Amazon Simple Storage Service (Amazon S3) with a reference in Kinesis Data Streams
  • Split one large record into multiple records
  • Compress your large records

Let’s discuss these points one by one.

Store large records in Amazon S3 with a reference in Kinesis Data Streams

A useful approach for storing large records involves utilizing an alternative storage solution while employing a reference within Kinesis Data Streams. In this context, Amazon S3 stands out as an excellent choice due to its exceptional durability and cost-effectiveness. The procedure involves uploading the record as an object to an S3 bucket and subsequently writing a reference entry in Kinesis Data Streams. This entry incorporates an attribute that serves as a pointer, indicating the location of the object within Amazon S3.

With this approach, you can generate a pre-signed URL associated with the S3 object’s location. This link can be shared with the requester, offering them direct access to the object without the need for intermediary server-side data transfers.

The following diagram illustrates the architecture of this solution.

The following is the sample code to write data to Kinesis Data Streams using this approach:

import json
import boto3
import random

def lambda_handler(event, context):
    try:
        s3 = boto3.client('s3', region_name='ap-southeast-2')
        kds = boto3.client('kinesis', region_name='ap-southeast-2')
        expiration=3600
        pk=str(random.randint(100,100000000))
        bucket_name = 'MY_BUCKET'
        object_key = 'air/' + pk + '.txt'
        file_content = b'LARGE OBJECT'
        response = s3.put_object(Bucket=bucket_name, Key=object_key, Body=file_content)
        presigned_url = s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': bucket_name, 'Key': object_key},
            ExpiresIn=expiration
        )
        
        kdata = {'message': presigned_url}
        response = kds.put_record(
            StreamName='test',
            Data=json.dumps(kdata),
            PartitionKey=pk
        )
        print (response)
    except Exception as e:
        print (e)

If you are using an AWS Lambda consumer to process this data, you can now decode the record to get the S3 pre-signed URL to efficiently retrieve the object from Amazon S3. Then you can implement your business logic to effectively process the data. The following is sample code for reference:

import json
import base64
import json

def lambda_handler(event, context):
    item = None
    decoded_record_data = [base64.b64decode(record['kinesis']['data']).decode().replace('\n','') for record in event['Records']]
    deserialized_data = [json.loads(decoded_record) for decoded_record in decoded_record_data]
    
    
    for item in deserialized_data:
        LOB=(item['message'])
        #process LOB implementing your business logic

An inherent benefit of adopting this technique is the capability to store data in Amazon S3, accommodating an extensive range of sizes per individual object. This method helps you reduce the costs of using Kinesis Data Streams because it uses less storage space and requires fewer read and write throughput for item access. This optimization is achieved by storing just the URL within Kinesis Data Streams. However, it’s important to acknowledge that accessing the sizable object necessitates an additional call to Amazon S3, thereby introducing higher latency for clients as they manage the additional request.

Split one large record into multiple records

Splitting large records into smaller ones in Kinesis Data Streams brings advantages like faster processing, improved throughput, efficient resource use, and more straightforward error handling. Let’s say you have a large record that you want to split into smaller chunks before sending them to a Kinesis data stream. First, you need to set up a Kinesis producer. Suppose you have a large record as a string. You can split it into smaller chunks of a predefined size. For this example, let’s say you’re splitting the record into chunks of 100 characters each. After you split that, loop through the record chunks and send each chunk as a separate message to a Kinesis data stream. The following is the sample code:

import boto3
kinesis = boto3.client('kinesis', region_name='ap-southeast-2')  

def split_record(record, chunk_size):
    chunks = [record[i:i + chunk_size] for i in range(0, len(record), chunk_size)]
    return chunks

def send_to_kinesis(stream_name, record):
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=record,
        PartitionKey= '100'
    )
    return response

def main():
    stream_name = 'test'  
    large_record = 'Your large record'  # Replace with your actual record
    chunk_size = 100  

    record_chunks = split_record(large_record, chunk_size)

    for chunk in record_chunks:
        response = send_to_kinesis(stream_name, chunk)
        print(f"Record sent: {response['SequenceNumber']}")

if __name__ == "__main__":
    main()

Ensure that all chunks of a given message are directed to a single partition, thereby guaranteeing the preservation of their order. In the final chunk, include metadata within the header indicating the conclusion of the message during production. This enables consumers to identify the ultimate chunk and facilitates seamless message reconstruction. The drawback of this method is that it adds complexity to the client-side tasks of dividing and putting back together the different parts. Therefore, these functions need thorough testing to prevent any loss of data.

Compress your large records

Applying data compression prior to transmitting it to Kinesis Data Streams has numerous advantages. This approach not only reduces the data’s size, enabling swifter travel and more efficient utilization of network resources, but also leads to cost savings in terms of storage expenses while optimizing overall resource consumption. Additionally, this practice simplifies storage and data retention. By using compression algorithms such as GZIP, Snappy, or LZ4, you can achieve substantial reduction in the size of large records. Compression brings the benefit of simplicity because it’s implemented seamlessly without requiring the caller to make changes to the item or use extra AWS services to support storage. However, compression introduces additional CPU overhead and latency on the producer side, and its impact on the compression ratio and efficiency can vary depending on the data type and format. Also, compression can enhance consumer throughput at the expense of some decompression overhead.

Conclusion

For real-time data streaming use cases, it’s essential to carefully consider the handling of large records when using Kinesis Data Streams. In this post, we discussed the challenges associated with managing large records and explored strategies such as utilizing Amazon S3 references, record splitting, and compression. Each approach has its own set of benefits and drawbacks, so it’s crucial to evaluate the nature of your data and the tasks you need to perform. Select the most suitable approach based on your data’s characteristics and your processing task requirements.

We encourage you to try out the approaches discussed in this post and share your thoughts in the comments section.


About the author

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Modernize a legacy real-time analytics application with Amazon Managed Service for Apache Flink

Post Syndicated from Bhupesh Sharma original https://aws.amazon.com/blogs/big-data/modernize-a-legacy-real-time-analytics-application-with-amazon-managed-service-for-apache-flink/

Organizations with legacy, on-premises, near-real-time analytics solutions typically rely on self-managed relational databases as their data store for analytics workloads. To reap the benefits of cloud computing, like increased agility and just-in-time provisioning of resources, organizations are migrating their legacy analytics applications to AWS. The lift and shift migration approach is limited in its ability to transform businesses because it relies on outdated, legacy technologies and architectures that limit flexibility and slow down productivity. In this post, we discuss ways to modernize your legacy, on-premises, real-time analytics architecture to build serverless data analytics solutions on AWS using Amazon Managed Service for Apache Flink.

Near-real-time streaming analytics captures the value of operational data and metrics to provide new insights to create business opportunities. In this post, we discuss challenges with relational databases when used for real-time analytics and ways to mitigate them by modernizing the architecture with serverless AWS solutions. We introduce you to Amazon Managed Service for Apache Flink Studio and get started querying streaming data interactively using Amazon Kinesis Data Streams.

Solution overview

In this post, we walk through a call center analytics solution that provides insights into the call center’s performance in near-real time through metrics that determine agent efficiency in handling calls in the queue. Key performance indicators (KPIs) of interest for a call center from a near-real-time platform could be calls waiting in the queue, highlighted in a performance dashboard within a few seconds of data ingestion from call center streams. These metrics help agents improve their call handle time and also reallocate agents across organizations to handle pending calls in the queue.

Traditionally, such a legacy call center analytics platform would be built on a relational database that stores data from streaming sources. Data transformations through stored procedures and use of materialized views to curate datasets and generate insights is a known pattern with relational databases. However, as data loses its relevance with time, transformations in a near-real-time analytics platform need only the latest data from the streams to generate insights. This may require frequent truncation in certain tables to retain only the latest stream of events. Also, the need to derive near-real-time insights within seconds requires frequent materialized view refreshes in this traditional relational database approach. Frequent materialized view refreshes on top of constantly changing base tables due to streamed data can lead to snapshot isolation errors. Also, a data model that allows table truncations at a regular frequency (for example, every 15 seconds) to store only relevant data in tables can cause locking and performance issues.

The following diagram provides the high-level architecture of a legacy call center analytics platform. In this traditional architecture, a relational database is used to store data from streaming data sources. Datasets used for generating insights are curated using materialized views inside the database and published for business intelligence (BI) reporting.

Modernizing this traditional database-driven architecture in the AWS Cloud allows you to use sophisticated streaming technologies like Amazon Managed Service for Apache Flink, which are built to transform and analyze streaming data in real time. With Amazon Managed Service for Apache Flink, you can gain actionable insights from streaming data with serverless, fully managed Apache Flink. You can use Amazon Managed Service for Apache Flink to quickly build end-to-end stream processing applications and process data continuously, getting insights in seconds or minutes. With Amazon Managed Service for Apache Flink, you can use Apache Flink code or Flink SQL to continuously generate time-series analytics over time windows and perform sophisticated joins across streams.

The following architecture diagram illustrates how a legacy call center analytics platform running on databases can be modernized to run on the AWS Cloud using Amazon Managed Service for Apache Flink. It shows a call center streaming data source that sends the latest call center feed in every 15 seconds. The second streaming data source constitutes metadata information about the call center organization and agents that gets refreshed throughout the day. You can perform sophisticated joins over these streaming datasets and create views on top of it using Amazon Managed Service for Apache Flink to generate KPIs required for the business using Amazon OpenSearch Service. You can analyze streaming data interactively using managed Apache Zeppelin notebooks with Amazon Managed Service for Apache Flink Studio in near-real time. The near-real-time insights can then be visualized as a performance dashboard using OpenSearch Dashboards.

In this post, you perform the following high-level implementation steps:

  1. Ingest data from streaming data sources to Kinesis Data Streams.
  2. Use managed Apache Zeppelin notebooks with Amazon Managed Service for Apache Flink Studio to transform the stream data within seconds of data ingestion.
  3. Visualize KPIs of call center performance in near-real time through OpenSearch Dashboards.

Prerequisites

This post requires you to set up the Amazon Kinesis Data Generator (KDG) to send data to a Kinesis data stream using an AWS CloudFormation template. For the template and setup information, refer to Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.

We use two datasets in this post. The first dataset is fact data, which contains call center organization data. The KDG generates a fact data feed in every 15 seconds that contains the following information:

  • AgentId – Agents work in a call center setting surrounded by other call center employees answering customers’ questions and referring them to the necessary resources to solve their problems.
  • OrgId – A call center contains different organizations and departments, such as Care Hub, IT Hub, Allied Hub, Prem Hub, Help Hub, and more.
  • QueueId – Call queues provide an effective way to route calls using simple or sophisticated patterns to ensure that all calls are getting into the correct hands quickly.
  • WorkMode – An agent work mode determines your current state and availability to receive incoming calls from the Automatic Call Distribution (ACD) and Direct Agent Call (DAC) queues. Call Center Elite does not route ACD and DAC calls to your phone when you are in an Aux mode or ACW mode.
  • WorkSkill – Working as a call center agent requires several soft skills to see the best results, like problem-solving, bilingualism, channel experience, aptitude with data, and more.
  • HandleTime – This customer service metric measures the length of a customer’s call.
  • ServiceLevel – The call center service level is defined as the percentage of calls answered within a predefined amount of time—the target time threshold. It can be measured over any period of time (such as 30 minutes, 1 hour, 1 day, or 1 week) and for each agent, team, department, or the company as a whole.
  • WorkStates – This specifies what state an agent is in. For example, an agent in an available state is available to handle calls from an ACD queue. An agent can have several states with respect to different ACD devices, or they can use a single state to describe their relationship to all ACD devices. Agent states are reported in agent-state events.
  • WaitingTime – This is the average time an inbound call spends waiting in the queue or waiting for a callback if that feature is active in your IVR system.
  • EventTime – This is the time when the call center stream is sent (via the KDG in this post).

The following fact payload is used in the KDG to generate sample fact data:

{
"AgentId" : {{random.number(
{
"min":2001,
"max":2005
}
)}},
"OrgID" : {{random.number(
{
"min":101,
"max":105
}
)}},
"QueueId" : {{random.number(
{
"min":1,
"max":5
}
)}},
"WorkMode" : "{{random.arrayElement(
["TACW","ACW","AUX"]
)}}",
"WorkSkill": "{{random.arrayElement(
["Problem Solving","Bilingualism","Channel experience","Aptitude with data"]
)}}",
"HandleTime": {{random.number(
{
"min":1,
"max":150
}
)}},
"ServiceLevel":"{{random.arrayElement(
["Sev1","Sev2","Sev3"]
)}}",
"WorkSates": "{{random.arrayElement(
["Unavailable","Available","On a call"]
)}}",
"WaitingTime": {{random.number(
{
"min":10,
"max":150
}
)}},
"EventTime":"{{date.utc("YYYY-MM-DDTHH:mm:ss")}}"
}

The following screenshot shows the output of the sample fact data in an Amazon Managed Service for Apache Flink notebook.

The second dataset is dimension data. This data contains metadata information like organization names for their respective organization IDs, agent names, and more. The frequency of the dimension dataset is twice a day, whereas the fact dataset gets loaded in every 15 seconds. In this post, we use Amazon Simple Storage Service (Amazon S3) as a data storage layer to store metadata information (Amazon DynamoDB can be used to store metadata information as well). We use AWS Lambda to load metadata from Amazon S3 to another Kinesis data stream that stores metadata information. The following JSON file stored in Amazon S3 has metadata mappings to be loaded into the Kinesis data stream:

[{"OrgID": 101,"OrgName" : "Care Hub","Region" : "APAC"},
{"OrgID" : 102,"OrgName" : "Prem Hub","Region" : "AMER"},
{"OrgID" : 103,"OrgName" : "IT Hub","Region" : "EMEA"},
{"OrgID" : 104,"OrgName" : "Help Hub","Region" : "EMEA"},
{"OrgID" : 105,"OrgName" : "Allied Hub","Region" : "LATAM"}]

Ingest data from streaming data sources to Kinesis Data Streams

To start ingesting your data, complete the following steps:

  1. Create two Kinesis data streams for the fact and dimension datasets, as shown in the following screenshot. For instructions, refer to Creating a Stream via the AWS Management Console.

  1. Create a Lambda function on the Lambda console to load metadata files from Amazon S3 to Kinesis Data Streams. Use the following code:
    import boto3
    import json
    
    # Create S3 object
    s3_client = boto3.client("s3")
    S3_BUCKET = '<S3 Bucket Name>'
    kinesis_client = boto3.client("kinesis")
    stream_name = '<Kinesis Stream Name>'
        
    def lambda_handler(event, context):
      
      # Read Metadata file on Amazon S3
      object_key = "<S3 File Name.json>"  
      file_content = s3_client.get_object(
          Bucket=S3_BUCKET, Key=object_key)["Body"].read()
      
      #Decode the S3 object to json
      decoded_data = file_content.decode("utf-8").replace("'", '"')
      json_data = json.dumps(decoded_data)
      
      #Upload json data to Kinesis data stream
      partition_key = 'OrgID'
      for record in json_data:
        response = kinesis_client.put_record(
          StreamName=stream_name,
          Data=json.dumps(record),
          PartitionKey=partition_key)

Use managed Apache Zeppelin notebooks with Amazon Managed Service for Apache Flink Studio to transform the streaming data

The next step is to create tables in Amazon Managed Service for Apache Flink Studio for further transformations (joins, aggregations, and so on). To set up and query Kinesis Data Streams using Amazon Managed Service for Apache Flink Studio, refer to Query your data streams interactively using Amazon Managed Service for Apache Flink Studio and Python and create an Amazon Managed Service for Apache Flink notebook. Then complete the following steps:

  1. In the Amazon Managed Service for Apache Flink Studio notebook, create a fact table from the facts data stream you created earlier, using the following query.

The event time attribute is defined using a WATERMARK statement in the CREATE table DDL. A WATERMARK statement defines a watermark generation expression on an existing event time field, which marks the event time field as the event time attribute.

The event time refers to the processing of streaming data based on timestamps that are attached to each row. The timestamps can encode when an event happened. Processing time (PROCTime) refers to the machine’s system time that is running the respective operation.

%flink.ssql
CREATE TABLE <Fact Table Name> (
AgentId INT,
OrgID INT,
QueueId BIGINT,
WorkMode VARCHAR,
WorkSkill VARCHAR,
HandleTime INT,
ServiceLevel VARCHAR,
WorkSates VARCHAR,
WaitingTime INT,
EventTime TIMESTAMP(3),
WATERMARK FOR EventTime AS EventTime - INTERVAL '4' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = '<fact stream name>',
'aws.region' = '<AWS region ex. us-east-1>',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);

  1. Create a dimension table in the Amazon Managed Service for Apache Flink Studio notebook that uses the metadata Kinesis data stream:
    %flink.ssql
    CREATE TABLE <Metadata Table Name> (
    AgentId INT,
    AgentName VARCHAR,
    OrgID INT,
    OrgName VARCHAR,
    update_time as CURRENT_TIMESTAMP,
    WATERMARK FOR update_time AS update_time
    )
    WITH (
    'connector' = 'kinesis',
    'stream' = '<Metadata Stream Name>',
    'aws.region' = '<AWS region ex. us-east-1> ',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    );

  1. Create a versioned view to extract the latest version of metadata table values to be joined with the facts table:
    %flink.ssql(type=update)
    CREATE VIEW versioned_metadata AS 
    SELECT OrgID,OrgName
      FROM (
          SELECT *,
          ROW_NUMBER() OVER (PARTITION BY OrgID
             ORDER BY update_time DESC) AS rownum 
          FROM <Metadata Table>)
    WHERE rownum = 1;

  1. Join the facts and versioned metadata table on orgID and create a view that provides the total calls in the queue in each organization in a span of every 5 seconds, for further reporting. Additionally, create a tumble window of 5 seconds to receive the final output in every 5 seconds. See the following code:
    %flink.ssql(type=update)
    
    CREATE VIEW joined_view AS
    SELECT streamtable.window_start, streamtable.window_end,metadata.OrgName,streamtable.CallsInQueue
    FROM
    (SELECT window_start, window_end, OrgID, count(QueueId) as CallsInQueue
      FROM TABLE(
        TUMBLE( TABLE <Fact Table Name>, DESCRIPTOR(EventTime), INTERVAL '5' SECOND))
      GROUP BY window_start, window_end, OrgID) as streamtable
    JOIN
        <Metadata table name> metadata
        ON metadata.OrgID = streamtable.OrgID

  1. Now you can run the following query from the view you created and see the results in the notebook:
%flink.ssql(type=update)

SELECT jv.window_end, jv.CallsInQueue, jv.window_start, jv.OrgName
FROM joined_view jv where jv.OrgName = ‘Prem Hub’ ;

Visualize KPIs of call center performance in near-real time through OpenSearch Dashboards

You can publish the metrics generated within Amazon Managed Service for Apache Flink Studio to OpenSearch Service and visualize metrics in near-real time by creating a call center performance dashboard, as shown in the following example. Refer to Stream the data and validate output to configure OpenSearch Dashboards with Amazon Managed Service for Apache Flink. After you configure the connector, you can run the following command from the notebook to create an index in an OpenSearch Service cluster.

%flink.ssql(type=update)

drop table if exists active_call_queue;
CREATE TABLE active_call_queue (
window_start TIMESTAMP,
window_end TIMESTAMP,
OrgID int,
OrgName varchar,
CallsInQueue BIGINT,
Agent_cnt bigint,
max_handle_time bigint,
min_handle_time bigint,
max_wait_time bigint,
min_wait_time bigint
) WITH (
‘connector’ = ‘elasticsearch-7’,
‘hosts’ = ‘<Amazon OpenSearch host name>’,
‘index’ = ‘active_call_queue’,
‘username’ = ‘<username>’,
‘password’ = ‘<password>’
);

The active_call_queue index is created in OpenSearch Service. The following screenshot shows an index pattern from OpenSearch Dashboards.

Now you can create visualizations in OpenSearch Dashboards. The following screenshot shows an example.

Conclusion

In this post, we discussed ways to modernize a legacy, on-premises, real-time analytics architecture and build a serverless data analytics solution on AWS using Amazon Managed Service for Apache Flink. We also discussed challenges with relational databases when used for real-time analytics and ways to mitigate them by modernizing the architecture with serverless AWS solutions.

If you have any questions or suggestions, please leave us a comment.


About the Authors

Bhupesh Sharma is a Senior Data Engineer with AWS. His role is helping customers architect highly-available, high-performance, and cost-effective data analytics solutions to empower customers with data-driven decision-making. In his free time, he enjoys playing musical instruments, road biking, and swimming.

Devika Singh is a Senior Data Engineer at Amazon, with deep understanding of AWS services, architecture, and cloud-based best practices. With her expertise in data analytics and AWS cloud migrations, she provides technical guidance to the community, on architecting and building a cost-effective, secure and scalable solution, that is driven by business needs. Beyond her professional pursuits, she is passionate about classical music and travel.

Perform Amazon Kinesis load testing with Locust

Post Syndicated from Luis Morales original https://aws.amazon.com/blogs/big-data/perform-amazon-kinesis-load-testing-with-locust/

Building a streaming data solution requires thorough testing at the scale it will operate in a production environment. Streaming applications operating at scale often handle large volumes of up to GBs per second, and it’s challenging for developers to simulate high-traffic Amazon Kinesis-based applications to generate such load easily.

Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose are capable of capturing and storing terabytes of data per hour from numerous sources. Creating Kinesis data streams or Firehose delivery streams is straightforward through the AWS Management Console, AWS Command Line Interface (AWS CLI), or Kinesis API. However, generating a continuous stream of test data requires a custom process or script to run continuously. Although the Amazon Kinesis Data Generator (KDG) provides a user-friendly UI for this purpose, it has some limitations, such as bandwidth constraints and increased round trip latency. (For more information on the KDG, refer to Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.)

To overcome these limitations, this post describes how to use Locust, a modern load testing framework, to conduct large-scale load testing for a more comprehensive evaluation of the streaming data solution.

Overview

This project emits temperature sensor readings via Locust to Kinesis. We set up the Amazon Elastic Compute Cloud (Amazon EC2) Locust instance via the AWS Cloud Development Kit (AWS CDK) to load test Kinesis-based applications. You can access the Locust dashboard to perform and observe the load test and connect via Session Manager, a capability of AWS Systems Manager, for configuration changes. The following diagram illustrates this architecture.

Architecture overview

In our testing with the largest recommended instance (c7g.16xlarge), the setup was capable of emitting over 1 million events per second to Kinesis data streams in on-demand capacity mode, with a batch size (simulated users per Locust user) of 500. You can find more details on what this means and how to configure the load test later in this post.

Locust overview

Locust is an open-source, scriptable, and scalable performance testing tool that allows you to define user behavior using Python code. It offers an easy-to-use interface, making it developer-friendly and highly expandable. With its distributed and scalable design, Locust can simulate millions of simultaneous users to mimic real user behavior during a performance test.

Each Locust user represents a scenario or a specific set of actions that a real user might perform on your system. When you run a performance test with Locust, you can specify the number of concurrent Locust users you want to simulate, and Locust will create an instance for each user, allowing you to assess the performance and behavior of your system under different user loads.

For more information on Locust, refer to the Locust documentation.

Prerequisites

To get started, clone or download the code from the GitHub repository.

Test locally

To test Locust out locally first before deploying it to the cloud, you have to install the necessary Python dependencies. If you’re new to Python, refer the README for more information on getting started.

Navigate to the load-test directory and run the following code:

pip install -r requirements.txt

To send events to a Kinesis data stream from your local machine, you will need to have AWS credentials. For more information, refer to Configuration and credential file settings.

To perform the test locally, stay in the load-test directory and run the following code:

locust -f locust-load-test.py

You can now access the Locust dashboard via http://0.0.0.0:8089/. Enter the number of Locust users, the spawn rate (users added per second), and the target Amazon Kinesis data stream name for Host. By default, it deploys the Kinesis data stream DemoStream that you can use for testing.

Locust Dashboard - Enter details

To see the generated events logged, run the following command, which filters only Locust and root logs (for example, no Botocore logs):

locust -f locust-load-test.py --loglevel DEBUG 2&gt;&amp;1 | grep -E "(locust|root)"

Set up resources with the AWS CDK

The GitHub repository contains the AWS CDK code to create all the necessary resources for the load test. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time. To deploy the resources, complete the following steps:

  1. If not already downloaded, clone the GitHub repository to your local computer using the following command:
git clone https://github.com/aws-samples/amazon-kinesis-load-testing-with-locust
  1. Download and install the latest Node.js.
  2. Navigate to the root folder of the project and run the following command to install the latest version of AWS CDK:
npm install -g aws-cdk
  1. Install the necessary dependencies:
npm install
  1. Run cdk bootstrap to initialize the AWS CDK environment in your AWS account. Replace your AWS account ID and Region before running the following command:
cdk bootstrap

To learn more about the bootstrapping process, refer to Bootstrapping.

  1. After the dependencies are installed, you can run the following command to deploy the stack of the AWS CDK template, which sets up the infrastructure within 5 minutes:
cdk deploy

The template sets up the Locust EC2 test instance, which is by default a c7g.xlarge instance, which at the time of publishing costs approximately $0.145 per hour in us-east-1. To find the most accurate pricing information, see Amazon EC2 On-Demand Pricing. You can find more details on how to change your instance size according to your scale of load testing later in this post.

It’s crucial to consider that the expenses incurred during load testing are not solely attributed to EC2 instance costs, but also heavily influenced by data transfer costs.

Accessing the Locust dashboard

You can access the dashboard by using the AWS CDK output KinesisLocustLoadTestingStack.locustdashboardurl to open the dashboard, for example http://1.2.3.4:8089.

The Locust dashboard is password protected. By default, it’s set to user name locust-user and password locust-dashboard-pwd.

With the default configuration, you can achieve up to 15,000 emitted events per second. Enter the number of Locust users (times the batch size), the spawn rate (users added per second), and the target Kinesis data stream name for Host.

Locust Dashboard - Enter details

After you have started the load test, you can look at the load test on the Charts tab.

Locust Dashboard - Charts

You can also monitor the load test on the Kinesis Data Streams console by navigating to the stream that you are load testing. If you used the default settings, navigate to DemoStream. On the detail page, choose the Monitoring tab to see the ingested load.

Kinesis Data Streams - Monitoring

Adapt workloads

By default, this project generates random temperature sensor readings for every sensor with the following format:

{
    "sensorId": "bfbae19c-2f0f-41c2-952b-5d5bc6e001f1_1",
    "temperature": 147.24,
    "status": "OK",
    "timestamp": 1675686126310
}

The project comes packaged with Faker, which you can use to adapt the payload to your needs. You just have to update the generate_sensor_reading function in the locust-load-test.py file:

class SensorAPIUser(KinesisBotoUser):
    # ...

    def generate_sensor_reading(self, sensor_id, sensor_reading):
        current_temperature = round(10 + random.random() * 170, 2)

        if current_temperature > 160:
            status = "ERROR"
        elif current_temperature > 140 or random.randrange(1, 100) > 80:
            status = random.choice(["WARNING", "ERROR"])
        else:
            status = "OK"

        return {
            'sensorId': f"{sensor_id}_{sensor_reading}",
            'temperature': current_temperature,
            'status': status,
            'timestamp': round(time.time()*1000)
        }

    # ...

Change configurations

After the initial deployment of the load testing tool, you can change configuration in two ways:

  1. Connect to the EC2 instance, make any configuration and code changes, and restart the Locust process
  2. Change the configuration and load testing code locally and redeploy it via cdk deploy

The first option helps you iterate more quickly on the remote instance without a need to redeploy. The latter uses the infrastructure as code (IaC) approach and makes sure that your configuration changes can be committed to your source control system. For a fast development cycle, it’s recommended to test your load test configuration locally first, connect to your instance to apply the changes, and after successful implementation, codify it as part of your IaC repository and then redeploy.

Locust is created on the EC2 instance as a systemd service and can therefore be controlled with systemctl. If you want to change the configuration of Locust as needed without redeploying the stack, you can connect to the instance via Systems Manager, navigate to the project directory on /usr/local/load-test, change the locust.env file, and restart the service by running sudo systemctl restart locust.

Large-scale load testing

This setup is capable of emitting over 1 million events per second to Kinesis data stream, with a batch size of 500 and 64 secondaries on a c7g.16xlarge.

To achieve peak performance with Locust and Kinesis, keep the following in mind:

  • Instance size – Your performance is bound by the underlying EC2 instance, so refer to EC2 instance type for more information about scaling. To set the correct instance size, you can configure the instance size in the file kinesis-locust-load-testing.ts.
  • Number of secondaries – Locust benefits from a distributed setup. Therefore, the setup spins up a primary, which does the coordination, and multiple secondaries, which do the actual work. To fully take advantage of the cores, you should specify one secondary per core. You can configure the number in the locust.env file.
  • Batch size – The amount of Kinesis data stream events you can send per Locust user is limited due to the resource overhead of switching Locust users and threads. To overcome this, you can configure a batch size to define how much users are simulated per Locust user. These are sent as a Kinesis data stream put_records call. You can configure the number in the locust.env file.

This setup is capable of emitting over 1 million events per second to the Kinesis data stream, with a batch size of 500 and 64 secondaries on a c7g.16xlarge instance.

Locust Dashboard - Large Scale Load Test Charts

You can observe this on the Monitoring tab for the Kinesis data stream as well.

Kinesis Data Stream - Large Scale Load Test Monitoring

Clean up

In order to not incur any unnecessary costs, delete the stack by running the following code:

cdk destroy

Summary

Kinesis is already popular for its ease of use among users building streaming applications. With this load testing capability using Locust, you can now test your workloads in a more straightforward and faster way. Visit the GitHub repo to embark on your testing journey.

The project is licensed under the Apache 2.0 license, providing the freedom to clone and modify it according to your needs. Furthermore, you can contribute to the project by submitting issues or pull requests via GitHub, fostering collaboration and improvement in the testing ecosystem.


About the author

Luis Morales works as Senior Solutions Architect with digital native businesses to support them in constantly reinventing themselves in the cloud. He is passionate about software engineering, cloud-native distributed systems, test-driven development, and all things code and security

Create an Apache Hudi-based near-real-time transactional data lake using AWS DMS, Amazon Kinesis, AWS Glue streaming ETL, and data visualization using Amazon QuickSight

Post Syndicated from Raj Ramasubbu original https://aws.amazon.com/blogs/big-data/create-an-apache-hudi-based-near-real-time-transactional-data-lake-using-aws-dms-amazon-kinesis-aws-glue-streaming-etl-and-data-visualization-using-amazon-quicksight/

With the rapid growth of technology, more and more data volume is coming in many different formats—structured, semi-structured, and unstructured. Data analytics on operational data at near-real time is becoming a common need. Due to the exponential growth of data volume, it has become common practice to replace read replicas with data lakes to have better scalability and performance. In most real-world use cases, it’s important to replicate the data from the relational database source to the target in real time. Change data capture (CDC) is one of the most common design patterns to capture the changes made in the source database and reflect them to other data stores.

We recently announced support for streaming extract, transform, and load (ETL) jobs in AWS Glue version 4.0, a new version of AWS Glue that accelerates data integration workloads in AWS. AWS Glue streaming ETL jobs continuously consume data from streaming sources, clean and transform the data in-flight, and make it available for analysis in seconds. AWS also offers a broad selection of services to support your needs. A database replication service such as AWS Database Migration Service (AWS DMS) can replicate the data from your source systems to Amazon Simple Storage Service (Amazon S3), which commonly hosts the storage layer of the data lake. Although it’s straightforward to apply updates on a relational database management system (RDBMS) that backs an online source application, it’s difficult to apply this CDC process on your data lakes. Apache Hudi, an open-source data management framework used to simplify incremental data processing and data pipeline development, is a good option to solve this problem.

This post demonstrates how to apply CDC changes from Amazon Relational Database Service (Amazon RDS) or other relational databases to an S3 data lake, with flexibility to denormalize, transform, and enrich the data in near-real time.

Solution overview

We use an AWS DMS task to capture near-real-time changes in the source RDS instance, and use Amazon Kinesis Data Streams as a destination of the AWS DMS task CDC replication. An AWS Glue streaming job reads and enriches changed records from Kinesis Data Streams and performs an upsert into the S3 data lake in Apache Hudi format. Then we can query the data with Amazon Athena visualize it in Amazon QuickSight. AWS Glue natively supports continuous write operations for streaming data to Apache Hudi-based tables.

The following diagram illustrates the architecture used for this post, which is deployed through an AWS CloudFormation template.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Source data overview

To illustrate our use case, we assume a data analyst persona who is interested in analyzing near-real-time data for sport events using the table ticket_activity. An example of this table is shown in the following screenshot.

Apache Hudi connector for AWS Glue

For this post, we use AWS Glue 4.0, which already has native support for the Hudi framework. Hudi, an open-source data lake framework, simplifies incremental data processing in data lakes built on Amazon S3. It enables capabilities including time travel queries, ACID (Atomicity, Consistency, Isolation, Durability) transactions, streaming ingestion, CDC, upserts, and deletes.

Set up resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • An RDS database instance (source).
  • An AWS DMS replication instance, used to replicate the data from the source table to Kinesis Data Streams.
  • A Kinesis data stream.
  • Four AWS Glue Python shell jobs:
    • rds-ingest-rds-setup-<CloudFormation Stack name> – creates one source table called ticket_activity on Amazon RDS.
    • rds-ingest-data-initial-<CloudFormation Stack name> – Sample data is automatically generated at random by the Faker library and loaded to the ticket_activity table.
    • rds-ingest-data-incremental-<CloudFormation Stack name> – Ingests new ticket activity data into the source table ticket_activity continuously. This job simulates customer activity.
    • rds-upsert-data-<CloudFormation Stack name> – Upserts specific records in the source table ticket_activity. This job simulates administrator activity.
  • AWS Identity and Access Management (IAM) users and policies.
  • An Amazon VPC, a public subnet, two private subnets, internet gateway, NAT gateway, and route tables.
    • We use private subnets for the RDS database instance and AWS DMS replication instance.
    • We use the NAT gateway to have reachability to pypi.org to use the MySQL connector for Python from the AWS Glue Python shell jobs. It also provides reachability to Kinesis Data Streams and an Amazon S3 API endpoint

To set up these resources, you must have the following prerequisites:

The following diagram illustrates the architecture of our provisioned resources.

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack
  3. Choose Next.
  4. For S3BucketName, enter the name of your new S3 bucket.
  5. For VPCCIDR, enter a CIDR IP address range that doesn’t conflict with your existing networks.
  6. For PublicSubnetCIDR, enter the CIDR IP address range within the CIDR you gave for VPCCIDR.
  7. For PrivateSubnetACIDR and PrivateSubnetBCIDR, enter the CIDR IP address range within the CIDR you gave for VPCCIDR.
  8. For SubnetAzA and SubnetAzB, choose the subnets you want to use.
  9. For DatabaseUserName, enter your database user name.
  10. For DatabaseUserPassword, enter your database user password.
  11. Choose Next.
  12. On the next page, choose Next.
  13. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  14. Choose Create stack.

Stack creation can take about 20 minutes.

Set up an initial source table

The AWS Glue job rds-ingest-rds-setup-<CloudFormation stack name> creates a source table called event on the RDS database instance. To set up the initial source table in Amazon RDS, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose rds-ingest-rds-setup-<CloudFormation stack name> to open the job.
  3. Choose Run.
  4. Navigate to the Runs tab and wait for Run status to show as SUCCEEDED.

This job will only create the one table, ticket_activity, in the MySQL instance (DDL). See the following code:

CREATE TABLE ticket_activity (
ticketactivity_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
sport_type VARCHAR(256) NOT NULL,
start_date DATETIME NOT NULL,
location VARCHAR(256) NOT NULL,
seat_level VARCHAR(256) NOT NULL,
seat_location VARCHAR(256) NOT NULL,
ticket_price INT NOT NULL,
customer_name VARCHAR(256) NOT NULL,
email_address VARCHAR(256) NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL )

Ingest new records

In this section, we detail the steps to ingest new records. Implement following steps to star the execution of the jobs.

Start data ingestion to Kinesis Data Streams using AWS DMS

To start data ingestion from Amazon RDS to Kinesis Data Streams, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Select the task rds-to-kinesis-<CloudFormation stack name>.
  3. On the Actions menu, choose Restart/Resume.
  4. Wait for the status to show as Load complete and Replication ongoing.

The AWS DMS replication task ingests data from Amazon RDS to Kinesis Data Streams continuously.

Start data ingestion to Amazon S3

Next, to start data ingestion from Kinesis Data Streams to Amazon S3, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose streaming-cdc-kinesis2hudi-<CloudFormation stack name> to open the job.
  3. Choose Run.

Do not stop this job; you can check the run status on the Runs tab and wait for it to show as Running.

Start the data load to the source table on Amazon RDS

To start data ingestion to the source table on Amazon RDS, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose rds-ingest-data-initial-<CloudFormation stack name> to open the job.
  3. Choose Run.
  4. Navigate to the Runs tab and wait for Run status to show as SUCCEEDED.

Validate the ingested data

After about 2 minutes from starting the job, the data should be ingested into the Amazon S3. To validate the ingested data in the Athena, complete the following steps:

  1. On the Athena console, complete the following steps if you’re running an Athena query for the first time:
    • On the Settings tab, choose Manage.
    • Specify the stage directory and the S3 path where Athena saves the query results.
    • Choose Save.

  1. On the Editor tab, run the following query against the table to check the data:
SELECT * FROM "database_<account_number>_hudi_cdc_demo"."ticket_activity" limit 10;

Note that AWS Cloud Formation will create the database with the account number as database_<your-account-number>_hudi_cdc_demo.

Update existing records

Before you update the existing records, note down the ticketactivity_id value of a record from the ticket_activity table. Run the following SQL using Athena. For this post, we use ticketactivity_id = 46 as an example:

SELECT * FROM "database_<account_number>_hudi_cdc_demo"."ticket_activity" limit 10;

To simulate a real-time use case, update the data in the source table ticket_activity on the RDS database instance to see that the updated records are replicated to Amazon S3. Complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose rds-ingest-data-incremental-<CloudFormation stack name> to open the job.
  3. Choose Run.
  4. Choose the Runs tab and wait for Run status to show as SUCCEEDED.

To upsert the records in the source table, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose the job rds-upsert-data-<CloudFormation stack name>.
  3. On the Job details tab, under Advanced properties, for Job parameters, update the following parameters:
    • For Key, enter --ticketactivity_id.
    • For Value, replace 1 with one of the ticket IDs you noted above (for this post, 46).

  1. Choose Save.
  2. Choose Run and wait for the Run status to show as SUCCEEDED.

This AWS Glue Python shell job simulates a customer activity to buy a ticket. It updates a record in the source table ticket_activity on the RDS database instance using the ticket ID passed in the job argument --ticketactivity_id. It will update ticket_price=500 and updated_at with the current timestamp.

To validate the ingested data in Amazon s3, run the same query from Athena and check the ticket_activity value you noted earlier to observe the ticket_price and updated_at fields:

SELECT * FROM "database_<account_number>_hudi_cdc_demo"."ticket_activity" where ticketactivity_id = 46 ;

Visualize the data in QuickSight

After you have the output file generated by the AWS Glue streaming job in the S3 bucket, you can use QuickSight to visualize the Hudi data files. QuickSight is a scalable, serverless, embeddable, ML-powered business intelligence (BI) service built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights. QuickSight dashboards can be accessed from any device and seamlessly embedded into your applications, portals, and websites.

Build a QuickSight dashboard

To build a QuickSight dashboard, complete the following steps:

  1. Open the QuickSight console.

You’re presented with the QuickSight welcome page. If you haven’t signed up for QuickSight, you may have to complete the signup wizard. For more information, refer to Signing up for an Amazon QuickSight subscription.

After you have signed up, QuickSight presents a “Welcome wizard.” You can view the short tutorial, or you can close it.

  1. On the QuickSight console, choose your user name and choose Manage QuickSight.
  2. Choose Security & permissions, then choose Manage.
  3. Select Amazon S3 and select the buckets that you created earlier with AWS CloudFormation.
  4. Select Amazon Athena.
  5. Choose Save.
  6. If you changed your Region during the first step of this process, change it back to the Region that you used earlier during the AWS Glue jobs.

Create a dataset

Now that you have QuickSight up and running, you can create your dataset. Complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena.
  4. For Data source name, enter a name (for example, hudi-blog).
  5. Choose Validate.
  6. After the validation is successful, choose Create data source.
  7. For Database, choose database_<your-account-number>_hudi_cdc_demo.
  8. For Tables, select ticket_activity.
  9. Choose Select.
  10. Choose Visualize.
  11. Choose hour and then ticket_activity_id to get the count of ticket_activity_id by hour.

Clean up

To clean up your resources, complete the following steps:

  1. Stop the AWS DMS replication task rds-to-kinesis-<CloudFormation stack name>.
  2. Navigate to the RDS database and choose Modify.
  3. Deselect Enable deletion protection, then choose Continue.
  4. Stop the AWS Glue streaming job streaming-cdc-kinesis2redshift-<CloudFormation stack name>.
  5. Delete the CloudFormation stack.
  6. On the QuickSight dashboard, choose your user name, then choose Manage QuickSight.
  7. Choose Account settings, then choose Delete account.
  8. Choose Delete account to confirm.
  9. Enter confirm and choose Delete account.

Conclusion

In this post, we demonstrated how you can stream data—not only new records, but also updated records from relational databases—to Amazon S3 using an AWS Glue streaming job to create an Apache Hudi-based near-real-time transactional data lake. With this approach, you can easily achieve upsert use cases on Amazon S3. We also showcased how to visualize the Apache Hudi table using QuickSight and Athena. As a next step, refer to the Apache Hudi performance tuning guide for a high-volume dataset. To learn more about authoring dashboards in QuickSight, check out the QuickSight Author Workshop.


About the Authors

Raj Ramasubbu is a Sr. Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

Sundeep Kumar is a Sr. Data Architect, Data Lake at AWS, helping customers build data lake and analytics platform and solutions. When not building and designing data lakes, Sundeep enjoys listening music and playing guitar.

A side-by-side comparison of Apache Spark and Apache Flink for common streaming use cases

Post Syndicated from Deepthi Mohan original https://aws.amazon.com/blogs/big-data/a-side-by-side-comparison-of-apache-spark-and-apache-flink-for-common-streaming-use-cases/

Apache Flink and Apache Spark are both open-source, distributed data processing frameworks used widely for big data processing and analytics. Spark is known for its ease of use, high-level APIs, and the ability to process large amounts of data. Flink shines in its ability to handle processing of data streams in real-time and low-latency stateful computations. Both support a variety of programming languages, scalable solutions for handling large amounts of data, and a wide range of connectors. Historically, Spark started out as a batch-first framework and Flink began as a streaming-first framework.

In this post, we share a comparative study of streaming patterns that are commonly used to build stream processing applications, how they can be solved using Spark (primarily Spark Structured Streaming) and Flink, and the minor variations in their approach. Examples cover code snippets in Python and SQL for both frameworks across three major themes: data preparation, data processing, and data enrichment. If you are a Spark user looking to solve your stream processing use cases using Flink, this post is for you. We do not intend to cover the choice of technology between Spark and Flink because it’s important to evaluate both frameworks for your specific workload and how the choice fits in your architecture; rather, this post highlights key differences for use cases that both these technologies are commonly considered for.

Apache Flink offers layered APIs that offer different levels of expressiveness and control and are designed to target different types of use cases. The three layers of API are Process Functions (also known as the Stateful Stream Processing API), DataStream, and Table and SQL. The Stateful Stream Processing API requires writing verbose code but offers the most control over time and state, which are core concepts in stateful stream processing. The DataStream API supports Java, Scala, and Python and offers primitives for many common stream processing operations, as well as a balance between code verbosity or expressiveness and control. The Table and SQL APIs are relational APIs that offer support for Java, Scala, Python, and SQL. They offer the highest abstraction and intuitive, SQL-like declarative control over data streams. Flink also allows seamless transition and switching across these APIs. To learn more about Flink’s layered APIs, refer to layered APIs.

Apache Spark Structured Streaming offers the Dataset and DataFrames APIs, which provide high-level declarative streaming APIs to represent static, bounded data as well as streaming, unbounded data. Operations are supported in Scala, Java, Python, and R. Spark has a rich function set and syntax with simple constructs for selection, aggregation, windowing, joins, and more. You can also use the Streaming Table API to read tables as streaming DataFrames as an extension to the DataFrames API. Although it’s hard to draw direct parallels between Flink and Spark across all stream processing constructs, at a very high level, we could say Spark Structured Streaming APIs are equivalent to Flink’s Table and SQL APIs. Spark Structured Streaming, however, does not yet (at the time of this writing) offer an equivalent to the lower-level APIs in Flink that offer granular control of time and state.

Both Flink and Spark Structured Streaming (referenced as Spark henceforth) are evolving projects. The following table provides a simple comparison of Flink and Spark capabilities for common streaming primitives (as of this writing).

. Flink Spark
Row-based processing Yes Yes
User-defined functions Yes Yes
Fine-grained access to state Yes, via DataStream and low-level APIs No
Control when state eviction occurs Yes, via DataStream and low-level APIs No
Flexible data structures for state storage and querying Yes, via DataStream and low-level APIs No
Timers for processing and stateful operations Yes, via low level APIs No

In the following sections, we cover the greatest common factors so that we can showcase how Spark users can relate to Flink and vice versa. To learn more about Flink’s low-level APIs, refer to Process Function. For the sake of simplicity, we cover the four use cases in this post using the Flink Table API. We use a combination of Python and SQL for an apples-to-apples comparison with Spark.

Data preparation

In this section, we compare data preparation methods for Spark and Flink.

Reading data

We first look at the simplest ways to read data from a data stream. The following sections assume the following schema for messages:

symbol: string,
price: int,
timestamp: timestamp,
company_info:
{
    name: string,
    employees_count: int
}

Reading data from a source in Spark Structured Streaming

In Spark Structured Streaming, we use a streaming DataFrame in Python that directly reads the data in JSON format:

spark = ...  # spark session

# specify schema
stock_ticker_schema = ...

# Create a streaming DataFrame
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "mybroker1:port") \
    .option("topic", "stock_ticker") \
    .load()
    .select(from_json(col("value"), stock_ticker_schema).alias("ticker_data")) \
    .select(col("ticker_data.*"))

Note that we have to supply a schema object that captures our stock ticker schema (stock_ticker_schema). Compare this to the approach for Flink in the next section.

Reading data from a source using Flink Table API

For Flink, we use the SQL DDL statement CREATE TABLE. You can specify the schema of the stream just like you would any SQL table. The WITH clause allows us to specify the connector to the data stream (Kafka in this case), the associated properties for the connector, and data format specifications. See the following code:

# Create table using DDL

CREATE TABLE stock_ticker (
  symbol string,
  price INT,
  timestamp TIMESTAMP(3),
  company_info STRING,
  WATERMARK FOR timestamp AS timestamp - INTERVAL '3' MINUTE
) WITH (
 'connector' = 'kafka',
 'topic' = 'stock_ticker',
 'properties.bootstrap.servers' = 'mybroker1:port',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)

JSON flattening

JSON flattening is the process of converting a nested or hierarchical JSON object into a flat, single-level structure. This converts multiple levels of nesting into an object where all the keys and values are at the same level. Keys are combined using a delimiter such as a period (.) or underscore (_) to denote the original hierarchy. JSON flattening is useful when you need to work with a more simplified format. In both Spark and Flink, nested JSONs can be complicated to work with and may need additional processing or user-defined functions to manipulate. Flattened JSONs can simplify processing and improve performance due to reduced computational overhead, especially with operations like complex joins, aggregations, and windowing. In addition, flattened JSONs can help in easier debugging and troubleshooting data processing pipelines because there are fewer levels of nesting to navigate.

JSON flattening in Spark Structured Streaming

JSON flattening in Spark Structured Streaming requires you to use the select method and specify the schema that you need flattened. JSON flattening in Spark Structured Streaming involves specifying the nested field name that you’d like surfaced to the top-level list of fields. In the following example, company_info is a nested field and within company_info, there’s a field called company_name. With the following query, we’re flattening company_info.name to company_name:

stock_ticker_df = ...  # Streaming DataFrame w/ schema shown above

stock_ticker_df.select("symbol", "timestamp", "price", "company_info.name" as "company_name")

JSON flattening in Flink

In Flink SQL, you can use the JSON_VALUE function. Note that you can use this function only in Flink versions equal to or greater than 1.14. See the following code:

SELECT
   symbol,
   timestamp,
   price,
   JSON_VALUE(company_info, 'lax $.name' DEFAULT NULL ON EMPTY) AS company_name
FROM
   stock_ticker

The term lax in the preceding query has to do with JSON path expression handling in Flink SQL. For more information, refer to System (Built-in) Functions.

Data processing

Now that you have read the data, we can look at a few common data processing patterns.

Deduplication

Data deduplication in stream processing is crucial for maintaining data quality and ensuring consistency. It enhances efficiency by reducing the strain on the processing from duplicate data and helps with cost savings on storage and processing.

Spark Streaming deduplication query

The following code snippet is related to a Spark Streaming DataFrame named stock_ticker. The code performs an operation to drop duplicate rows based on the symbol column. The dropDuplicates method is used to eliminate duplicate rows in a DataFrame based on one or more columns.

stock_ticker = ...  # Streaming DataFrame w/ schema shown above

stock_ticker.dropDuplicates("symbol")

Flink deduplication query

The following code shows the Flink SQL equivalent to deduplicate data based on the symbol column. The query retrieves the first row for each distinct value in the symbol column from the stock_ticker stream, based on the ascending order of proctime:

SELECT symbol, timestamp, price
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY proctime ASC) AS row_num
  FROM stock_ticker)
WHERE row_num = 1

Windowing

Windowing in streaming data is a fundamental construct to process data within specifications. Windows commonly have time bounds, number of records, or other criteria. These time bounds bucketize continuous unbounded data streams into manageable chunks called windows for processing. Windows help in analyzing data and gaining insights in real time while maintaining processing efficiency. Analyses or operations are performed on constantly updating streaming data within a window.

There are two common time-based windows used both in Spark Streaming and Flink that we will detail in this post: tumbling and sliding windows. A tumbling window is a time-based window that is a fixed size and doesn’t have any overlapping intervals. A sliding window is a time-based window that is a fixed size and moves forward in fixed intervals that can be overlapping.

Spark Streaming tumbling window query

The following is a Spark Streaming tumbling window query with a window size of 10 minutes:

stock_ticker = ...  # Streaming DataFrame w/ schema shown above

# Get max stock price in tumbling window
# of size 10 minutes
visitsByWindowAndUser = visits
   .withWatermark("timestamp", "3 minutes")
   .groupBy(
      window(stock_ticker.timestamp, "10 minutes"),
      stock_ticker.symbol)
   .max(stock_ticker.price)

Flink Streaming tumbling window query

The following is an equivalent tumbling window query in Flink with a window size of 10 minutes:

SELECT symbol, MAX(price)
  FROM TABLE(
    TUMBLE(TABLE stock_ticker, DESCRIPTOR(timestamp), INTERVAL '10' MINUTES))
  GROUP BY ticker;

Spark Streaming sliding window query

The following is a Spark Streaming sliding window query with a window size of 10 minutes and slide interval of 5 minutes:

stock_ticker = ...  # Streaming DataFrame w/ schema shown above

# Get max stock price in sliding window
# of size 10 minutes and slide interval of size
# 5 minutes

visitsByWindowAndUser = visits
   .withWatermark("timestamp", "3 minutes")
   .groupBy(
      window(stock_ticker.timestamp, "10 minutes", "5 minutes"),
      stock_ticker.symbol)
   .max(stock_ticker.price)

Flink Streaming sliding window query

The following is a Flink sliding window query with a window size of 10 minutes and slide interval of 5 minutes:

SELECT symbol, MAX(price)
  FROM TABLE(
    HOP(TABLE stock_ticker, DESCRIPTOR(timestamp), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY ticker;

Handling late data

Both Spark Structured Streaming and Flink support event time processing, where a field within the payload can be used for defining time windows as distinct from the wall clock time of the machines doing the processing. Both Flink and Spark use watermarking for this purpose.

Watermarking is used in stream processing engines to handle delays. A watermark is like a timer that sets how long the system can wait for late events. If an event arrives and is within the set time (watermark), the system will use it to update a request. If it’s later than the watermark, the system will ignore it.

In the preceding windowing queries, you specify the lateness threshold in Spark using the following code:

.withWatermark("timestamp", "3 minutes")

This means that any records that are 3 minutes late as tracked by the event time clock will be discarded.

In contrast, with the Flink Table API, you can specify an analogous lateness threshold directly in the DDL:

WATERMARK FOR timestamp AS timestamp - INTERVAL '3' MINUTE

Note that Flink provides additional constructs for specifying lateness across its various APIs.

Data enrichment

In this section, we compare data enrichment methods with Spark and Flink.

Calling an external API

Calling external APIs from user-defined functions (UDFs) is similar in Spark and Flink. Note that your UDF will be called for every record processed, which can result in the API getting called at a very high request rate. In addition, in production scenarios, your UDF code often gets run in parallel across multiple nodes, further amplifying the request rate.

For the following code snippets, let’s assume that the external API call entails calling the function:

response = my_external_api(request)

External API call in Spark UDF

The following code uses Spark:

class Predict(ScalarFunction):
def open(self, function_context):

with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)

def eval(self, x):
return self.model.predict(x)

External API call in Flink UDF

For Flink, assume we define the UDF callExternalAPIUDF, which takes as input the ticker symbol symbol and returns enriched information about the symbol via a REST endpoint. We can then register and call the UDF as follows:

callExternalAPIUDF = udf(callExternalAPIUDF(), result_type=DataTypes.STRING())

SELECT
    symbol, 
    callExternalAPIUDF(symbol) as enriched_symbol
FROM stock_ticker;

Flink UDFs provide an initialization method that gets run one time (as opposed to one time per record processed).

Note that you should use UDFs judiciously as an improperly implemented UDF can cause your job to slow down, cause backpressure, and eventually stall your stream processing application. It’s advisable to use UDFs asynchronously to maintain high throughput, especially for I/O-bound use cases or when dealing with external resources like databases or REST APIs. To learn more about how you can use asynchronous I/O with Apache Flink, refer to Enrich your data stream asynchronously using Amazon Kinesis Data Analytics for Apache Flink.

Conclusion

Apache Flink and Apache Spark are both rapidly evolving projects and provide a fast and efficient way to process big data. This post focused on the top use cases we commonly encountered when customers wanted to see parallels between the two technologies for building real-time stream processing applications. We’ve included samples that were most frequently requested at the time of this writing. Let us know if you’d like more examples in the comments section.


About the author

Deepthi Mohan is a Principal Product Manager on the Amazon Kinesis Data Analytics team.

Karthi Thyagarajan was a Principal Solutions Architect on the Amazon Kinesis team.

Temporal data lake architecture for benchmark and indices analytics

Post Syndicated from Krishna Gogineni original https://aws.amazon.com/blogs/architecture/temporal-data-lake-architecture-for-benchmark-and-indices-analytics/

Financial trading houses and stock exchanges generate enormous volumes of data in near real-time, making it difficult to perform bi-temporal calculations that yield accurate results. Achieving this requires a processing architecture that can handle large volumes of data during peak bursts, meet strict latency requirements, and scale according to incoming volumes.

In this post, we’ll describe a scenario for an industry leader in the financial services sector and explain how AWS services are used for bi-temporal processing with state management and scale based on variable workloads during the day, all while meeting strict service-level agreement (SLA) requirements.

Problem statement

To design and implement a fully temporal transactional data lake with the repeatable read isolation level for queries is a challenge, particularly with burst events that need the overall architecture to scale accordingly. The data store in the overall architecture needs to record the value history of data at different times, which is especially important for financial data. Financial data can include corporate actions, annual or quarterly reports, or fixed-income securities, like bonds that have variable rates. It’s crucial to be able to correct data inaccuracies during the reporting period.

The example customer seeks a data processing platform architecture to dynamically scale based on the workloads with a capacity of processing 150 million records under 5 minutes. Their platform should be capable of meeting the end-to-end SLA of 15 minutes, from ingestion to reporting, with lowest total cost of ownership. Additionally, managing bi-temporal data requires a database that has critical features, such as ACID (atomicity, consistency, isolation, durability) compliance, time-travel capability, full-schema evolution, partition layout and evolution, rollback to prior versions, and SQL-like query experience.

Solution overview

The solution architecture key building blocks are Amazon Kinesis Data Streams for streaming data, Amazon Kinesis Data Analytics with Apache Flink as processing engine, Flink’s RocksDB for state management, and Apache Iceberg on Amazon Simple Storage Service (Amazon S3) as the storage engine (Figure 1).

End-to-end data-processing architecture

Figure 1. End-to-end data-processing architecture

Data processing

Here’s how it works:

  • A publisher application receives the data from the source systems and publishes data into Kinesis Data Streams using a well-defined JSON format structure.
  • Kinesis Data Streams holds the data for a duration that is configurable so data is not lost and can auto scale based on the data volume ingested.
  • Kinesis Data Analytics runs an Apache Flink application, with state management (RocksDB), to handle bi-temporal calculations. The Apache Flink application consumes data from Kinesis Data Streams and performs the following computations:
    • Transforms the JSON stream into a row-type record, compatible with a SQL table-like structure, resolving nesting and parent–child relationships present within the stream
    • Checks whether the record has already an existing state in in-memory RocksDB or disk attached to Kinesis Data Analytics computational node to avoid read latency from the database, which is critical for meeting the performance requirements
    • Performs bi-temporal calculations and creates the resultant records in an in-memory data structure before invoking the Apache Iceberg sink operator
    • The Apache Flink application sink operator appends the temporal states, expressed as records into existing Apache Iceberg data store. This will comply with key principles of time series data, which is immutable, and the ability to time-travel along with ACID compliance, schema evolution, and partition evolution
  • Kinesis Data Analytics is resilient and provides a no-data-loss capability, with features like periodic checkpoints and savepoints. They are used to store the state management in a secure Amazon S3 location that can be accessed outside of Kinesis Data Analytics. This savepoints mechanism can be used to programmatically to scale the cluster size based on the workloads using time-driven scheduling and AWS Lambda functions.
  • If the time-to-live feature of RocksDB is implemented, old records are stored in Apache Iceberg on Amazon S3. When performing temporal calculations, if the state is not found in memory, data is read from Apache Iceberg into RocksDB and the processing is completed. However, this step is optional and can be circumvented if the Kinesis Data Analytics cluster is initialized with right number of Kinesis processing units to hold the historical information, as per requirements.
  • Because the data is stored in an Apache Iceberg table format in Amazon S3, data is queried using Trino, which supports Apache Iceberg table format.
  • The end user queries data using any SQL tool that supports the Trino query engine.

Apache Iceberg maintenance jobs, such as data compaction, expire snapshot, delete orphan files, can be launched using Amazon Athena to optimize performance out of Apache Iceberg data store. Details of each processing step performed in Apache Flink application are captured using Amazon CloudWatch, which logs all the events.

Scalability

Amazon EventBridge scheduler invokes a Lambda function to scale the Kinesis Data Analytics. Kinesis Data Analytics has a short outage during rescaling that is proportional to the amount of data stored in RocksDB, which is why a state management strategy is necessary for the proper operation of the system.

Figure 2 shows the scaling process, which depicts:

  • Before peak load: The Kinesis Data Analytics cluster is processing off-peak records with minimum configuration before the peak load. A scheduled event is launched from EventBridge that invokes a Lambda function, which shuts down the cluster using the savepoint mechanism and scales up the Kinesis Data Analytics cluster to required Kinesis processing units.
  • During peak load: When the peak data burst happens, the Kinesis Data Analytics cluster is ready to handle the volume of data from Kinesis Data Stream, and processes it within the SLA of 5 minutes.
  • After peak load: A scheduled event from EventBridge invokes a Lambda function to scale down the Kinesis Data Analytics cluster to the minimum configuration that holds the required state for the entire volume of records.
Cluster scaling before, during, and after peak data volume processing

Figure 2. Cluster scaling before, during, and after peak data volume processing

Performance insights

With the discussed architecture, we want to demonstrate that the we are able to meet the SLAs, in terms of performance and processing times. We have taken a subset of benchmarks and indices data and processed the same with the end-to-end architecture. During the process, we observed some very interesting findings, which we would like to share.

Processing time for Apache Iceberg Upsert vs Append operations: During our tests, we expected Upsert operation to be faster than append. But on the contrary, we noticed that Append operations were faster compared to Upsert even though more computations are performed in the Apache Flink application. In our test with 3,500,000 records, Append operation took 1556 seconds while Upsert took 1675 seconds to process the data (Figure 3).

Processing times for Upsert vs. Append

Figure 3. Processing times for Upsert vs. Append

Compute consumption for Apache Iceberg Upsert vs. Append operations: Comparing the compute consumption for 10,000,000 records, we noticed that Append operation was able to process the data in the same amount of time as Upsert operation but with less compute resources. In our tests, we have noted that Append operation only consumed 64 Kinesis processing units, whereas Upsert consumed 78 Kinesis processing units (Figure 4).

Comparing consumption for Upsert vs. Append

Figure 4. Comparing consumption for Upsert vs. Append

Scalability vs performance: To achieve the desired data processing performance, we need a specific configuration of Kinesis processing units, Kinesis Data Streams, and Iceberg parallelism. In our test with the data that we chose, we started with four Kinesis processing units and four Kinesis data streams for data processing. We observed an 80% performance improvement in data processing with 16 Kinesis data processing units. An additional 6% performance improvement was demonstrated when we scaled to 32 Kinesis processing units. When we increased the Kinesis data streams to 16, we observed an additional 2% performance improvement (Figure 5).

Scalability vs. performance

Figure 5. Scalability vs. performance

Data volume processing times for Upsert vs. Append: For this test, we started with 350,000 records of data. When we increased data volume to 3.5M records, we observed that Append performing better than Upsert, demonstrating a five-fold increase in processing time (Figure 6).

Data volume processing times for Upsert vs. Append

Figure 6. Data volume processing times for Upsert vs. Append

Conclusion

The architecture we explored today scales based on the data-volume requirements of the customer and is capable of meeting the end-to-end SLA of 15 minutes, with a potential lowered total cost of ownership. Additionally, the solution is capable of handling high-volume, bi-temporal computations with ACID compliance, time travel, full-schema evolution, partition layout evolution, rollback to prior versions and SQL-like query experience.

Further reading

New Solution – Clickstream Analytics on AWS for Mobile and Web Applications

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/new-solution-clickstream-analytics-on-aws-for-mobile-and-web-applications/

Starting today, you can deploy on your AWS account an end-to-end solution to capture, ingest, store, analyze, and visualize your customers’ clickstreams inside your web and mobile applications (both for Android and iOS). The solution is built on top of standard AWS services.

This new solution Clickstream Analytics on AWS allows you to keep your data in the security and compliance perimeter of your AWS account and customize the processing and analytics as you require, giving you the full flexibility to extract value for your business. For example, many business line owners want to combine clickstream analytics data with business system data to gain more comprehensive insights. Storing clickstream analysis data in your AWS account allows you to cross reference the data with your existing business system, which is complex to implement when you use a third-party analytics solution that creates an artificial data silo.

Clickstream Analytics on AWS is available from the AWS Solutions Library at no cost, except for the services it deploys on your account.

Why Analyze Your Applications Clickstreams?
Organizations today are in search of vetted solutions and architectural guidance to rapidly solve business challenges. Whether you prefer off-the-shelf deployments or customizable architectures, the AWS Solutions Library carries solutions built by AWS and AWS Partners for a broad range of industry and technology use cases.

When I talk with mobile and web application developers or product owners, you often tell me that you want to use a clickstream analysis solution to understand your customers’ behavior inside your application. Click stream analysis solutions help you to identify popular and frequently visited screens, analyze navigation patterns, identify bottlenecks and drop-off points, or perform A/B testing of functionalities such as the pay wall, but you face two challenges to adopt or build a click stream analysis solution.

Either you use a third-party library and analytics solution that sends all your application and customer data to an external provider, which causes security and compliance risks and makes it more difficult to reference your existing business data to enrich the analysis, or you dedicate time and resources to build your own solution based on AWS services, such as Amazon Kinesis (for data ingestion), Amazon EMR (for processing), Amazon Redshift (for storage), and Amazon QuickSight (for visualization). Doing so ensures your application and customer data stay in the security perimeter of your AWS account, which is already approved and vetted by your information and security team. Often, building such a solution is an undifferentiated task that drives resources and budget away from developing the core business of your application.

Introducing Clickstream Analytics on AWS
The new solution Clickstream Analytics on AWS provides you with a backend for data ingestion, processing, and visualization of click stream data. It’s shipped as an AWS CloudFormation template that you can easily deploy into the AWS account of your choice.

In addition to the backend component, the solution provides you with purpose-built Java and Swift SDKs to integrate into your mobile applications (for both Android and iOS). The SDKs automatically collects data and provide developers with an easy-to-use API to collect application-specific data. They manage the low-level tasks of buffering the data locally, sending them to the backend, managing the retries in case of communication errors, and more.

The following diagram shows you the high-level architecture of the solution.

Clickstream analysis - architecture

The solution comes with an easy-to-use console to configure your solution. For example, it allows you to choose between three AWS services to ingest the application clickstream data: Amazon Managed Streaming for Apache Kafka, Amazon Kinesis Data Streams, or Amazon Simple Storage Service (Amazon S3). You can create multiple data pipelines for multiple applications or teams, each using a different configuration. This allows you to adjust the backend to the application user base and requirements.

You can use plugins to transform the data during the processing phase. The solution comes with two plugins preinstalled: User-Agent enrichment and IP address enrichment to add additional data that’s related to the User-Agent and the geolocation of the IP address used by the client applications.

By default, it provides a Amazon Redshift Serverless cluster to minimize the costs, but you can select a provisioned Amazon Redshift configuration to meet your performance and budget requirements.

Finally, the solution provides you with a set of pre-assembled visualization dashboards to report on user acquisition, user activity, and user engagement. The dashboard consumes the data available in Amazon Redshift. You’re free to develop other analytics and other dashboards using the tools and services of your choice.

Let’s See It in Action
The best way to learn how to deploy and to configure Clickstream Analytics on AWS is to follow the tutorial steps provided by the Clickstream Analytics on AWS workshop.

The workshop goes into great detail about each step. Here are the main steps I did to deploy the solution:

1. I create the control plane (the management console) of the solution using this CloudFormation template. The output of the template contains the URL to the management console. I later receive an email with a temporary password for the initial connection.

2. On the Clickstream Analytics console, I create my first project and define various network parameters such as the VPC, subnets, and security groups. I also select the service to use for data ingestion and my choice of configuration for Amazon Redshift.

Clickstream analysis - Create project

Clickstream analysis - data sink

3. When I enter all configuration data, the console creates the data plane for my application.

AWS services and solutions are usually built around a control plane and one or multiple data planes. In the context of Clickstream Analytics, the control plane is the console that I use to define my data acquisition and analysis project. The data plane is the infrastructure to receive, analyze, and visualize my application data. Now that I define my project, the console generates and launches another CloudFormation template to create and manage the data plane.

4. The Clickstream Analytics console generates a JSON configuration file to include into my application and it shares the Java or Swift code to include into my Android or iOS application. The console provides instructions to add the clickstream analysis as a dependency to my application. I also update my application code to insert the code suggested and start to deploy.

Clickstream analysis - code for your applications

5. After my customers start to use the mobile app, I access the Clickstream Analytics dashboard to visualize the data collected.

The Dashboards
Clickstream Analytics dashboards are designed to provide a holistic view of the user lifecycle: the acquisition, the engagement, the activity, and the retention. In addition, it adds visibility into user devices and geographies. The solution automatically generates visualizations in these six categories: Acquisition, Engagement, Activity, Retention, Devices, and Navigation path. Here are a couple of examples.

The Acquisition dashboard reports the total number of users, the registered number of users (the ones that signed in), and the number of users by traffic source. It also computes the new users and registered users’ trends.

Clickstream analysis - acquisition dashboard

The Engagement dashboard reports the user engagement level (the number of user sessions versus the time users spent on my application). Specifically, I have access to the number of engaged sessions (sessions that last more than 10 seconds or have at least two screen views), the engagement rate (the percentage of engaged sessions from the total number of sessions), and the average engagement time.

Clickstream analysis - engagement dashboard

The Activity dashboard shows the event and actions taken by my customers in my application. It reports data, such as the number of events and number of views (or screens) shown, with the top events and views shown for a given amount of time.

Clickstream analysis - activity dashboard

The Retention tab shows user retention over time: the user stickiness for your daily, weekly, and monthly active users. It also shows the rate of returning users versus new users.

Clickstream analysis - retention

The Device tab shows data about your customer’s devices: operating systems, versions, screen sizes, and language.

Clickstream analysis - devices dashboard

And finally, the Path explorer dashboard shows your customers’ navigation path into the screens of your applications.

Clickstream analysis - path explorer dashboard

As I mentioned earlier, all the data are available in Amazon Redshift, so you’re free to build other analytics and dashboards.

Pricing and Availability
The Clickstream Analytics solution is available free of charge. You pay for the AWS services provisioned for you, including Kinesis or Amazon Redshift. Cost estimates depend on the configuration that you select. For example, the size of the Kinesis and Amazon Redshift cluster you select for your data ingestion and analytics needs, or the volume of data your applications send to the pipeline both affect the monthly cost of the solution.

To learn how to get started with this solution, take the Clickstream Analytics workshop today and stop sharing your customer and application clickstream data with third-party solutions.

— seb

Stream VPC Flow Logs to Datadog via Amazon Kinesis Data Firehose

Post Syndicated from Chaitanya Shah original https://aws.amazon.com/blogs/big-data/stream-vpc-flow-logs-to-datadog-via-amazon-kinesis-data-firehose/

It’s common to store the logs generated by customer’s applications and services in various tools. These logs are important for compliance, audits, troubleshooting, security incident responses, meeting security policies, and many other purposes. You can perform log analysis on these logs to understand users’ application behavior and patterns to make informed decisions.

When running workloads on Amazon Web Services (AWS), you need to analyze Amazon Virtual Private Cloud (Amazon VPC) Flow Logs to track the IP traffic going to and from the network interfaces for the workloads in their VPC. Analyzing VPC flow logs helps you understand how your applications are communicating over the VPC network and acts as a main source of information to the network in your VPC.

You can easily deliver data to supported destinations using the Amazon Kinesis Data Firehose integration with VPC flow logs. Kinesis Data Firehose is a fully managed service for delivering near-real-time streaming data to various destinations for storage and performing near-real-time analytics. With its extensible data transformation capabilities, you can also streamline log processing and log delivery pipelines into a single Kinesis Data Firehose delivery stream. You can perform analytics on VPC flow logs delivered from your VPC using the Kinesis Data Firehose integration with Datadog as a destination.

Datadog is a monitoring and security platform and AWS Partner Network (APN) Advanced Technology Partner with AWS Competencies in AWS Cloud Operations, DevOps, Migration, Security, Networking, Containers, and Microsoft Workloads, along with many others.

Datadog enables you to easily explore and analyze logs to gain deeper insights into the state of your applications and AWS infrastructure. You can analyze all your AWS service logs while storing only the ones you need, generate metrics from aggregated logs to uncover, and send alerts about trends in your AWS services.

In this post, you learn how to integrate VPC flow logs with Kinesis Data Firehose and deliver it to Datadog.

Solution overview

This solution uses native integration of VPC flow logs streaming to Kinesis Data Firehose. We use a Kinesis Data Firehose delivery stream to buffer the streamed VPC flow logs to a Datadog destination endpoint in your Datadog account. You can use these logs with Datadog Log Management and Datadog Cloud SIEM to analyze the health, performance, and security of your cloud resources.

The following diagram illustrates the solution architecture.

We walk you through the following high-level steps:

  1. Link your AWS account with your Datadog account.
  2. Create the Kinesis Data Firehose stream where VPC service streams the flow logs.
  3. Create the VPC flow log subscription to Kinesis Data Firehose.
  4. Visualize VPC flow logs in the Datadog dashboard.

The account ID 123456781234 used in this post is a dummy account. It is used only for demonstration purposes.

Prerequisites

You should have the following prerequisites:

Link your AWS account with your Datadog account for AWS integration

Follow the instructions provided on the Datadog website for AWS Integration. To configure log archiving and enrich the log data sent from your AWS account with useful context, link the accounts. When you complete the linking setup, proceed to the following step.

Create a Kinesis Data Firehose stream

Now that your Datadog integration with AWS is complete, you can create a Kinesis Data Firehose delivery stream where VPC Flow Logs are streamed by following these steps:

  1. On the Amazon Kinesis console, choose Kinesis Data Firehose in the navigation pane.
  2. Choose Create delivery stream.
  3. Choose Direct PUT as the source.
  4. Set Destination as Datadog.
    Create delivery stream
  1. For Delivery stream name, enter PUT-DATADOG-DEMO.
  2. Keep Data transformation set to Disabled under Transform records.
  3. In Destination settings, for HTTP endpoint URL, choose the desired log’s HTTP endpoint based on your Region and Datadog account configuration.
    Kinesis delivery stream configuration
  4. For API key, enter your Datadog API key.

This allows your delivery stream to publish VPC Flow logs to the Datadog endpoint. API keys are unique to your organization. An API key is required by the Datadog Agent to submit metrics and events to Datadog.

  1. Set Content encoding to GZIP to reduce the size of data transferred.
  2. Set the Retry duration to 60.You can change the Retry duration value if you need to. This depends on the request handling capacity of the Datadog endpoint.
    Kinesis destination settings
    Under Buffer hints, Buffer size and Buffer interval are set with default values for Datadog integration.
    Kinesis buffer settings
  1. Under Backup settings, as mentioned in the prerequisites, choose the S3 bucket that you created to store failed logs and backup with specific prefix.
  2. Under S3 buffer hints section, set Buffer size to 5 and Buffer interval to 300.

You can change the S3 buffer size and interval based on your requirements.

  1. Under S3 compression and encryption, select GZIP for Compression for data records or another compression method of your choice.

Compressing data reduces the required storage space.

  1. Select Disabled for Encryption of the data records. You can enable encryption of the data records to secure access to your logs.
    Kinesis stream backup settings
  1. Optionally, in Advanced settings, select Enable server-side encryption for source records in delivery stream.
    You can use AWS managed keys or a CMK managed by you for the encryption type.
  1. Enable CloudWatch error logging.
  2. Choose Create or update IAM role, which is created by Kinesis Data Firehose as part of this stream.
    Kinesis stream Advanced settings
  1. Choose Next.
  2. Review your settings.
  3. Choose Create delivery stream.

Create a VPC flow logs subscription

Create a VPC flow logs subscription for the Kinesis Data Firehose delivery stream you created in the previous step:

  1. On the Amazon VPC console, choose Your VPCs.
  2. Select the VPC that you to create the flow log for.
  3. On the Actions menu, choose Create flow log.
    AWS VPCs
  1. Select All to send all flow log records to the Firehose destination.

If you want to filter the flow logs, you could alternatively select Accept or Reject.

  1. For Maximum aggregation interval, select 10 minutes or the minimum setting of 1 minute if you need the flow log data to be available for near-real-time analysis in Datadog.
  2. For Destination, select Send to Kinesis Data Firehose in the same account if the delivery stream is set up on the same account where you create the VPC flow logs.

If you want to send the data to a different account, refer to Publish flow logs to Kinesis Data Firehose.

  1. Choose an option for Log record format:
  2. If you leave Log record format as the AWS default format, the flow logs are sent as version 2 format.
  3. Alternatively, you can specify the custom fields for flow logs to capture and send it to Datadog.

For more information on log format and available fields, refer to Flow log records.

  1. Choose Create flow log.
    Create VPC Flow log

Now let’s explore the VPC flow logs in Datadog.

Visualize VPC flow logs in the Datadog dashboard

In the Logs Search option in the navigation pane, filter to source:vpc. The VPC flow logs from your VPC are in the Datadog Log Explorer and are automatically parsed so you can analyze your logs by source, destination, action, or other attributes.

Datadog Logs Dashboard

Clean up

After you test this solution, delete all the resources you created to avoid incurring future charges. Refer to the following links for instructions for deleting the resources:

Conclusion

In this post, we walked through a solution of how to integrate VPC flow logs with a Kinesis Data Firehose delivery stream, deliver it to a Datadog destination with no code, and visualize it in a Datadog dashboard. With Datadog, you can easily explore and analyze logs to gain deeper insights into the state of your applications and AWS infrastructure.

Try this new, quick, and hassle-free way of sending your VPC flow logs to a Datadog destination using Kinesis Data Firehose.


About the Author

Chaitanya Shah - AWS Chaitanya Shah is a Sr. Technical Account Manager(TAM) with AWS, based out of New York. He has over 22 years of experience working with enterprise customers. He loves to code and actively contributes to the AWS solutions labs to help customers solve complex problems. He provides guidance to AWS customers on best practices for their AWS Cloud migrations. He is also specialized in AWS data transfer and the data and analytics domain.

Real-time inference using deep learning within Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/real-time-inference-using-deep-learning-within-amazon-kinesis-data-analytics-for-apache-flink/

Apache Flink is a framework and distributed processing engine for stateful computations over data streams. Amazon Kinesis Data Analytics for Apache Flink is a fully managed service that enables you to use an Apache Flink application to process streaming data. The Deep Java Library (DJL) is an open-source, high-level, engine-agnostic Java framework for deep learning.

In this blog post, we demonstrate how you can use DJL within Kinesis Data Analytics for Apache Flink for real-time machine learning inference. Real-time inference can be valuable in a variety of applications and industries where it is essential to make predictions or take actions based on new data as quickly as possible with low latencies. We show how to load a pre-trained deep learning model from the DJL model zoo into a Flink job and apply the model to classify data objects in a continuous data stream. The DJL model zoo includes a wide variety of pre-trained models for image classification, semantic segmentation, speech recognition, text embedding generation, question answering, and more. It supports HuggingFace, Pytorch, MXNet, and TensorFlow model frameworks and also helps developers create and publish their own models. We will focus on image classification and use a popular classifier called ResNet-18 to produce predictions in real time. The model has been pre-trained on ImageNet with 1.2 million images belonging to 1,000 class labels.

We provide sample code, architecture diagrams, and an AWS CloudFormation template so you can follow along and employ ResNet-18 as your classifier to make real-time predictions. The solution we provide here is a powerful design pattern for continuously producing ML-based predictions on streaming data within Kinesis Data Analytics for Apache Flink. You can adapt the provided solution for your use case and choose an alternative model from the model zoo or even provide your own model.

Image classification is a classic problem that takes an image and selects the best-fitting class, such as whether the image from an autonomous driving system is that of a bicycle or a pedestrian. Common use cases for real-time inference on streams of images include classifying images from vehicle cameras and license plate recognition systems, and classifying images uploaded to social media and ecommerce websites. The use cases typically need low latency while handling high throughput and potentially bursty streams. For example, in ecommerce websites, real-time classification of uploaded images can help in marking pictures of banned goods or hazardous materials that have been supplied by sellers. Immediate determination through streaming inference is needed to trigger alerts and follow-up actions to prevent these images from being part of the catalog. This enables faster decision-making compared to batch jobs that run on a periodic basis. The data stream pipeline can involve multiple models for different purposes, such as classifying uploaded images into ecommerce categories of electronics, toys, fashion, and so on.

Solution overview

The following diagram illustrates the workflow of our solution.

architecture showcasing a kinesis data analytics for apache flink application reading from Images in an Amazon S3 bucket, classifying those images and then writing out to another S3 bucket called "classifications"

The application performs the following steps:

  1. Read in images from Amazon Simple Storage Service (Amazon S3) using the Apache Flink Filesystem File Source connector.
  2. Window the images into a collection of records.
  3. Classify the batches of images using the DJL image classification class.
  4. Write inference results to Amazon S3 at the path specified.

Images are recommended to be of reasonable size so that they may fit into a Kinesis Processing Unit. Images larger than 50MB in size may result in latency in processing and classification.

The main class for this Apache Flink job is located at src/main/java/com.amazon.embeddedmodelinference/EMI.java. Here you can find the main() method and entry point to our Flink job.

Prerequisites

To get started, configure the following prerequisites on your local machine:

Once this is set up, you can clone the code base to access the source code for this solution. The Java application code for this example is available on GitHub. To download the application code, clone the remote repository using the following command:

git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples

Find and navigate to the folder of the image classification example, called image-classification.

An example set of images to stream and test the code is available in the imagenet-sample-images folder.

Let’s walk through the code step by step.

Test on your local machine

If you would like to test this application locally on your machine, ensure you have AWS credentials set up locally on your machine. Additionally, download the Flink S3 Filesystem Hadoop JAR to use with your Apache Flink installation and place it in a folder named plugins/s3 in the root of your project. Then configure the following environment variables either on your IDE or in your machine’s local variable scope:

IS_LOCAL=true;
plugins.dir=<<path-to-flink-s3-fs-hadoop jar>>
s3.access.key=<<aws access key>>
s3.secret.key=<<aws secret key>>

Replace these values with your own.showcasing the environment properties to replace on IntelliJ

After configuring the environment variables and downloading the necessary plugin JAR, let’s look at the code.

In the main method, after setting up our StreamExecutionEnvironment, we define our FileSource to read files from Amazon S3. By default, this source operator reads from a sample bucket. You can replace this bucket name with your own by changing the variable called bucket, or setting the application property on Kinesis Data Analytics for Apache Flink once deployed.

final FileSource<StreamedImage> source =
FileSource.forRecordStreamFormat(new ImageReaderFormat(), new Path(s3SourcePath))
               .monitorContinuously(Duration.ofSeconds(10))
               .build();

The FileSource is configured to read in files in the ImageReaderFormat, and will check Amazon S3 for new images every 10 seconds. This can be configured as well.

After we have read in our images, we convert our FileSource into a stream that can be processed:

DataStream<StreamedImage> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Next, we create a tumbling window of a variable time window duration, specified in the configuration, defaulting to 60 seconds. Every window close creates a batch (list) of images to be classified using a ProcessWindowFunction.

This ProcessWindowFunction calls the classifier predict function on the list of images and returns the best probability of classification from each image. This result is then sent back to the Flink operator, where it’s promptly written out to the S3 bucket path of your configuration.

.process(new ProcessWindowFunction<StreamedImage, String, String, TimeWindow>() {
                    @Override
                    public void process(String s,
                                        ProcessWindowFunction<StreamedImage, String, String, TimeWindow>.Context context,
                                        Iterable<StreamedImage> iterableImages,
                                        Collector<String> out) throws Exception {


                            List<Image> listOfImages = new ArrayList<Image>();
                            iterableImages.forEach(x -> {
                                listOfImages.add(x.getImage());
                            });
                        try
                        {
                            // batch classify images
                            List<Classifications> list = classifier.predict(listOfImages);
                            for (Classifications classifications : list) {
                                Classifications.Classification cl = classifications.best();
                                String ret = cl.getClassName() + ": " + cl.getProbability();
                                out.collect(ret);
                            }
                        } catch (ModelException | IOException | TranslateException e) {
                            logger.error("Failed predict", e);
                        }
                        }
                    });

In Classifier.java, we read the image and apply crop, transpose, reshape, and finally convert to an N-dimensional array that can be processed by the deep learning model. Then we feed the array to the model and apply a forward pass. During the forward pass, the model computes the neural network layer by layer. At last, the output object contains the probabilities for each image object that the model is being trained on. We map the probabilities with the object name and return to the map function.

Deploy the solution with AWS CloudFormation

To run this code base on Kinesis Data Analytics for Apache Flink, we have a helpful CloudFormation template that will spin up the necessary resources. Simply open AWS CloudShell or your local machine’s terminal and enter the following commands. Complete the following steps to deploy the solution:

  1. If you don’t have the AWS Cloud Development Kit (AWS CDK) bootstrapped in your account, run the following command, providing your account number and current Region:
cdk bootstrap aws://ACCOUNT-NUMBER/REGION

The script will clone a GitHub repo of images to classify and upload them to your source S3 bucket. Then it will launch the CloudFormation stack given your input parameters. video walking through the setup of the cloudformation template. Described in text later

  1. Enter the following code and replace the BUCKET variables with your own source bucket and sink bucket, which will contain the source images and the classifications, respectively:
export SOURCE_BUCKET=s3://SAMPLE-BUCKET/PATH;
export SINK_BUCKET=s3://SAMPLE_BUCKET/PATH;
git clone https://github.com/EliSchwartz/imagenet-sample-images; cd imagenet-sample-images;
aws s3 cp . $SOURCE_BUCKET --recursive --exclude "*/";
aws cloudformation create-stack --stack-name KDAImageClassification --template-url https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-3098/BlogStack.template.json --parameters ParameterKey=inputBucketPath,ParameterValue=$SOURCE_BUCKET ParameterKey=outputBucketPath,ParameterValue=$SINK_BUCKET --capabilities CAPABILITY_IAM;

This CloudFormation stack creates the following resources:

    • A Kinesis Data Analytics application with 1 Kinesis Processing Unit (KPU) preconfigured with some application properties
    • An S3 bucket for your output results
  1. When the stack is complete, navigate to the Kinesis Data Analytics for Apache Flink console.
  2. Find the application called blog-DJL-flink-ImageClassification-application and choose Run.
  3. On the Amazon S3 console, navigate to the bucket you specified in the outputBucketPath variable.

If you have readable images in the source bucket listed, you should see classifications of those images within the checkpoint interval of the running application.

Deploy the solution manually

If you prefer to use your own code base, you can follow the manual steps in this section:

  • After you clone the application locally, create your application JAR by navigating to the directory that contains your pom.xml and running the following command:
mvn clean package

This builds your application JAR in the target/ directory called embedded-model-inference-1.0-SNAPSHOT.jar.

application properties on KDA console

  1. Upload this application JAR to an S3 bucket, either the one created from the CloudFormation template, or another one to store code artifacts.
  2. You can then configure your Kinesis Data Analytics application to point to this newly uploaded S3 JAR file.
  3. This is also a great opportunity to configure your runtime properties, as shown in the following screenshot.
  4. Choose Run to start your application.

You can open the Apache Flink Dashboard to check for application exceptions or to see data flowing through the tasks defined in the code.

image of flink dashboard showing successful running of the application

Validate the results

To validate our results, let’s check the results in Amazon S3 by navigating to the Amazon S3 console and finding our S3 bucket. We can find the output in a folder called output-kda.

image showing folders within amazon s3 partitioned by datetime

When we choose one of the data-partitioned folders, we can see partition files. Ensure that there is no underscore in front of your part file, because this indicates that the results are still being finalized according to the rollover interval defined in Apache Flink’s FileSink connector. After the underscores have disappeared, we can use Amazon S3 Select to view our data.

partition files as they land in Amazon S3

We now have a solution that continuously performs classification on incoming images in real time using Kinesis Data Analytics for Apache Flink. It extracts a pre-trained classification model (ResNet-18) from the DJL model zoo, applies some preprocessing, loads the model into a Flink operator’s memory, and continuously applies the model for online predictions on streaming images.

Although we used ResNet-18 in this post, you can choose another model by modifying the classifier. The DJL model zoo provides many other models, both for classification and other applications, that can be used out of the box. You can also load your custom model by providing an S3 link or URL to the criteria. DJL supports models in a large number of engines such as PyTorch, ONNX, TensorFlow, and MXNet. Using a model in the solution is relatively simple. All of the preprocessing and postprocessing code is encapsulated in the (built-in) translator, so all we have to do is load the model, create a predictor, and call predict(). This is done within the data source operator, which processes the stream of input data and sends the links to the data to the inference operator where the model you selected produces the prediction. Then the sink operator writes the results.

The CloudFormation template in this example focused on a simple 1 KPU application. You could extend the solution to further scale out to large models and high-throughput streams, and support multiple models within the pipeline.

Clean up

To clean up the CloudFormation script you launched, complete the following steps:

  1. Empty the source bucket you specified in the bash script.
  2. On the AWS CloudFormation console, locate the CloudFormation template called KDAImageClassification.
  3. Delete the stack, which will remove all of the remaining resources created for this post.
  4. You may optionally delete the bootstrapping CloudFormation template, CDKToolkit, which you launched earlier as well.

Conclusion

In this post, we presented a solution for real-time classification using the Deep Java Library within Kinesis Data Analytics for Apache Flink. We shared a working example and discussed how you can adapt the solution for your specific ML use case. If you have any feedback or questions, please leave them in the comments section.


About the Authors

Jeremy Ber has been working in the telemetry data space for the past 9 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. At AWS, he is a Streaming Specialist Solutions Architect, supporting both Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Deepthi Mohan is a Principal Product Manager for Amazon Kinesis Data Analytics, AWS’s managed offering for Apache Flink.

Gaurav Rele is a Data Scientist at the Amazon ML Solution Lab, where he works with AWS customers across different verticals to accelerate their use of machine learning and AWS Cloud services to solve their business challenges.

Real-time time series anomaly detection for streaming applications on Amazon Kinesis Data Analytics

Post Syndicated from Antonio Vespoli original https://aws.amazon.com/blogs/big-data/real-time-time-series-anomaly-detection-for-streaming-applications-on-amazon-kinesis-data-analytics/

Detecting anomalies in real time from high-throughput streams is key for informing on timely decisions in order to adapt and respond to unexpected scenarios. Stream processing frameworks such as Apache Flink empower users to design systems that can ingest and process continuous flows of data at scale. In this post, we present a streaming time series anomaly detection algorithm based on matrix profiles and left-discords, inspired by Lu et al., 2022, with Apache Flink, and provide a working example that will help you get started on a managed Apache Flink solution using Amazon Kinesis Data Analytics.

Challenges of anomaly detection

Anomaly detection plays a key role in a variety of real-world applications, such as fraud detection, sales analysis, cybersecurity, predictive maintenance, and fault detection, among others. The majority of these use cases require actions to be taken in near real-time. For instance, card payment networks must be able to identify and reject potentially fraudulent transactions before processing them. This raises the challenge to design near-real-time anomaly detection systems that are able to scale to ultra-fast arriving data streams.

Another key challenge that anomaly detection systems face is concept drift. The ever-changing nature of some use cases requires models to dynamically adapt to new scenarios. For instance, in a predictive maintenance scenario, you could use several Internet of Things (IoT) devices to monitor the vibrations produced by an electric motor with the objective of detecting anomalies and preventing unrecoverable damage. Sounds emitted by the vibrations of the motor can vary significantly over time due to different environmental conditions such as temperature variations, and this shift in pattern can invalidate the model. This class of scenarios creates the necessity for online learning—the ability of the model to continuously learn from new data.

Time series anomaly detection

Time series are a particular class of data that incorporates time in their structuring. The data points that characterize a time series are recorded in an orderly fashion and are chronological in nature. This class of data is present in every industry and is common at the core of many business requirements or key performance indicators (KPIs). Natural sources of time series data include credit card transactions, sales, sensor measurements, machine logs, and user analytics.

In the time series domain, an anomaly can be defined as a deviation from the expected patterns that characterize the time series. For instance, a time series can be characterized by its expected ranges, trends, seasonal, or cyclic patterns. Any significant alteration of this normal flow of data points is considered an anomaly.

Detecting anomalies can be more or less challenging depending on the domain. For instance, a threshold-based approach might be suitable for time series that are informed of their expected ranges, such as the working temperature of a machine or CPU utilization. On the other hand, applications such as fraud detection, cybersecurity, and predictive maintenance can’t be classified via simple rule-based approaches and require a more fine-grained mechanism to capture unexpected observations. Thanks to their parallelizable and event-driven setup, streaming engines such as Apache Flink provide an excellent environment for scaling real-time anomaly detection to fast-arriving data streams.

Solution overview

Apache Flink is a distributed processing engine for stateful computations over streams. A Flink program can be implemented in Java, Scala, or Python. It supports ingestion, manipulation, and delivery of data to the desired destinations. Kinesis Data Analytics allows you to run Flink applications in a fully managed environment on AWS.

Distance-based anomaly detection is a popular approach where a model is characterized by a number of internally stored data points that are used for comparison against the new incoming data points. At inference time, these methods compute distances and classify new data points according to how dissimilar they are from the past observations. In spite of the plethora of algorithms in literature, there is increasing evidence that distance-based anomaly detection algorithms are still competitive with the state of the art (Nakamura et al., 2020).

In this post, we present a streaming version of a distance-based unsupervised anomaly detection algorithm called time series discords, and explore some of the optimizations introduced by the Discord Aware Matrix Profile (DAMP) algorithm (Lu et al., 2022), which further develops the discords method to scale to trillions of data points.

Understanding the algorithm

A left-discord is a subsequence that is significantly dissimilar from all the subsequences that precede it. In this post, we demonstrate how to use the concept of left-discords to identify time series anomalies in streams using Kinesis Data Analytics for Apache Flink.

Let’s consider an unbounded stream and all its subsequences of length n. The m most recent subsequences will be stored and used for inference. When a new data point arrives, a new subsequence that includes the new event is formed. The algorithm compares this latest subsequence (query) to the m subsequences retained from the model, with the exclusion of the latest n subsequences because they overlap with the query and would therefore characterize a self-match. After computing these distances, the algorithm classifies the query as an anomaly if its distance from its closest non-self-matching subsequence is above a certain moving threshold.

For this post, we use a Kinesis data stream to ingest the input data, a Kinesis Data Analytics application to run the Flink anomaly detection program, and another Kinesis data stream to ingest the output produced by your application. For visualization purposes, we consume from the output stream using Kinesis Data Analytics Studio, which provides an Apache Zeppelin Notebook that we use to visualize and interact with the data in real time.

Implementation details

The Java application code for this example is available on GitHub. To download the application code, complete the following steps:

  1. Clone the remote repository using the following command:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples

  2. Navigate to the amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords directory:

Let’s walk through the code step by step.

The MPStreamingJob class defines the data flow of the application, and the MPProcessFunction class defines the logic of the function that detects anomalies.

The implementation is best described by three core components:

  • The Kinesis data stream source, used to read from the input stream
  • The anomaly detection process function
  • The Kinesis data stream sink, used to deliver the output into the output stream

The anomaly detection function is implemented as a ProcessFunction<String, String>. Its method MPProcessFunction#processElement is called for every data point:

@Override
public void processElement(String dataPoint, ProcessFunction<String, OutputWithLabel>.Context context,
                            Collector<OutputWithLabel> collector) {

   Double record = Double.parseDouble(dataPoint);

   int currentIndex = timeSeriesData.add(record);

   Double minDistance = 0.0;
   String anomalyTag = "INITIALISING";

   if (timeSeriesData.readyToCompute()) {
       minDistance = timeSeriesData.computeNearestNeighbourDistance();
       threshold.update(minDistance);
   }

   /*
   * Algorithm will wait for initializationPeriods * sequenceLength data points until starting
   * to compute the Matrix Profile (MP).
   */
   if (timeSeriesData.readyToInfer()) {
       anomalyTag = minDistance > threshold.getThreshold() ? "IS_ANOMALY" : "IS_NOT_ANOMALY";
   }

   OutputWithLabel output = new OutputWithLabel(currentIndex, record, minDistance, anomalyTag);

   collector.collect(output);
}

For every incoming data point, the anomaly detection algorithm takes the following actions:

  1. Adds the record to the timeSeriesData.
  2. If it has observed at least 2 * sequenceLength data points, starts computing the matrix profile.
  3. If it has observed at least initializationPeriods * sequenceLength data points, starts outputting anomaly labels.

Following these actions, the MPProcessFunction function outputs an OutputWithLabel object with four attributes:

  • index – The index of the data point in the time series
  • input – The input data without any transformation (identity function)
  • mp – The distance to the closest non-self-matching subsequence for the subsequence ending in index
  • anomalyTag – A binary label that indicates whether the subsequence is an anomaly

In the provided implementation, the threshold is learned online by fitting a normal distribution to the matrix profile data:

/*
 * Computes the threshold as two standard deviations away from the mean (p = 0.02)
 *
 * @return an estimated threshold
 */
public Double getThreshold() {
   Double mean = sum/counter;

   return mean + 2 * Math.sqrt(squaredSum/counter - mean*mean);
}

In this example, the algorithm classifies as anomalies those subsequences whose distance from their nearest neighbor deviates significantly from the average minimum distance (more than two standard deviations away from the mean).

The TimeSeries class implements the data structure that retains the context window, namely, the internally stored records that are used for comparison against the new incoming records. In the provided implementation, the n most recent records are retained, and when the TimeSeries object is at capacity, the oldest records are overridden.

Prerequisites

Before you create a Kinesis Data Analytics application for this exercise, create two Kinesis data streams: InputStream and OutputStream in us-east-1. The Flink application will use these streams as its respective source and destination streams. To create these resources, launch the following AWS CloudFormation stack:

Launch Stack

Alternatively, follow the instructions in Creating and Updating Data Streams.

Create the application

To create your application, complete the following steps:

  1. Clone the remote repository using the following command:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples 

  2. Navigate to the amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core directory.
    cd amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core

  3. Create your JAR file by running the following Maven command in the core directory, which contains the pom.xml file:
    mvn package -Dflink.version=1.15.4
  4. Create an Amazon Simple Storage Service (Amazon S3) bucket and upload the file target/left-discords-1.0.0.jar.
  5. Create and run a Kinesis Data Analytics application as described in Create and Run the Kinesis Data Analytics Application:
    1. Use the target/left-discords-1.0.0.jar.
    2. Note that the input and output streams are called InputStream and OutputStream, respectively.
    3. The provided example is set up to run in us-east-1.

Populate the input stream

You can populate InputStream by running the script.py file from the cloned repository, using the command python script.py. By editing the last two lines, you can populate the stream with synthetic data or with real data from a CSV dataset.

Visualize data on Kinesis Data Analytics Studio

Kinesis Data Analytics Studio provides the perfect setup for observing data in real time. The following screenshot shows sample visualizations. The first plot shows the incoming time series data, the second plot shows the matrix profile, and the third plot shows which data points have been classified as anomalies.

To visualize the data, complete the following steps:

  1. Create a notebook.
  2. Add the following paragraphs to the Zeppelin note:

Create a table and define the shape of the records generated by the application:

%flink.ssql

CREATE TABLE data (
index INT,
input VARCHAR(6),
mp VARCHAR(6),
anomalyTag VARCHAR(20)
)
PARTITIONED BY (index)
WITH (
'connector' = 'kinesis',
'stream' = 'OutputStream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)

Visualize the input data (choose Line Chart from the visualization options):

%flink.ssql(type=update)

SELECT index, input FROM data;

Visualize the output matrix profile data (choose Scatter Chart from the visualization options):

%flink.ssql(type=update)

SELECT index, mp FROM data;

Visualize the labeled data (choose Scatter Chart from the visualization options):

%flink.ssql(type=update)

SELECT index, anomalyTag FROM data;

Clean up

To delete all the resources that you created, follow the instructions in Clean Up AWS Resources.

Future developments

In this section, we discuss future developments for this solution.

Optimize for speed

The online time series discords algorithm is further developed and optimized for speed in Lu et al., 2022. The proposed optimizations include:

  • Early stopping – If the algorithm finds a subsequence that is similar enough (below the threshold), it stops searching and marks the query as non-anomaly.
  • Look-ahead windowing – Look at some amount of data in the future and compare it to the current query to cheaply discover and prune future subsequences that could not be left-discords. Note that this introduces some delay. The reason why disqualifying improves performance is that data points that are close in time are more likely to be similar than data points that are distant in time.
  • Use of MASS – The MASS (Mueen’s Algorithm for Similarity Search) search algorithm is designed for efficiently discovering the most similar subsequence in the past.

Parallelization

The algorithm above operates with parallelism 1, which means that when a single worker is enough to handle the data stream throughput, the above algorithm can be directly used. This design can be enhanced with further distribution logic for handling high throughput scenarios. In order to parallelise this algorithm, you may to design a partitioner operator that ensures that the anomaly detection operators would have at their disposal the relevant past data points. The algorithm can maintain a set of the most recent records to which it compares the query. Efficiency and accuracy trade-offs of approximate solutions are interesting to explore. Since the best solution for parallelising the algorithm depends largely on the nature of the data, we recommend experimenting with various approaches using your domain-specific knowledge.

Conclusion

In this post, we presented a streaming version of an anomaly detection algorithm based on left-discords. By implementing this solution, you learned how to deploy an Apache Flink-based anomaly detection solution on Kinesis Data Analytics, and you explored the potential of Kinesis Data Analytics Studio for visualizing and interacting with streaming data in real time. For more details on how to implement anomaly detection solutions in Apache Flink, refer to the GitHub repository that accompanies this post. To learn more about Kinesis Data Analytics and Apache Flink, explore the Amazon Kinesis Data Analytics Developer Guide.

Give it a try and share your feedback in the comments section.


About the Authors

Antonio Vespoli is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Samuel Siebenmann is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Nuno Afonso is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Real-time anomaly detection via Random Cut Forest in Amazon Kinesis Data Analytics

Post Syndicated from Daren Wong original https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/

Real-time anomaly detection describes a use case to detect and flag unexpected behavior in streaming data as it occurs. Online machine learning (ML) algorithms are popular for this use case because they don’t require any explicit rules and are able to adapt to a changing baseline, which is particularly useful for continuous streams of data where incoming data changes continuously over time.

Random Cut Forest (RCF) is one such algorithm widely used for anomaly detection use cases. In typical setups, we want to be able to run the RCF algorithm on input data with large throughput, and streaming data processing frameworks can help with that. We are excited to share that RCF is possible with Amazon Kinesis Data Analytics for Apache Flink. Apache Flink is a popular open-source framework for real-time, stateful computations over data streams, and can be used to run RCF on input streams with large throughput.

This post demonstrates how we can use Kinesis Data Analytics for Apache Flink to run an online RCF algorithm for anomaly detection.

Solution overview

The following diagram illustrates our architecture, which consists of three components: an input data stream using Amazon Kinesis Data Streams, a Flink job, and an output Kinesis data stream. In terms of data flow, we use a Python script to generate anomalous sine wave data into the input data stream, the data is then processed by RCF in a Flink job, and the resultant anomaly score is delivered to the output data stream.

The following graph shows an example of our expected result, which indicates that the anomaly score peaked when the sine wave data source anomalously dropped to constant -17.

We can implement this solution in three simple steps:

  1. Set up AWS resources via AWS CloudFormation.
  2. Set up a data generator to produce data into the source data stream.
  3. Run the RCF Flink Java code on Kinesis Data Analytics.

Set up AWS resources via AWS CloudFormation

The following CloudFormation stack will create all the AWS resources we need for this tutorial, including two Kinesis data streams, a Kinesis Data Analytics app, and an Amazon Simple Storage Service (Amazon S3) bucket.

Sign in to your AWS account, then choose Launch Stack:

BDB-2063-launch-cloudformation-stack

Follow the steps on the AWS CloudFormation console to create the stack.

Set up a data generator

Run the following Python script to populate the input data stream with the anomalous sine wave data:

import json
import boto3
import math 

STREAM_NAME = "ExampleInputStream-RCF"


def get_data(time):
    rad = (time/100)%360
    val = math.sin(rad)*10 + 10

    if rad > 2.4 and rad < 2.6:
        val = -17

    return {'time': time, 'value': val}

def generate(stream_name, kinesis_client):
    time = 0

    while True:
        data = get_data(time)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")

        time += 1


if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))

Run the RCF Flink Java code on Kinesis Data Analytics

The CloudFormation stack automatically downloaded and packaged the RCF Flink job JAR file for you. Therefore, you can simply go to the Kinesis Data Analytics console to run your application.

That’s it! We now have a running Flink job that continuously reads in data from an input Kinesis data stream and calculates the anomaly score for each new data point given the previous data points it has seen.

The following sections explain the RCF implementation and Flink job code in more detail.

RCF implementation

Numerous RCF implementations are publicly available. For this tutorial, we use the AWS implementation by wrapping it around a custom wrapper (RandomCutForestOperator) to be used in our Flink job.

RandomCutForestOperator is implemented as an Apache Flink ProcessFunction, which is a function that allows us to write custom logic to process every element in the stream. Our custom logic starts with a data transformation via inputDataMapper.apply, followed by getting the anomaly score by calling the AWS RCF library via rcf.getAnomalyScore. The code implementation of RandomCutForestOperator can be found on GitHub.

RandomCutForestOperatorBuilder requires two main types of parameters:

  • RandomCutForestOperator hyperparameters – We use the following:
    • Dimensions – We set this to 1 because our input data is a 1-dimensional sine wave consisting of the float data type.
    • ShingleSize – We set this to 1, which means our RCF algorithm will take into account the previous and current data points in anomaly score deduction. Note that this can be increased to account for seasonality in data.
    • SampleSize – We set this to 628, which means a maximum of 628 data points is kept in the data sample for each tree.
  • DataMapper parameters for input and output processing – We use the following:
    • InputDataMapper – We use RandomCutForestOperator.SIMPLE_FLOAT_INPUT_DATA_MAPPER to map input data from float to float[].
    • ResultMapper – We use RandomCutForestOperator.SIMPLE_TUPLE_RESULT_DATA_MAPPER, which is a BiFunction that joins the anomaly score with the corresponding sine wave data point into a tuple.

Flink job code

The following code snippet illustrates the core streaming structure of our Apache Flink streaming Java code. It first reads in data from the source Kinesis data stream, then processes it using the RCF algorithm. The computed anomaly score is then written to an output Kinesis data stream.

DataStream<Float> sineWaveSource = createSourceFromStaticConfig(env);

sineWaveSource
        .process(
                RandomCutForestOperator.<Float, Tuple2<Float, Double>>builder()
                        .setDimensions(1)
                        .setShingleSize(1)
                        .setSampleSize(628)
                        .setInputDataMapper(RandomCutForestOperator.SIMPLE_FLOAT_INPUT_DATA_MAPPER)
                        .setResultMapper(RandomCutForestOperator.SIMPLE_TUPLE_RESULT_DATA_MAPPER)
                        .build(),
                TupleTypeInfo.getBasicTupleTypeInfo(Float.class, Double.class))
       .addSink(createSinkFromStaticConfig());

In this example, our baseline input data is a sine wave. As shown in the following screenshot, a low anomaly score is returned when the data is regular. However, when there is an anomaly in the data (when the sine wave input data drops to a constant), a high anomaly score is returned. The anomaly score is delivered into an output Kinesis data stream. You can visualize this result by creating a Kinesis Data Analytics Studio app; for instructions, refer to Interactive analysis of streaming data.

Because this is an unsupervised algorithm, you don’t need to provide any explicit rules or labeled datasets for this operator. In short, only the input data stream, data conversions, and some hyperparameters were provided. The RCF algorithm itself determined the expected baseline based on the input data and identified any unexpected behavior.

Furthermore, this means the model will continuously adapt even if the baseline changes over time. As such, minimal retraining cadence is required. This is powerful for anomaly detection on streaming data because the data will often drift slowly over time due seasonal trends, inflation, equipment calibration drift, and so on.

Clean up

To avoid incurring future charges, complete the following steps:

  1. On the Amazon S3 console, empty the S3 bucket created by the CloudFormation stack.
  2. On the AWS CloudFormation console, delete the CloudFormation stack.

Conclusion

This post demonstrated how to perform anomaly detection on input streaming data with RCF, an online unsupervised ML algorithm using Kinesis Data Analytics. We also showed how this algorithm learns the data baseline on its own, and can adapt to changes in the baseline over time. We hope you consider this solution for your real-time anomaly detection use cases.


About the Authors

Daren Wong is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Aleksandr Pilipenko is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Hong Liang Teoh is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.