Tag Archives: Kinesis Data Analytics

Announcing Amazon Managed Service for Apache Flink Renamed from Amazon Kinesis Data Analytics

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/announcing-amazon-managed-service-for-apache-flink-renamed-from-amazon-kinesis-data-analytics/

Today we are announcing the rename of Amazon Kinesis Data Analytics to Amazon Managed Service for Apache Flink, a fully managed and serverless service for you to build and run real-time streaming applications using Apache Flink.

We continue to deliver the same experience in your Flink applications without any impact on ongoing operations, developments, or business use cases. All your existing running applications in Kinesis Data Analytics will work as is without any changes.

Many customers use Apache Flink for data processing, including support for diverse use cases with a vibrant open-source community. While Apache Flink applications are robust and popular, they can be difficult to manage because they require scaling and coordination of parallel compute or container resources. With the explosion of data volumes, data types, and data sources, customers need an easier way to access, process, secure, and analyze their data to gain faster and deeper insights without compromising on performance and costs.

Using Amazon Managed Service for Apache Flink, you can set up and integrate data sources or destinations with minimal code, process data continuously with sub-second latencies from hundreds of data sources like Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), and respond to events in real-time. You can also analyze streaming data interactively with notebooks in just a few clicks with Amazon Managed Service for Apache Flink Studio with built-in visualizations powered by Apache Zeppelin.

With Amazon Managed Service for Apache Flink, you can deploy secure, compliant, and highly available applications. There are no servers and clusters to manage, no compute and storage infrastructure to set up, and you only pay for the resources your applications consume.

A History to Support Apache Flink
Since we launched Amazon Kinesis Data Analytics based on a proprietary SQL engine in 2016, we learned that SQL alone was not sufficient to provide the capabilities that customers needed for efficient stateful stream processing. So, we started investing in Apache Flink, a popular open-source framework and engine for processing real-time data streams.

In 2018, we provided support for Amazon Kinesis Data Analytics for Java as a programmable option for customers to build streaming applications using Apache Flink libraries and choose their own integrated development environment (IDE) to build their applications. In 2020, we repositioned Amazon Kinesis Data Analytics for Java to Amazon Kinesis Data Analytics for Apache Flink to emphasize our continued support for Apache Flink. In 2021, we launched Kinesis Data Analytics Studio (now, Amazon Managed Service for Apache Flink Studio) with a simple, familiar notebook interface for rapid development powered by Apache Zeppelin and using Apache Flink as the processing engine.

Since 2019, we have worked more closely with the Apache Flink community, increasing code contributions in the area of AWS connectors for Apache Flink such as those for Kinesis Data Streams and Kinesis Data Firehose, as well as sponsoring annual Flink Forward events. Recently, we contributed Async Sink to the Flink 1.15 release, which improved cloud interoperability and added more sink connectors and formats, among other updates.

Beyond connectors, we continue to work with the Flink community to contribute availability improvements and deployment options. To learn more, see Making it Easier to Build Connectors with Apache Flink: Introducing the Async Sink in the AWS Open Source Blog.

New Features in Amazon Managed Service for Apache Flink
As I mentioned, you can continue to run your existing Flink applications in Kinesis Data Analytics (now Amazon Managed Apache Flink) without making any changes. I want to let you know about a part of the service along with the console change and new feature,  a blueprint where you create an end-to-end data pipeline with just one click.

First, you can use the new console of Amazon Managed Service for Apache Flink directly under the Analytics section in AWS. To get started, you can easily create Streaming applications or Studio notebooks in the new console, with the same experience as before.

To create a streaming application in the new console, choose Create from scratch or Use a blueprint. With a new blueprint option, you can create and set up all the resources that you need to get started in a single step using AWS CloudFormation.

The blueprint is a curated collection of Apache Flink applications. The first of these has demo data being read from a Kinesis Data Stream and written to an Amazon Simple Storage Service (Amazon S3) bucket.

After creating the demo application, you can configure, run, and open the Apache Flink dashboard to monitor your Flink application’s health with the same experiences as before. You can change a code sample in the GitHub repository to perform different operations using the Flink libraries in your own local development environment.

Blueprints are designed to be extensible, and you can leverage them to create more complex applications to solve your business challenges based on Amazon Managed Service for Apache Flink. Learn more about how to use Apache Flink libraries in the AWS documentation.

You can also use a blueprint to create your Studio notebook using Apache Zeppelin as a new setup option. With this new blueprint option, you can also create and set up all the resources that you need to get started in a single step using AWS CloudFormation.

This blueprint includes Apache Flink applications with demo data being sent to an Amazon MSK topic and read in Managed Service for Apache Flink. With an Apache Zeppelin notebook, you can view, query, and analyze your streaming data. Deploying the blueprint and setting up the Studio notebook takes about ten minutes. Go get a cup of coffee while we set it up!

After creating the new Studio notebook, you can open an Apache Zeppelin notebook to run SQL queries in your note with the same experiences as before. You can view a code sample in the GitHub repository to learn more about how to use Apache Flink libraries.

You can run more SQL queries on this demo data such as user-defined functions, tumbling and hopping windows, Top-N queries, and delivering data to an S3 bucket for streaming.

You can also use Java, Python, or Scala to power up your SQL queries and deploy your note as a continuously running application, as shown in the blog posts, how to use the Studio notebook and query your Amazon MSK topics.

To learn more blueprint samples, see GitHub repositories such as reading from MSK Serverless and writing to Amazon S3, reading from MSK Serverless and writing to MSK Serverless, and reading from MSK Serverless and writing to Amazon S3.

Now Available
You can now use Amazon Managed Service for Apache Flink, renamed from Amazon Kinesis Data Analytics. All your existing running applications in Kinesis Data Analytics will work as is without any changes.

To learn more, visit the new product page and developer guide. You can send feedback to AWS re:Post for Amazon Managed Service for Apache Flink, or through your usual AWS Support contacts.


Temporal data lake architecture for benchmark and indices analytics

Post Syndicated from Krishna Gogineni original https://aws.amazon.com/blogs/architecture/temporal-data-lake-architecture-for-benchmark-and-indices-analytics/

Financial trading houses and stock exchanges generate enormous volumes of data in near real-time, making it difficult to perform bi-temporal calculations that yield accurate results. Achieving this requires a processing architecture that can handle large volumes of data during peak bursts, meet strict latency requirements, and scale according to incoming volumes.

In this post, we’ll describe a scenario for an industry leader in the financial services sector and explain how AWS services are used for bi-temporal processing with state management and scale based on variable workloads during the day, all while meeting strict service-level agreement (SLA) requirements.

Problem statement

To design and implement a fully temporal transactional data lake with the repeatable read isolation level for queries is a challenge, particularly with burst events that need the overall architecture to scale accordingly. The data store in the overall architecture needs to record the value history of data at different times, which is especially important for financial data. Financial data can include corporate actions, annual or quarterly reports, or fixed-income securities, like bonds that have variable rates. It’s crucial to be able to correct data inaccuracies during the reporting period.

The example customer seeks a data processing platform architecture to dynamically scale based on the workloads with a capacity of processing 150 million records under 5 minutes. Their platform should be capable of meeting the end-to-end SLA of 15 minutes, from ingestion to reporting, with lowest total cost of ownership. Additionally, managing bi-temporal data requires a database that has critical features, such as ACID (atomicity, consistency, isolation, durability) compliance, time-travel capability, full-schema evolution, partition layout and evolution, rollback to prior versions, and SQL-like query experience.

Solution overview

The solution architecture key building blocks are Amazon Kinesis Data Streams for streaming data, Amazon Kinesis Data Analytics with Apache Flink as processing engine, Flink’s RocksDB for state management, and Apache Iceberg on Amazon Simple Storage Service (Amazon S3) as the storage engine (Figure 1).

End-to-end data-processing architecture

Figure 1. End-to-end data-processing architecture

Data processing

Here’s how it works:

  • A publisher application receives the data from the source systems and publishes data into Kinesis Data Streams using a well-defined JSON format structure.
  • Kinesis Data Streams holds the data for a duration that is configurable so data is not lost and can auto scale based on the data volume ingested.
  • Kinesis Data Analytics runs an Apache Flink application, with state management (RocksDB), to handle bi-temporal calculations. The Apache Flink application consumes data from Kinesis Data Streams and performs the following computations:
    • Transforms the JSON stream into a row-type record, compatible with a SQL table-like structure, resolving nesting and parent–child relationships present within the stream
    • Checks whether the record has already an existing state in in-memory RocksDB or disk attached to Kinesis Data Analytics computational node to avoid read latency from the database, which is critical for meeting the performance requirements
    • Performs bi-temporal calculations and creates the resultant records in an in-memory data structure before invoking the Apache Iceberg sink operator
    • The Apache Flink application sink operator appends the temporal states, expressed as records into existing Apache Iceberg data store. This will comply with key principles of time series data, which is immutable, and the ability to time-travel along with ACID compliance, schema evolution, and partition evolution
  • Kinesis Data Analytics is resilient and provides a no-data-loss capability, with features like periodic checkpoints and savepoints. They are used to store the state management in a secure Amazon S3 location that can be accessed outside of Kinesis Data Analytics. This savepoints mechanism can be used to programmatically to scale the cluster size based on the workloads using time-driven scheduling and AWS Lambda functions.
  • If the time-to-live feature of RocksDB is implemented, old records are stored in Apache Iceberg on Amazon S3. When performing temporal calculations, if the state is not found in memory, data is read from Apache Iceberg into RocksDB and the processing is completed. However, this step is optional and can be circumvented if the Kinesis Data Analytics cluster is initialized with right number of Kinesis processing units to hold the historical information, as per requirements.
  • Because the data is stored in an Apache Iceberg table format in Amazon S3, data is queried using Trino, which supports Apache Iceberg table format.
  • The end user queries data using any SQL tool that supports the Trino query engine.

Apache Iceberg maintenance jobs, such as data compaction, expire snapshot, delete orphan files, can be launched using Amazon Athena to optimize performance out of Apache Iceberg data store. Details of each processing step performed in Apache Flink application are captured using Amazon CloudWatch, which logs all the events.


Amazon EventBridge scheduler invokes a Lambda function to scale the Kinesis Data Analytics. Kinesis Data Analytics has a short outage during rescaling that is proportional to the amount of data stored in RocksDB, which is why a state management strategy is necessary for the proper operation of the system.

Figure 2 shows the scaling process, which depicts:

  • Before peak load: The Kinesis Data Analytics cluster is processing off-peak records with minimum configuration before the peak load. A scheduled event is launched from EventBridge that invokes a Lambda function, which shuts down the cluster using the savepoint mechanism and scales up the Kinesis Data Analytics cluster to required Kinesis processing units.
  • During peak load: When the peak data burst happens, the Kinesis Data Analytics cluster is ready to handle the volume of data from Kinesis Data Stream, and processes it within the SLA of 5 minutes.
  • After peak load: A scheduled event from EventBridge invokes a Lambda function to scale down the Kinesis Data Analytics cluster to the minimum configuration that holds the required state for the entire volume of records.
Cluster scaling before, during, and after peak data volume processing

Figure 2. Cluster scaling before, during, and after peak data volume processing

Performance insights

With the discussed architecture, we want to demonstrate that the we are able to meet the SLAs, in terms of performance and processing times. We have taken a subset of benchmarks and indices data and processed the same with the end-to-end architecture. During the process, we observed some very interesting findings, which we would like to share.

Processing time for Apache Iceberg Upsert vs Append operations: During our tests, we expected Upsert operation to be faster than append. But on the contrary, we noticed that Append operations were faster compared to Upsert even though more computations are performed in the Apache Flink application. In our test with 3,500,000 records, Append operation took 1556 seconds while Upsert took 1675 seconds to process the data (Figure 3).

Processing times for Upsert vs. Append

Figure 3. Processing times for Upsert vs. Append

Compute consumption for Apache Iceberg Upsert vs. Append operations: Comparing the compute consumption for 10,000,000 records, we noticed that Append operation was able to process the data in the same amount of time as Upsert operation but with less compute resources. In our tests, we have noted that Append operation only consumed 64 Kinesis processing units, whereas Upsert consumed 78 Kinesis processing units (Figure 4).

Comparing consumption for Upsert vs. Append

Figure 4. Comparing consumption for Upsert vs. Append

Scalability vs performance: To achieve the desired data processing performance, we need a specific configuration of Kinesis processing units, Kinesis Data Streams, and Iceberg parallelism. In our test with the data that we chose, we started with four Kinesis processing units and four Kinesis data streams for data processing. We observed an 80% performance improvement in data processing with 16 Kinesis data processing units. An additional 6% performance improvement was demonstrated when we scaled to 32 Kinesis processing units. When we increased the Kinesis data streams to 16, we observed an additional 2% performance improvement (Figure 5).

Scalability vs. performance

Figure 5. Scalability vs. performance

Data volume processing times for Upsert vs. Append: For this test, we started with 350,000 records of data. When we increased data volume to 3.5M records, we observed that Append performing better than Upsert, demonstrating a five-fold increase in processing time (Figure 6).

Data volume processing times for Upsert vs. Append

Figure 6. Data volume processing times for Upsert vs. Append


The architecture we explored today scales based on the data-volume requirements of the customer and is capable of meeting the end-to-end SLA of 15 minutes, with a potential lowered total cost of ownership. Additionally, the solution is capable of handling high-volume, bi-temporal computations with ACID compliance, time travel, full-schema evolution, partition layout evolution, rollback to prior versions and SQL-like query experience.

Further reading

Migrate from Amazon Kinesis Data Analytics for SQL Applications to Amazon Kinesis Data Analytics Studio

Post Syndicated from Nicholas Tunney original https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-kinesis-data-analytics-studio/

Amazon Kinesis Data Analytics makes it easy to transform and analyze streaming data in real time.

In this post, we discuss why AWS recommends moving from Kinesis Data Analytics for SQL Applications to Amazon Kinesis Data Analytics for Apache Flink to take advantage of Apache Flink’s advanced streaming capabilities. We also show how to use Kinesis Data Analytics Studio to test and tune your analysis before deploying your migrated applications. If you don’t have any Kinesis Data Analytics for SQL applications, this post still provides a background on many of the use cases you’ll see in your data analytics career and how Amazon Data Analytics services can help you achieve your objectives.

Kinesis Data Analytics for Apache Flink is a fully managed Apache Flink service. You only need to upload your application JAR or executable, and AWS will manage the infrastructure and Flink job orchestration. To make things simpler, Kinesis Data Analytics Studio is a notebook environment that uses Apache Flink and allows you to query data streams and develop SQL queries or proof of concept workloads before scaling your application to production in minutes.

We recommend that you use Kinesis Data Analytics for Apache Flink or Kinesis Data Analytics Studio over Kinesis Data Analytics for SQL. This is because Kinesis Data Analytics for Apache Flink and Kinesis Data Analytics Studio offer advanced data stream processing features, including exactly-once processing semantics, event time windows, extensibility using user-defined functions (UDFs) and custom integrations, imperative language support, durable application state, horizontal scaling, support for multiple data sources, and more. These are critical for ensuring accuracy, completeness, consistency, and reliability of data stream processing and are not available with Kinesis Data Analytics for SQL.

Solution overview

For our use case, we use several AWS services to stream, ingest, transform, and analyze sample automotive sensor data in real time using Kinesis Data Analytics Studio. Kinesis Data Analytics Studio allows us to create a notebook, which is a web-based development environment. With notebooks, you get a simple interactive development experience combined with the advanced capabilities provided by Apache Flink. Kinesis Data Analytics Studio uses Apache Zeppelin as the notebook, and uses Apache Flink as the stream processing engine. Kinesis Data Analytics Studio notebooks seamlessly combine these technologies to make advanced analytics on data streams accessible to developers of all skill sets. Notebooks are provisioned quickly and provide a way for you to instantly view and analyze your streaming data. Apache Zeppelin provides your Studio notebooks with a complete suite of analytics tools, including the following:

  • Data visualization
  • Exporting data to files
  • Controlling the output format for easier analysis
  • Ability to turn the notebook into a scalable, production application

Unlike Kinesis Data Analytics for SQL Applications, Kinesis Data Analytics for Apache Flink adds the following SQL support:

  • Joining stream data between multiple Kinesis data streams, or between a Kinesis data stream and an Amazon Managed Streaming for Apache Kafka (Amazon MSK) topic
  • Real-time visualization of transformed data in a data stream
  • Using Python scripts or Scala programs within the same application
  • Changing offsets of the streaming layer

Another benefit of Kinesis Data Analytics for Apache Flink is the improved scalability of the solution once deployed, because you can scale the underlying resources to meet demand. In Kinesis Data Analytics for SQL Applications, scaling is performed by adding more pumps to persuade the application into adding more resources.

In our solution, we create a notebook to access automotive sensor data, enrich the data, and send the enriched output from the Kinesis Data Analytics Studio notebook to an Amazon Kinesis Data Firehose delivery stream for delivery to an Amazon Simple Storage Service (Amazon S3) data lake. This pipeline could further be used to send data to Amazon OpenSearch Service or other targets for additional processing and visualization.

Kinesis Data Analytics for SQL Applications vs. Kinesis Data Analytics for Apache Flink

In our example, we perform the following actions on the streaming data:

  1. Connect to an Amazon Kinesis Data Streams data stream.
  2. View the stream data.
  3. Transform and enrich the data.
  4. Manipulate the data with Python.
  5. Restream the data to a Firehose delivery stream.

To compare Kinesis Data Analytics for SQL Applications with Kinesis Data Analytics for Apache Flink, let’s first discuss how Kinesis Data Analytics for SQL Applications works.

At the root of a Kinesis Data Analytics for SQL application is the concept of an in-application stream. You can think of the in-application stream as a table that holds the streaming data so you can perform actions on it. The in-application stream is mapped to a streaming source such as a Kinesis data stream. To get data into the in-application stream, first set up a source in the management console for your Kinesis Data Analytics for SQL application. Then, create a pump that reads data from the source stream and places it into the table. The pump query runs continuously and feeds the source data into the in-application stream. You can create multiple pumps from multiple sources to feed the in-application stream. Queries are then run on the in-application stream, and results can be interpreted or sent to other destinations for further processing or storage.

The following SQL demonstrates setting up an in-application stream and pump:

   "column1" BIGINT NOT NULL, 
   "column2" INTEGER, 
   "column3" VARCHAR(64));

SELECT STREAM inputcolumn1, 

Data can be read from the in-application stream using a SQL SELECT query:


When creating the same setup in Kinesis Data Analytics Studio, you use the underlying Apache Flink environment to connect to the streaming source, and create the data stream in one statement using a connector. The following example shows connecting to the same source we used before, but using Apache Flink:

   "column1" BIGINT NOT NULL, 
   "column2" INTEGER, 
   "column3" VARCHAR(64)
) WITH (
   'connector' = 'kinesis',
   'stream' = sample-kinesis-stream',
   'aws.region' = 'aws-kinesis-region',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

MY_TABLE is now a data stream that will continually receive the data from our sample Kinesis data stream. It can be queried using a SQL SELECT statement:

SELECT column1, 

Although Kinesis Data Analytics for SQL Applications use a subset of the SQL:2008 standard with extensions to enable operations on streaming data, Apache Flink’s SQL support is based on Apache Calcite, which implements the SQL standard.

It’s also important to mention that Kinesis Data Analytics Studio supports PyFlink and Scala alongside SQL within the same notebook. This allows you to perform complex, programmatic methods on your streaming data that aren’t possible with SQL.


During this exercise, we set up various AWS resources and perform analytics queries. To follow along, you need an AWS account with administrator access. If you don’t already have an AWS account with administrator access, create one now. The services outlined in this post may incur charges to your AWS account. Make sure to follow the cleanup instructions at the end of this post.

Configure streaming data

In the streaming domain, we’re often tasked with exploring, transforming, and enriching data coming from Internet of Things (IoT) sensors. To generate the real-time sensor data, we employ the AWS IoT Device Simulator. This simulator runs within your AWS account and provides a web interface that lets users launch fleets of virtually connected devices from a user-defined template and then simulate them to publish data at regular intervals to AWS IoT Core. This means we can build a virtual fleet of devices to generate sample data for this exercise.

We deploy the IoT Device Simulator using the following Amazon CloudFront template. It handles creating all the necessary resources in your account.

  1. On the Specify stack details page, assign a name to your solution stack.
  2. Under Parameters, review the parameters for this solution template and modify them as necessary.
  3. For User email, enter a valid email to receive a link and password to log in to the IoT Device Simulator UI.
  4. Choose Next.
  5. On the Configure stack options page, choose Next.
  6. On the Review page, review and confirm the settings. Select the check boxes acknowledging that the template creates AWS Identity and Access Management (IAM) resources.
  7. Choose Create stack.

The stack takes about 10 minutes to install.

  1. When you receive your invitation email, choose the CloudFront link and log in to the IoT Device Simulator using the credentials provided in the email.

The solution contains a prebuilt automotive demo that we can use to begin delivering sensor data quickly to AWS.

  1. On the Device Type page, choose Create Device Type.
  2. Choose Automotive Demo.
  3. The payload is auto populated. Enter a name for your device, and enter automotive-topic as the topic.
  4. Choose Save.

Now we create a simulation.

  1. On the Simulations page, choose Create Simulation.
  2. For Simulation type, choose Automotive Demo.
  3. For Select a device type, choose the demo device you created.
  4. For Data transmission interval and Data transmission duration, enter your desired values.

You can enter any values you like, but use at least 10 devices transmitting every 10 seconds. You’ll want to set your data transmission duration to a few minutes, or you’ll need to restart your simulation several times during the lab.

  1. Choose Save.

Now we can run the simulation.

  1. On the Simulations page, select the desired simulation, and choose Start simulations.

Alternatively, choose View next to the simulation you want to run, then choose Start to run the simulation.

  1. To view the simulation, choose View next to the simulation you want to view.

If the simulation is running, you can view a map with the locations of the devices, and up to 100 of the most recent messages sent to the IoT topic.

We can now check to ensure our simulator is sending the sensor data to AWS IoT Core.

  1. Navigate to the AWS IoT Core console.

Make sure you’re in the same Region you deployed your IoT Device Simulator.

  1. In the navigation pane, choose MQTT Test Client.
  2. Enter the topic filter automotive-topic and choose Subscribe.

As long as you have your simulation running, the messages being sent to the IoT topic will be displayed.

Finally, we can set a rule to route the IoT messages to a Kinesis data stream. This stream will provide our source data for the Kinesis Data Analytics Studio notebook.

  1. On the AWS IoT Core console, choose Message Routing and Rules.
  2. Enter a name for the rule, such as automotive_route_kinesis, then choose Next.
  3. Provide the following SQL statement. This SQL will select all message columns from the automotive-topic the IoT Device Simulator is publishing:
SELECT timestamp, trip_id, VIN, brake, steeringWheelAngle, torqueAtTransmission, engineSpeed, vehicleSpeed, acceleration, parkingBrakeStatus, brakePedalStatus, transmissionGearPosition, gearLeverPosition, odometer, ignitionStatus, fuelLevel, fuelConsumedSinceRestart, oilTemp, location 
FROM 'automotive-topic' WHERE 1=1
  1. Choose Next.
  2. Under Rule Actions, select Kinesis Stream as the source.
  3. Choose Create New Kinesis Stream.

This opens a new window.

  1. For Data stream name, enter automotive-data.

We use a provisioned stream for this exercise.

  1. Choose Create Data Stream.

You may now close this window and return to the AWS IoT Core console.

  1. Choose the refresh button next to Stream name, and choose the automotive-data stream.
  2. Choose Create new role and name the role automotive-role.
  3. Choose Next.
  4. Review the rule properties, and choose Create.

The rule begins routing data immediately.

Set up Kinesis Data Analytics Studio

Now that we have our data streaming through AWS IoT Core and into a Kinesis data stream, we can create our Kinesis Data Analytics Studio notebook.

  1. On the Amazon Kinesis console, choose Analytics applications in the navigation pane.
  2. On the Studio tab, choose Create Studio notebook.
  3. Leave Quick create with sample code selected.
  4. Name the notebook automotive-data-notebook.
  5. Choose Create to create a new AWS Glue database in a new window.
  6. Choose Add database.
  7. Name the database automotive-notebook-glue.
  8. Choose Create.
  9. Return to the Create Studio notebook section.
  10. Choose refresh and choose your new AWS Glue database.
  11. Choose Create Studio notebook.
  12. To start the Studio notebook, choose Run and confirm.
  13. Once the notebook is running, choose the notebook and choose Open in Apache Zeppelin.
  14. Choose Import note.
  15. Choose Add from URL.
  16. Enter the following URL: https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-2461/auto-notebook.ipynb.
  17. Choose Import Note.
  18. Open the new note.

Perform stream analysis

In a Kinesis Data Analytics for SQL application, we add our streaming course via the management console, and then define an in-application stream and pump to stream data from our Kinesis data stream. The in-application stream functions as a table to hold the data and make it available for us to query. The pump takes the data from our source and streams it to our in-application stream. Queries may then be run against the in-application stream using SQL, just as we’d query any SQL table. See the following code:

    `trip_id` CHAR(36),
    `VIN` CHAR(17),
    `brake` FLOAT,
    `steeringWheelAngle` FLOAT,
    `torqueAtTransmission` FLOAT,
    `engineSpeed` FLOAT,
    `vehicleSpeed` FLOAT,
    `acceleration` FLOAT,
    `parkingBrakeStatus` BOOLEAN,
    `brakePedalStatus` BOOLEAN,
    `transmissionGearPosition` VARCHAR(10),
    `gearLeverPosition` VARCHAR(10),
    `odometer` FLOAT,
    `ignitionStatus` VARCHAR(4),
    `fuelLevel` FLOAT,
    `fuelConsumedSinceRestart` FLOAT,
    `oilTemp` FLOAT,
    `location` VARCHAR(100),
    `timestamp` TIMESTAMP(3));


To migrate an in-application stream and pump from our Kinesis Data Analytics for SQL application to Kinesis Data Analytics Studio, we convert this into a single CREATE statement by removing the pump definition and defining a kinesis connector. The first paragraph in the Zeppelin notebook sets up a connector that is presented as a table. We can define columns for all items in the incoming message, or a subset.

Run the statement, and a success result is output in your notebook. We can now query this table using SQL, or we can perform programmatic operations with this data using PyFlink or Scala.

Before performing real-time analytics on the streaming data, let’s look at how the data is currently formatted. To do this, we run a simple Flink SQL query on the table we just created. The SQL used in our streaming application is identical to what is used in a SQL application.

Note that if you don’t see records after a few seconds, make sure that your IoT Device Simulator is still running.

If you’re also running the Kinesis Data Analytics for SQL code, you may see a slightly different result set. This is another key differentiator in Kinesis Data Analytics for Apache Flink, because the latter has the concept of exactly once delivery. If this application is deployed to production and is restarted or if scaling actions occur, Kinesis Data Analytics for Apache Flink ensures you only receive each message once, whereas in a Kinesis Data Analytics for SQL application, you need to further process the incoming stream to ensure you ignore repeat messages that could affect your results.

You can stop the current paragraph by choosing the pause icon. You may see an error displayed in your notebook when you stop the query, but it can be ignored. It’s just letting you know that the process was canceled.

Flink SQL implements the SQL standard, and provides an easy way to perform calculations on the stream data just like you would when querying a database table. A common task while enriching data is to create a new field to store a calculation or conversion (such as from Fahrenheit to Celsius), or create new data to provide simpler queries or improved visualizations downstream. Run the next paragraph to see how we can add a Boolean value named accelerating, which we can easily use in our sink to know if an automobile was currently accelerating at the time the sensor was read. The process here doesn’t differ between Kinesis Data Analytics for SQL and Kinesis Data Analytics for Apache Flink.

You can stop the paragraph from running when you have inspected the new column, comparing our new Boolean value to the FLOAT acceleration column.

Data being sent from a sensor is usually compact to improve latency and performance. Being able to enrich the data stream with external data and enrich the stream, such as additional vehicle information or current weather data, can be very useful. In this example, let’s assume we want to bring in data currently stored in a CSV in Amazon S3, and add a column named color that reflects the current engine speed band.

Apache Flink SQL provides several source connectors for AWS services and other sources. Creating a new table like we did in our first paragraph but instead using the filesystem connector permits Flink to directly connect to Amazon S3 and read our source data. Previously in Kinesis Data Analytics for SQL Applications, you couldn’t add new references inline. Instead, you defined S3 reference data and added it to your application configuration, which you could then use as a reference in a SQL JOIN.

NOTE: If you are not using the us-east-1 region, you can download the csv and place the object your own S3 bucket.  Reference the csv file as s3a://<bucket-name>/<key-name>

Building on the last query, the next paragraph performs a SQL JOIN on our current data and the new lookup source table we created.

Now that we have an enriched data stream, we restream this data. In a real-world scenario, we have many choices on what to do with our data, such as sending the data to an S3 data lake, another Kinesis data stream for further analysis, or storing the data in OpenSearch Service for visualization. For simplicity, we send the data to Kinesis Data Firehose, which streams the data into an S3 bucket acting as our data lake.

Kinesis Data Firehose can stream data to Amazon S3, OpenSearch Service, Amazon Redshift data warehouses, and Splunk in just a few clicks.

Create the Kinesis Data Firehose delivery stream

To create our delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. Choose Direct PUT for the stream source and Amazon S3 as the target.
  3. Name your delivery stream automotive-firehose.
  4. Under Destination settings, create a new bucket or use an existing bucket.
  5. Take note of the S3 bucket URL.
  6. Choose Create delivery stream.

The stream takes a few seconds to create.

  1. Return to the Kinesis Data Analytics console and choose Streaming applications.
  2. On the Studio tab, and choose your Studio notebook.
  3. Choose the link under IAM role.
  4. In the IAM window, choose Add permissions and Attach policies.
  5. Search for and select AmazonKinesisFullAccess and CloudWatchFullAccess, then choose Attach policy.
  6. You may return to your Zeppelin notebook.

Stream data into Kinesis Data Firehose

As of Apache Flink v1.15, creating the connector to the Firehose delivery stream works similar to creating a connector to any Kinesis data stream. Note that there are two differences: the connector is firehose, and the stream attribute becomes delivery-stream.

After the connector is created, we can write to the connector like any SQL table.

To validate that we’re getting data through the delivery stream, open the Amazon S3 console and confirm you see files being created. Open the file to inspect the new data.

In Kinesis Data Analytics for SQL Applications, we would have created a new destination in the SQL application dashboard. To migrate an existing destination, you add a SQL statement to your notebook that defines the new destination right in the code. You can continue to write to the new destination as you would have with an INSERT while referencing the new table name.

Time data

Another common operation you can perform in Kinesis Data Analytics Studio notebooks is aggregation over a window of time. This sort of data can be used to send to another Kinesis data stream to identify anomalies, send alerts, or be stored for further processing. The next paragraph contains a SQL query that uses a tumbling window and aggregates total fuel consumed for the automotive fleet for 30-second periods. Like our last example, we could connect to another data stream and insert this data for further analysis.

Scala and PyFlink

There are times when a function you’d perform on your data stream is better written in a programming language instead of SQL, for both simplicity and maintenance. Some examples include complex calculations that SQL functions don’t support natively, certain string manipulations, the splitting of data into multiple streams, and interacting with other AWS services (such as text translation or sentiment analysis). Kinesis Data Analytics for Apache Flink has the ability to use multiple Flink interpreters within the Zeppelin notebook, which is not available in Kinesis Data Analytics for SQL Applications.

If you have been paying close attention to our data, you’ll see that the location field is a JSON string. In Kinesis Data Analytics for SQL, we could use string functions and define a SQL function and break apart the JSON string. This is a fragile approach depending on the stability of the message data, but this could be improved with several SQL functions. The syntax for creating a function in Kinesis Data Analytics for SQL follows this pattern:

CREATE FUNCTION ''<function_name>'' ( ''<parameter_list>'' )
    RETURNS ''<data type>''
    [ SPECIFIC ''<specific_function_name>''  | [NOT] DETERMINISTIC ]
  RETURN ''<SQL-defined function body>''

In Kinesis Data Analytics for Apache Flink, AWS recently upgraded the Apache Flink environment to v1.15, which extends Apache Flink SQL’s table SQL to add JSON functions that are similar to JSON Path syntax. This allows us to query the JSON string directly in our SQL. See the following code:

SELECT JSON_STRING(location, ‘$.latitude) AS latitude,
JSON_STRING(location, ‘$.longitude) AS longitude
FROM my_table

Alternatively, and required prior to Apache Flink v1.15, we can use Scala or PyFlink in our notebook to convert the field and restream the data. Both languages provide robust JSON string handling.

The following PyFlink code defines two user-defined functions, which extract the latitude and longitude from the location field of our message. These UDFs can then be invoked from using Flink SQL. We reference the environment variable st_env. PyFlink creates six variables for you in your Zeppelin notebook. Zeppelin also exposes a context for you as the variable z.

Errors can also happen when messages contain unexpected data. Kinesis Data Analytics for SQL Applications provides an in-application error stream. These errors can then be processed separately and restreamed or dropped. With PyFlink in Kinesis Data Analytics Streaming applications, you can write complex error-handling strategies and immediately recover and continue processing the data. When the JSON string is passed into the UDF, it may be malformed, incomplete, or empty. By catching the error in the UDF, Python will always return a value even if an error would have occurred.

The following sample code shows another PyFlink snippet that performs a division calculation on two fields. If a division-by-zero error is encountered, it provides a default value so the stream can continue processing the message.

@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def DivideByZero(price):    
		price / 0        
		return -1
st_env.register_function("DivideByZero", DivideByZero)

Next steps

Building a pipeline as we’ve done in this post gives us the base for testing additional services in AWS. I encourage you to continue your streaming analytics learning before tearing down the streams you created. Consider the following:

Clean up

To clean up the services created in this exercise, complete the following steps:

  1. Navigate to the CloudFormation Console and delete the IoT Device Simulator stack.
  2. On the AWS IoT Core console, choose Message Routing and Rules, and delete the rule automotive_route_kinesis.
  3. Delete the Kinesis data stream automotive-data in the Kinesis Data Stream console.
  4. Remove the IAM role automotive-role in the IAM Console.
  5. In the AWS Glue console, delete the automotive-notebook-glue database.
  6. Delete the Kinesis Data Analytics Studio notebook automotive-data-notebook.
  7. Delete the Firehose delivery stream automotive-firehose.


Thanks for following along with this tutorial on Kinesis Data Analytics Studio. If you’re currently using a legacy Kinesis Data Analytics Studio SQL application, I recommend you reach out to your AWS technical account manager or Solutions Architect and discuss migrating to Kinesis Data Analytics Studio. You can continue your learning path in our Amazon Kinesis Data Streams Developer Guide, and access our code samples on GitHub.

About the Author

Nicholas Tunney is a Partner Solutions Architect for Worldwide Public Sector at AWS. He works with global SI partners to develop architectures on AWS for clients in the government, nonprofit healthcare, utility, and education sectors.

How Klarna Bank AB built real-time decision-making with Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Nir Tsruya original https://aws.amazon.com/blogs/big-data/how-klarna-bank-ab-built-real-time-decision-making-with-amazon-kinesis-data-analytics-for-apache-flink/

This is a joint post co-authored with Nir Tsruya from Klarna Bank AB.

Klarna is a leading global payments and shopping service, providing smarter and more flexible shopping and purchase experiences to 150 million active consumers across more than 500,000 merchants in 45 countries. Klarna offers direct payments, pay after delivery options, and instalment plans in a smooth one-click purchase experience that lets consumers pay when and how they prefer to. The ability to utilize data to make near-real-time decisions is a source of competitive advantage for Klarna.

This post presents a reference architecture for real-time queries and decision-making on AWS using Amazon Kinesis Data Analytics for Apache Flink. In addition, we explain why the Klarna Decision Tooling team selected Kinesis Data Analytics for Apache Flink for their first real-time decision query service. We show how Klarna uses Kinesis Data Analytics for Apache Flink as part of an end-to-end solution including Amazon DynamoDB and Apache Kafka to process real-time decision-making.

AWS offers a rich set of services that you can use to realize real-time insights. These services include Kinesis Data Analytics for Apache Flink, the solution Klarna that uses to underpin automated decision-making in their business today. Kinesis Data Analytics for Apache Flink allows you to easily build stream processing applications for a variety of sources including Amazon Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), and Amazon MQ.

The challenge: Real-time decision-making at scale

Klarna’s customers expect a real-time, frictionless, online experience when shopping and paying online. In the background, Klarna needs to assess risks such as credit risk, fraud attempts, and money laundering for every customer credit request in every operating geography. The outcome of this risk assessment is called a decision. Decisions generate millions of risk assessment transactions a day that must be run in near-real time. The final decision is the record of whether Klarna has approved or rejected the request to extend credit to a consumer. These underwriting decisions are critical artefacts. First, they contain information that must be persisted for legal reasons. Second, they are used to build profiles and models that are fed into underwriting policies to improve the decision process. Under the hood, a decision is the sum of a number of transactions (for example, credit checks), coordinated and persisted via a decision store.

Klarna wanted to build a framework to ensure decisions persist successfully, ensuring timely risk assessment and quick decisions for customers. First, the Klarna team looked to solve the problem of producing and capturing decisions by using a combination of Apache Kafka and AWS Lambda. By publishing decision artefacts directly to a Kafka topic, the Klarna team found that high latency could cause long transaction wait times or transactions to be rejected altogether, leading to delays in getting ratified decisions to customers in a timely fashion and potential lost revenue. This approach also caused operational overhead for the Klarna team, including management of the schema evolution, replaying old events, and native integration of Lambda with their self-managed Apache Kafka clusters.

Design requirements

Klarna was able to set out their requirements for a solution to capture risk assessment artefacts (decisions), acting as a source of truth for all underwriting decisions within Klarna. The key requirements included at-least once reliability and millisecond latency, enabling real-time access to decision-making and the ability to replay past events in case of missing data in downstream systems. Additionally, the team needed a system that could scale to keep pace with Klarna’s rapid [10 times] growth.

Solution overview

The solution consists of two components: a combination of an highly available API with DynamoDB as the data store to store each decision, and Amazon DynamoDB Streams with Kinesis Data Analytics. Kinesis Data Analytics is a fully managed Apache Flink service and used to stream, process, enrich, and standardize the decision in real time and replay past events (if needed).

The following diagram illustrates the overall flow from end-user to the downstream systems.

The flow includes the following steps:

  1. As the end-user makes a purchase, the policy components assess risk and the decision is sent to a decision store via the Decision Store API.
  2. The Decision Store API persists the data in DynamoDB and responds to the requester. Decisions for each transaction are time-ordered and streamed by DynamoDB Streams. Decision Store also enables centralised schema management and handles evolution of event schemas.
  3. The Kinesis Data Analytics for Apache Flink application is the consumer of DynamoDB streams. The application makes sure that the decisions captured are conforming to the expected event schema that is then published to a Kafka topic to be consumed by various downstream systems. Here, Kinesis Data Analytics for Apache Flink plays a vital part in the delivery of those events: aggregating, enriching, and mapping data to adhere to the event schema. This provides a standardized way for consumers to access decisions from their respective producers. The application enables at-least once delivery capability, and Flink’s checkpoint and retry mechanism guarantees that every event is processed and persisted.
  4. The published Kafka events are consumed by the downstream systems and stored in an Amazon Simple Storage Service (Amazon S3) bucket. The events stored in Amazon S3 reflect every decision ever taken by the producing policy components, and can be used by the decision store to backfill and replay any past events. In addition to preserving the history of decision events, events are also stored as a set of variables in the variable store.
  5. Policy components use the variable store to check for similar past decisions to determine if a request can be accepted or denied immediately. The request is then processed as described by the preceding workflow, and the next subsequent request will be answered by the variable store based on the result of the previous decision.

The decision store provides a standardized workflow for processing and producing events for downstream systems and customer support. With all the events captured and safely stored in DynamoDB, the decision store provides an API for support engineers (and other supporting tools like chatbots) to query and access past decisions in near-real time.

Solution impact

The solution provided benefits in three areas.

First, the managed nature of Kinesis Data Analytics allowed the Klarna team to focus on value-adding application development instead of managing infrastructure. The team is able to onboard new use cases in less than a week. They can take full advantage of the auto scaling feature in Kinesis Data Analytics and pre-built sources and destinations.

Second, the team can use Apache Flink to ensure the accuracy, completeness, consistency, and reliability of data. Flink’s native capability of stateful computation and data accuracy through the use of checkpoints and savepoints directly supports Klarna team’s vision to add more logic into the pipelines, allowing the team to expand to different use cases confidently. Additionally, the low latency of the service ensures that enriched decision artefacts are available to consumers and subsequently to the policy agents for future decision-making in near-real time.

Third, the solution enables the Klarna team to take advantage of the Apache Flink open-source community, which provides rich community support and the opportunity to contribute back to the community by bug fixing or adding new features.

This solution has proven to scale with increased adoption of a new use case, translating to a 10-times increase in events over 3 months.

Lessons learned

The Klarna team faced a few challenges with Flink serialization and upgrading Apache Flink versions. Flink serialization is an interesting concept and critical for the application’s performance. Flink uses a different set of serializers in order to serialize data between the operators. It’s up to the team to configure the best and most efficient serializer based on the use case. The Klarna team configured the objects as Flink POJO, which reduced the pipeline runtime by 85%. For more information, refer to Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can before deploying a Flink application to production.

The other challenge faced by the team was upgrading the Apache Flink version in Kinesis Data Analytics. Presently, the Kinesis Data Analytics for Apache Flink application requires the creation of a new Kinesis Data Analytics for Apache Flink application. Currently, reusing a snapshot (the binary artefact representing the state of the Flink application, used to restore the application to the last checkpoint taken) is not possible between two different applications. For that reason, upgrading the Apache Flink version requires additional steps in order to ensure the application doesn’t lose data.

What’s next for Klarna and Kinesis Data Analytics for Apache Flink?

The team is looking into expanding the usage of Kinesis Data Analytics and Flink in Klarna. Because the team is already highly experienced in the technology, their first ambition will be to own the infrastructure of a Kinesis Data Analytics for Apache Flink deployment, and connect it to different Klarna data sources. The team then will host business logic provided by other departments in Klarna such as Fraud Prevention. This will allow the specialised teams to concentrate on the business logic and fraud detection algorithms, while decision tooling will handle the infrastructure.

What next Overview

Klarna, AWS, and the Flink community

A key part of choosing Kinesis Data Analytics for Apache Flink was the open-source community and support.

Several teams within Klarna created different implementations of a Flink DynamoDB connector, which were used internally by multiple teams. Klarna then identified the opportunity to create a single maintained DynamoDB Flink connector and contribute it to the open-source community. This has initiated a collaboration within Klarna, led by the Klarna Flink experts and accompanied by Flink open-source contributors from AWS.

The main principle for designing the DynamoDB Flink connector was utilizing the different write capacity modes of DynamoDB. DynamoDB supports On-demand and Provisioned capacity modes and each behaves differently when it comes to how it handles incoming throughput. On-demand mode will automatically scale up DynamoDB write capacity and apply itself to the incoming load. However, Provisioned mode is more limiting, and will throttle incoming traffic according to the provisioned write capacity.

To comply with this process, the DynamoDB Flink connector was designed to allow concurrent writes to DynamoDB. The number of concurrent requests can be configured to comply with DynamoDB’s capacity mode. In addition, the DynamoDB Flink connector supports backpressure handling in case the DynamoDB write provisioning is low compared to the incoming load from the Apache Flink application.

At the time of writing, the DynamoDB Flink connector has been open sourced.


Klarna has successfully been running Kinesis Data Analytics for Apache Flink in production since October 2020. It provides several key benefits. The Klarna development team can focus on development, not on cluster and operational management. Their applications can be quickly modified and uploaded. The low latency properties of the service ensure a near-real-time experience for end-users, data consumers, and producers, which underpin risk assessment and the decision-making processes underpinning continuous traffic growth. At the same time, exactly-once processing in combination with Flink checkpoints and savepoints means that critical decision-making and legal data is not lost.

To learn more about Kinesis Data Analytics and getting started, refer to Using a Studio notebook with Kinesis Data Analytics for Apache Flink and More Kinesis Data Analytics Solutions on GitHub.

About the authors

Nir Tsruya is a Lead Engineer in Klarna. He leads 2 engineering teams focusing mainly on real time data processing and analytics at large scale.

Ankit Gupta is a Senior Solutions Architect at Amazon Web Serves based in Stockholm, Sweden, where we helps customers across the Nordics succeed in Cloud. He’s particularly passionate about building strong Networking foundation in Cloud.

Daniel Arenhage is a Solutions Architect at Amazon Web Services based in Gothenburg, Sweden.

Real-time inference using deep learning within Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/real-time-inference-using-deep-learning-within-amazon-kinesis-data-analytics-for-apache-flink/

Apache Flink is a framework and distributed processing engine for stateful computations over data streams. Amazon Kinesis Data Analytics for Apache Flink is a fully managed service that enables you to use an Apache Flink application to process streaming data. The Deep Java Library (DJL) is an open-source, high-level, engine-agnostic Java framework for deep learning.

In this blog post, we demonstrate how you can use DJL within Kinesis Data Analytics for Apache Flink for real-time machine learning inference. Real-time inference can be valuable in a variety of applications and industries where it is essential to make predictions or take actions based on new data as quickly as possible with low latencies. We show how to load a pre-trained deep learning model from the DJL model zoo into a Flink job and apply the model to classify data objects in a continuous data stream. The DJL model zoo includes a wide variety of pre-trained models for image classification, semantic segmentation, speech recognition, text embedding generation, question answering, and more. It supports HuggingFace, Pytorch, MXNet, and TensorFlow model frameworks and also helps developers create and publish their own models. We will focus on image classification and use a popular classifier called ResNet-18 to produce predictions in real time. The model has been pre-trained on ImageNet with 1.2 million images belonging to 1,000 class labels.

We provide sample code, architecture diagrams, and an AWS CloudFormation template so you can follow along and employ ResNet-18 as your classifier to make real-time predictions. The solution we provide here is a powerful design pattern for continuously producing ML-based predictions on streaming data within Kinesis Data Analytics for Apache Flink. You can adapt the provided solution for your use case and choose an alternative model from the model zoo or even provide your own model.

Image classification is a classic problem that takes an image and selects the best-fitting class, such as whether the image from an autonomous driving system is that of a bicycle or a pedestrian. Common use cases for real-time inference on streams of images include classifying images from vehicle cameras and license plate recognition systems, and classifying images uploaded to social media and ecommerce websites. The use cases typically need low latency while handling high throughput and potentially bursty streams. For example, in ecommerce websites, real-time classification of uploaded images can help in marking pictures of banned goods or hazardous materials that have been supplied by sellers. Immediate determination through streaming inference is needed to trigger alerts and follow-up actions to prevent these images from being part of the catalog. This enables faster decision-making compared to batch jobs that run on a periodic basis. The data stream pipeline can involve multiple models for different purposes, such as classifying uploaded images into ecommerce categories of electronics, toys, fashion, and so on.

Solution overview

The following diagram illustrates the workflow of our solution.

architecture showcasing a kinesis data analytics for apache flink application reading from Images in an Amazon S3 bucket, classifying those images and then writing out to another S3 bucket called "classifications"

The application performs the following steps:

  1. Read in images from Amazon Simple Storage Service (Amazon S3) using the Apache Flink Filesystem File Source connector.
  2. Window the images into a collection of records.
  3. Classify the batches of images using the DJL image classification class.
  4. Write inference results to Amazon S3 at the path specified.

Images are recommended to be of reasonable size so that they may fit into a Kinesis Processing Unit. Images larger than 50MB in size may result in latency in processing and classification.

The main class for this Apache Flink job is located at src/main/java/com.amazon.embeddedmodelinference/EMI.java. Here you can find the main() method and entry point to our Flink job.


To get started, configure the following prerequisites on your local machine:

Once this is set up, you can clone the code base to access the source code for this solution. The Java application code for this example is available on GitHub. To download the application code, clone the remote repository using the following command:

git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples

Find and navigate to the folder of the image classification example, called image-classification.

An example set of images to stream and test the code is available in the imagenet-sample-images folder.

Let’s walk through the code step by step.

Test on your local machine

If you would like to test this application locally on your machine, ensure you have AWS credentials set up locally on your machine. Additionally, download the Flink S3 Filesystem Hadoop JAR to use with your Apache Flink installation and place it in a folder named plugins/s3 in the root of your project. Then configure the following environment variables either on your IDE or in your machine’s local variable scope:

plugins.dir=<<path-to-flink-s3-fs-hadoop jar>>
s3.access.key=<<aws access key>>
s3.secret.key=<<aws secret key>>

Replace these values with your own.showcasing the environment properties to replace on IntelliJ

After configuring the environment variables and downloading the necessary plugin JAR, let’s look at the code.

In the main method, after setting up our StreamExecutionEnvironment, we define our FileSource to read files from Amazon S3. By default, this source operator reads from a sample bucket. You can replace this bucket name with your own by changing the variable called bucket, or setting the application property on Kinesis Data Analytics for Apache Flink once deployed.

final FileSource<StreamedImage> source =
FileSource.forRecordStreamFormat(new ImageReaderFormat(), new Path(s3SourcePath))

The FileSource is configured to read in files in the ImageReaderFormat, and will check Amazon S3 for new images every 10 seconds. This can be configured as well.

After we have read in our images, we convert our FileSource into a stream that can be processed:

DataStream<StreamedImage> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Next, we create a tumbling window of a variable time window duration, specified in the configuration, defaulting to 60 seconds. Every window close creates a batch (list) of images to be classified using a ProcessWindowFunction.

This ProcessWindowFunction calls the classifier predict function on the list of images and returns the best probability of classification from each image. This result is then sent back to the Flink operator, where it’s promptly written out to the S3 bucket path of your configuration.

.process(new ProcessWindowFunction<StreamedImage, String, String, TimeWindow>() {
                    public void process(String s,
                                        ProcessWindowFunction<StreamedImage, String, String, TimeWindow>.Context context,
                                        Iterable<StreamedImage> iterableImages,
                                        Collector<String> out) throws Exception {

                            List<Image> listOfImages = new ArrayList<Image>();
                            iterableImages.forEach(x -> {
                            // batch classify images
                            List<Classifications> list = classifier.predict(listOfImages);
                            for (Classifications classifications : list) {
                                Classifications.Classification cl = classifications.best();
                                String ret = cl.getClassName() + ": " + cl.getProbability();
                        } catch (ModelException | IOException | TranslateException e) {
                            logger.error("Failed predict", e);

In Classifier.java, we read the image and apply crop, transpose, reshape, and finally convert to an N-dimensional array that can be processed by the deep learning model. Then we feed the array to the model and apply a forward pass. During the forward pass, the model computes the neural network layer by layer. At last, the output object contains the probabilities for each image object that the model is being trained on. We map the probabilities with the object name and return to the map function.

Deploy the solution with AWS CloudFormation

To run this code base on Kinesis Data Analytics for Apache Flink, we have a helpful CloudFormation template that will spin up the necessary resources. Simply open AWS CloudShell or your local machine’s terminal and enter the following commands. Complete the following steps to deploy the solution:

  1. If you don’t have the AWS Cloud Development Kit (AWS CDK) bootstrapped in your account, run the following command, providing your account number and current Region:
cdk bootstrap aws://ACCOUNT-NUMBER/REGION

The script will clone a GitHub repo of images to classify and upload them to your source S3 bucket. Then it will launch the CloudFormation stack given your input parameters. video walking through the setup of the cloudformation template. Described in text later

  1. Enter the following code and replace the BUCKET variables with your own source bucket and sink bucket, which will contain the source images and the classifications, respectively:
git clone https://github.com/EliSchwartz/imagenet-sample-images; cd imagenet-sample-images;
aws s3 cp . $SOURCE_BUCKET --recursive --exclude "*/";
aws cloudformation create-stack --stack-name KDAImageClassification --template-url https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-3098/BlogStack.template.json --parameters ParameterKey=inputBucketPath,ParameterValue=$SOURCE_BUCKET ParameterKey=outputBucketPath,ParameterValue=$SINK_BUCKET --capabilities CAPABILITY_IAM;

This CloudFormation stack creates the following resources:

    • A Kinesis Data Analytics application with 1 Kinesis Processing Unit (KPU) preconfigured with some application properties
    • An S3 bucket for your output results
  1. When the stack is complete, navigate to the Kinesis Data Analytics for Apache Flink console.
  2. Find the application called blog-DJL-flink-ImageClassification-application and choose Run.
  3. On the Amazon S3 console, navigate to the bucket you specified in the outputBucketPath variable.

If you have readable images in the source bucket listed, you should see classifications of those images within the checkpoint interval of the running application.

Deploy the solution manually

If you prefer to use your own code base, you can follow the manual steps in this section:

  • After you clone the application locally, create your application JAR by navigating to the directory that contains your pom.xml and running the following command:
mvn clean package

This builds your application JAR in the target/ directory called embedded-model-inference-1.0-SNAPSHOT.jar.

application properties on KDA console

  1. Upload this application JAR to an S3 bucket, either the one created from the CloudFormation template, or another one to store code artifacts.
  2. You can then configure your Kinesis Data Analytics application to point to this newly uploaded S3 JAR file.
  3. This is also a great opportunity to configure your runtime properties, as shown in the following screenshot.
  4. Choose Run to start your application.

You can open the Apache Flink Dashboard to check for application exceptions or to see data flowing through the tasks defined in the code.

image of flink dashboard showing successful running of the application

Validate the results

To validate our results, let’s check the results in Amazon S3 by navigating to the Amazon S3 console and finding our S3 bucket. We can find the output in a folder called output-kda.

image showing folders within amazon s3 partitioned by datetime

When we choose one of the data-partitioned folders, we can see partition files. Ensure that there is no underscore in front of your part file, because this indicates that the results are still being finalized according to the rollover interval defined in Apache Flink’s FileSink connector. After the underscores have disappeared, we can use Amazon S3 Select to view our data.

partition files as they land in Amazon S3

We now have a solution that continuously performs classification on incoming images in real time using Kinesis Data Analytics for Apache Flink. It extracts a pre-trained classification model (ResNet-18) from the DJL model zoo, applies some preprocessing, loads the model into a Flink operator’s memory, and continuously applies the model for online predictions on streaming images.

Although we used ResNet-18 in this post, you can choose another model by modifying the classifier. The DJL model zoo provides many other models, both for classification and other applications, that can be used out of the box. You can also load your custom model by providing an S3 link or URL to the criteria. DJL supports models in a large number of engines such as PyTorch, ONNX, TensorFlow, and MXNet. Using a model in the solution is relatively simple. All of the preprocessing and postprocessing code is encapsulated in the (built-in) translator, so all we have to do is load the model, create a predictor, and call predict(). This is done within the data source operator, which processes the stream of input data and sends the links to the data to the inference operator where the model you selected produces the prediction. Then the sink operator writes the results.

The CloudFormation template in this example focused on a simple 1 KPU application. You could extend the solution to further scale out to large models and high-throughput streams, and support multiple models within the pipeline.

Clean up

To clean up the CloudFormation script you launched, complete the following steps:

  1. Empty the source bucket you specified in the bash script.
  2. On the AWS CloudFormation console, locate the CloudFormation template called KDAImageClassification.
  3. Delete the stack, which will remove all of the remaining resources created for this post.
  4. You may optionally delete the bootstrapping CloudFormation template, CDKToolkit, which you launched earlier as well.


In this post, we presented a solution for real-time classification using the Deep Java Library within Kinesis Data Analytics for Apache Flink. We shared a working example and discussed how you can adapt the solution for your specific ML use case. If you have any feedback or questions, please leave them in the comments section.

About the Authors

Jeremy Ber has been working in the telemetry data space for the past 9 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. At AWS, he is a Streaming Specialist Solutions Architect, supporting both Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Deepthi Mohan is a Principal Product Manager for Amazon Kinesis Data Analytics, AWS’s managed offering for Apache Flink.

Gaurav Rele is a Data Scientist at the Amazon ML Solution Lab, where he works with AWS customers across different verticals to accelerate their use of machine learning and AWS Cloud services to solve their business challenges.

Real-time time series anomaly detection for streaming applications on Amazon Kinesis Data Analytics

Post Syndicated from Antonio Vespoli original https://aws.amazon.com/blogs/big-data/real-time-time-series-anomaly-detection-for-streaming-applications-on-amazon-kinesis-data-analytics/

Detecting anomalies in real time from high-throughput streams is key for informing on timely decisions in order to adapt and respond to unexpected scenarios. Stream processing frameworks such as Apache Flink empower users to design systems that can ingest and process continuous flows of data at scale. In this post, we present a streaming time series anomaly detection algorithm based on matrix profiles and left-discords, inspired by Lu et al., 2022, with Apache Flink, and provide a working example that will help you get started on a managed Apache Flink solution using Amazon Kinesis Data Analytics.

Challenges of anomaly detection

Anomaly detection plays a key role in a variety of real-world applications, such as fraud detection, sales analysis, cybersecurity, predictive maintenance, and fault detection, among others. The majority of these use cases require actions to be taken in near real-time. For instance, card payment networks must be able to identify and reject potentially fraudulent transactions before processing them. This raises the challenge to design near-real-time anomaly detection systems that are able to scale to ultra-fast arriving data streams.

Another key challenge that anomaly detection systems face is concept drift. The ever-changing nature of some use cases requires models to dynamically adapt to new scenarios. For instance, in a predictive maintenance scenario, you could use several Internet of Things (IoT) devices to monitor the vibrations produced by an electric motor with the objective of detecting anomalies and preventing unrecoverable damage. Sounds emitted by the vibrations of the motor can vary significantly over time due to different environmental conditions such as temperature variations, and this shift in pattern can invalidate the model. This class of scenarios creates the necessity for online learning—the ability of the model to continuously learn from new data.

Time series anomaly detection

Time series are a particular class of data that incorporates time in their structuring. The data points that characterize a time series are recorded in an orderly fashion and are chronological in nature. This class of data is present in every industry and is common at the core of many business requirements or key performance indicators (KPIs). Natural sources of time series data include credit card transactions, sales, sensor measurements, machine logs, and user analytics.

In the time series domain, an anomaly can be defined as a deviation from the expected patterns that characterize the time series. For instance, a time series can be characterized by its expected ranges, trends, seasonal, or cyclic patterns. Any significant alteration of this normal flow of data points is considered an anomaly.

Detecting anomalies can be more or less challenging depending on the domain. For instance, a threshold-based approach might be suitable for time series that are informed of their expected ranges, such as the working temperature of a machine or CPU utilization. On the other hand, applications such as fraud detection, cybersecurity, and predictive maintenance can’t be classified via simple rule-based approaches and require a more fine-grained mechanism to capture unexpected observations. Thanks to their parallelizable and event-driven setup, streaming engines such as Apache Flink provide an excellent environment for scaling real-time anomaly detection to fast-arriving data streams.

Solution overview

Apache Flink is a distributed processing engine for stateful computations over streams. A Flink program can be implemented in Java, Scala, or Python. It supports ingestion, manipulation, and delivery of data to the desired destinations. Kinesis Data Analytics allows you to run Flink applications in a fully managed environment on AWS.

Distance-based anomaly detection is a popular approach where a model is characterized by a number of internally stored data points that are used for comparison against the new incoming data points. At inference time, these methods compute distances and classify new data points according to how dissimilar they are from the past observations. In spite of the plethora of algorithms in literature, there is increasing evidence that distance-based anomaly detection algorithms are still competitive with the state of the art (Nakamura et al., 2020).

In this post, we present a streaming version of a distance-based unsupervised anomaly detection algorithm called time series discords, and explore some of the optimizations introduced by the Discord Aware Matrix Profile (DAMP) algorithm (Lu et al., 2022), which further develops the discords method to scale to trillions of data points.

Understanding the algorithm

A left-discord is a subsequence that is significantly dissimilar from all the subsequences that precede it. In this post, we demonstrate how to use the concept of left-discords to identify time series anomalies in streams using Kinesis Data Analytics for Apache Flink.

Let’s consider an unbounded stream and all its subsequences of length n. The m most recent subsequences will be stored and used for inference. When a new data point arrives, a new subsequence that includes the new event is formed. The algorithm compares this latest subsequence (query) to the m subsequences retained from the model, with the exclusion of the latest n subsequences because they overlap with the query and would therefore characterize a self-match. After computing these distances, the algorithm classifies the query as an anomaly if its distance from its closest non-self-matching subsequence is above a certain moving threshold.

For this post, we use a Kinesis data stream to ingest the input data, a Kinesis Data Analytics application to run the Flink anomaly detection program, and another Kinesis data stream to ingest the output produced by your application. For visualization purposes, we consume from the output stream using Kinesis Data Analytics Studio, which provides an Apache Zeppelin Notebook that we use to visualize and interact with the data in real time.

Implementation details

The Java application code for this example is available on GitHub. To download the application code, complete the following steps:

  1. Clone the remote repository using the following command:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples

  2. Navigate to the amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords directory:

Let’s walk through the code step by step.

The MPStreamingJob class defines the data flow of the application, and the MPProcessFunction class defines the logic of the function that detects anomalies.

The implementation is best described by three core components:

  • The Kinesis data stream source, used to read from the input stream
  • The anomaly detection process function
  • The Kinesis data stream sink, used to deliver the output into the output stream

The anomaly detection function is implemented as a ProcessFunction<String, String>. Its method MPProcessFunction#processElement is called for every data point:

public void processElement(String dataPoint, ProcessFunction<String, OutputWithLabel>.Context context,
                            Collector<OutputWithLabel> collector) {

   Double record = Double.parseDouble(dataPoint);

   int currentIndex = timeSeriesData.add(record);

   Double minDistance = 0.0;
   String anomalyTag = "INITIALISING";

   if (timeSeriesData.readyToCompute()) {
       minDistance = timeSeriesData.computeNearestNeighbourDistance();

   * Algorithm will wait for initializationPeriods * sequenceLength data points until starting
   * to compute the Matrix Profile (MP).
   if (timeSeriesData.readyToInfer()) {
       anomalyTag = minDistance > threshold.getThreshold() ? "IS_ANOMALY" : "IS_NOT_ANOMALY";

   OutputWithLabel output = new OutputWithLabel(currentIndex, record, minDistance, anomalyTag);


For every incoming data point, the anomaly detection algorithm takes the following actions:

  1. Adds the record to the timeSeriesData.
  2. If it has observed at least 2 * sequenceLength data points, starts computing the matrix profile.
  3. If it has observed at least initializationPeriods * sequenceLength data points, starts outputting anomaly labels.

Following these actions, the MPProcessFunction function outputs an OutputWithLabel object with four attributes:

  • index – The index of the data point in the time series
  • input – The input data without any transformation (identity function)
  • mp – The distance to the closest non-self-matching subsequence for the subsequence ending in index
  • anomalyTag – A binary label that indicates whether the subsequence is an anomaly

In the provided implementation, the threshold is learned online by fitting a normal distribution to the matrix profile data:

 * Computes the threshold as two standard deviations away from the mean (p = 0.02)
 * @return an estimated threshold
public Double getThreshold() {
   Double mean = sum/counter;

   return mean + 2 * Math.sqrt(squaredSum/counter - mean*mean);

In this example, the algorithm classifies as anomalies those subsequences whose distance from their nearest neighbor deviates significantly from the average minimum distance (more than two standard deviations away from the mean).

The TimeSeries class implements the data structure that retains the context window, namely, the internally stored records that are used for comparison against the new incoming records. In the provided implementation, the n most recent records are retained, and when the TimeSeries object is at capacity, the oldest records are overridden.


Before you create a Kinesis Data Analytics application for this exercise, create two Kinesis data streams: InputStream and OutputStream in us-east-1. The Flink application will use these streams as its respective source and destination streams. To create these resources, launch the following AWS CloudFormation stack:

Launch Stack

Alternatively, follow the instructions in Creating and Updating Data Streams.

Create the application

To create your application, complete the following steps:

  1. Clone the remote repository using the following command:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples 

  2. Navigate to the amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core directory.
    cd amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core

  3. Create your JAR file by running the following Maven command in the core directory, which contains the pom.xml file:
    mvn package -Dflink.version=1.15.4
  4. Create an Amazon Simple Storage Service (Amazon S3) bucket and upload the file target/left-discords-1.0.0.jar.
  5. Create and run a Kinesis Data Analytics application as described in Create and Run the Kinesis Data Analytics Application:
    1. Use the target/left-discords-1.0.0.jar.
    2. Note that the input and output streams are called InputStream and OutputStream, respectively.
    3. The provided example is set up to run in us-east-1.

Populate the input stream

You can populate InputStream by running the script.py file from the cloned repository, using the command python script.py. By editing the last two lines, you can populate the stream with synthetic data or with real data from a CSV dataset.

Visualize data on Kinesis Data Analytics Studio

Kinesis Data Analytics Studio provides the perfect setup for observing data in real time. The following screenshot shows sample visualizations. The first plot shows the incoming time series data, the second plot shows the matrix profile, and the third plot shows which data points have been classified as anomalies.

To visualize the data, complete the following steps:

  1. Create a notebook.
  2. Add the following paragraphs to the Zeppelin note:

Create a table and define the shape of the records generated by the application:


index INT,
input VARCHAR(6),
mp VARCHAR(6),
anomalyTag VARCHAR(20)
'connector' = 'kinesis',
'stream' = 'OutputStream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'

Visualize the input data (choose Line Chart from the visualization options):


SELECT index, input FROM data;

Visualize the output matrix profile data (choose Scatter Chart from the visualization options):


SELECT index, mp FROM data;

Visualize the labeled data (choose Scatter Chart from the visualization options):


SELECT index, anomalyTag FROM data;

Clean up

To delete all the resources that you created, follow the instructions in Clean Up AWS Resources.

Future developments

In this section, we discuss future developments for this solution.

Optimize for speed

The online time series discords algorithm is further developed and optimized for speed in Lu et al., 2022. The proposed optimizations include:

  • Early stopping – If the algorithm finds a subsequence that is similar enough (below the threshold), it stops searching and marks the query as non-anomaly.
  • Look-ahead windowing – Look at some amount of data in the future and compare it to the current query to cheaply discover and prune future subsequences that could not be left-discords. Note that this introduces some delay. The reason why disqualifying improves performance is that data points that are close in time are more likely to be similar than data points that are distant in time.
  • Use of MASS – The MASS (Mueen’s Algorithm for Similarity Search) search algorithm is designed for efficiently discovering the most similar subsequence in the past.


The algorithm above operates with parallelism 1, which means that when a single worker is enough to handle the data stream throughput, the above algorithm can be directly used. This design can be enhanced with further distribution logic for handling high throughput scenarios. In order to parallelise this algorithm, you may to design a partitioner operator that ensures that the anomaly detection operators would have at their disposal the relevant past data points. The algorithm can maintain a set of the most recent records to which it compares the query. Efficiency and accuracy trade-offs of approximate solutions are interesting to explore. Since the best solution for parallelising the algorithm depends largely on the nature of the data, we recommend experimenting with various approaches using your domain-specific knowledge.


In this post, we presented a streaming version of an anomaly detection algorithm based on left-discords. By implementing this solution, you learned how to deploy an Apache Flink-based anomaly detection solution on Kinesis Data Analytics, and you explored the potential of Kinesis Data Analytics Studio for visualizing and interacting with streaming data in real time. For more details on how to implement anomaly detection solutions in Apache Flink, refer to the GitHub repository that accompanies this post. To learn more about Kinesis Data Analytics and Apache Flink, explore the Amazon Kinesis Data Analytics Developer Guide.

Give it a try and share your feedback in the comments section.

About the Authors

Antonio Vespoli is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Samuel Siebenmann is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Nuno Afonso is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Real-time anomaly detection via Random Cut Forest in Amazon Kinesis Data Analytics

Post Syndicated from Daren Wong original https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/

Real-time anomaly detection describes a use case to detect and flag unexpected behavior in streaming data as it occurs. Online machine learning (ML) algorithms are popular for this use case because they don’t require any explicit rules and are able to adapt to a changing baseline, which is particularly useful for continuous streams of data where incoming data changes continuously over time.

Random Cut Forest (RCF) is one such algorithm widely used for anomaly detection use cases. In typical setups, we want to be able to run the RCF algorithm on input data with large throughput, and streaming data processing frameworks can help with that. We are excited to share that RCF is possible with Amazon Kinesis Data Analytics for Apache Flink. Apache Flink is a popular open-source framework for real-time, stateful computations over data streams, and can be used to run RCF on input streams with large throughput.

This post demonstrates how we can use Kinesis Data Analytics for Apache Flink to run an online RCF algorithm for anomaly detection.

Solution overview

The following diagram illustrates our architecture, which consists of three components: an input data stream using Amazon Kinesis Data Streams, a Flink job, and an output Kinesis data stream. In terms of data flow, we use a Python script to generate anomalous sine wave data into the input data stream, the data is then processed by RCF in a Flink job, and the resultant anomaly score is delivered to the output data stream.

The following graph shows an example of our expected result, which indicates that the anomaly score peaked when the sine wave data source anomalously dropped to constant -17.

We can implement this solution in three simple steps:

  1. Set up AWS resources via AWS CloudFormation.
  2. Set up a data generator to produce data into the source data stream.
  3. Run the RCF Flink Java code on Kinesis Data Analytics.

Set up AWS resources via AWS CloudFormation

The following CloudFormation stack will create all the AWS resources we need for this tutorial, including two Kinesis data streams, a Kinesis Data Analytics app, and an Amazon Simple Storage Service (Amazon S3) bucket.

Sign in to your AWS account, then choose Launch Stack:


Follow the steps on the AWS CloudFormation console to create the stack.

Set up a data generator

Run the following Python script to populate the input data stream with the anomalous sine wave data:

import json
import boto3
import math 

STREAM_NAME = "ExampleInputStream-RCF"

def get_data(time):
    rad = (time/100)%360
    val = math.sin(rad)*10 + 10

    if rad > 2.4 and rad < 2.6:
        val = -17

    return {'time': time, 'value': val}

def generate(stream_name, kinesis_client):
    time = 0

    while True:
        data = get_data(time)

        time += 1

if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))

Run the RCF Flink Java code on Kinesis Data Analytics

The CloudFormation stack automatically downloaded and packaged the RCF Flink job JAR file for you. Therefore, you can simply go to the Kinesis Data Analytics console to run your application.

That’s it! We now have a running Flink job that continuously reads in data from an input Kinesis data stream and calculates the anomaly score for each new data point given the previous data points it has seen.

The following sections explain the RCF implementation and Flink job code in more detail.

RCF implementation

Numerous RCF implementations are publicly available. For this tutorial, we use the AWS implementation by wrapping it around a custom wrapper (RandomCutForestOperator) to be used in our Flink job.

RandomCutForestOperator is implemented as an Apache Flink ProcessFunction, which is a function that allows us to write custom logic to process every element in the stream. Our custom logic starts with a data transformation via inputDataMapper.apply, followed by getting the anomaly score by calling the AWS RCF library via rcf.getAnomalyScore. The code implementation of RandomCutForestOperator can be found on GitHub.

RandomCutForestOperatorBuilder requires two main types of parameters:

  • RandomCutForestOperator hyperparameters – We use the following:
    • Dimensions – We set this to 1 because our input data is a 1-dimensional sine wave consisting of the float data type.
    • ShingleSize – We set this to 1, which means our RCF algorithm will take into account the previous and current data points in anomaly score deduction. Note that this can be increased to account for seasonality in data.
    • SampleSize – We set this to 628, which means a maximum of 628 data points is kept in the data sample for each tree.
  • DataMapper parameters for input and output processing – We use the following:
    • InputDataMapper – We use RandomCutForestOperator.SIMPLE_FLOAT_INPUT_DATA_MAPPER to map input data from float to float[].
    • ResultMapper – We use RandomCutForestOperator.SIMPLE_TUPLE_RESULT_DATA_MAPPER, which is a BiFunction that joins the anomaly score with the corresponding sine wave data point into a tuple.

Flink job code

The following code snippet illustrates the core streaming structure of our Apache Flink streaming Java code. It first reads in data from the source Kinesis data stream, then processes it using the RCF algorithm. The computed anomaly score is then written to an output Kinesis data stream.

DataStream<Float> sineWaveSource = createSourceFromStaticConfig(env);

                RandomCutForestOperator.<Float, Tuple2<Float, Double>>builder()
                TupleTypeInfo.getBasicTupleTypeInfo(Float.class, Double.class))

In this example, our baseline input data is a sine wave. As shown in the following screenshot, a low anomaly score is returned when the data is regular. However, when there is an anomaly in the data (when the sine wave input data drops to a constant), a high anomaly score is returned. The anomaly score is delivered into an output Kinesis data stream. You can visualize this result by creating a Kinesis Data Analytics Studio app; for instructions, refer to Interactive analysis of streaming data.

Because this is an unsupervised algorithm, you don’t need to provide any explicit rules or labeled datasets for this operator. In short, only the input data stream, data conversions, and some hyperparameters were provided. The RCF algorithm itself determined the expected baseline based on the input data and identified any unexpected behavior.

Furthermore, this means the model will continuously adapt even if the baseline changes over time. As such, minimal retraining cadence is required. This is powerful for anomaly detection on streaming data because the data will often drift slowly over time due seasonal trends, inflation, equipment calibration drift, and so on.

Clean up

To avoid incurring future charges, complete the following steps:

  1. On the Amazon S3 console, empty the S3 bucket created by the CloudFormation stack.
  2. On the AWS CloudFormation console, delete the CloudFormation stack.


This post demonstrated how to perform anomaly detection on input streaming data with RCF, an online unsupervised ML algorithm using Kinesis Data Analytics. We also showed how this algorithm learns the data baseline on its own, and can adapt to changes in the baseline over time. We hope you consider this solution for your real-time anomaly detection use cases.

About the Authors

Daren Wong is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Aleksandr Pilipenko is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Hong Liang Teoh is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Accelerating revenue growth with real-time analytics: Poshmark’s journey

Post Syndicated from Mahesh Pasupuleti original https://aws.amazon.com/blogs/big-data/accelerating-revenue-growth-with-real-time-analytics-poshmarks-journey/

This post was co-written by Mahesh Pasupuleti and Gaurav Shah from Poshmark.

Poshmark is a leading social marketplace for new and secondhand styles for women, men, kids, pets, home, and more. By combining the human connection of physical shopping with the scale, ease, and selection benefits of Ecommerce, Poshmark makes buying and selling simple, social, and sustainable. Its community of more than 80 million registered users across the US, Canada, Australia, and India is driving a more sustainable future for the fashion industry.

An important goal to achieve for any organization is to grow the top line revenue. Top line revenue refers to the total value of sales of an organization’s services or products. The two main approaches organizations employ to increase revenue are to expand geographically to enter new markets and to increase market share within a market by improving customer experience (CX).

Improving CX is a well-known guideline to attract and retain customers and thereby increase the market share. In this post, we share how Poshmark improved CX and accelerated revenue growth by using a real-time analytics solution. We discuss how to create such a solution using Amazon Kinesis Data Streams, Amazon Managed Streaming for Kafka (Amazon MSK), Amazon Kinesis Data Analytics for Apache Flink; the design decisions that went into the architecture; and the observed business benefits by Poshmark.

High-level challenge: The need for real-time analytics

Previous efforts at Poshmark for improving CX through analytics were based on batch processing of analytics data and using it on a daily basis to improve CX. Although these batch analytics-based efforts were successful to some extent, they saw opportunities to improve the customer experience with real-time personalization and security guidance during the customer’s interaction with the Poshmark app. The customer insights gathered from the batch analytics couldn’t be paired with the current customer activities in real time due to the latencies involved in enriching the current activities with the knowledge gained through batch processes. Therefore, the opportunity to provide tailored offers or showcase products based on customers’ preference and behaviors in near-real time, which contributes to a much better customer experience, was missing. Similarly, the opportunity to catch fraud within a second, before checkout, was also missing.

To improve the customer experience, Poshmark decided to invest in building a real-time analytics platform to enable real-time capabilities, as explained further in this post. Poshmark engineers worked closely with AWS architects through the AWS Data Lab program. The AWS Data Lab offers accelerated, joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives. The Design Lab is one half to two day engagement with customer team offering prescriptive guidance to arrive at the optimal solution architecture design before you embark on building the platform.

Designing the solution architecture through the AWS Data Lab process

The business and technical stakeholders from Poshmark and the AWS Data Lab architects discussed near-to-long-term business requirements along with the functional and non-functional capabilities required to decide on the architecture approach. They reviewed the current state architecture and constraints to understand data flow and technical integration points. The joint team discussed the pros and cons of various AWS services that already exist in Poshmark’s current architecture, as well as other AWS services that can meet the requirements.

Poshmark wanted to address the following business use cases via the real-time analytics platform:

  • Sessionization – Poshmark captures both server-side application events and client-side tracking events. They wanted to use these events to identify and analyze user sessions to track behavior.
  • Illegitimate sign-up and sign-in prevention – Poshmark wanted to detect and ban illegitimate sign-up or sign-in events from bots or non-human traffic in real time on the Poshmark application.
  • IP translation – The IP addresses present in events will be translated to city, state, and zip, and enriched with other information to implement near-real-time, location-aware services encompassing security-related functions as well as personalization functions.
  • Anonymization – Poshmark wanted to anonymize events and make the data available for internal users for querying in near-real time.
  • Personalized recommendations – User behavior based on clickstream events can be captured up to the last second before enriching it for personalization and sending it to the model to predict the recommendations.
  • Other use cases – Additional use cases relating to aggregations and machine learning (ML) inference use cases such as authorization to operate, listing spam detection, and avoiding account takeovers (ATOs), among others.

One common pattern identified for these use cases was the need for a central data enrichment pipeline to enrich incoming raw events before event data can be utilized for actual business processing. In the Design Lab, we focused on design for data enrichment pipelines aimed at enriching events with data from static files, dynamic data stores such as databases, APIs, or within the same event stream for the aforementioned streaming use cases. Later in this post, we cover the salient points discussed during the lab around design and architecture.

Batch analytics solution architecture

The following diagram shows the previous architecture at Poshmark. For brevity, only the flow pertaining to the real-time analytics platform is explained.

User interactions on Poshmark web and mobile applications generate server-side events. These events include add to cart, orders, transactions, and more on application servers, and the page view, clicks, and more on tracking servers. Fluentd with an Amazon Kinesis plugin is set up on both the application and tracking servers to send these events to Amazon Kinesis Data Streams. The Fluentd Kinesis plugin aggregates events before sending to Kinesis Data Streams. A single Kinesis data stream is currently set up to capture these events. A random partition key is configured in Fluentd for the events to allow even distribution of events across shards. The event data format is nested JSON. Poshmark maintains the same schema grammar at the first level of JSON for both server-side and client-side server events. The attributes at nested level can differ between server-side and client-side events.

Poshmark receives around 1 billion events per day (100 million per hour during peak hours, 10 million per hour during non-peak hours). The average size of the event record is 1.2 KB.

The data from the Kinesis data stream is consumed by two applications:

  • A Spark streaming application on Amazon EMR is used to write data from the Kinesis data stream to a data lake hosted on Amazon Simple Storage Service (Amazon S3) in a partitioned way. The data from the S3 data lake is used for batch processing and analytics through Amazon EMR and Amazon Redshift.
  • Druid hosted on Amazon Elastic Compute Cloud (Amazon EC2) integrates with the Kinesis data stream for streaming ingestion and allows users to run slice-and-dice OLAP queries. Operational dashboards are hosted on Grafana integrated with Druid.

Desired enhancements to the initial solution

The use cases discussed during the architecture sessions fall into one or more combinations of the following stream processing requirements:

  • Stateless event processing – For example, near-real-time anonymization.
  • External lookup – Looking up a value from external stores. For example, IP address, city, zip, state, or ID.
  • Stateful data processing – Accessing past events or aggregations or ML inferences.

To meet these requirements, the streaming platform is divided into two layers:

  • Central data enrichment – This layer runs enrichments commonly required by downstream streaming applications. This will help avoid replication of the same enrichment logic in each application and enable better operational maintenance. The enrichment should strive for per-record processing in most cases.
  • Specific streaming applications – This layer will house specific streaming applications with respect to use cases and utilize enriched data from the central data enrichment pipeline.

For central data enrichment, we made the following enhancements to the platform:

  • The total latency including ingestion and data enrichment was super critical and should be in the range of double-digit millisecond latency based on the overall latency budget of Poshmark to achieve real-time ML responses to events. The absolute lowest ingestion latency was achieved by Kafka, and the team decided to go with the managed version of Kafka, Amazon MSK.
  • Similarly, low-latency processing of data is also required, and appropriate framework should be considered accordingly.
  • Exactly-once delivery guarantees were required to avoid data duplication resulting in wrong calculations.
  • The enrichment source could be any source such as static files, databases, and APIs and latencies can vary between them. A number of server-side and client-side events are generated when a user interacts with a Poshmark application. As a result, the same information from the enrichment source is required to enrich each event. This frequently accessed information cached in a centralized cache will optimize fetch time.

Design decisions for the new solution

Poshmark made the following design decisions for central data enrichment:

  • Kafka can support double-digit millisecond latency from producer to consumer with appropriate performance tuning. Kafka can provide exactly-once semantics both at producers and consumer applications. AWS provides Kafka as part of its Amazon MSK offering, eliminating the operational overhead of maintaining and running Kafka cluster infrastructure on AWS, thereby allowing you to focus on developing and running Kafka-based applications. Poshmark decided to use Amazon MSK for their streaming ingestion and storage requirements.
  • We also decided to use Flink for streaming data enrichment applications for the following reasons:
    • Flink can provide low-latency processing even at higher throughput with exactly-once guarantees. Spark Structured Streaming on the other hand can provide low latency with low throughput due to microbatch-based processing. Spark Structured Streaming continuous processing is an experimental feature and provides at-least once guarantees.
    • The enrichment requests call to an external store if modeled in a map function (Spark’s map API or Flink’s MapFunction API) will make synchronous calls to the external store. The call will wait for a response from the external store before processing the next event, adding to delays and reducing overall throughput. The asynchronous interaction will allow sending requests and receiving responses concurrently from external stores. This will reduce wait time and improve overall throughput. Flink supports async I/O operators natively, allowing users to use asynchronous request clients with data streams. The API handles the integration with data streams, well as handling order, event time, fault tolerance, and more. Spark Structured Streaming doesn’t provide any such support natively and leaves it to users for custom implementation.
    • Poshmark selected Kinesis Data Analytics for Apache Flink to run the data enrichment application. Kinesis Data Analytics for Apache Flink provides the underlying infrastructure for your Apache Flink applications. It handles core capabilities like provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots).
  • An enrichment microservice accompanying Amazon ElastiCache for Redis was set up to abstract access from data enrichment applications. The AsyncFunction in the Flink async I/O operator isn’t multi-threaded and won’t work in a truly asynchronous way if the call is blocked or waiting for a response. The enrichment microservice handles requests and responses asynchronously coming from Flink async I/O operators. The data is also cached in ElastiCache for Redis to improve the latency of the microservice.
  • The Poshmark ML applications are the consumers of this enriched data. The team has built and deployed different ML models over time. These models include a learning to rank algorithm, fraud detection, personalization and recommendations, and online spam filtering. Previously, for deploying each model into production, the Poshmark team had to go through a series of infrastructure setup steps that involved data extraction from real-time sources, building real-time aggregate features from streaming data, storing these features in a low-latency database (Redis) for sub-millisecond inferences, and finally performing inferences via Amazon SageMaker hosted endpoints.
  • We also designed an ML feature storage pipeline that consumes data from the enriched streaming sources (Kinesis or Kafka), generate single-level and aggregated-level features, and ingest these generated features into a feature store repository with a very low latency of less than 80 milliseconds.
  • The ML models are now able to extract the needed features with latency less than 10 milliseconds from the feature repository and perform real-time model inferencing.

Real-time analytics solution architecture

The following diagram illustrates the solution architecture for real-time analytics with Amazon MSK and Kinesis Data Analytics for Apache Flink.

The workflow is as follows:

  1. Users interact on Poshmark’s web or mobile application.
  2. Server-side events are captured on application servers and client-side events are captured on tracking servers. These events are written in the downstream MSK cluster.
  3. The raw events will be ingested into the MSK cluster using the Fluentd plugin to produce data for Kafka.
  4. The enrichment microservice consists of reactive (asynchronous) enrichment lookup APIs fetching data from persistent data stores. ElastiCache for Redis caches frequently accessed data, reducing fetch time for enrichment lookup APIs.
  5. The Flink application running on Kinesis Data Analytics for Apache Flink consumes raw events from Amazon MSK and runs data enrichment on a per-record basis. The Flink data enrichment application uses Flink’s async I/O to read external data from the enrichment lookup store for enriching stream events.
  6. Enriched events are written in the MSK cluster under different enriched events topics.
  7. The existing Spark streaming application consumes from the enriched events topic (or raw events topic) in Amazon MSK and writes the data into an S3 data lake.
  8. Druid streaming ingestion now reads from the enriched events topic or raw events topic in Amazon MSK depending on the requirements.

Enrichment of the captured event data

In this section, we discuss the different steps to enrich the captured event data.

Enrichment processing

Kinesis Data Analytics for Apache Flink provides the underlying infrastructure for the Apache Flink applications. It handles core capabilities like provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots). You can use the high-level Flink programming features (such as operators, functions, sources, and sinks) in the same way that you use them when hosting the Flink infrastructure yourself.

Flink on Amazon EMR gives the flexibility to choose your Flink version, installation, configuration, instances, and storage. However, you also have to take care of cluster management and operational requirements such as scaling, application backup, and provisioning.

Enrichment lookup store

The AsyncFunction in the Flink async I/O operator isn’t multi-threaded and won’t work in a truly asynchronous way if the call is blocked or waiting for a response. The enrichment lookup API should handle requests and responses asynchronously coming from Flink async I/O operators. The enrichment lookup API can be hosted on Amazon EC2 or containers such as Amazon Elastic Container Service (Amazon ECS) or Amazon Elastic Kubernetes Service (Amazon EKS).

A number of server-side and client-side events are generated when a user interacts with a Poshmark application. As a result, the same information is required to enrich each event. This frequently accessed information cached in a centralized cache can optimize fetch time. The latency to the centralized cache can be further reduced by hosting the client (enrichment lookup API) and cache server in the same Availability Zone.

Reconciliation in case of pipeline errors

The event enrichment can fail in data enrichment applications for various reasons, such as the external store timing out or missing information in the store. The enriched fields may or may not be critical for downstream streaming applications. You should build your downstream streaming applications considering that these failures can occur and implement a fallback mechanism, for example retrying on-demand enrichment from the application. The failure handling will also be governed by latency tolerance of the application.

The processing of data is based on event time. In some situations, data can arrive late in the platform. Both Flink and Spark allow lateness and watermarks for users to handle late-arriving data by defining thresholds. Late-arriving data beyond the threshold is discarded from processing. It’s possible to get this discarded too-late data in Flink using a side output. There is no such provision in Spark Structured Streaming.

A few streaming applications require their batch counterpart to reconcile data hourly or daily to handle data mismatch or data discrepancy due to late-arriving data or missing data.

Improved customer experience

The new real-time architecture offered the following benefits for an improved customer experience:

  • Anonymization – Poshmark is now able to provide and utilize real-time anonymized data for multiple functions both internally and externally because anonymization happens in real time.
  • Fraud mitigation – Poshmark was previously able to detect and prevent 45% of ATOs with the batch-based solution. With the real-time system, Poshmark is able to prevent 80% of ATOs.
  • Personalization – By providing personalized search results, Poshmark achieved an 8% improvement on clickthrough rates for search. This is a significant increase in the top of the funnel, increasing overall search conversions.

Improvement in these three factors helped end-customers gain confidence in the Poshmark app and website, which in turn enabled customers to increase their interaction with the app and helped accelerate customer engagement and growth.


In this post, we discussed the ingestion of real-time clickstream and log event data into Amazon MSK. We showed how enrichment of the captured data can be performed through Kinesis Data Analytics for Apache Flink. We broke up the enrichment processing into multiple components, such as Kinesis Data Analytics for Apache Flink, the enrichment microservices and the enrichment lookup store, and an enrichment cache. We discussed the downstream applications that used this enriched customer information to perform real-time security checks and offer personalized recommendations to end-users. We also discussed some of the areas that may need attention in case there are failures in the pipeline. Lastly, we showed how Poshmark improved their customer experience and gained market share by implementing this real-time analytics pipeline.

About the authors

Mahesh Pasupuleti is a VP of Data & Machine Learning Engineering at Poshmark. He has helped several startups succeed in different domains, including media streaming, healthcare, the financial sector, and marketplaces. He loves software engineering, building high performance teams, and strategy, and enjoys gardening and playing badminton in his free time.

Gaurav Shah is Director of Data Engineering and ML at Poshmark. He and his team help build data-driven solutions to drive growth at Poshmark.

Raghu Mannam is a Sr. Solutions Architect at AWS in San Francisco. He works closely with late-stage startups, many of which have had recent IPOs. His focus is end-to-end solutioning including security, DevOps automation, resilience, analytics, machine learning, and workload optimization in the cloud.

Deepesh Malviya is Solutions Architect Manager on the AWS Data Lab team. He and his team help customers architect and build data, analytics, and machine learning solutions to accelerate their key initiatives as part of the AWS Data Lab.

Automate deployment and version updates for Amazon Kinesis Data Analytics applications with AWS CodePipeline

Post Syndicated from Anand Shah original https://aws.amazon.com/blogs/big-data/automate-deployment-and-version-updates-for-amazon-kinesis-data-analytics-applications-with-aws-codepipeline/

Amazon Kinesis Data Analytics is the easiest way to transform and analyze streaming data in real time using Apache Flink. Customers are already using Kinesis Data Analytics to perform real-time analytics on fast-moving data generated from data sources like IoT sensors, change data capture (CDC) events, gaming, social media, and many others. Apache Flink is a popular open-source framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

Although building Apache Flink applications is typically the responsibility of a data engineering team, automating the deployment and provisioning infrastructure as code (IaC) is usually owned by the platform (or DevOps) team.

The following are typical responsibilities of the data engineering role:

  • Write code for real-time analytics Apache Flink applications
  • Roll out new application versions or roll them back (for example, in the case of a critical bug)

The following are typical responsibilities of the platform role:

  • Write code for IaC
  • Provision the required resources in the cloud and manage their access

In this post, we show how you can automate deployment and version updates for Kinesis Data Analytics applications and allow both Platform and engineering teams to effectively collaborate and co-own the final solution using AWS CodePipeline with the AWS Cloud Development Kit (AWS CDK).

Solution overview

To demonstrate the automated deployment and version update of a Kinesis Data Analytics application, we use the following example real-time data analytics architecture for this post.

Real-time data analytics architecture

The workflow includes the following steps:

  1. An AWS Lambda function (acting as data source) is the event producer pushing events on demand to Amazon Kinesis Data Streams when invoked.
  2. The Kinesis data stream receives and stores real-time events.
  3. The Kinesis Data Analytics application reads events from the data stream and performs real-time analytics on it.

Generic architecture

You can refer to the following generic architecture to adapt this example to your preferred CI/CD tool (for example, Jenkins). The overall deployment process is divided into three high-level parts:

  1. Infrastructure CI/CD – This portion is highlighted in orange. The infrastructure CI/CD pipeline is responsible for deploying all the real-time streaming architecture components, including the Kinesis Data Analytics application and any connected resources typically deployed using AWS CloudFormation.
  2. ApplicationStack – This portion is highlighted in gray. The application stack is deployed by the infrastructure CI/CD component using AWS CloudFormation.
  3. Application CI/CD – This portion is highlighted in green. The application CI/CD pipeline updates the Kinesis Data Analytics application in three steps:
    1. The pipeline builds the Java or Python source code of the Kinesis Data Analytics application and produces the application as a binary file.
    2. The pipeline pushes the latest binary file to the Amazon Simple Storage Service (Amazon S3) artifact bucket after a successful build as Kinesis Data Analytics application binary files are referenced from S3.
    3. The S3 bucket file put event triggers a Lambda function, which updates the version of the Kinesis Data Analytics application by deploying the latest binary.

The following diagram illustrates this workflow.

Workflow illustrated

CI/CD architecture with CodePipeline

In this post, we implement the generic architecture using CodePipeline. The following diagram illustrates our updated architecture.

Updated architecture illustrated

The final solution includes the following steps:

  1. The platform (DevOps) team and data engineering team push their source code to their respective code repositories.
  2. CodePipeline deploys the whole infrastructure as three stacks:
    1. InfraPipelineStack – Contains a pipeline to deploy the overall infrastructure.
    2. ApplicationPipelineStack – Contains a pipeline to build and deploy Kinesis Data Analytics application binaries. In this post, we build a Java source using the JavaBuildPipeline AWS CDK construct. You can use the PythonBuildPipeline AWS CDK construct to build a Python source.
    3. ApplicationStack – Contains real-time data analytics pipeline resources including Lambda (data source), Kinesis Data Streams (storage), and Kinesis Data Analytics (Apache Flink application).

Deploy resources using AWS CDK

The following GitHub repository contains the AWS CDK code to create all the necessary resources for the data pipeline. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time. To deploy the resources, complete the following steps:

  1. Clone the GitHub repository to your local computer using the following command:
git clone https://github.com/aws-samples/automate-deployment-and-version-update-of-kda-application
  1. Download and install the latest Node.js.
  2. Run the following command to install the latest version of AWS CDK:
npm install -g aws-cdk
  1. Run cdk bootstrap to initialize the AWS CDK environment in your AWS account. Replace your AWS account ID and Region before running the following command.
cdk bootstrap aws://123456789012/us-east-1

To learn more about the bootstrapping process, refer to Bootstrapping.

Part 1: Data engineering and platform teams push source code to their code repositories

The data engineering and platform teams begin work in their respective code repositories, as illustrated in the following figure.

The data engineering and platform teams begin work in their respective code repositories, as illustrated in the following figure.

In this post, we use two folders instead of two GitHub repositories, which you can find under the root folder of the cloned repository:

  • kinesis-analytics-application – This folder contains example source code of the Kinesis Data Analytics application. This represents your Kinesis Data Analytics application source code developed by your data engineering team.
  • infrastructure-cdk – This folder contains example AWS CDK source code of the final solution used for provisioning all the required resources and CodePipeline. You can reuse this code for your Kinesis Data Analytics application deployment.

Application development teams usually stores the application source code in git repositories. For the demonstration purpose, we will use source code as zip file downloaded from Github instead of connecting CodePipeline to the Github repository. You may want to directly connect source repository with CodePipeline. To learn more about how to connect, refer to Create a connection to GitHub.

Part 2: The platform team deploys the application pipeline

The following figure illustrates the next step in the workflow.

Next step in the workflow illustrated

In this step, you deploy the first pipeline to build the Java source code from kinesis-analytics-application. Complete the following steps to deploy ApplicationPipelineStack:

  1. Open your terminal, bash, or command window depending on your OS.
  2. Switch the current path to the folder infrastructure-cdk.
  3. Run npm install to download all dependencies.
  4. Run cdk deploy ApplicationPipelineStack to deploy the application pipeline.

This process should take about 5 minutes to complete and deploys the following resources to your AWS account, highlighted in green in the preceding diagram:

  • CodePipeline, containing stages for AWS CodeBuild and AWS CodeDeploy
  • An S3 bucket to store binaries
  • A Lambda function to update the Kinesis Data Analytics application JAR after manual approval

Trigger an automatic build for the application pipeline

After the cdk deploy command is successful, complete the following steps to automatically run the pipeline:

  1. Download the source code .zip file.
  2. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  3. Choose the stack ApplicationPipelineStack.Choose the stack ApplicationPipelineStack.
  4. On the Outputs tab, choose the link for the key ArtifactBucketLink.On the Outputs tab, choose the link for the key ArtifactBucketLink.

You’re redirected to the S3 artifact bucket.

  1. Choose Upload.
  2. Upload the source code .zip file you downloaded.

The first pipeline run (shown as Auto Build in the following diagram) starts automatically and takes about 5 minutes to reach the manual approval stage. The pipeline automatically downloads the source code from the artifact bucket, builds the Java project kinesis-analytics-application using Maven, and publishes the output binary JAR file back to the artifact bucket under the directory jars.

The pipeline automatically downloads the source code from the artifact bucket, builds the Java project kinesis-analytics-application using Maven, and publishes the output binary JAR file back to the artifact bucket under the directory jars.

View the application pipeline run

Complete the following steps to view the application pipeline run:

  1. On the AWS CloudFormation console, navigate to the stack ApplicationPipelineStack.
  2. On the Outputs tab, choose the link for the key ApplicationCodePipelineLink.On the Outputs tab, choose the link for the key ApplicationCodePipelineLink.

You’re redirected to the pipeline details page. You can see a detailed view of the pipeline, including the state of each action in each stage and the state of the transitions.

Do not approve the build for the manual approval stage yet; this is done later.

Part 3: The platform team deploys the infrastructure pipeline

The application pipeline run publishes a JAR file named kinesis-analytics-application-final.jar to the artifact bucket. Next, we deploy the Kinesis Data Analytics architecture. Complete the following steps to deploy the example flow:

  1. Open a terminal, bash, or command window depending on your OS.
  2. Switch the current path to the folder infrastructure-cdk.
  3. Run cdk deploy InfraPipelineStack to deploy the infrastructure pipeline.

This process should take about 5 minutes to complete and deploys a pipeline containing stages for CodeBuild and CodeDeploy to your AWS account, as highlighted in green in the following diagram.

This process should take about 5 minutes to complete and deploys a pipeline containing stages for CodeBuild and CodeDeploy to your AWS account, as highlighted in green in the following diagram.

When the cdk deploy is complete, the infrastructure pipeline run starts automatically (shown as Auto Build 1 in the following diagram) and takes about 10 minutes to download the source code from the artifact bucket, build the AWS CDK project infrastructure-stack, and deploy ApplicationStack automatically to your AWS account. When the infrastructure pipeline run is complete, the following resources are deployed to your account (shown in green in following diagram):

  • A CloudFormation template named app-ApplicationStack
  • A Lambda function acting as a data source
  • A Kinesis data stream acting as the stream storage
  • A Kinesis Data Analytics application with the first version of kinesis-analytics-application-final.jarWhen the infrastructure pipeline run is complete, the following resources are deployed to your account (shown in green in following diagram):

View the infrastructure pipeline run

Complete the following steps to view the application pipeline run:

  1. On the AWS CloudFormation console, navigate to the stack InfraPipelineStack.On the AWS CloudFormation console, navigate to the stack InfraPipelineStack.
  2. On the Outputs tab, choose the link for the key InfraCodePipelineLink.On the Outputs tab, choose the link for the key InfraCodePipelineLink.

You’re redirected to the pipeline details page. You can see a detailed view of the pipeline, including the state of each action in each stage and the state of the transitions.

Step 4: The data engineering team deploys the application

Now your account has everything in place for the data engineering team to work independently and roll out new versions of the Kinesis Data Analytics application. You can approve the respective application build from the application pipeline to deploy new versions of the application. The following diagram illustrates the full workflow.

Diagram illustrates the full workflow.

The build process starts automatically when it detects changes in the source code. You can test a version update by re-uploading the source code .zip file to the S3 artifact bucket. In a real-world use case, you update the main branch either via a pull request or by merging your changes, and this action triggers a new pipeline run automatically.

View the current application version

To view the current version of the Kinesis Data Analytics application, complete the following steps:

  1. On the AWS CloudFormation console, navigate to the stack InfraPipelineStack.
  2. On the Outputs tab, choose the link for the key KDAApplicationLink.On the Outputs tab, choose the link for the key KDAApplicationLink.

You’re redirected to the Kinesis Data Analytics application details page. You can find the current application version by looking at Version ID.

Find the current application version by looking at Version ID

Approve the application deployment

Complete the following steps to approve the deployment (or version update) of the Kinesis Data Analytics application:

  1. On the AWS CloudFormation console, navigate to the stack ApplicationPipelineStack.
  2. On the Outputs tab, choose the link for the key ApplicationCodePipelineLink.
  3. Choose Review from the pipeline approval stage.Choose Review from the pipeline approval stage
  4. When prompted, choose Approve to provide approval (optionally adding any comments) for the Kinesis Data Analytics application deployment or version update.Choose Approve to provide approval
  5. Repeat the steps mentioned earlier to view the current application version.

You should see the application version as defined in Version ID increased by one, as shown in the following screenshot.

Application version as defined in Version ID increased by one

Deploying a new version of the Kinesis Data Analytics application will cause a downtime of around 5 minutes because the Lambda function responsible for the version update makes the API call UpdateApplication, which restarts the application after updating the version. However, the application resumes stream processing where it left off after the restart.

Clean up

Complete the following steps to delete your resources and stop incurring costs:

  1. On the AWS CloudFormation console, select the stack InfraPipelineStack and choose Delete.
  2. Select the stack app-ApplicationStack and choose Delete.
  3. Select stack ApplicationPipelineStack and choose Delete.
  4. On the Amazon S3 console, select the bucket with the name starting with javaappCodePipeline and choose Empty.
  5. Enter permanently delete to confirm the choice.
  6. Select the bucket again and choose Delete.
  7. Confirm the action by entering the bucket name when prompted.
  8. Repeat these steps to delete the bucket with the name starting with infrapipelinestack-pipelineartifactsbucket.


This post demonstrated how to automate deployment and version updates for your Kinesis Data Analytics applications using CodePipeline and AWS CDK.

For more information, see Continuous integration and delivery (CI/CD) using CDK Pipelines and CodePipeline tutorials.

About the Author

About the AuthorAnand Shah is a Big Data Prototyping Solutions Architect at AWS. He works with AWS customers and their engineering teams to build prototypes using AWS analytics services and purpose-built databases. Anand helps customers solve the most challenging problems using the art of the possible technology. He enjoys beaches in his leisure time.

Learn more about Apache Flink and Amazon Kinesis Data Analytics with three new videos

Post Syndicated from Deepthi Mohan original https://aws.amazon.com/blogs/big-data/learn-more-about-apache-flink-and-amazon-kinesis-data-analytics-with-three-new-videos/

Amazon Kinesis Data Analytics is a fully managed service for Apache Flink that reduces the complexity of building, managing, and integrating Apache Flink applications with other AWS services. Apache Flink is an open-source framework and engine for stateful processing of data streams. It’s highly available and scalable, delivering high throughput and low latency for the most demanding stream-processing applications.

In this post, we highlight three new videos for you to learn more about Apache Flink and Kinesis Data Analytics, including open-source contributions to Apache Flink, our learnings from running thousands of Flink jobs on a managed service, and how we use Kinesis Data Analytics and Apache Flink to enable machine learning (ML) in Alexa.

In Introducing the new Async Sink, we present the new Async Sink framework, an open-source contribution to make it easier than ever to build sink connectors for Apache Flink. You can learn about the need for the Async Sink framework and how we built it, followed by a demo of building a new sink to Amazon CloudWatch to deliver CloudWatch metrics, in under 20 minutes! The Async Sink framework bootstraps development of Flink sinks, is compatible with Apache Flink 1.15 and above, and has already seen usage by the community beyond building new sinks to AWS services.

The video Practical learnings from running thousands of Flink jobs shares insight from running Kinesis Data Analytics, a managed service for Apache Flink that runs tens of thousands of Flink jobs. You can learn lessons based on our experience of operating Apache Flink at very large scale, touching on issues such as out-of-memory errors, timeouts, and stability challenges. The video also covers improving application performance with memory tuning and configuration changes and the approaches to automating job health monitoring and management of Flink jobs at scale.

“Alexa, be quiet!” End-to-end near-real time model building and evaluation in Amazon Alexa discusses how Alexa has built an automated end-to-end solution for incremental model building or fine-tuning ML models through continuous learning, continual learning, or semi-supervised active learning. Alexa uses Apache Flink to transform and discover metrics in real time. In this video, you learn about how Alexa scales infrastructure to meet the needs of ML teams across Alexa, and explore specific use cases that use Apache Flink and Kinesis Data Analytics to improve Alexa experiences to delight customers.

To learn more about Kinesis Data Analytics for Apache Flink, visit our product page.

About the author

Deepthi Mohan is a Principal Product Manager on the Kinesis Data Analytics team.

Common streaming data enrichment patterns in Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/common-streaming-data-enrichment-patterns-in-amazon-kinesis-data-analytics-for-apache-flink/

Stream data processing allows you to act on data in real time. Real-time data analytics can help you have on-time and optimized responses while improving overall customer experience.

Apache Flink is a distributed computation framework that allows for stateful real-time data processing. It provides a single set of APIs for building batch and streaming jobs, making it easy for developers to work with bounded and unbounded data. Apache Flink provides different levels of abstraction to cover a variety of event processing use cases.

Amazon Kinesis Data Analytics is an AWS service that provides a serverless infrastructure for running Apache Flink applications. This makes it easy for developers to build highly available, fault tolerant, and scalable Apache Flink applications without needing to become an expert in building, configuring, and maintaining Apache Flink clusters on AWS.

Data streaming workloads often require data in the stream to be enriched via external sources (such as databases or other data streams). For example, assume you are receiving coordinates data from a GPS device and need to understand how these coordinates map with physical geographic locations; you need to enrich it with geolocation data. You can use several approaches to enrich your real-time data in Kinesis Data Analytics depending on your use case and Apache Flink abstraction level. Each method has different effects on the throughput, network traffic, and CPU (or memory) utilization. In this post, we cover these approaches and discuss their benefits and drawbacks.

Data enrichment patterns

Data enrichment is a process that appends additional context and enhances the collected data. The additional data often is collected from a variety of sources. The format and the frequency of the data updates could range from once in a month to many times in a second. The following table shows a few examples of different sources, formats, and update frequency.

Data Format Update Frequency
IP address ranges by country CSV Once a month
Company organization chart JSON Twice a year
Machine names by ID CSV Once a day
Employee information Table (Relational database) A few times a day
Customer information Table (Non-relational database) A few times an hour
Customer orders Table (Relational database) Many times a second

Based on the use case, your data enrichment application may have different requirements in terms of latency, throughput, or other factors. The remainder of the post dives deeper into different patterns of data enrichment in Kinesis Data Analytics, which are listed in the following table with their key characteristics. You can choose the best pattern based on the trade-off of these characteristics.

Enrichment Pattern Latency Throughput Accuracy if Reference Data Changes Memory Utilization Complexity
Pre-load reference data in Apache Flink Task Manager memory Low High Low High Low
Partitioned pre-loading of reference data in Apache Flink state Low High Low Low Low
Periodic Partitioned pre-loading of reference data in Apache Flink state Low High Medium Low Medium
Per-record asynchronous lookup with unordered map Medium Medium High Low Low
Per-record asynchronous lookup from an external cache system Low or Medium (Depending on Cache storage and implementation) Medium High Low Medium
Enriching streams using the Table API Low High High Low – Medium (depending on the selected join operator) Low

Enrich streaming data by pre-loading the reference data

When the reference data is small in size and static in nature (for example, country data including country code and country name), it’s recommended to enrich your streaming data by pre-loading the reference data, which you can do in several ways.

To see the code implementation for pre-loading reference data in various ways, refer to the GitHub repo. Follow the instructions in the GitHub repository to run the code and understand the data model.

Pre-loading of reference data in Apache Flink Task Manager memory

The simplest and also fastest enrichment method is to load the enrichment data into each of the Apache Flink task managers’ on-heap memory. To implement this method, you create a new class by extending the RichFlatMapFunction abstract class. You define a global static variable in your class definition. The variable could be of any type, the only limitation is that it should extend java.io.Serializable—for example, java.util.HashMap. Within the open() method, you define a logic that loads the static data into your defined variable. The open() method is always called first, during the initialization of each task in Apache Flink’s task managers, which makes sure the whole reference data is loaded before the processing begins. You implement your processing logic by overriding the processElement() method. You implement your processing logic and access the reference data by its key from the defined global variable.

The following architecture diagram shows the full reference data load in each task slot of the task manager.

diagram shows the full reference data load in each task slot of the task manager.

This method has the following benefits:

  • Easy to implement
  • Low latency
  • Can support high throughput

However, it has the following disadvantages:

  • If the reference data is large in size, the Apache Flink task manager may run out of memory.
  • Reference data can become stale over a period of time.
  • Multiple copies of the same reference data are loaded in each task slot of the task manager.
  • Reference data should be small to fit in the memory allocated to a single task slot. In Kinesis Data Analytics, each Kinesis Processing Unit (KPU) has 4 GB of memory, out of which 3 GB can be used for heap memory. If ParallelismPerKPU in Kinesis Data Analytics is set to 1, one task slot runs in each task manager, and the task slot can use the whole 3 GB of heap memory. If ParallelismPerKPU is set to a value greater than 1, the 3 GB of heap memory is distributed across multiple task slots in the task manager. If you’re deploying Apache Flink in Amazon EMR or in a self-managed mode, you can tune taskmanager.memory.task.heap.size to increase the heap memory of a task manager.

Partitioned pre-loading of reference data in Apache Flink State

In this approach, the reference data is loaded and kept in the Apache Flink state store at the start of the Apache Flink application. To optimize the memory utilization, first the main data stream is divided by a specified field via the keyBy() operator across all task slots. Furthermore, only the portion of the reference data that corresponds to each task slot is loaded in the state store.

This is achieved in Apache Flink by creating the class PartitionPreLoadEnrichmentData, extending the RichFlatMapFunction abstract class. Within the open method, you override the ValueStateDescriptor method to create a state handle. In the referenced example, the descriptor is named locationRefData, the state key type is String, and the value type is Location. In this code, we use ValueState compared to MapState because we only hold the location reference data for a particular key. For example, when we query Amazon S3 to get the location reference data, we query for the specific role and get a particular location as a value.

In Apache Flink, ValueState is used to hold a specific value for a key, whereas MapState is used to hold a combination of key-value pairs.

This technique is useful when you have a large static dataset that is difficult to fit in memory as a whole for each partition.

The following architecture diagram shows the load of reference data for the specific key for each partition of the stream.

diagram shows the load of reference data for the specific key for each partition of the stream.

For example, our reference data in the sample GitHub code has roles which are mapped to each building. Because the stream is partitioned by roles, only the specific building information per role is required to be loaded for each partition as the reference data.

This method has the following benefits:

  • Low latency.
  • Can support high throughput.
  • Reference data for specific partition is loaded in the keyed state.
  • In Kinesis Data Analytics, the default state store configured is RocksDB. RocksDB can utilize a significant portion of 1 GB of managed memory and 50 GB of disk space provided by each KPU. This provides enough room for the reference data to grow.

However, it has the following disadvantages:

  • Reference data can become stale over a period of time

Periodic partitioned pre-loading of reference data in Apache Flink State

This approach is a fine-tune of the previous technique, where each partitioned reference data is reloaded on a periodic basis to refresh the reference data. This is useful if your reference data changes occasionally.

The following architecture diagram shows the periodic load of reference data for the specific key for each partition of the stream.

diagram shows the periodic load of reference data for the specific key for each partition of the stream.

In this approach, the class PeriodicPerPartitionLoadEnrichmentData is created, extending the KeyedProcessFunction class. Similar to the previous pattern, in the context of the GitHub example, ValueState is recommended here because each partition only loads a single value for the key. In the same way as mentioned earlier, in the open method, you define the ValueStateDescriptor to handle the value state and define a runtime context to access the state.

Within the processElement method, load the value state and attach the reference data (in the referenced GitHub example, buildingNo to the customer data). Also register a timer service to be invoked when the processing time passes the given time. In the sample code, the timer service is scheduled to be invoked periodically (for example, every 60 seconds). In the onTimer method, update the state by making a call to reload the reference data for the specific role.

This method has the following benefits:

  • Low latency.
  • Can support high throughput.
  • Reference data for specific partitions is loaded in the keyed state.
  • Reference data is refreshed periodically.
  • In Kinesis Data Analytics, the default state store configured is RocksDB. Also, 50 GB of disk space provided by each KPU. This provides enough room for the reference data to grow.

However, it has the following disadvantages:

  • If the reference data changes frequently, the application still has stale data depending on how frequently the state is reloaded
  • The application can face load spikes during reload of reference data

Enrich streaming data using per-record lookup

Although pre-loading of reference data provides low latency and high throughput, it may not be suitable for certain types of workloads, such as the following:

  • Reference data updates with high frequency
  • Apache Flink needs to make an external call to compute the business logic
  • Accuracy of the output is important and the application shouldn’t use stale data

Normally, for these types of use cases, developers trade-off high throughput and low latency for data accuracy. In this section, you learn about a few of common implementations for per-record data enrichment and their benefits and disadvantages.

Per-record asynchronous lookup with unordered map

In a synchronous per-record lookup implementation, the Apache Flink application has to wait until it receives the response after sending every request. This causes the processor to stay idle for a significant period of processing time. Instead, the application can send a request for other elements in the stream while it waits for the response for the first element. This way, the wait time is amortized across multiple requests and therefore it increases the process throughput. Apache Flink provides asynchronous I/O for external data access. While using this pattern, you have to decide between unorderedWait (where it emits the result to the next operator as soon as the response is received, disregarding the order of the element on the stream) and orderedWait (where it waits until all inflight I/O operations complete, then sends the results to the next operator in the same order as original elements were placed on the stream). Usually, when downstream consumers disregard the order of the elements in the stream, unorderedWait provides better throughput and less idle time. Visit Enrich your data stream asynchronously using Kinesis Data Analytics for Apache Flink to learn more about this pattern.

The following architecture diagram shows how an Apache Flink application on Kinesis Data Analytics does asynchronous calls to an external database engine (for example Amazon DynamoDB) for every event in the main stream.

diagram shows how an Apache Flink application on Kinesis Data Analytics does asynchronous calls to an external database engine (for example Amazon DynamoDB) for every event in the main stream.

This method has the following benefits:

  • Still reasonably simple and easy to implement
  • Reads the most up-to-date reference data

However, it has the following disadvantages:

  • It generates a heavy read load for the external system (for example, a database engine or an external API) that hosts the reference data
  • Overall, it might not be suitable for systems that require high throughput with low latency

Per-record asynchronous lookup from an external cache system

A way to enhance the previous pattern is to use a cache system to enhance the read time for every lookup I/O call. You can use Amazon ElastiCache for caching, which accelerates application and database performance, or as a primary data store for use cases that don’t require durability like session stores, gaming leaderboards, streaming, and analytics. ElastiCache is compatible with Redis and Memcached.

For this pattern to work, you must implement a caching pattern for populating data in the cache storage. You can choose between a proactive or reactive approach depending your application objectives and latency requirements. For more information, refer to Caching patterns.

The following architecture diagram shows how an Apache Flink application calls to read the reference data from an external cache storage (for example, Amazon ElastiCache for Redis). Data changes must be replicated from the main database (for example, Amazon Aurora) to the cache storage by implementing one of the caching patterns.

diagram shows how an Apache Flink application calls to read the reference data from an external cache storage (for example, Amazon ElastiCache for Redis). Data changes must be replicated from the main database (for example, Amazon Aurora) to the cache storage by implementing one of the caching patterns.

Implementation for this data enrichment pattern is similar to the per-record asynchronous lookup pattern; the only difference is that the Apache Flink application makes a connection to the cache storage, instead of connecting to the primary database.

This method has the following benefits:

  • Better throughput because caching can accelerate application and database performance
  • Protects the primary data source from the read traffic created by the stream processing application
  • Can provide lower read latency for every lookup call
  • Overall, might not be suitable for medium to high throughput systems that want to improve data freshness

However, it has the following disadvantages:

  • Additional complexity of implementing a cache pattern for populating and syncing the data between the primary database and the cache storage
  • There is a chance for the Apache Flink stream processing application to read stale reference data depending on what caching pattern is implemented
  • Depending on the chosen cache pattern (proactive or reactive), the response time for each enrichment I/O may differ, therefore the overall processing time of the stream could be unpredictable

Alternatively, you can avoid these complexities by using the Apache Flink JDBC connector for Flink SQL APIs. We discuss enrichment stream data via Flink SQL APIs in more detail later in this post.

Enrich stream data via another stream

In this pattern, the data in the main stream is enriched with the reference data in another data stream. This pattern is good for use cases in which the reference data is updated frequently and it’s possible to perform change data capture (CDC) and publish the events to a data streaming service such as Apache Kafka or Amazon Kinesis Data Streams. This pattern is useful in the following use cases, for example:

  • Customer purchase orders are published to a Kinesis data stream, and then join with customer billing information in a DynamoDB stream
  • Data events captured from IoT devices should enrich with reference data in a table in Amazon Relational Database Service (Amazon RDS)
  • Network log events should enrich with the machine name on the source (and the destination) IP addresses

The following architecture diagram shows how an Apache Flink application on Kinesis Data Analytics joins data in the main stream with the CDC data in a DynamoDB stream.

diagram shows how an Apache Flink application on Kinesis Data Analytics joins data in the main stream with the CDC data in a DynamoDB stream.

To enrich streaming data from another stream, we use a common stream to stream join patterns, which we explain in the following sections.

Enrich streams using the Table API

Apache Flink Table APIs provide higher abstraction for working with data events. With Table APIs, you can define your data stream as a table and attach the data schema to it.

In this pattern, you define tables for each data stream and then join those tables to achieve the data enrichment goals. Apache Flink Table APIs support different types of join conditions, like inner join and outer join. However, you want to avoid those if you’re dealing with unbounded streams because those are resource intensive. To limit the resource utilization and run joins effectively, you should use either interval or temporal joins. An interval join requires one equi-join predicate and a join condition that bounds the time on both sides. To better understand how to implement an interval join, refer to Get started with Apache Flink SQL APIs in Kinesis Data Analytics Studio.

Compared to interval joins, temporal table joins don’t work with a time period within which different versions of a record are kept. Records from the main stream are always joined with the corresponding version of the reference data at the time specified by the watermark. Therefore, fewer versions of the reference data remain in the state.

Note that the reference data may or may not have a time element associated with it. If it doesn’t, you may need to add a processing time element for the join with the time-based stream.

In the following example code snippet, the update_time column is added to the currency_rates reference table from the change data capture metadata such as Debezium. Furthermore, it’s used to define a watermark strategy for the table.

CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
        WATERMARK FOR update_time AS update_time,
) WITH (
   'connector' = 'kafka',
   'value.format' = 'debezium-json',
   /* ... */

This method has the following benefits:
  • Easy to implement
  • Low latency
  • Can support high throughput when reference data is a data stream

SQL APIs provide higher abstractions over how the data is processed. For more complex logic around how the join operator should process, we recommend you always start with SQL APIs first and use DataStream APIs if you really need to.


In this post, we demonstrated different data enrichment patterns in Kinesis Data Analytics. You can use these patterns and find the one that addresses your needs and quickly develop a stream processing application.

For further reading on Kinesis Data Analytics, visit the official product page.

About the Authors

About the author Ali AlemiAli Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customers’ use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

About the author Subham RakshitSubham Rakshit is a Streaming Specialist Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build search and streaming data platforms that help them achieve their business objective. Outside of work, he enjoys spending time solving jigsaw puzzles with his daughter.

About the author Dr. Sam MokhtariDr. Sam Mokhtari is a Senior Solutions Architect in AWS. His main area of depth is data and analytics, and he has published more than 30 influential articles in this field. He is also a respected data and analytics advisor who led several large-scale implementation projects across different industries, including energy, health, telecom, and transport.

Sink Amazon Kinesis Data Analytics Apache Flink output to Amazon Keyspaces using Apache Cassandra Connector

Post Syndicated from Pratik Patel original https://aws.amazon.com/blogs/big-data/sink-amazon-kinesis-data-analytics-apache-flink-output-to-amazon-keyspaces-using-apache-cassandra-connector/

Amazon Keyspaces (for Apache Cassandra) is a scalable, highly available, and managed Apache Cassandra–compatible database service. With Amazon Keyspaces you don’t have to provision, patch, or manage servers, and you don’t have to install, maintain, or operate software. Amazon Keyspaces is serverless, so you only pay for the resources that you use and the service can automatically scale tables up and down in response to application traffic. You can use Amazon Keyspaces to store large volumes of data, such as entries in a log file or the message history for a chat application as Amazon Keyspaces offers virtually unlimited throughput and storage. You can also use Amazon Keyspaces to store information about devices for Internet of Things (IoT) applications or player profiles for games.

A popular use case in the wind energy sector is to protect wind turbines from wind speed. Engineers and analysts often want to see real-time aggregated wind turbine speed data to analyze the current situation out in the field. Furthermore, they need access to historical aggregated wind turbine speed data to build machine learning (ML) models which can help them take preventative actions on wind turbines. Customers often ingest high-velocity IoT data into Amazon Kinesis Data Streams and use Amazon Kinesis Data Analytics, AWS Lambda, or Amazon Kinesis Client Library (KCL) applications to aggregate IoT data in real-time and store it in Amazon Keyspaces, Amazon DynamoDB, or Amazon Timestream.

In this post, we demonstrate how to aggregate sensor data using Amazon Kinesis Data Analytics and persist aggregated sensor data in to Amazon Keyspaces using Apache Flink’s Apache Cassandra Connector.



In the architecture diagram above, Lambda simulates wind speed sensor data and ingests sensor data into Amazon Kinesis Data Stream. Amazon Kinesis Data Analytics Apache Flink application reads wind speed sensor data from Amazon Kinesis Data Stream in real-time and aggregates wind speed sensor data using a five minutes tumbling window and storing aggregated wind speed sensor data into Amazon Keyspaces table. Aggregated wind speed sensor data stored in Amazon Keyspaces can be used by engineers and analysts to review real-time dashboards or to perform historical analysis on specific wind turbine.

Deploying resources using AWS CloudFormation

After you sign in to your AWS account, launch the AWS CloudFormation template by choosing Launch Stack:


The CloudFormation template configures the following resources in your account:

  • One Lambda function which simulates wind turbine data
  • One Amazon Kinesis Data Stream
  • One Amazon Kinesis Data Analytics Apache Flink application
  • An AWS Identity and Access Management (IAM) role (service execution role) for Amazon Kinesis Data Analytics Apache Flink application
  • One Amazon Keyspaces Table: turbine_aggregated_sensor_data

After you complete the setup, sign in to the Kinesis Data Analytics console. On the Kinesis Data Analytics applications page, choose the Streaming applications tab, where you can see the Streaming application in the ready status. Select the Streaming application, choose Run, and wait until the Streaming application is in running status. It can take a couple of minutes for the Streaming application to get into running status.

Now that we have deployed all of the resources using CloudFormation template, let’s review deployed resources and how they function.

Format of wind speed sensor data

Lambda simulates wind turbine speed data every one minute and ingests it into Amazon Kinesis Data Stream. Each wind turbine sensor data message consists of two attributes: turbineId and speed.

  "turbineId": "turbine-0001",
  "speed": 60

Schema of destination Amazon Keyspaces table

We’ll store aggregated sensor data in to destination turbine_aggregated_sensor_data Amazon Keyspaces table. turbine_aggregated_sensor_data table has on-demand capacity mode enabled. Amazon Keyspaces (for Apache Cassandra) on-demand capacity mode is a flexible billing option capable of serving thousands of requests per second without capacity planning. This option offers pay-per-request pricing for read and write requests so that you pay only for what you use. When you choose on-demand mode, Amazon Keyspaces can scale the throughput capacity for your table up to any previously reached traffic level instantly, and then back down when application traffic decreases. If a workload’s traffic level hits a new peak, then the service adapts rapidly to increase throughput capacity for your table.

BDB-2063-keyspaces-table BDB-2063-keyspaces-table-def-1 BDB-2063-keyspaces-table-def-2

Apache Flink code to aggregate and persist data in Amazon Keyspaces Table

Apache Flink source code used by this post can be found on the KeyspacesSink section of Kinesis Data Analytics Java Examples public git repository.

The following code snippet demonstrates how incoming wind turbine messages are getting aggregated using a five-minute tumbling window and produces a DataStream of TurbineAggregatedRecord records.

DataStream<TurbineAggregatedRecord> result = input
.map(new WindTurbineInputMap())
.keyBy(t -> t.turbineId)
.reduce(new AggregateReducer())
.map(new AggregateMap());

The following code snippet demonstrates how Amazon Keyspaces table name and column names are annotated on the TurbineAggregatedRecord class.

@Table(keyspace = "sensor_data", name = "turbine_aggregated_sensor_data", readConsistency = "LOCAL_QUORUM", writeConsistency = "LOCAL_QUORUM")
public class TurbineAggregatedRecord {

@Column(name = "turbineid")
private String turbineid = "";

@Column(name = "reported_time")
private long reported_time = 0;

@Column(name = "max_speed")
private long max_speed = 0;

@Column(name = "min_speed")
private long min_speed = 0;

@Column(name = "avg_speed")
private long avg_speed = 0;

The following code snippet demonstrates the implementation of Apache Cassandra Connector to sink aggregated wind speed sensor data TurbineAggregatedRecord into Amazon Keyspaces table. We’re using SigV4AuthProvider with Apache Cassandra Connector. The SigV4 authentication plugin lets you use IAM credentials for users or roles when connecting to Amazon Keyspaces. Instead of requiring a user name and password, this plugin signs API requests using access keys.

                        new ClusterBuilder() {

                            private static final long serialVersionUID = 2793938419775311824L;

                            public Cluster buildCluster(Cluster.Builder builder) {
                                return builder
                                        .addContactPoint("cassandra."+ region +".amazonaws.com")
                                        .withAuthProvider(new SigV4AuthProvider(region))
                .setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})

Review output in Amazon Keyspaces Table

Once Amazon Kinesis Data Analytics Apache Flink application aggregates wind turbine sensor data and persists aggregated data in Amazon Keyspaces table, we can query and review aggregated data using Amazon Keyspaces CQL editor as illustrated in the following.

select * from sensor_data.turbine_aggregated_sensor_data

BDB-2063-cql-editor BDB-2063-cql-editor-result

Clean up

To avoid incurring future charges, complete the following steps:

  1. Empty Amazon S3 bucket created by AWS CloudFormation stack.
  2. Delete AWS CloudFormation stack.


As you’ve learned in this post, you can build Amazon Kinesis Data Analytics Apache Flink application to read sensor data from Amazon Kinesis Data Streams, perform aggregations, and persist aggregated sensor data in Amazon Keyspaces using Apache Cassandra Connector. There are several use cases in IoT and Application development to move data quickly through the analytics pipeline and persist data in Amazon Keyspaces.

We look forward to hearing from you about your experience. If you have questions or suggestions, please leave a comment.

About the Author

Pratik Patel is a Sr Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices and proactively helps in keeping customer’s AWS environments operationally healthy.

Query your data streams interactively using Kinesis Data Analytics Studio and Python

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/

Amazon Kinesis Data Analytics Studio makes it easy for customers to analyze streaming data in real time, as well as build stream processing applications powered by Apache Flink using standard SQL, Python, and Scala. Just a few clicks in the AWS Management console lets customers launch a serverless notebook to query data streams and get results in seconds. Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications. Apache Flink is an open-source framework and engine for processing data streams. It’s highly available and scalable, and it delivers high throughput and low latency for stream processing applications.

Customers running Apache Flink workloads face the non-trivial challenge of developing their distributed stream processing applications without having true visibility into the steps conducted by their application for data processing. Kinesis Data Analytics Studio combines the ease-of-use of Apache Zeppelin notebooks, with the power of the Apache Flink processing engine, to provide advanced streaming analytics capabilities in a fully-managed offering. Furthermore, it accelerates developing and running stream processing applications that continuously generate real-time insights.

In this post, we will introduce you to Kinesis Data Analytics Studio and get started querying data interactively from an Amazon Kinesis Data Stream using the Python API for Apache Flink (Pyflink). We will use a Kinesis Data Stream for this example, as it is the quickest way to begin. Kinesis Data Analytics Studio is also compatible with Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Simple Storage Service (Amazon S3), and various other data sources supported by Apache Flink.


  • Kinesis Data Stream
  • Data Generator

To follow this guide and interact with your streaming data, you will need a data stream with data flowing through.

Create a Kinesis Data Stream

You can create these streams using either the Amazon Kinesis console or the following AWS Command Line Interface (AWS CLI) command. For console instructions, see Creating and Updating Data Streams in the Kinesis Data Streams Developer Guide.

To create the data stream, use the following Kinesis create-stream AWS CLI command. Your data stream will be named input-stream.

$ aws kinesis create-stream \
--stream-name input-stream \
--shard-count 1 \
--region us-east-1

Creating a Kinesis Data Analytics Studio notebook

You can start interacting with your data stream by following these steps:

  1. Open the AWS Management Console and navigate to Amazon Kinesis Data Analytics for Apache Flink
  2. Select the Studio tab on the main page, and select Create Studio Notebook.
  3. Enter the name of your Studio notebook, and let Kinesis Data Analytics Studio create an AWS Identity and Access Management (IAM) role for this. You can create a custom role for specific use cases using the IAM Console.
  4. Choose an AWS Glue Database to store the metadata around your sources and destinations used by Kinesis Data Analytics Studio.
  5. Select Create Studio Notebook.

We will keep the default settings for the application, and we can scale up as needed.

Once the application has been created, select Start to start the Apache Flink application. This will take a few minutes to complete, at which point you can Open in Apache Zeppelin.

Write Sample Records to the Data Stream

In this section, you can create a Python script within the Apache Zeppelin notebook to write sample records to the stream for the application to process.

Select Create a new note in Apache Zeppelin, and name the new notebook stock-producer with the following contents:

import datetime
import json
import random
import boto3

STREAM_NAME = "input-stream"
REGION = "us-east-1"

def get_data():
    return {
        'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(["BTC","ETH","BNB", "XRP", "DOGE"]),
        'price': round(random.random() * 100, 2)}

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()

if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name=REGION))

You can run the stock-producer paragraph to begin publishing messages to your Kinesis Data Stream either by pressing SHIFT + ENTER on the paragraph, or by selecting the Play button in the top-right of the paragraph.

Feel free to close or navigate away from this notebook for now, as it will continue publishing events indefinitely.

Note that this will continue publishing events until the notebook is paused or the Apache Flink cluster is shut down.

Example Applications

Apache Zeppelin supports the Apache Flink interpreter and allows for the direct use of Apache Flink within a notebook for interactive data analysis. Within the Flink Interpreter, three languages are supported at this time—Scala, Python (PyFlink), and SQL. The notebook requires a specification to one of these languages at the top of each paragraph to interpret the language properly.

%flink          - Scala environment 
%flink.pyflink  - Python Environment
%flink.ipyflink - ipython Environment
%flink.ssql     - Streaming SQL Environment
%flink.bsql     - Batch SQL Environment 

There are several other predefined variables per interpreter, such as the senv variable in Scala for a StreamExecutionEnvironment, and st_env in python for the same. A full list of these entry point variables can be found here. Now we will showcase the capabilities of Apache Flink in Python (Pyflink) by providing code samples for the most common use cases.

How to follow along

If you would like to follow along with this walkthrough, we have provided the Kinesis Data Analytics Studio notebook here with comments and context. Once you have created your Kinesis Data Analytics application, you can download the file and upload it to Kinesis Data Analytics studio.

Once you have imported the notebook, you should be able to follow along with the remainder of the post as you try it out!

Create a source table for Kinesis

Using the %flink.pyflink header to signify that this code block will be interpreted via the Python Flink interpreter, we’re creating a table called stock_table with a ticker, price, and event_time column that signifies the time at which the price was recorded for the ticker. The WATERMARK clause defines the watermark strategy for generating watermarks according to the event_time (row_time) column. The event_time column must be defined as Timestamp(3) and be a top-level column to be used in conjunction with watermarks. The syntax following the WATERMARK definition—FOR event_time AS event_time - INTERVAL '5' SECOND declares that watermarks will be emitted according to a bounded out of orderness watermark strategy that allows for a five second delay in event_time data.

To learn more about event time and watermarks, read about the techniques implemented by Apache Flink here.

The table defined below uses the Kinesis connector to read from a kinesis data stream called input-stream in the us-east-1 region from the latest stream position.

In this example, we are utilizing the Python interpreter’s built-in streaming table environment variable, st_env, to execute a SQL DDL statement. The streaming table environment provides access to the Table API within pyflink and uses the blink planner to optimize the job graph. This planner translates queries into a DataStream program regardless of whether the input is batch or streaming.

If the table already exists in the AWS Glue Data Catalog, then this statement will issue an error stating that the table already exists.

CREATE TABLE stock_table (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = 'input-stream',
                'aws.region' = 'us-east-1',
                'scan.stream.initpos' = 'LATEST',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """)

The screenshot above showcases the successful execution of this paragraph. We can verify the results by checking in the AWS Glue Data Catalog for the accompanying table.

To find this, navigate back to the AWS Management Console, and then search for Glue. Once here, locate the Glue database that you chose for our Kinesis Data Analytics application, and select it. You should see a link toward the bottom of the Databases view that lets you view the Tables in your database. Furthermore, you can directly select Tables in the left-hand side. Locate the table that we created in the previous step, called stock_table.

Here we can see that the table was not only created in Kinesis Data Analytics studio, but also durably persisted in a Glue Data Catalog table for reference from other applications or between runs of your application.

Tumbling windows

Performing a tumbling window in the Python Table API first requires the definition of an in-memory reference to the table created in Step 1. We use the st_env variable to define this table using the from_path function and referencing the table name. Once this is created, then we can create a windowed aggregation over one minute of data, according to the event_time column.

Note that you could also perform this transformation entirely in Flink SQL, as described in this blog post. We’re simply showcasing the features of the Pyflink API. The blog post linked above also showcases many different window operators that you might perform, such as sliding windows, group windows, over windows, session windows, etc. The windowing choice is entirely use-case dependent.

from pyflink.table.expressions import col, lit

stock_table = st_env.from_path("stock_table")

 # tumble over 1 minute, then group by that window and sum the number of trades over that time
count_table = stock_table.window(
                     Tumble.over(lit(1).minute).on(stock_table.event_time).alias("one_minute_window")) \
                           .group_by(col("one_minute_window"), col("ticker")) \
                           .select(col("ticker"), col("price").sum.alias("sum_price"), col("one_minute_window").end.alias("minute_window"))

Use the ZeppelinContext to visualize the Python Table aggregation within the notebook.


z.show(count_table, stream_type="update")

This image shows the count_table we defined previously displayed as a pie chart within the Apache Zeppelin notebook.

User-defined functions

To use and reuse common business logic into an operator, it can be useful to reference a User-defined function to transform your Data stream. This can be done either within the Kinesis Data Analytics notebook, or as an externally referenced application jar file. Utilizing User-defined functions can simplify the transformations or data enrichments that you might perform over streaming data.

In our notebook, we will be referencing a simple Java application jar that computes an integer hash of our ticker symbol. You can also write Python or Scala UDFs for use within the notebook. We chose a Java application jar to highlight the functionality of importing an application jar into a Pyflink notebook.

package com.aws.kda.udf;

import org.apache.flink.table.functions.ScalarFunction;

// The Java class must have a public no-argument constructor and can be founded in current Java classloader.
public class HashFunction extends ScalarFunction {
    private int factor = 12;

    public int eval(String s) {
        return s.hashCode() * factor;

You can find the application jar here.

  1. Create and package this jar, or download the link above.
  2. Next, upload this application jar to an Amazon S3 bucket to be referenced by our Kinesis Data Analytics Studio notebook.
  3. Head back to the Kinesis Data Analytics studio notebook, and under Configuration locate the User-defined functions box. From here, select Add user-defined function, and use the add wizard to locate your uploaded Java jar to reference it.

Once you save changes, the application will take a few minutes to update before you can open it again.

Open the notebook once it has been restarted so that we can reference our UDF.

st_env.create_java_temporary_function("hash", "com.aws.kda.udf.HashFunction")

hash_ticker = stock_table.select("ticker, hash(ticker) as secret_ticker_key, event_time")

Now we can view this newly transformed data from the hash_ticker table context.

st_env.create_java_temporary_function("hash", "com.aws.kda.udf.HashFunction")

hash_ticker = stock_table.select("ticker, hash(ticker) as secret_ticker_key, event_time")

The screenshot above showcases data being displayed in a tabular format from our hashed results set.

The screenshot above showcases data being displayed in a tabular format from our hashed results set.

Enable checkpointing

To utilize the fault-tolerant features of the Streaming File Sink (writing data to Amazon S3), we must enable checkpointing within our Apache Flink application. This setting isn’t enabled by default on any Kinesis Data Analytics Studio notebook. However, it can be enabled by simply accessing the streaming environment variable’s configuration and setting the proper string accordingly:

z.show(hash_ticker, stream_type="update")

Writing results out to Amazon S3

In the same way that we ingested data into Kinesis Data Analytics Studio, we will create another table, called a sink, that will be responsible for taking data within Kinesis Data Analytics Studio and writing it out to Amazon S3 using the Apache Flink Filesystem connector. This connector does require checkpoints to commit data to a Filesystem, hence the previous step.

First, let’s create the table.


table_name = "output_table"
bucket_name = "kda-python-sink-bucket"

st_env.execute_sql("""CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3)
              PARTITIONED BY (ticker)
              WITH (
                  'sink.partition-commit.delay' = '1 min'
        table_name, bucket_name))

Next, we can perform the insert by calling the streaming table environment’s execute_sql function.

table_result = st_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}".format("output_table", "hash_ticker"))

The return value table_result is a pyflink table TableResult object. This lets you query and interact with the Flink job that is operating in the background.

Since we’ve set our checkpointing interval to one minute, wait at least one minute with data flowing to see data in your Amazon S3 bucket.

To stop the Amazon S3 sink process, run the following cell:



A Studio notebook application consists of one or more tasks. You can split an application task into several parallel instances for execution, where each parallel instance processes a subset of the task’s data. The number of parallel instances of a task is called its parallelism, and adjusting that helps execute your tasks more efficiently.

Upon creation, Studio notebooks are given four parallel Kinesis Processing Units (KPU) which make up the application parallelism. To increase that parallelism, navigate to the Kinesis Data Analytics Studio Management Console, select your application name, and select the Configuration tab.

The screenshot above shows the Kinesis Data Analytics Studio console configuration page, where we can note the runtime environment, IAM Role, and modify things like the number of KPU’s the application is allocated.

  1. From this page, under the Scaling section, select Edit and modify the Parallelism entry. We don’t recommend increasing the Parallelism Per KPU setting higher than 1 unless your application is I/O bound.
  2. Select Save Changes to increase/decrease your application’s parallelism.


When you have thoroughly tested and iterated on your application code within a Kinesis Data Analytics Studio notebook, you may choose to promote your notebook to a Kinesis Data Analytics for Apache Flink application with durable state. The benefits of doing this include having full fault tolerance with stateful operations, such as checkpointing, snapshotting, and autoscaling based on CPU usage.

To promote your Kinesis Data Analytics Studio notebook to a Kinesis Data Analytics for Apache Flink application:

  1. Navigate to the top-right of your notebook and select Actions for <<notebook name>>.
  2. First, select Build <<notebook name>> and export to Amazon S3.
  3. Once this process finishes, select Deploy <<notebook name>> as Kinesis Analytics Application. This will open a modal.
  4. Then, select Deploy using AWS Console.
  5. On the next screen, you can enter the following
    1. An optional description
    2. The same IAM role that you used for your Kinesis Data Analytics Studio notebooks.
  6. Then, select Create streaming application. Once the process finishes, you will see a Streaming Application preconfigured with the code supplied by your Kinesis Data Analytics studio notebook.
  7. Select Run to start your application.

Make sure that you have stopped all paragraphs in your Kinesis Data Analytics studio notebook so as not to contend for resources with your Kinesis Data Stream.

When the application has started, you should begin to see new data flowing into your Amazon S3 bucket in an entirely fault-tolerant and stateful manner.

Congratulations! You’ve just promoted a Kinesis Data Analytics studio notebook to Kinesis Data Analytics for Apache Flink!


Kinesis Data Analytics Studio makes developing stream processing applications using Apache Flink much faster. Moreover, all of this is done with rich visualizations, a scalable and user-friendly interface to develop and collaborate on pipelines, and the flexibility of language choice to make any streaming workload performant and powerful. Users can run paragraphs from within the notebook as described in this post, or choose to promote their Studio notebook to a Kinesis Data Analytics for Apache Flink application with durable state.

For more information, please see the following documentation:

About the Author

Jeremy Ber has been working in the telemetry data space for the past five years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. In the past, Jeremy has supported and built systems that stream in terabytes of data-per-day, and process complex Machine Learning Algorithms in real-time. At AWS, he is a Solutions Architect Streaming Specialist supporting both Managed Streaming for Kafka (Amazon MSK) and Amazon Kinesis services.

Use Amazon CodeGuru Profiler to monitor and optimize performance in Amazon Kinesis Data Analytics applications for Apache Flink

Post Syndicated from Praveen Panati original https://aws.amazon.com/blogs/big-data/use-amazon-codeguru-profiler-to-monitor-and-optimize-performance-in-amazon-kinesis-data-analytics-applications-for-apache-flink/

Amazon Kinesis Data Analytics makes it easy to transform and analyze streaming data and gain actionable insights in real time with Apache Flink. Apache Flink is an open-source framework and engine for processing data streams in real time. Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications using open-source libraries and integrating with other AWS services.

Kinesis Data Analytics is a fully managed service that takes care of everything required to run real-time streaming applications continuously and scale automatically to match the volume and throughput of your incoming data.

As you start building and deploying business-critical, highly scalable, real-time streaming applications, it’s important that you continuously monitor applications for health and performance, and optimize the application to meet the demands of your business.

With Amazon CodeGuru Profiler, developers and operations teams can monitor the following:

You can use CodeGuru Profiler to analyze the application’s performance characteristics and bottlenecks in the application code by capturing metrics such as CPU and memory utilization. You can use these metrics and insights to identify the most expensive lines of code; optimize for performance; improve stability, latency, and throughput; and reduce operational cost.

In this post, we discuss some of the challenges of running streaming applications and how you can use Amazon Kinesis Data Analytics for Apache Flink to build reliable, scalable, and highly available streaming applications. We also demonstrate how to set up and use CodeGuru Profiler to monitor an application’s health and capture important metrics to optimize the performance of Kinesis Data Analytics for Apache Flink applications.


Streaming applications are particularly complex in nature. The data is continuously generated from a variety of sources with varying amounts of throughput. It’s critical that the application infrastructure scales up and down according to these varying demands without becoming overloaded, and not run into operational issues that might result in downtime.

As such, it’s crucial to constantly monitor the application for health, and identify and troubleshoot the bottlenecks in the application configuration and application code to optimize the application and the underlying infrastructure to meet the demands while also reducing the operational costs.

What Kinesis Data Analytics for Apache Flink and CodeGuru Profiler do for you

With Kinesis Data Analytics for Apache Flink, you can use Java, Scala, and Python to process and analyze real-time streaming data using open-source libraries based on Apache Flink. Kinesis Data Analytics provides the underlying infrastructure for your Apache Flink applications. It handles core capabilities such as provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots) to rapidly create, test, deploy, and scale real-time data streaming applications using best practices. This allows developers to focus more on application development and less on Apache Flink infrastructure management.

With CodeGuru Profiler, you can quickly and easily monitor Kinesis Data Analytics for Apache Flink applications to:

  • Identify and troubleshoot CPU and memory issues using CPU and memory (heap summary) utilization metrics
  • Identify bottlenecks and the application’s most expensive lines of code
  • Optimize application performance (latency, throughput) and reduce infrastructure and operational costs

Solution overview

In this post, we use a sample Java application deployed as a Kinesis Data Analytics application for Apache Flink, which consumes the records from Amazon Kinesis Data Streams and uses Apache Flink operators to generate real-time actionable insights. We use this sample to understand and demonstrate how to integrate with CodeGuru Profiler to monitor the health and performance of your Kinesis Data Analytics applications.

The following diagram shows the solution components.

At a high level, the solution covers the following steps:

  1. Set up, configure, and deploy a sample Apache Flink Java application on Kinesis Data Analytics.
  2. Set up CodeGuru Profiler.
  3. Integrate the sample Apache Flink Java application with CodeGuru Profiler.
  4. Use CodeGuru Profiler to analyze, monitor, and optimize application performance.

Set up a sample Apache Flink Java application on Kinesis Data Analytics

Follow the instructions in the GitHub repo and deploy the sample application that includes source code as well as AWS CloudFormation templates to deploy the Kinesis Data Analytics for Apache Flink application.

For this post, I deploy the stack in the us-east-1 Region.

After you deploy the sample application, you can test the application by running the following commands, and providing the correct parameters for the Kinesis data stream and Region.

The Java application has already been downloaded to an EC2 instance that has been provisioned by AWS CloudFormation; you just need to connect to the instance and run the JAR file to start ingesting events into the stream.

$ ssh ec2-user@«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-*.jar -streamName «Kinesis data stream name» -streamRegion «AWS region» -speedup 3600

Set up CodeGuru Profiler

Set up and configure CodeGuru Profiler using the AWS Management Console. For instructions, see Set up in the CodeGuru Profiler console.

For this post, I create a profiling group called flinkappdemo in the us-east-1 Region.

In the next section, I demonstrate how to integrate the sample Kinesis Data Analytics application with the profiling group.

Integrate the sample Apache Flink Java application with CodeGuru Profiler

Download the source code that you deployed earlier and complete the following steps to integrate CodeGuru Profiler to the Java application:

  1. Include the CodeGuru Profiler agent in your application by adding the following dependencies to your pom.xml file:
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  2. Add the CodeGuru Profiler agent configuration code to the Apache Flink Operators (functions), as shown in the following code.

Because multiple operators and operator instances can run on the same TaskManager JVM, and because one instance of the profiler can capture all events in a JVM, you just need to enable the profiler on an operator that is guaranteed to be present on all TaskManager JVMs. For this, you can pick the operator with the highest parallelism. In addition, you could instantiate the profiler as a singleton such that there is one instance per JVM.

public class CountByGeoHash implements WindowFunction<TripGeoHash, PickupCount, String, TimeWindow> {

  static {
    new Profiler.Builder()
            .withHeapSummary(false) // optional - to start without heap profiling set to false or remove line
public class TripDurationToAverageTripDuration implements WindowFunction<TripDuration, AverageTripDuration, Tuple2<String, String>, TimeWindow> {

  static {
    new Profiler.Builder()
            .withHeapSummary(false) // optional - to start without heap profiling set to false or remove line
  1. Build the application using the following command:
    mvn clean package

The preceding command packages the application into a JAR file.

  1. Copy and replace the JAR file in the Amazon Simple Storage Service (Amazon S3) bucket that was created as part of the CloudFormation stack.
  2. Choose Save changes to update the application.

This step allows the application to use the latest JAR file that contains the CodeGuru Profiler code to start profiling the application.

Use CodeGuru Profiler to analyze, monitor, and optimize application performance

Now that the application has been configured to use CodeGuru Profiler, you can use the metrics and visualizations to explore profiling data collected from the application.

Run the following commands from when you set up your application to start ingesting data into the Kinesis data stream and enable CodeGuru Profiler to profile the application and gather metrics:

$ ssh ec2-user@«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-*.jar -streamName «Kinesis data stream name» -streamRegion «AWS region» -speedup 3600

On the CodeGuru console, navigate to flinkappdemo on the Profiling groups page.

The summary page displays the status of your profiling group as well as the relevant metrics gathered while profiling the application.

In the following sections, we discuss the metrics and reports on this page in more detail.

CPU summary

Use this summary and the associated metrics CPU utilization and Time spent executing code to understand how much of the instance’s CPU resources are consumed by the application and how frequently the application’s JVM threads were in the RUNNABLE state. This helps you measure the application’s time spent running operations on the CPU so you can tune your application code and configuration.

With the CPU utilization metric, a low value (such as less than 10%) indicates your application doesn’t consume a large amount of the system CPU capacity. This means there could be an opportunity to scale in the application parallelism to reduce cost. A high value (over 90%) indicates your application is consuming a large amount of system CPU capacity. This means there is likely value in looking at your CPU profiles and recommendations for areas of optimization.

When examining the time spent running code, a high percentage (over 90%) indicates most of your application’s time is spent running operations on the CPU. A very low percentage (under 1%) indicates that most of your application was spent in other thread states (such as BLOCKED or WAITING) and there may be more value in looking at the latency visualization, which displays all non-idle thread states, instead of the CPU visualization.

For more information on understanding the CPU summary, see CPU summary.

Latency summary

Use this summary and the metrics Time spent blocked and Time spent waiting to understand what sections of the code are causing threads to block and threads that are waiting to tune your application code and configuration. For more information, see Latency summary.

The CPU summary and latency visualization can help you analyze the thread blocking and wait operations to further identify bottlenecks and tune your application’s performance and configuration.

Heap usage

Use this summary and the metrics Average heap usage and Peak heap usage to understand how much of your application’s maximum heap capacity is consumed by your application and to spot memory leaks. If the graph grows continuously over time, that could be an indication of a memory leak.

With the average heap usage metric, a high percentage (over 90%) could indicate that your application is close to running out of memory most of the time. If you wish to optimize this, the heap summary visualization shows you the object types consuming the most space on the heap. A low percentage (less than 10%) may indicate that your JVM is being provided much more memory than it actually requires and cost savings may be available by scaling in the application parallelism, although you should check the peak usage too.

Peak heap usage shows the highest percentage of memory consumed by your application seen by the CodeGuru Profiler agent. This is based on the same dataset as seen in the heap summary visualization. A high percentage (over 90%) could indicate that your application has high spikes of memory usage, especially if your average heap usage is low.

For more information on the heap summary, see Understanding the heap summary.

Anomalies and recommendation reports

CodeGuru Profiler uses machine learning to detect and alert on anomalies in your application profile and code. Use this to identify parts of the code for performance optimization and potential savings.

The issues identified during analysis are included in the recommendations report. Use this report to identify potential outages, latency, and other performance issues. For more information on how to work with anomalies and recommendations, see Working with anomalies and recommendation reports.


You can use visualizations associated with the preceding metrics to drill down further to identify what parts of the application configuration and application code are impacting the performance, and use these insights to improve and optimize application performance.

CodeGuru Profiler supports three types of visualizations and a heap summary to display profiling data collected from applications:

Let’s explore the profiling data collected from the preceding steps to observe and monitor application performance.

CPU utilization

The following screenshot shows the snapshot of the application’s profiling data in a flame graph visualization. This view provides a bottom-up view of the application’s profiling data, with the X-axis showing the stack profile and the Y-axis showing the stack depth. Each rectangle represents a stack frame. This visualization can help you identify specific call stacks that lead to inefficient code by looking at the top block function on CPU. This may indicate an opportunity to optimize.

Recommendation report with opportunities to optimize the application

Use the recommendation report to identify and correlate the sections of the application code that can be improved to optimize the application performance. In our example, we can improve the application code by using StringBuilder instead of String.format and by reusing the loggers rather than reinitializing them repetitively, and also by selectively applying the debug/trace logging, as recommended in the following report.

Hotspot visualization

The hotspot visualization shows a top-down view of the application’s profiling data. The functions that consume the most CPU time are at the top of the visualization and have the widest block. You can use this view to investigate functions that are computationally expensive.

Latency visualization

In this mode, you can visualize frames with different thread states, which can help you identify functions that spent a lot of time being blocked on shared resources, or waiting for I/O or sleeping. You can use this view to identify threads that are waiting or dependent on other threads and use it to improve latency on all or parts of your application.

You can inspect a visualization to further analyze any frame by selecting a frame and then choosing (right-click) the frame and choosing Inspect.

Heap summary

This summary view shows how much heap space your application requires to store all objects required in memory after a garbage collection cycle. If this value continuously grows over time until it reaches total capacity, that could be an indication of a memory leak. If this value is very low compared to total capacity, you may be able to save money by reducing your system’s memory.

For more information on how to work and explore data with visualizations, refer to Working with visualizations and Exploring visualization data.

Clean up

To avoid ongoing charges, delete the resources you created from the previous steps.

  1. On the CodeGuru console, choose Profiling groups in the navigation pane.
  2. Select the flinkappdemo profiling group.
  3. On the Actions meu, choose Delete profiling group.
  4. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  5. Select the stack you deployed (kinesis-analytics-taxi-consumer) and choose Delete.


This post explained how to configure, build, deploy, and monitor real-time streaming Java applications using Kinesis Data Analytics applications for Apache Flink and CodeGuru. We also explained how you can use CodeGuru Profiler to collect runtime performance data and metrics that can help you monitor application health and optimize your application performance.

For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics Developer Guide.

Several customers are now using CodeGuru Profiler to monitor and improve application performance, and you too can start monitoring your applications by following the instructions in the product documentation. Head over to the CodeGuru console to get started today!

About the Author

Praveen Panati is a Senior Solutions Architect at Amazon Web Services. He is passionate about cloud computing and works with AWS enterprise customers to architect, build, and scale cloud-based applications to achieve their business goals. Praveen’s area of expertise includes cloud computing, big data, streaming analytics, and software engineering.

Now Available: Updated guidance on the Data Analytics Lens for AWS Well-Architected Framework

Post Syndicated from Wallace Printz original https://aws.amazon.com/blogs/big-data/now-available-updated-guidance-on-the-data-analytics-lens-for-aws-well-architected-framework/

Nearly all businesses today require some form of data analytics processing, from auditing user access to generating sales reports. For all your analytics needs, the Data Analytics Lens for AWS Well-Architected Framework provides prescriptive guidance to help you assess your workloads and identify best practices aligned to the AWS Well-Architected Pillars: Operational Excellence, Security, Reliability, Performance Efficiency, and Cost Optimization. Today, we’re pleased to announce a completely revised and updated version of the Data Analytics Lens whitepaper.

Self-assess with Well-Architected design principles

The updated version of the Data Analytics Lens whitepaper has been revised to provide guidance to CxOs as well as all data personas. Within each of the five Well-Architected Pillars, we provide top-level design principles for CxOs to quickly identify areas for teams and fundamental rules that analytics workloads designers should follow. Each design principle is followed by a series of questions and best practices that architects and system designers can use to perform self-assessments. Additionally, the Data Analytics Lens includes suggestions that prescriptively explain steps to implement best practices useful for implementation teams.

For example, the Security Pillar design principle “Control data access” works with the best practice to build user identity solutions that uniquely identify people and systems. The associated suggestion for this best practice is to centralize workforce identities, which details how to use this principle and includes links to more documentation on the suggestion.

“Building Data Analytics platform or workloads is one of the complex architecture patterns. It involves multi-layered approach such as Data Ingestion, Data Landing, Transformation Layer, Analytical/Insight and Reporting. Choices of technology and service for each of these layers are wide. The AWS Well-Architected Analytics Lens helps us to design and validate with great confidence against each of the pillars. Now Cognizant Architects can perform assessments using the Data Analytics Lens to validate and help build secure, scalable and innovative data solutions for customers.”

– Supriyo Chakraborty, Principal Architect & Head of Data Engineering Guild, Cognizant Germany
– Somasundaram Janavikulam, Cloud Enterprise Architect & Well Architected Partner Program Lead, Cognizant

In addition to performing your own assessment, AWS can provide a guided experience through reviewing your workload with a Well-Architected Framework Review engagement. For customers building data analytics workloads with AWS Professional Services, our teams of Data Architects can perform assessments using the Data Analytics Lens during the project engagements. This provides you with an objective assessment of your workloads and guidance on future improvements. The integration is available now for customers of the AWS Data Lake launch offering, with additional Data Analytics offerings coming in 2022. Reach out to your AWS Account Team if you’d like to know more about these guided Reviews.

Updated architectural patterns and scenarios

In this version of the Data Analytics Lens, we have also revised the discussion of data analytics patterns and scenarios to keep up with the industry and modern data analytics practices. Each scenario includes sections on characteristics that help you plan when developing systems for that scenario, a reference architecture to visualize and explain how the components work together, and configuration notes to help you properly configure your solution.

This version covers the following topics:

  • Building a modern data architecture (formerly Lake House Architecture)
  • Organize around data domains by delivering data as a product using a data mesh
  • Efficiently and securely provide batch data processing
  • Use streaming ingest and stream processing for real-time workloads
  • Build operational analytics systems to improve business processes and performance
  • Provide data visualization securely and cost-effectively at scale

Changed from the first release, the machine learning and tenant analytics scenarios have been migrated to a separate Machine Learning Lens whitepaper and SaaS Lens whitepaper.


We expect this updated version will provide better guidance to validate your existing architectures, as well as provide recommendations for any gaps that identified.

For more information about building your own Well-Architected systems using the Data Analytics Lens, see the Data Analytics Lens whitepaper.

Special thanks to everyone across the AWS Solution Architecture and Data Analytics communities who contributed. These contributions encompassed diverse perspectives, expertise, and experiences in developing the new AWS Well-Architected Data Analytics Lens.

About the Authors

Wallace Printz is a Senior Solutions Architect based in Austin, Texas. He helps customers across Texas transform their businesses in the cloud. He has a background in semiconductors, R&D, and machine learning.

Indira Balakrishnan is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

Query your Amazon MSK topics interactively using Amazon Kinesis Data Analytics Studio

Post Syndicated from Chinmayi Narasimhadevara original https://aws.amazon.com/blogs/big-data/query-your-amazon-msk-topics-interactively-using-amazon-kinesis-data-analytics-studio/

Amazon Kinesis Data Analytics Studio makes it easy to analyze streaming data in real time and build stream processing applications powered by Apache Flink using standard SQL, Python, and Scala. With a few clicks on the AWS Management Console, you can launch a serverless notebook to query data streams and get results in seconds. Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications. Apache Flink is an open-source framework and engine for processing data streams. It’s highly available and scalable, delivering high throughput and low latency for stream processing applications.

If you’re running Apache Flink workloads, you may experience the non-trivial challenge of developing your distributed stream processing applications without having true visibility into the steps your application performs for data processing. Kinesis Data Analytics Studio combines the ease of use of Apache Zeppelin notebooks with the power of the Apache Flink processing engine to provide advanced streaming analytics capabilities in a fully managed offering. This accelerates developing and running stream processing applications that continuously generate real-time insights.

In this post, we introduce you to Kinesis Data Analytics Studio and how to get started querying data interactively from an Amazon Managed Streaming for Kafka (Amazon MSK) cluster using SQL, Python, and Scala. We also demonstrate how to query data across different topics using Kinesis Data Analytics Studio. Kinesis Data Analytics Studio is also compatible with Amazon Kinesis Data Streams, Amazon Simple Storage Service (Amazon S3), and a variety of other data sources supported by Apache Flink.


To get started, you must have the following prerequisites:

  • An MSK cluster
  • A data generator for populating data into the MSK cluster

To follow this guide and interact with your streaming data, you need a data stream with data flowing through.

Create and set up a Kafka cluster

You can create your Kafka cluster either using the Amazon MSK console or the following AWS Command Line Interface (AWS CLI) command. For console instructions, see Getting Started Using Amazon MSK and creating Studio notebook with MSK

You can either create topics and messages or use existing topics in the MSK cluster.

For this post, we have two topics in the MSK cluster, impressions and clicks, and they have the following fields in JSON format:

  • impressionsbid_id, campaign_id, country_code, creative_details, i_timestamp
  • clickscorrelation_id, tracker, c_timestamp

The correlation_id is the click correlation ID for a bid_id, so the field has common values across topics that we use for the join.

For the data in the MSK topic, we use the Amazon MSK Data Generator. Refer to the GitHub repo for setup and usage details. (We will be using the adtech.json sample for this blog)

The following are sample JSON records generated for the impressions topic:

   "country_code": "KN",
   "creative_details": "orchid",
   "i_timestamp": "Sat Jul 10 05:34:56 GMT 2021",
   "campaign_id": "1443403873",
   "bid_id": "0868262269"
   "country_code": "BO",
   "creative_details": "mint green",
   "i_timestamp": "Sat Jul 10 05:34:56 GMT 2021",
   "campaign_id": "1788762118",
   "bid_id": "1025543335"

The following are sample JSON records generated for the clicks topic:

   "c_timestamp": "Sat Jul 10 05:34:55 GMT 2021",
   "correlation_id": "0868262269",
   "tracker": "8q4rcfkbjnmicgo4rbw48xajokcm4xhcft7025ea1mt0htrfcvsgl1rusg8e8ez30p7orsmjx76vtrha2fi9qb3iaw8htd9uri9jauz64zdq8ldz7b0o8vzlkxs640hnwxgikpfvy5nno15c9etgrh79niku8hhtnxg94n03f2zci5ztv05jixu1r3p5yeehgm9kfd7szle9kikgo2xy5mlx09mmtlo9ndwqdznwjyj3yk02ufcwui1yvzveqfn"
   "c_timestamp": "Sat Jul 10 05:35:01 GMT 2021",
   "correlation_id": "0868262269",
   "tracker": "gfwq09yk0jwirg9mw60rrpu88h98tkd9xr645jsdoo7dwu24f8usha14uimtsfltvjmhl4i5rq24lz0aucqn6ji4da4xbo6db7lfezus7twhkw238dqw0pzdt98rn5lk8vf4tk6smkyyq38rhjaeh2ezsmlcg4v7im39u7knj10ofiint4fny0xcgqwta0uwq426oc21b1t8m446tmc6fyy7ops80xonzbzfc4a1xjd4x56x81uyg80dxyu2g7v"

Create a Kinesis Data Analytics Studio notebook

You can start interacting with your data stream by following these simple steps:

  1. On the Amazon MSK console, choose Process data in real time.
  2. Choose Apache Flink – Studio Notebook.
  3. Enter the name of your Kinesis Data Analytics Studio notebook and allow the notebook to create an AWS Identity and Access Management (IAM) role.

You can create a custom role for specific use cases on the IAM console.

  1. Choose an AWS Glue database to store the metadata around your sources and destinations, which the notebook uses.
  2. Choose Create Studio notebook.

We keep the default settings for the application and can scale up as needed.

  1. After you create the application, choose Start to start the Apache Flink application.
  2. When it’s complete (after a few minutes), choose Open in Apache Zeppelin.

To connect to an MSK cluster, you must specify the same VPC, subnets, and security groups for the Kinesis Data Analytics Studio notebook as were used to create the MSK cluster. If you chose Process data in real time during your setup, this is already set for you.

The Studio notebook is created with an IAM role for the notebook that grants the necessary access for the AWS Glue Data Catalog and tables.

Example applications

Apache Zeppelin supports the Apache Flink interpreter and allows for the use of Apache Flink directly within Zeppelin for interactive data analysis. Within the Flink interpreter, three languages are supported as of this writing: Scala, Python (PyFlink), and SQL. The notebook requires a specification to one of these languages at the top of each paragraph in order to interpret the language properly:

%flink          - Scala environment 
%flink.pyflink  - Python Environment
%flink.ipyflink - ipython Environment
%flink.ssql     - Streaming SQL Environment
%flink.bsql     - Batch SQL Environment 

There are several other predefined variables per interpreter, such as the senv variable in Scala for a StreamExecutionEnvironment, or st_env in Python for the same. You can review the full list of these entry point variables.

In this section, we show the same example code in all three languages to highlight the flexibility Zeppelin affords you for development.


We use the %flink.ssql(type=update) header to signify to the notebook that this paragraph will be interpreted as Flink SQL. We create two tables from the Kafka topics:

  • impressions – With bid_id, campaign_id, creative_details, country_code, and i_timestamp columns providing details of impressions in the system
  • clicks – With correlation_id, tracker, and c_timestamp providing details of the clicks for an impression.

The tables use the Kafka connector to read from a Kafka topic called impressions and clicks in the us-east-1 Region from the latest offset.

As soon as this statement runs within a Zeppelin notebook, AWS Glue Data Catalog tables are created according to the declaration specified in the create statement, and the tables are available immediately for queries from the MSK cluster.

You don’t need to complete this step if your AWS Glue Data Catalog already contains the tables.

CREATE TABLE impressions (
bid_id VARCHAR,
creative_details VARCHAR(10),
campaign_id VARCHAR,
country_code VARCHAR(5),
i_timestamp VARCHAR,
serve_time as TO_TIMESTAMP (`i_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR serve_time AS serve_time -INTERVAL '5' SECOND
'connector'= 'kafka',
'topic' = 'impressions',
'properties.bootstrap.servers' = '<bootstrap servers shown in the MSK client
info dialog>',
'format' = 'json',
'properties.group.id' = 'testGroup1',
'scan.startup.mode'= 'earliest-offset',
'json.timestamp-format.standard'= 'ISO-8601'

correlation_id VARCHAR,
tracker VARCHAR(100),
c_timestamp VARCHAR,
click_time as TO_TIMESTAMP (`c_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR click_time AS click_time -INTERVAL '5' SECOND
PARTITIONED BY (correlation_id)
'connector'= 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = '<bootstrap servers shown in the MSK client info dialog>',
'format' = 'json',
'properties.group.id' = 'testGroup1',
'scan.startup.mode'= 'earliest-offset',
'json.timestamp-format.standard'= 'ISO-8601'

The following screenshot is the AWS Glue Data Catalog view, which shows the tables that represent MSK topics.

In the preceding tables, WATERMARK FOR serve_time AS serve_time - INTERVAL '5' SECOND means that we can tolerate out-of-order delivery of events in the timeframe of 5 seconds and still produce correct results.

After you create the tables, run a query that calculates the number of impressions within a tumbling window of 60 seconds broken down by campaign_id and creative_details:

   AS window_end, COUNT(*) AS c
FROM impressions
  TUMBLE(serve_time, INTERVAL '60' SECOND), 
ORDER BY window_end, c DESC;

The results from this query appear as soon as results are available.

Additionally, we want to see the clickthrough rate of the impressions:

  CAST(serve_time AS TIMESTAMP) AS serveTime, 
  CAST(click_time AS TIMESTAMP) AS clickTime,
     WHEN `click_time` IS NULL THEN FALSE
     WHEN `click_time` IS NOT NULL THEN TRUE
  END AS clicked
FROM  impressions 
  ON bid_id = correlation_id AND
  click_time BETWEEN  serve_time AND 
  serve_time + INTERVAL '2' MINUTE ;

This query produces one row for each impression and matches it with a click (if any) that was observed within 2 minutes after serving the ad. This is essentially performing a join operation across the topics to get this information.

You can insert this data back into an existing Kafka topic using the following code:

INSERT INTO clickthroughrate 
     WHEN `click_time` IS NULL THEN FALSE
     WHEN `click_time` IS NOT NULL THEN TRUE
  END AS clicked
FROM  impressions 
  ON bid_id = correlation_id AND
  click_time BETWEEN  serve_time AND 
  serve_time + INTERVAL '2' MINUTE ;

Create the corresponding table for the Kafka topic in the Data Catalog if it doesn’t exist already. After you run the preceding query, you can see data in your Amazon MSK topic (see the following sample below):

1095810839,1911670336,KH,"mint green","2021-06-15 01:08:00","ainhpsm6vxgs4gvyl52v13s173gntd7jyitlq328qmam37rpbs2tj1il049dlyb2vgwx89dbvwezl2vkcynqvlqfql7pxp8blg6807yxy1y54eedwff2nuhrbqhce36j00mbxdh72fpjmztymobq79y1g3xoyr6f09rgwqna1kbejkjw4nfddmm0d56g3mkd8obrrzo81z0ktu934a00b04e9q0h1krapotnon76rk0pmw6gr8c24wydp0b2yls","2021-06-15 01:08:07",true
0946058105,1913684520,GP,magenta,"2021-06-15 01:07:56","7mlkc1qm9ntazr7znfn9msew75xs9tf2af96ys8638l745t2hxwnmekaft735xdcuq4xtynpxr68orw5gmbrhr9zyevhawjwfbvzhlmziao3qs1grsb5rdzysvr5663qg2eqi5p7braruyb6rhyxkf4x3q5djo7e1jd5t91ybop0cxu4zqmwkq7x8l7c4y33kd4gwd4g0jmm1hy1df443gdq5tnj8m1qaymr0q9gatqt7jg61cznql0z6ix8pyr","2021-06-15 01:08:07",true
0920672086,0888784120,CK,silver,"2021-06-15 01:08:03","gqr76xyhu2dmtwpv9k3gxihvmn7rluqblh39gcrfyejt0w8jwwliq24okxkho1zuyxdw9mp4vzwi0nd4s5enhvm2d74eydtqnmf7fm4jsyuhauhh3d32esc8gzpbwkgs8yymlp22ih6kodrpjj2bayh4bjebcoeb42buzb43ii1e0zv19bxb8suwg17ut2mdhj4vmf8g9jl02p2tthe9w3rpv7w9w16d14bstiiviy4wcf86adfpz378a49f36q","2021-06-15 01:08:16",true

This is the CSV data from the preceding query, which shows the ClickThroughRate for the impressions. You can use this mechanism to store data back persistently into Kafka from Flink directly.


We use the %flink header to signify that this code block will be interpreted via the Scala Flink interpreter, and create a table identical to the one from the SQL example. However, in this example, we use the Scala interpreter’s built-in streaming table environment variable, stenv, to run a SQL DDL statement. If the table already exists in the AWS Glue Data Catalog, this statement issues an error stating that the table already exists.

stenv.executeSql("""CREATE TABLE impressions (
  bid_id VARCHAR,
  creative_details VARCHAR(10),
  campaign_id VARCHAR,
  country_code VARCHAR(5),
  i_timestamp VARCHAR,
  serve_time as TO_TIMESTAMP (`i_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
  WATERMARK FOR serve_time AS serve_time -INTERVAL '5' SECOND
  WITH (
  'connector'= 'kafka',
  'topic' = 'impressions',
  'properties.bootstrap.servers' = '< Bootstrap Servers shown in the MSK client info dialog >',
  'format' = 'json',
  'properties.group.id' = 'testGroup1',
  'scan.startup.mode'= 'earliest-offset',
  'json.timestamp-format.standard'= 'ISO-8601'

 CREATE TABLE clicks (
 correlation_id VARCHAR,
 tracker VARCHAR(100),
 c_timestamp VARCHAR,
 click_time as TO_TIMESTAMP (`c_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR click_time AS click_time -INTERVAL '5' SECOND
 'connector'= 'kafka',
 'topic' = 'clicks',
 'properties.bootstrap.servers' = '< Bootstrap Servers shown in the MSK client info dialog >',
 'format' = 'json',
 'properties.group.id' = 'testGroup1',
 'scan.startup.mode'= 'earliest-offset',
 'json.timestamp-format.standard'= 'ISO-8601'

Performing a tumbling window in the Scala table API first requires the definition of an in-memory reference to the table we created. We use the stenv variable to define this table using the from function and referencing the table name. After this is created, we can create a windowed aggregation over 1 minute of data, serve_time column. See the following code:

val inputTable: Table = stenv.from("impressions")
val tumblingWindowTable = inputTable.window(Tumble over 1.minute on $"serve_time" as $"oneMinuteWindow")
.groupBy( $"oneMinuteWindow", $"campaign_id",$"creative_details")
.select($"campaign_id", $"creative_details", $"oneMinuteWindow".rowtime as "window_end",$"creative_details".count as "c")

Use the ZeppelinContext to visualize the Scala table aggregation within the notebook:

z.show(tumblingWindowTable, streamType="update")

The following screenshot shows our results.

Additionally, we want to see the clickthrough rate of the impressions by joining with the clicks:

val left:Table = stenv.from("impressions").select("bid_id,campaign_id,country_code,creative_details,serve_time")
val right:Table = stenv.from("clicks").select("correlation_id,tracker,click_time")
val result:Table = left.leftOuterJoin(right).where($"bid_id" === $"correlation_id" && $"click_time" < ( $"serve_time" + 2.minutes) && $"click_time" > $"serve_time").select($"bid_id", $"campaign_id", $"country_code",$"creative_details",$"tracker",$"serve_time".cast(Types.SQL_TIMESTAMP) as "s_time", $"click_time".cast(Types.SQL_TIMESTAMP) as "c_time" , $"click_time".isNull.?("false","true") as "clicked" )

Use the ZeppelinContext to visualize the Scala table aggregation within the notebook.

z.show(result, streamType="update")

The following screenshot shows our results.


We use the %flink.pyflink header to signify that this code block will be interpreted via the Python Flink interpreter, and create a table identical to the one from the SQL and Scala examples. In this example, we use the Python interpreter’s built-in streaming table environment variable, st_env, to run a SQL DDL statement. If the table already exists in the AWS Glue Data Catalog, this statement issues an error stating that the table already exists.

 CREATE TABLE impressions (
 bid_id VARCHAR,
 creative_details VARCHAR(10),
 campaign_id VARCHAR,
 country_code VARCHAR(5),
 i_timestamp VARCHAR,
 serve_time as TO_TIMESTAMP (`i_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR serve_time AS serve_time -INTERVAL '5' SECOND
 'connector'= 'kafka',
 'topic' = 'impressions',
 'properties.bootstrap.servers' = '< Bootstrap Servers shown in the MSK client info dialog >',
 'format' = 'json',
 'properties.group.id' = 'testGroup1',
 'scan.startup.mode'= 'earliest-offset',
 'json.timestamp-format.standard'= 'ISO-8601'
 CREATE TABLE clicks (
 correlation_id VARCHAR,
 tracker VARCHAR(100),
 c_timestamp VARCHAR,
 click_time as TO_TIMESTAMP (`c_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR click_time AS click_time -INTERVAL '5' SECOND
 'connector'= 'kafka',
 'topic' = 'clicks',
 'properties.bootstrap.servers' = '< Bootstrap Servers shown in the MSK client info dialog >',
 'format' = 'json',
 'properties.group.id' = 'testGroup1',
 'scan.startup.mode'= 'earliest-offset',
 'json.timestamp-format.standard'= 'ISO-8601'

Performing a sliding (hopping) window in the Python table API first requires the definition of an in-memory reference to the table we created. We use the st_env variable to define this table using the from_path function and referencing the table name. After this is created, we can create a windowed aggregation over 1 minute of data, emitting results every 5 seconds according to the event_time column. See the following code:


input_table = st_env.from_path("impressions")
tumbling_window_table =(input_table.window(Tumble.over("1.minute").on("serve_time").alias("one_minute_window"))
.group_by( "one_minute_window, campaign_id, creative_details")
.select("campaign_id, creative_details, one_minute_window.end as window_end, creative_details.count as c"))

Use the ZeppelinContext to visualize the Python table aggregation within the notebook:


z.show(tumbling_window_table, stream_type="update")

The following screenshot shows our results.

Additionally, we want to see the clickthrough rate of the impressions by joining with the clicks:

impressions = st_env.from_path("impressions").select("bid_id,campaign_id,country_code,creative_details,serve_time")
clicks = st_env.from_path("clicks").select("correlation_id,tracker,click_time")
results = impressions.left_outer_join(clicks).where("bid_id == correlation_id && click_time < (serve_time + 2.minutes) && click_time > serve_time").select("bid_id, campaign_id, country_code, creative_details, tracker, serve_time.cast(STRING) as s_time, click_time.cast(STRING) as c_time, (click_time.isNull).?('false','true') as clicked")


A Studio notebook consists of one or more tasks. You can split a Studio notebook task into several parallel instances to run, where each parallel instance processes a subset of the task’s data. The number of parallel instances of a task is called its parallelism, and adjusting that helps run your tasks more efficiently.

On creation, Studio notebooks are given four parallel Kinesis Processing Units (KPUs), which make up the application parallelism. To increase that parallelism, navigate to the Kinesis Data Analytics console, choose your application name, and choose the Configuration tab.

From this page, in the Scaling section, choose Edit and modify the Parallelism entry. We don’t recommend increasing the Parallelism Per KPU setting higher than 1 unless your application is I/O bound.

Choose Save changes to increase or decrease your application’s parallelism.

Clean up

You may want to clean up the demo environment when you are done, To do so, stop the Studio notebook and delete the resources created for the Data Generator and the Amazon MSK cluster ( if you created a new cluster).


Kinesis Data Analytics Studio makes developing stream processing applications using Apache Flink much faster, with rich visualizations, a scalable and user-friendly interface to develop pipelines, and the flexibility of language choice to make any streaming workload performant and powerful. You can run paragraphs from within the notebook or promote your Studio notebook to a Kinesis Data Analytics for Apache Flink application with a durable state, as shown in the SQL example in this post.

For more information, see the following resources:

About the Author

Chinmayi Narasimhadevara is a Solutions Architect focused on Big Data and Analytics at Amazon Web Services. Chinmayi has over 15 years of experience in information technology. She helps AWS customers build advanced, highly scalable and performant solutions

How NortonLifelock built a serverless architecture for real-time analysis of their VPN usage metrics

Post Syndicated from Madhu Nunna original https://aws.amazon.com/blogs/big-data/how-nortonlifelock-built-a-serverless-architecture-for-real-time-analysis-of-their-vpn-usage-metrics/

This post presents a reference architecture and optimization strategies for building serverless data analytics solutions on AWS using Amazon Kinesis Data Analytics. In addition, this post shows the design approach that the engineering team at NortonLifeLock took to build out an operational analytics platform that processes usage data for their VPN services, consuming petabytes of data across the globe on a daily basis.

NortonLifeLock is a global cybersecurity and internet privacy company that offers services to millions of customers for device security, and identity and online privacy for home and family. NortonLifeLock believes the digital world is only truly empowering when people are confident in their online security. NortonLifeLock has been an AWS customer since 2014.

For any organization, the value of operational data and metrics decreases with time. This lost value can equate to lost revenue and wasted resources. Real-time streaming analytics helps capture this value and provide new insights that can create new business opportunities.

AWS offers a rich set of services that you can use to provide real-time insights and historical trends. These services include managed Hadoop infrastructure services on Amazon EMR as well as serverless options such as Kinesis Data Analytics and AWS Glue.

Amazon EMR also supports multiple programming options for capturing business logic, such as Spark Streaming, Apache Flink, and SQL.

As a customer, it’s important to understand organizational capabilities, project timelines, business requirements, and AWS service best practices in order to define an optimal architecture from performance, cost, security, reliability, and operational excellence perspectives (the five pillars of the AWS Well-Architected Framework).

NortonLifeLock is taking a methodical approach to real-time analytics on AWS while using serverless technology to deliver on key business drivers such as time to market and total cost of ownership. In addition to NortonLifeLock’s implementation, this post provides key lessons learned and best practices for rapid development of real-time analytics workloads.

Business problem

NortonLifeLock offers a VPN product as a freemium service to users. Therefore, they need to enforce usage limits in real time to stop freemium users from using the service when their usage is over the limit. The challenge for NortonLifeLock is to do this in a reliable and affordable fashion.

NortonLifeLock runs its VPN infrastructure in almost all AWS Regions. Migrating to AWS from smaller hosting vendors has greatly improved user experience and VPN edge server performance, including a reduction in connection latency, time to connect and connection errors, faster upload and download speed, and more stability and uptime for VPN edge servers.

VPN usage data is collected by VPN edge servers and uploaded to backend stats servers every minute and persisted in backend databases. The usage information serves multiple purposes:

  • Displaying how much data a device has consumed for the past 30 days.
  • Enforcing usage limits on freemium accounts. When a user exhausts their free quota, that user is unable to connect through VPN until the next free cycle.
  • Analyzing usage data by the internal business intelligence (BI) team based on time, marketing campaigns, and account types, and using this data to predict future growth, ability to retain users, and more.

Design challenge

NortonLifeLock had the following design challenges:

  • The solution must be able to simultaneously satisfy both real-time and batch analysis.
  • The solution must be economical. NortonLifeLock VPN has hundreds of thousands of concurrent users, and if a user’s usage information is persisted as it comes in, it results in tens of thousands of reads and writes per second and tens of thousands of dollars a month in database costs.

Solution overview

NortonLifeLock decided to split storage into two parts by storing usage data in Amazon DynamoDB for real-time access and in Amazon Simple Storage Service (Amazon S3) for analysis, which addresses real-time enforcement and BI needs. Kinesis Data Analytics aggregates and loads data to Amazon S3 and DynamoDB. With Amazon Kinesis Data Streams and AWS Lambda as consumers of Kinesis Data Analytics, the implementation of user and device-level aggregations was simplified.

To keep costs down, user usage data was aggregated by the hour and persisted in DynamoDB. This spread hundreds of thousands of writes over an hour and reduced DynamoDB cost by 30 times.

Although increasing aggregation might not be an option for other problem domains, it’s acceptable in this case because it’s not necessary to be precise to the minute for user usage, and it’s acceptable to calculate and enforce the usage limit every hour.

The following diagram illustrates the high-level architecture. The solution is broken into three logical parts:

  • End-users – Real-time queries from devices to display current usage information (how much data is used daily)
  • Business analysts – Query historical usage information through Amazon Athena to extract business insights
  • Usage limit enforcement – Usage data ingestion and aggregation in real time

The solution has the following workflow:

  1. Usage data is collected by a VPN edge server and sends it to the backend service through Application Load Balancer.
  2. A single usage data record sent by the VPN edge server contains usage data for many users. A stats splitter splits the message into individual usage stats per user and forwards the message to Kinesis Data Streams.
  3. Usage data is consumed by both the legacy stats processor and the new Apache Flink application developed and deployed on Kinesis Data Analytics.
  4. The Apache Flink application carries out the following tasks:
    1. Aggregate device usage data hourly and send the aggregated result to Amazon S3 and the outgoing Kinesis data stream, which is picked up by a Lambda function that persists the usage data in DynamoDB.
    2. Aggregate device usage data daily and send the aggregated result to Amazon S3.
    3. Aggregate account usage data hourly and forward the aggregated results to the outgoing data stream, which is picked up by a Lambda function that checks if account usage is over the limit for that account. If account usage is over the limit, the function forwards the account information to another Lambda function, via Amazon Simple Queue Service (Amazon SQS), to cut off access on that account.

Design journey

NortonLifeLock needed a solution that was capable of real-time streaming and batch analytics. Kinesis Data Analysis fits this requirement because of the following key features:

  • Real-time streaming and batch analytics for data aggregation
  • Fully managed with a pay-as-you-go model
  • Auto scaling

NortonLifeLock needed Kinesis Data Analytics to do the following:

  • Aggregate customer usage data per device hourly and send results to Kinesis Data Streams (ultimately to DynamoDB) and the data lake (Amazon S3)
  • Aggregate customer usage data per account hourly and send results to Kinesis Data Streams (ultimately to DynamoDB and Lambda, which enforces usage limit)
  • Aggregate customer usage data per device daily and send results to the data lake (Amazon S3)

The legacy system processes usage data from an incoming Kinesis data stream, and they plan to use Kinesis Data Analytics to consume and process production data from the same stream. As such, NortonLifeLock started with SQL applications on Kinesis Data Analytics.

First attempt: Kinesis Data Analytics for SQL

Kinesis Data Analytics with SQL provides a high-level SQL-based abstraction for real-time stream processing and analytics. It’s configuration driven and very simple to get started. NortonLifeLock was able to create a prototype from scratch, get to production, and process the production load in less than 2 weeks. The solution met 90% of the requirements, and there were alternates for the remaining 10%.

However, they started to receive “read limit exceeded” alerts from the source data stream, and the legacy application was read throttled. With Amazon Support’s help, they traced the issues to the drastic reversal of the Kinesis Data Analytics MillisBehindLatest metric in Kinesis record processing. This was correlated to the Kinesis Data Analytics auto scaling events and application restarts, as illustrated by the following diagram. The highlighted areas show the correlation between spikes due to autoscaling and reversal of MillisBehindLatest metrics.

Here’s what happened:

  • Kinesis Data Analytics for SQL scaled up KPU due to load automatically, and the Kinesis Data Analytics application was restarted (part of scaling up).
  • Kinesis Data Analytics for SQL supports the at least once delivery model and uses checkpoints to ensure no data loss. But it doesn’t support taking a snapshot and restoring from the snapshot after a restart. For more details, see Delivery Model for Persisting Application Output to an External Destination.
  • When the Kinesis Data Analytics for SQL application was restarted, it needed to reprocess data from the beginning of the aggregation window, resulting in a very large number of duplicate records, which led to a dramatic increase in the Kinesis Data Analytics MillisBehindLatest metric.
  • To catch up with incoming data, Kinesis Data Analytics started re-reading from the Kinesis data stream, which led to over-consumption of read throughput and the legacy application being throttled.

In summary, Kinesis Data Analytics for SQL’s duplicates record processing on restarts, no other means to eliminate duplicates, and limited ability to control auto scaling led to this issue.

Although they found Kinesis Data Analytics for SQL easy to get started, these limitations demanded other alternatives. NortonLifeLock reached out to the Kinesis Data Analytics team and discussed the following options:

  • Option 1 – AWS was planning to release a new service, Kinesis Data Analytics Studio for SQL, Python, and Scala, which addresses these limitations. But this service was still a few months away (this service is now available, launched May 27, 2021).
  • Option 2 – The alternative was to switch to Kinesis Data Analytics for Apache Flink, which also provides the necessary tools to address all their requirements.

Second attempt: Kinesis Data Analytics for Apache Flink

Apache Flink has a comparatively steep learning curve (we used Java for streaming analytics instead of SQL), and it took about 4 weeks to build the same prototype, deploy it to Kinesis Data Analytics, and test the application in production. NortonLifeLock had to overcome a few hurdles, which we document in this section along with the lessons learned.

Challenge 1: Too many writes to outgoing Kinesis data stream

The first thing they noticed was that the write threshold on the outgoing Kinesis data stream was greatly exceeded. Kinesis Data Analytics was attempting to write 10 times the amount of expected data to the data stream, with 95% of data throttled.

After a lengthy investigation, it turned out that having too much parallelism in the Kinesis Data Analytics application led to this issue. They had followed default recommendations and set parallelism to 12 and it scaled up to 16. This means that every hour, 16 separate threads were attempting to write to the destination data stream simultaneously, leading to massive contention and writes throttled. These threads attempted to retry continuously, until all records were written to the data stream. This resulted in 10 times the amount of data processing attempted, even though only one tenth of the writes eventually succeeded.

The solution was to reduce parallelism to 4 and disable auto scaling. In the preceding diagram, the percentage of throttled records dropped to 0 from 95% after they reduced parallelism to 4 in the Kinesis Data Analytics application. This also greatly improved KPU utilization and reduced Kinesis Data Analytics cost from $50 a day to $8 a day.

Challenge 2: Use Kinesis Data Analytics sink aggregation

After tuning parallelism, they still noticed occasional throttling by Kinesis Data Streams because of the number of records being written, not record size. To overcome this, they turned on Kinesis Data Analytics sink aggregation to reduce the number of records being written to the data stream, and the result was dramatic. They were able to reduce the number of writes by 1,000 times.

Challenge 3: Handle Kinesis Data Analytics Flink restarts and the resulting duplicate records

Kinesis Data Analytics applications restart because of auto scaling or recovery from application or task manager crashes. When this happens, Kinesis Data Analytics saves a snapshot before shutdown and automatically reloads the latest snapshot and picks up where the work was left off. Kinesis Data Analytics also saves a checkpoint every minute so no data is lost, guaranteeing exactly-once processing.

However, when the Kinesis Data Analytics application shut down in the middle of sending results to Kinesis Data Streams, it doesn’t guarantee exactly-once data delivery. In fact, Flink only guarantees at least once delivery to Kinesis Data Analytics sink, meaning that Kinesis Data Analytics guarantees to send a record at least once, which leads to duplicate records sent when Kinesis Data Analytics is restarted.

How were duplicate records handled in the outgoing data stream?

Because duplicate records aren’t handled by Kinesis Data Analytics when sinks do not have exactly-once semantics, the downstream application must deal with the duplicate records. The first question you should ask is whether it’s necessary to deal with the duplicate records. Maybe it’s acceptable to tolerate duplicate records in your application? This, however, is not an option for NortonLifeLock, because no user wants to have their available usage taken twice within the same hour. So, logic had to be built in the application to handle duplicate usage records.

To deal with duplicate records, you can employ a strategy in which the application saves an update timestamp along with the user’s latest usage. When a record comes in, the application reads existing daily usage and compares the update timestamp against the current time. If the difference is less than a configured window (50 minutes if the aggregation window is 60 minutes), the application ignores the new record because it’s a duplicate. It’s acceptable for the application to potentially undercount vs. overcount user usage.

How were duplicate records handled in the outgoing S3 bucket?

Kinesis Data Analytics writes temporary files in Amazon S3 before finalizing and removing them. When Kinesis Data Analytics restarts, it attempts to write new S3 files, and potentially leaves behind temporary S3 files because of restart. Because Athena ignores all temporary S3 files, no further is action needed. If your BI tools take temporary S3 files into consideration, you have to configure the Amazon S3 lifecycle policy to clean up temporary S3 files after a certain time.


NortonLifelock has been successfully running a Kinesis Data Analytics application in production since May 2021. It provides several key benefits. VPN users can now keep track of their usage in near-real time. BI analysts can get timely insights that are used for targeted sales and marketing campaigns, and upselling features and services. VPN usage limits are enforced in near-real time, thereby optimizing the network resources. NortonLifelock is saving tens of thousands of dollars each month with this real-time streaming analytics solution. And this telemetry solution is able to keep up with petabytes of data flowing through their global VPN service, which is seeing double-digit monthly growth.

To learn more about Kinesis Data Analytics and getting started with serverless streaming solutions on AWS, please see Developer Guide for Studio, the easiest way to build Apache Flink applications in SQL, Python, Scala in a notebook interface.

About the Authors

Lei Gu has 25 years of software development experience and the architect for three key Norton products, Norton Secure Backup, VPN and Norton Family. He is passionate about cloud transformation and most recently spoke about moving from Cassandra to Amazon DynamoDB at AWS re:Invent 2019. Check out his Linkedin profile at https://www.linkedin.com/in/leigu/.

Madhu Nunna is a Sr. Solutions Architect at AWS, with over 20 years of experience in networks and cloud, with the last two years focused on AWS Cloud. He is passionate about Analytics and AI/ML. Outside of work, he enjoys hiking and reading books on philosophy, economics, history, astronomy and biology.

Get started with Flink SQL APIs in Amazon Kinesis Data Analytics Studio

Post Syndicated from Sam Mokhtari original https://aws.amazon.com/blogs/big-data/get-started-with-flink-sql-apis-in-amazon-kinesis-data-analytics-studio/

Before the release of Amazon Kinesis Data Analytics Studio, customers relied on Amazon Kinesis Data Analytics for SQL on Amazon Kinesis Data Streams. With the release of Kinesis Data Analytics Studio, data engineers and analysts can use an Apache Zeppelin notebook within Studio to query streaming data interactively from a variety of sources, like Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Simple Storage Service (Amazon S3), and other sources using custom connectors.

In this post, we cover some of the most common query patterns to run on streaming data using Apache Flink relational APIs. Out of the two relational API types supported by Apache Flink, SQL and Table APIs, our focus is on SQL APIs. We expect readers to have knowledge of Kinesis Data Streams, AWS Glue, and AWS Identity and Access Management (IAM). In this post, we use a sales transaction use case to walk you through the examples of tumbling, sliding, session and windows, group by, and joins query operations. We expect readers to have a basic knowledge of SQL queries and streaming window concepts.

Solution architecture

To show the working solution of interactive analytics on streaming data, we use a Kinesis Data Generator UI application to generate the stream of data, which continuously writes to Kinesis Data Streams. For the interactive analytics on Kinesis Data Streams, we use Kinesis Data Analytics Studio that uses Apache Flink as the processing engine, and notebooks powered by Apache Zeppelin. These notebooks come with preconfigured Apache Flink, which allows you to query data from Kinesis Data Streams interactively using SQL APIs. To use SQL queries in the Apache Zeppelin notebook, we configure an AWS Glue Data Catalog table, which is configured to use Kinesis Data Streams as a source. This configuration allows you to query the data stream by referring to the AWS Glue table in SQL queries.

We use an AWS CloudFormation template to create the AWS resources shown in the following diagram.

Set up the environment

After you sign in to your AWS account, launch the CloudFormation template by choosing Launch Stack:

The CloudFormation template configures the following resources in your account:

  • Two Kinesis data streams, one for sales transactions and one for card data
  • A Kinesis Data Analytics Studio application
  • An IAM role (service execution role) for Kinesis Data Analytics Studio
  • Two AWS Glue Data Catalog tables: sales and card

After you complete the setup, sign in to the Kinesis Data Analytics console. On the Kinesis Data Analytics applications page, choose the Studio tab, where you can see the Studio notebook in ready status. Select the Studio notebook, choose Run, and wait until the notebook is in running status. It can take a couple of minutes for the notebook to get into running status.

To run the analysis on streaming data, select the Apache Zeppelin notebook environment and open it. You have the option to create a new note in the notebook.

Run stream analytics in an interactive application

Before you start running interactive analytics with a Studio notebook, you need to start streaming data into your Kinesis data stream, which you created earlier using the CloudFormation stack. To generate streaming data into the data stream, we use a hosted Kinesis Data Generator UI application.

  1. Create an Amazon Cognito user pool in your account and user in that pool. For instructions, see the GitHub repo.
  2. Log in to the Kinesis Data Generator application.
  3. Choose the Region where the CloudFormation template was run to create the Kinesis data stream.
  4. Choose the data stream from the drop-down menu and select the data stream for sales.
  5. Set records per second to 10.
  6. Use the following code for the record template:
    "customer_card_id": {{random.number({
    "customer_id": {{random.number({
    "price": {{random.number(
    "product_id": "{{random.arrayElement(
  1. Choose Send Data.

To run the table join queries in the example section, you need to stream sample card data to a separate data stream.

  1. Choose the Region where you created the data stream.
  2. Choose the data stream from the drop-down menu.
  3. Select the data stream for card.
  4. Set records per second to 5.
  5. Use the following code for the record template:
    "card_id": {{random.number({
    "card_number": {{random.number({
    "card_zip": "{{random.arrayElement(
    "card_name": "{{random.arrayElement(
        ["Laura Perez","Peter Han","Karla Johnson"]
  1. Choose Send Data.
  2. Go back to the notebook note and specify the language Studio uses to run the application.

You need to specify Flink interpreter supported by Apache Zeppelin notebook, like Python, IPython, stream SQL, or batch SQL. Because we use Python Flink streaming SQL APIs in this post, we use the stream SQL interpreter ssql as the first statement:


Common query patterns with Flink SQL

In this section, we walk you through examples of common query patterns using Flink SQL APIs. In all the examples, we refer to the sales table, which is the AWS Glue table created by the CloudFormation template that has Kinesis Data Streams as a source. It’s the same data stream where you publish the sales data using the Kinesis Data Generator application.

Windows and aggregation

In this section, we cover examples of windowed and aggregate queries: tumbling, sliding, and session window operations.

Tumbling window

In the following example, we use SUM aggregation on a tumbling window. The query emits the total spend for every customer every 30-second window interval.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.115 78 118 B552B4B940D0 80
2021-04-20 21:31:01.328 75 101 E6DA5387367B 60
2021-04-20 21:31:01.504 78 101 4E5750DC2A1D 110
2021-04-20 21:31:01.678 75 148 4E5750DC2A1D 110
2021-04-20 21:31:01.960 78 118 B552B4B940D0 80

We use the following code for our query:

SELECT TUMBLE_END(proctime, INTERVAL '30' SECOND) as window_end_time, customer_id
, SUM(price) as tumbling_30_seconds_sum
FROM sales
GROUP BY TUMBLE(proctime, INTERVAL '30' SECOND), customer_id

The following table shows our results.

windown_end_time customer_id tumbling_30_seconds_sum
2021-04-20 21:31:01.0 75 170
2021-04-20 21:31:01.0 78 80
2021-04-20 21:31:30.0 75 110
2021-04-20 21:31:30.0 78 190

Sliding window

In this sliding window example, we run a SUM aggregate query that emits the total spend for every customer every 10 seconds for the 30-second window.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:31:01.30 78 101 4E5750DC2A1D 110
2021-04-20 21:31:01.36 75 148 4E5750DC2A1D 110
2021-04-20 21:31:01.40 78 118 B552B4B940D0 80

We use the following code for our query:

SELECT HOP_END(proctime, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS window_end_time
, customer_id, SUM(price) AS sliding_30_seconds_sum
FROM sales
GROUP BY HOP(proctime, INTERVAL '10' SECOND, INTERVAL '30' SECOND), customer_id

The following table shows our results.

window_end_time customer_id sliding_30_seconds_sum
2021-04-20 21:31:01.10 75 110
2021-04-20 21:31:01.20 75 110
2021-04-20 21:31:01.20 78 80
2021-04-20 21:31:30.30 75 170
2021-04-20 21:31:30.30 78 190
2021-04-20 21:31:30.40 75 280
2021-04-20 21:31:30.40 78 270

Session window

The following example of a session window query finds the total spend per session for a 1-minute gap of inactivity. To generate the result, we stream the data from the Kinesis Data Generator application and stop streaming for more than a minute to create a 1-minute gap of inactivity.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

We use the following code for our query:

SELECT customer_id, SESSION_START(proctime, INTERVAL '1' MINUTE) AS session_start_time
, SESSION_PROCTIME(proctime, INTERVAL '1' MINUTE) AS session_end_time, SUM(price) AS total_spend
FROM sales
GROUP BY SESSION(proctime, INTERVAL '1' MINUTE), customer_id

The following table shows our results.

session_start_time session_end_time total_spend
2021-04-20 21:31:01.10 2021-04-20 21:32:01.28 250
2021-04-20 21:32:50.30 2021-04-20 21:32:50.36 220

Data filter and consolidation

To show an example of a filter and union operation, we create two separate datasets using the filter condition and combine them using the UNION operation.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

We use the following code for our query:

(SELECT customer_id, product_id, price FROM sales WHERE price > 100 AND  product_id <> '4E5750DC2A1D')
(SELECT customer_id, product_id, price FROM sales WHERE product_id = '4E5750DC2A1D' AND price > 250)

The following table shows our results.

customer_id product_id price
78 4E5750DC2A1D 300
75 B552B4B940D0 170
78 B552B4B940D0 110
75 4E5750DC2A1D 260

Table joins

Flink SQL APIs support different types of join conditions, like inner join, outer join, and interval join. You want to limit the resource utilization from growing indefinitely, and run joins effectively. For that reason, in our example, we use table joins using an interval join. An interval join requires one equi-join predicate and a join condition that bounds the time on both sides. In this example, we join the dataset of two Kinesis Data Streams tables based on the card ID, which is a common field between the two stream datasets. The filter condition in the query is based on a time constraint, which restricts resource utilization from growing.

The following table shows our sales input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

The following table shows our cards input data.

card_id card_number card_zip card_name
101 23274397 23738 Laura Perez
118 54093472 7422 Karla Johnson
101 23274397 23738 Laura Perez
101 23274397 23738 Laura Perez
148 91368810 7422 Peter Han

We use the following code for our query:

SELECT sales.proctime, customer_card_id, card_zip, product_id, price
FROM card INNER JOIN sales ON card.card_id = sales.customer_card_id
WHERE sales.proctime BETWEEN card.proctime - INTERVAL '5' MINUTE AND card.proctime;

The following table shows our results.

proctime customer_card_id card_zip product_id price
2021-04-20 21:31:01.10 101 23738 4E5750DC2A1D 110
2021-04-20 21:31:01.20 118 7422 B552B4B940D0 80
2021-04-20 21:31:01.28 101 23738 E6DA5387367B 60
2021-04-20 21:32:50.30 101 23738 4E5750DC2A1D 110
2021-04-20 21:32:50.36 148 7422 4E5750DC2A1D 110

 Data partitioning and ranking

To show the example of Top-N records, we use the same input dataset as in the previous join example. In this example, we run a query to find the top sales records by sales price in each zip code. We use the OVER window clause to rank sales in each zip code using a PARTITION BY clause. Next, we order the records in each zip code with an ORDER BY clause on the price field in descending order. The result of this operation is a ranking of each record based on the OVER clause condition. We use the external block of the query to filter the result on ranking so that we get the top sales in each zip code.

We use the following code for our query:

SELECT card_zip, customer_card_id, product_id, price FROM (
ROW_NUMBER() OVER (PARTITION BY card_zip ORDER BY price DESC) as row_num
FROM card INNER JOIN sales ON card.card_id = sales.customer_card_id
WHERE sales.proctime BETWEEN card.proctime - INTERVAL '5' MINUTE AND card.proctime
WHERE row_num = 1

The following table shows our results.

card_zip customer_card_id product_id price
23738 101 4E5750DC2A1D 110
7422 148 4E5750DC2A1D 110

Data transformation

There are times when you want to transform incoming data. The Flink SQL API has many built-in functions to support a wide range of data transformation requirements, including string functions, date functions, arithmetic functions, and so on. For the complete list, see System (Built-in) Functions.

Extract a portion of a string

In this example, we use the SUBSTR string function to subtract the first four digits and only return the last four digits of the card number.

The following table shows our sales input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

The following table shows our cards input data.

card_id card_number card_zip card_name
101 23274397 23738 Laura Perez
118 54093472 7422 Karla Johnson
101 23274397 23738 Laura Perez
101 23274397 23738 Laura Perez
148 91368810 7422 Peter Han

We use the following code for our query:

SELECT proctime, SUBSTR(card_number,5) AS partial_card_number,    card_zip, product_id, price
FROM card INNER JOIN sales ON card.card_id = sales.customer_card_id

The following table shows our results.

proctime partial_card_number card_zip product_id price
2021-04-20 21:31:01.10 4397 23738 4E5750DC2A1D 110
2021-04-20 21:31:01.20 3472 7422 B552B4B940D0 80
2021-04-20 21:31:01.28 4397 23738 E6DA5387367B 60
2021-04-20 21:32:50.30 4397 23738 4E5750DC2A1D 110
2021-04-20 21:32:50.36 8810 7422 4E5750DC2A1D 110

Replace a substring

In this example, we use the REGEXP_REPLACE string function to remove all the characters after the space from the card_name field. Assuming that the first name and last name are separated by a space, the query returns the first name only.

The following table shows our cards input data.

card_id card_number card_zip card_name
101 23274397 23738 Laura Perez
118 54093472 7422 Karla Johnson
101 23274397 23738 Laura Perez
101 23274397 23738 Laura Perez
148 91368810 7422 Peter Han

We use the following code for our query:

SELECT card_id, REGEXP_REPLACE(card_name,' .*','') card_name
FROM card

The following table shows our results.

card_id card_name
101 Laura
118 Karla
101 Laura
101 Laura
148 Jason

Split the string field into multiple fields

In this example, we use the SPLIT_INDEX string function to split the card_name field into first_name and last_name, assuming the card_name field is a full name separated by space.

The following table shows our cards input data.

card_id card_number card_zip card_name
101 23274397 23738 Laura Perez
118 54093472 7422 Karla Johnson
101 23274397 23738 Laura Perez
101 23274397 23738 Laura Perez
148 91368810 7422 Peter Han

We use the following code for our query:

SELECT card_id, SPLIT_INDEX(card_name,' ',0) first_name, SPLIT_INDEX(card_name,' ',1) last_name
FROM card

The following table shows our results.

card_id first_name last_name
101 Laura Perez
118 Karla Johnson
101 Laura Perez
101 Laura Perez
148 Peter Han

Transform data using a CASE statement

There are times when you want to transform the result value and apply labels to get insights. For our example, we label the risk level as high, medium, or low for every customer (who is purchasing in the window) based on the number of purchases in the last 5-minute sliding window that emits results every 30 seconds.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:30.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:38.20 78 118 B552B4B940D0 80
2021-04-20 21:31:42.28 75 101 E6DA5387367B 60
2021-04-20 21:31:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:31:50.36 75 148 4E5750DC2A1D 110

We use the following code for our query:

SELECT customer_id, CASE
WHEN total_purchases BETWEEN 1 AND 2 THEN 'LOW'
WHEN total_purchases BETWEEN 3 AND 10 THEN 'MEDIUM'
END as risk
, customer_id, COUNT(1) AS total_purchases
FROM sales
GROUP BY HOP(proctime, INTERVAL '30' SECOND, INTERVAL '5' MINUTE), customer_id

The following table shows our results.

customer_id risk
78 LOW

DateTime data transformation

The Flink SQL API has a wide range of built-in functions to operate on the date timestamp field, like extracting the day, month, week, hour, minute, day of the month, and so on. There are functions to convert the date timestamp field. In this example, we use the MINUTE and HOUR functions to extract the minute of an hour and the hour from the timestamp field.

The following table shows our sales input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

We use the following code for our query:

SELECT HOUR(TIMESTAMP proctime) AS transaction_hour, MINUTE(TIMESTAMP proctime) AS transaction_min,customer_id, product_id, price
FROM sales

The following table shows our results.

transaction_hour transaction_min customer_id product_id price
21 31 75 4E5750DC2A1D 110
21 31 78 B552B4B940D0 80
21 31 75 E6DA5387367B 60
21 32 78 4E5750DC2A1D 110
21 32 75 4E5750DC2A1D 110


In this post, we used sales and card examples to demonstrate different query patterns to get insight from streaming data using Apache Flink SQL APIs. We walked you through examples of Flink SQL queries that you can run within Kinesis Data Analytics Studio. In just a few minutes, you can start running interactive analytics with the examples in this post.

You can quickly start developing a stream processing application using Studio from the supported languages like SQL, Python, and Scala. If you want to generate continuous actionable insights, you can easily build and deploy your code as an Apache Flink application with durable state from the notebook within Studio. For more information, see Deploying as an application with durable state.

For further reading on Flink SQL queries that you can use in Kinesis Data Analytics Studio, visit the official page at Apache Flink 1.11 SQL Queries.

About the Authors

Dr. Sam Mokhtari is a Senior Solutions Architect at AWS. His main area of depth is “Data & Analytics” and he published more than 30 influential articles in this field. He is also a respected data & analytics advisor who led several large-scale implementation projects across different industries including energy, health, telecom and transport.



Mitesh Patel is a Senior Solutions Architect at AWS. He works with customers in SMB to help them develop scalable, secure and cost effective solutions in AWS. He enjoys helping customers in modernizing applications using microservices and implementing serverless analytics platform.

Top 10 Flink SQL queries to try in Amazon Kinesis Data Analytics Studio

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/top-10-flink-sql-queries-to-try-in-amazon-kinesis-data-analytics-studio/

Amazon Kinesis Data Analytics Studio makes it easy to analyze streaming data in real time and build stream processing applications using standard SQL, Python, and Scala. With a few clicks on the AWS Management Console, you can launch a serverless notebook to query data streams and get results in seconds. Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications. Apache Flink is an open-source framework and engine for processing data streams. It’s highly available and scalable, delivering high throughput and low latency for stream processing applications.

Apache Flink’s SQL support uses Apache Calcite, which implements the SQL standard, allowing you to write simple SQL statements to create, transform, and insert data into streaming tables defined in Apache Flink. In this post, we discuss some of the Flink SQL queries you can run in Kinesis Data Analytics Studio.

The Flink SQL interface works seamlessly with both the Apache Flink Table API and the Apache Flink DataStream and Dataset APIs. Often, a streaming workload interchanges these levels of abstraction in order to process streaming data in a way that works best for the current operation. A simple filter pattern might call for a Flink SQL statement, whereas a more complex aggregation involving object-oriented state control could require the DataStream API. A workload could extract patterns from a data stream using the DataStream API, then later use the Flink SQL API to analyze, scan, filter, and aggregate them.

For more information about the Flink SQL and Table APIs, see Concepts & Common API, specifically the sections about the different planners that the interpreters use and how to structure an Apache Flink SQL or Table API program.

Write an Apache Flink SQL application in Kinesis Data Analytics Studio

With Kinesis Data Analytics Studio, you can query streams of millions of records per second, scaling the notebook accordingly. With the power of Kinesis Data Analytics for Apache Flink, with a few simple SQL statements, you can have a truly powerful Apache Flink application or analytical dashboard.

Need help getting started? It’s easy to get started with Amazon Kinesis Data Analytics Studio. In the next sections, we cover a variety of ways to interact with your incoming data stream—querying, aggregating, sinking, and processing data in a Kinesis Data Analytics Studio notebook. First, let’s create an in-memory table for our data stream.

Create an in-memory table for incoming data

Start by registering your in-memory table using a CREATE statement. You can configure these statements to connect to Amazon Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters, or any other currently supported connector within Apache Flink, such as Amazon Simple Storage Service (Amazon S3).

You need to specify at the top of your paragraph that you’re using the Flink SQL interpreter denoted by the Zeppelin magic % followed by flink.ssql and the type of paragraph. In most cases, this is an update paragraph, in which the output is updated continuously. You can also use type=single if the result of a query is one row, or type=append if the output of the query is appended to the existing results. See the following code:


CREATE TABLE stock_table (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
'connector' = 'kinesis',
'stream' = 'input-stream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601')

This example showcases creating a table called stock_table with a ticker, price, and event_time column, which signifies the time at which the price is recorded for the ticker. The WATERMARK clause defines the watermark strategy for generating watermarks according to the event_time (row_time) column. The event_time column is defined as Timestamp(3) and is a top-level column used in conjunction with watermarks. The syntax following the WATERMARK definition—FOR event_time AS event_time - INTERVAL '5' SECOND—declares that watermarks are emitted according to a bounded-out-of-orderness watermark strategy, allowing for a 5-second delay in event_time data. The table uses the Kinesis connector to read from a Kinesis data stream called input-stream in the us-east-1 Region from the latest stream position.

As soon as this statement runs within a Zeppelin notebook, an AWS Glue Data Catalog table is created according to the declaration specified in the CREATE statement, and the table is available immediately for queries from Kinesis Data Streams.

You don’t need to complete this step if your Data Catalog already contains the table. You can either create a table as described, or use an existing Data Catalog table.

The following screenshots shows the table created in the Glue Data Catalog.

Query your data stream with live updates

After you create the table, you can perform simple queries of the data stream by writing a SELECT statement, which allows the visualization of data in tabular form, as well as bar charts, pie charts, and more:

SELECT * FROM stock_table;

Choosing a different visualization amongst the different charts is as simple as selecting the option from the top left of the result set.

To drop or recreate this table, you can delete it manually from the Data Catalog by navigating to the table on the AWS Glue console, but you can also explicitly drop the table from the Kinesis Data Analytics Studio notebook:

DROP TABLE stock_table;

Filter functions

You can perform simple FILTER operations on the data stream using the keyword WHERE. In the following code example, the stream is filtered for all stock ticker records starting with AM:


SELECT * FROM stock_table WHERE ticker LIKE 'AM%'

The following screenshot shows our results.

User-defined functions

You can register user-defined functions (UDFs) within the notebook to be used within our Flink SQL queries. These must be registered in the table environment to be used by Flink SQL within the Kinesis Data Analytics Studio application. UDFs are functions that can be defined outside of the scope of Flink SQL that use custom logic or frequent transformations that would otherwise be impossible to express in SQL.

UDFs are implemented in Scala within the Kinesis Data Analytics Studio, with Python UDF support coming soon. UDFs can use arbitrary libraries to act upon the data.

Let’s define a UDF that converts the ticker symbol to lowercase, and another that converts the event_time into epoch seconds:


import java.time.LocalDateTime
import java.time.format.DateTimeFormatter._
import java.time.ZoneOffset

class DateTimeToEpoch extends ScalarFunction {
def eval(datetime: LocalDateTime) = datetime.toEpochSecond(ZoneOffset.UTC)
stenv.registerFunction("dt_to_epoch", new DateTimeToEpoch())

class ScalaLowerCase extends ScalarFunction {
def eval(str: String) = str.toLowerCase
stenv.registerFunction("to_lower", new ScalaLowerCase())

At the bottom of each UDF definition, the stenv (StreamingTableEnvironment) within Scala is used to register the function with a given name.

After it’s registered, you can simply call the UDF within the Flink SQL paragraph to transform our data:

SELECT to_lower(ticker) as lowercase_ticker, price, dt_to_epoch(event_time) as epoch_time from stock_table;

The following screenshot shows our results.

Enrichment from an external data source (joins)

You may need to enrich streaming data with static or reference data stored outside of the data stream. For example, a company address and metadata might be stored external to the stock transactions flowing into a data stream in a relational database or flat file on Amazon S3. To enrich a data stream with this, Flink SQL allows you to join reference data to a streaming source. This enrichment static data may or may not have a time element associated with it. If it doesn’t have time elements associated, you may need to add a processing time element to the data read in from externally in order to join it up with the time-based stream. This is to avoid getting stale data, and is something to take note of in your enrichments.

Let’s define an enrichment file to source our data from, which is located in Amazon S3. The bucket contains a single CSV file containing the stock ticker and the associated company metadata—full name, city, and state:


CREATE TABLE company_details_table (
  ticker_symbol VARCHAR(6),
  company_name VARCHAR,
  company_city VARCHAR,
  company_state_abbrev VARCHAR
)  WITH (
  'connector' = 'filesystem',          
  'path' = 's3a://interactive-applications/data-mapping-stock-enrichment.csv', 
  'format' = 'csv'                   

This CSV file is read in at once and the task is marked as finished. You can now join this with the existing stock_table:

SELECT ticker, price, company_name, event_time, company_city, company_state_abbrev FROM (SELECT CAST(event_time AS TIMESTAMP) as event_time, ticker, price from stock_table)
JOIN company_details_table cd
ON ticker=ticker_symbol;

As of this writing, Flink has a limitation in which it can’t distinguish between interval joins (requiring timestamps in both tables) and regular joins. Because of this, you need to explicitly cast the rowtime column (event_time) to a regular timestamp so that it’s not incorporated into the regular join. If both tables have a timestamp, the ideal case is to include them in the WHERE clause of a join statement. The following screenshot shows our results.

Tumbling windows

Tumbling windows can be thought of as mini-batches of aggregations over a non-overlapping window of time. For example, computing the max price over 30 seconds, or the ticker count over 10 seconds. To perform this functionality with Apache Flink SQL, use the following code:


SELECT ticker_symbol, COUNT(ticker_symbol) AS ticker_symbol_count
FROM stock_ticker_table
GROUP BY TUMBLE(processing_time, INTERVAL '10' second), ticker_symbol;

The following screenshot shows our output.

Sliding windows

Sliding windows (also called hopping windows) are virtually identical to tumbling windows, save for the fact that these windows can be overlapping. Data can be emitted from a sliding window every X seconds over a Y-second window. For example, with the preceding use case, you can have a 10-second count of data that is emitted every 5 seconds:


SELECT ticker_symbol, COUNT(ticker_symbol) AS ticker_symbol_count
FROM stock_ticker_table
GROUP BY HOP(processing_time, INTERVAL '5' second, INTERVAL '10' second), ticker_symbol;

The following screenshot shows our results.

Sliding window with a filtered alarm

To filter records from a data stream to trigger some sort of alarm or use them downstream, the following example shows a filtered sliding window being inserted into an aggregated count table that is configured to write out to a data stream. This could later be actioned upon to alert of a high transaction rate or other metric by using Amazon CloudWatch or another triggering mechanism.

The following CREATE TABLE statement is connected to a Kinesis data stream, and the insert statement directly after it filters all ticker records starting with AM, where there are 750 records in a 1-minute interval:


CREATE TABLE stock_ticker_count_table (
    ticker_symbol VARCHAR(4),
    ticker_symbol_count INTEGER
'connector' = 'kinesis',
'stream' = 'output-stream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');

INSERT INTO  stock_ticker_count_table
    (SELECT ticker_symbol, CAST(COUNT(ticker_symbol) AS INTEGER) AS ticker_symbol_count
    FROM stock_ticker_table
    WHERE ticker_symbol like 'AM%'
    GROUP BY HOP(processing_time, INTERVAL '30' second, INTERVAL '1' minute), ticker_symbol)
WHERE ticker_symbol_count > 750;

Event time

If the incoming data contains timestamp information, your data pipeline will better reflect reality by using event time instead of processing time. The difference is that event time reflects the time the record was generated rather than the time Kinesis Data Analytics for Apache Flink received the record.

To specify event time in your Flink SQL create statement, the element being used for event time must be of type TIMESTAMP(3), and must be accompanied by a watermark strategy expression. The event time column can also be computed if it’s not of type TIMESTAMP(3). Defining the watermark strategy expression marks the event time field as the event time attribute, and explains how to handle late-arriving data.

The watermark strategy expression defines the watermark strategy. The watermark generation is computed for every record, and handles the order of data accordingly.

Late data in streaming workloads is quite common and for the most part unavoidable. This late-arriving data could be a result of network lag, data buffering or slow processing, and anything in-between. For ascending timestamp workloads that may introduce late data, you can use the following watermark strategy:

WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

This code emits a watermark of the max observed timestamp minus one record. Rows with timestamps earlier or equal to the max timestamp aren’t considered late.

Bounded-out-of-orderness timestamps

To emit watermarks that are the maximum observed timestamp minus a specified delay, the bounded-of-orderness definition lets you define the allowed lateness of records in a data stream:

WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '3' SECOND

The preceding code emits a 3-second delayed watermark. The example can be found in the intro of this post. The watermark instructs the stream as to how to handle late-arriving data. Consider the scenario where a stock ticker updates a real-time dashboard every 5 seconds with real-time data. If data arrives to the stream 10 seconds late (according to event time), we want to discard that data so that it’s not reflected onto the dashboard. The watermark tells Apache Flink how to handle that late-arriving data.


A common pattern in streaming data is the ability to detect patterns. Apache Flink features a complex event processing library to detect patterns in data, and the Flink SQL API allows this detection in a relational query syntax.

A MATCH_RECOGNIZE query in Flink SQL allows for the logical partitioning and identification of patterns within a streaming table. The following example manipulates our stock table:


FROM stock_table
        PARTITION BY ticker
        ORDER BY event_time
            A.event_time AS initialPriceTime,
            C.event_time AS dropTime,
            A.price - C.price AS dropDiff,
            A.price as initialPrice,
            C.price as lastPrice
            B AS B.price > A.price - 500

In this query, we’re identifying a drop-in price for a particular stock of $500 over 10 minutes. Let’s break down the MATCH_RECOGNIZE query into its components.

The following code queries our already existing stock_table:

SELECT * FROM stock_table

The MATCH_RECOGNIZE keyword begins the pattern matching clause of the query. This signifies that we’re identifying a pattern within the table.

The following code defines the logical partitioning of the table, similar to a GROUP BY expression:


The following code defines how the incoming data should be ordered. All MATCH_RECOGNIZE patterns require both a partitioning and an ordering scheme in order to identify patterns.

ORDER BY event_time

MEASURES defines the output of the query. You can think of this as the SELECT statement, because this is what ultimately comes out of the pattern.

In the following code, we select the rows out of the pattern identification to output:

A.event_time AS initialPriceTime,
C.event_time AS dropTime,
A.price - C.price AS dropDiff,
A.price as initialPrice,
C.price as lastPrice

We use the following parameters:

  • A.event_time – The first time recorded in the pattern, from which there was a decrease in price of $500
  • C.event_time – The last time recorded in the pattern, which was at least $500 less than A.price
  • A.price – C.price – The difference in price between the first and last record in the pattern
  • A.price – The first price recorded in the pattern, from which there was a decrease in price of $500
  • C.price – The last price recorded in the pattern, which was at least $500 less than A.price

ONE ROW PER MATCH defines the output mode—how many rows should be emitted for every found match. As of Apache Flink 1.12, this is the only supported output mode. For alternatives that aren’t currently supported, see Output Mode.

The following code defines the after match strategy:


This code tells Flink SQL how to start a new matching procedure after the match was found. This particular definition skips all rows in the current pattern and goes to the next row in the stream. This makes sure there are no overlaps in pattern events. For alternative AFTER MATCH SKIP strategies, see After Match Strategy. This strategy can be thought of as a tumbling window type aggregation, because the results of the pattern don’t overlap with each other.

In the following code, we define the pattern A B* C, which states that we will have a sequence of concatenated records:


We use the following sequence:

  • A – The first record in the sequence
  • B* – zero or more records matching the constraint defined in the DEFINE clause
  • C – The last record in the sequence

The names of these variables are defined within the PATTERN clause, and follow a regex-like syntax. For more information, see Defining a Pattern.

In the following code, we define the B pattern variable as a record’s price, so long as that price is greater than the first record in the pattern minus 500:

    B AS B.price > A.price - 500

For example, suppose we had the following pattern.

row ticker price event_time
1 AMZN 800 10:00 am
2 AMZN 400 10:01 am
3 AMZN 500 10:02 am
4 AMZN 350 10:03 am
5 AMZN 200 10:04 am

We define the following:

  • A – Row 1
  • B – Rows 2–4, which all match the condition in the DEFINE clause
  • C – Row 5, which breaks the pattern of matching the B condition, so it’s the last row in the pattern

The following screenshot shows our full example.


Top-N queries identify the N smallest or largest values ordered by columns. This query is useful in cases in which you need to identify the top 10 items in a stream, or the bottom 10 items in a stream, for example.

Flink can use the combination of an OVER window clause and a filter expression to generate a Top-N query. An OVER / PARTITION BY clause can also support a per-group Top-N. See the following code:

    SELECT *, ROW_NUMBER() OVER (PARTITION BY ticker_symbol ORDER BY price DESC) as row_num
    FROM stock_table)
WHERE row_num <= 10;


If the data being generated into your data stream can incur duplicate entries, you have several strategies for eliminating these. The simplest way to achieve this is through deduplication, in which you remove rows in a window, keeping only the first or last element according to the timestamp.

Flink can use ROW_NUMBER to remove duplicates in the same way it does in the Top-N example. Simply write your OVER / PARTITION BY query, and in the WHERE clause, specify the first row number:

    SELECT *, ROW_NUMBER OVER (PARTITION BY ticker_symbol ORDER BY price DESC) as row_num
    FROM stock_table)
WHERE row_num = 1;

Best practices

As with any streaming workload, you need both a testing and a monitoring strategy in order to understand how your workloads are progressing.

The following are key areas to monitor:

  • Sources – Ensure that your source stream has enough throughput and that you aren’t receiving ThroughputExceededExceptions in the case of Kinesis, or any sort of high memory or CPU utilization on the source system.
  • Sinks – Like sources, make sure the output of your Flink SQL application doesn’t overwhelm the downstream system. Ensure you’re not receiving any ThroughputExceededExceptions in the case of Kinesis. If this is the case, you should either add shards or more evenly distribute your data. Otherwise, this can cause backpressure on your pipeline.
  • Scaling – Make sure that your data pipeline has enough Kinesis Processing Units when allocating and scaling your Kinesis Data Analytics Studio application. You can enable autoscaling, which is a CPU-based autoscaling feature, or implement a custom autoscaler to scale your application with the influx of data flowing in.
  • Testing – Test things out on a small scale before deploying your new data pipeline on your production scale data. If possible, use real production data to test out your pipeline, or data that mimics the production data to see how your application reacts before deploying it to a production-facing environment.
  • Notebook memory – Because the Zeppelin notebook running your application is limited by the amount of memory available within your browser, don’t emit too many rows to the console—this causes memory in the browser to freeze the notebook. Data and calculations aren’t lost, but the presentation layer becomes unreachable. Instead, try aggregating your data before bringing it to the presentation layer, grabbing a representative sample, or in general limiting the amount of records returned to mitigate the notebook running out of memory.


Within minutes, you can get started querying your data stream and creating data pipelines using Kinesis Data Analytics Studio using Flink SQL. In this post, we discussed many different ways to query your data stream, but there are countless other examples listed in the Apache Flink SQL documentation.

You can take these samples into your own Kinesis Data Analytics Studio notebook to try them on your own streaming data! Be sure to let AWS know your experience with this new feature, and we look forward to seeing users use Kinesis Data Analytics Studio to generate insights from their data.

About the authors

Jeremy Ber has been working in the telemetry data space for the past 5 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. In the past, Jeremy has supported and built systems that stream in terabytes of data per day, and process complex machine learning algorithms in real time. At AWS, he is a Solutions Architect Streaming Specialist supporting both Managed Streaming for Kafka (Amazon MSK) and Amazon Kinesis.

Introducing Amazon Kinesis Data Analytics Studio – Quickly Interact with Streaming Data Using SQL, Python, or Scala

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/introducing-amazon-kinesis-data-analytics-studio-quickly-interact-with-streaming-data-using-sql-python-or-scala/

The best way to get timely insights and react quickly to new information you receive from your business and your applications is to analyze streaming data. This is data that must usually be processed sequentially and incrementally on a record-by-record basis or over sliding time windows, and can be used for a variety of analytics including correlations, aggregations, filtering, and sampling.

To make it easier to analyze streaming data, today we are pleased to introduce Amazon Kinesis Data Analytics Studio.

Now, from the Amazon Kinesis console you can select a Kinesis data stream and with a single click start a Kinesis Data Analytics Studio notebook powered by Apache Zeppelin and Apache Flink to interactively analyze data in the stream. Similarly, you can select a cluster in the Amazon Managed Streaming for Apache Kafka console to start a notebook to analyze data in Apache Kafka streams. You can also start a notebook from the Kinesis Data Analytics Studio console and connect to custom sources.

Architectural diagram.

In the notebook, you can interact with streaming data and get results in seconds using SQL queries and Python or Scala programs. When you are satisfied with your results, with a few clicks you can promote your code to a production stream processing application that runs reliably at scale with no additional development effort.

For new projects, we recommend that you use the new Kinesis Data Analytics Studio over Kinesis Data Analytics for SQL Applications. Kinesis Data Analytics Studio combines ease of use with advanced analytical capabilities, which makes it possible to build sophisticated stream processing applications in minutes. Let’s see how that works in practice.

Using Kinesis Data Analytics Studio to Analyze Streaming Data
I want to get a better understanding of the data sent by some sensors to a Kinesis data stream.

To simulate the workload, I use this random_data_generator.py Python script. You don’t need to know Python to use Kinesis Data Analytics Studio. In fact, I am going to use SQL in the following steps. Also, you can avoid any coding and use the Amazon Kinesis Data Generator user interface (UI) to send test data to Kinesis Data Streams or Kinesis Data Firehose. I am using a Python script to have finer control over the data that is being sent.

import datetime
import json
import random
import boto3

STREAM_NAME = "my-input-stream"

def get_random_data():
    current_temperature = round(10 + random.random() * 170, 2)
    if current_temperature > 160:
        status = "ERROR"
    elif current_temperature > 140 or random.randrange(1, 100) > 80:
        status = random.choice(["WARNING","ERROR"])
        status = "OK"
    return {
        'sensor_id': random.randrange(1, 100),
        'current_temperature': current_temperature,
        'status': status,
        'event_time': datetime.datetime.now().isoformat()

def send_data(stream_name, kinesis_client):
    while True:
        data = get_random_data()
        partition_key = str(data["sensor_id"])

if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis')
    send_data(STREAM_NAME, kinesis_client)

This script sends random records to my Kinesis data stream using JSON syntax. For example:

{'sensor_id': 77, 'current_temperature': 93.11, 'status': 'OK', 'event_time': '2021-05-19T11:20:00.978328'}
{'sensor_id': 47, 'current_temperature': 168.32, 'status': 'ERROR', 'event_time': '2021-05-19T11:20:01.110236'}
{'sensor_id': 9, 'current_temperature': 140.93, 'status': 'WARNING', 'event_time': '2021-05-19T11:20:01.243881'}
{'sensor_id': 27, 'current_temperature': 130.41, 'status': 'OK', 'event_time': '2021-05-19T11:20:01.371191'}

From the Kinesis console, I select a Kinesis data stream (my-input-stream) and choose Process data in real time from the Process drop-down. In this way, the stream is configured as a source for the notebook.

Console screenshot.

Then, in the following dialog box, I create an Apache Flink – Studio notebook.

I enter a name (my-notebook) and a description for the notebook. The AWS Identity and Access Management (IAM) permissions to read from the Kinesis data stream I selected earlier (my-input-stream) are automatically attached to the IAM role assumed by the notebook.

Console screenshot.

I choose Create to open the AWS Glue console and create an empty database. Back in the Kinesis Data Analytics Studio console, I refresh the list and select the new database. It will define the metadata for my sources and destinations. From here, I can also review the default Studio notebook settings. Then, I choose Create Studio notebook.

Console screenshot.

Now that the notebook has been created, I choose Run.

Console screenshot.

When the notebook is running, I choose Open in Apache Zeppelin to get access to the notebook and write code in SQL, Python, or Scala to interact with my streaming data and get insights in real time.

In the notebook, I create a new note and call it Sensors. Then, I create a sensor_data table describing the format of the data in the stream:


CREATE TABLE sensor_data (
    sensor_id INTEGER,
    current_temperature DOUBLE,
    status VARCHAR(6),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
PARTITIONED BY (sensor_id)
    'connector' = 'kinesis',
    'stream' = 'my-input-stream',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'

The first line in the previous command tells to Apache Zeppelin to provide a stream SQL environment (%flink.ssql) for the Apache Flink interpreter. I can also interact with the streaming data using a batch SQL environment (%flink.bsql), or Python (%flink.pyflink) or Scala (%flink) code.

The first part of the CREATE TABLE statement is familiar to anyone who has used SQL with a database. A table is created to store the sensor data in the stream. The WATERMARK option is used to measure progress in the event time, as described in the Event Time and Watermarks section of the Apache Flink documentation.

The second part of the CREATE TABLE statement describes the connector used to receive data in the table (for example, kinesis or kafka), the name of the stream, the AWS Region, the overall data format of the stream (such as json or csv), and the syntax used for timestamps (in this case, ISO 8601). I can also choose the starting position to process the stream, I am using LATEST to read the most recent data first.

When the table is ready, I find it in the AWS Glue Data Catalog database I selected when I created the notebook:

Console screenshot.

Now I can run SQL queries on the sensor_data table and use sliding or tumbling windows to get a better understanding of what is happening with my sensors.

For an overview of the data in the stream, I start with a simple SELECT to get all the content of the sensor_data table:


SELECT * FROM sensor_data;

This time the first line of the command has a parameter (type=update) so that the output of the SELECT, which is more than one row, is continuously updated when new data arrives.

On the terminal of my laptop, I start the random_data_generator.py script:

$ python3 random_data_generator.py

At first I see a table that contains the data as it comes. To get a better understanding, I select a bar graph view. Then, I group the results by status to see their average current_temperature, as shown here:

Notebook screenshot.

As expected by the way I am generating these results, I have different average temperatures depending on the status (OK, WARNING, or ERROR). The higher the temperature, the greater the probability that something is not working correctly with my sensors.

I can run the aggregated query explicitly using a SQL syntax. This time, I want the result computed on a sliding window of 1 minute with results updated every 10 seconds. To do so, I am using the HOP function in the GROUP BY section of the SELECT statement. To add the time to the output of the select, I use the HOP_ROWTIME function. For more information, see how group window aggregations work in the Apache Flink documentation.


SELECT sensor_data.status,
       COUNT(*) AS num,
       AVG(sensor_data.current_temperature) AS avg_current_temperature,
       HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
  FROM sensor_data
 GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status;

This time, I look at the results in table format:

Notebook screenshot.

To send the result of the query to a destination stream, I create a table and connect the table to the stream. First, I need to give permissions to the notebook to write into the stream.

In the Kinesis Data Analytics Studio console, I select my-notebook. Then, in the Studio notebooks details section, I choose Edit IAM permissions. Here, I can configure the sources and destinations used by the notebook and the IAM role permissions are updated automatically.

Console screenshot.

In the Included destinations in IAM policy section, I choose the destination and select my-output-stream. I save changes and wait for the notebook to be updated. I am now ready to use the destination stream.

In the notebook, I create a sensor_state table connected to my-output-stream.


CREATE TABLE sensor_state (
    status VARCHAR(6),
    num INTEGER,
    avg_current_temperature DOUBLE,
    hop_time TIMESTAMP(3)
'connector' = 'kinesis',
'stream' = 'my-output-stream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');

I now use this INSERT INTO statement to continuously insert the result of the select into the sensor_state table.


INSERT INTO sensor_state
SELECT sensor_data.status,
    COUNT(*) AS num,
    AVG(sensor_data.current_temperature) AS avg_current_temperature,
    HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
FROM sensor_data
GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status;

The data is also sent to the destination Kinesis data stream (my-output-stream) so that it can be used by other applications. For example, the data in the destination stream can be used to update a real-time dashboard, or to monitor the behavior of my sensors after a software update.

I am satisfied with the result. I want to deploy this query and its output as a Kinesis Analytics application. To do so, I need to provide an S3 location to store the application executable.

In the configuration section of the console, I edit the Deploy as application configuration settings. There, I choose a destination bucket in the same region and save changes.

Console screenshot.

I wait for the notebook to be ready after the update. Then, I create a SensorsApp note in my notebook and copy the statements that I want to execute as part of the application. The tables have already been created, so I just copy the INSERT INTO statement above.

From the menu at the top right of my notebook, I choose Build SensorsApp and export to Amazon S3 and confirm the application name.

Notebook screenshot.

When the export is ready, I choose Deploy SensorsApp as Kinesis Analytics application in the same menu. After that, I fine-tune the configuration of the application. I set parallelism to 1 because I have only one shard in my input Kinesis data stream and not a lot of traffic. Then, I run the application, without having to write any code.

From the Kinesis Data Analytics applications console, I choose Open Apache Flink dashboard to get more information about the execution of my application.

Apache Flink console screenshot.

Availability and Pricing
You can use Amazon Kinesis Data Analytics Studio today in all AWS Regions where Kinesis Data Analytics is generally available. For more information, see the AWS Regional Services List.

In Kinesis Data Analytics Studio, we run the open-source versions of Apache Zeppelin and Apache Flink, and we contribute changes upstream. For example, we have contributed bug fixes for Apache Zeppelin, and we have contributed to AWS connectors for Apache Flink, such as those for Kinesis Data Streams and Kinesis Data Firehose. Also, we are working with the Apache Flink community to contribute availability improvements, including automatic classification of errors at runtime to understand whether errors are in user code or in application infrastructure.

With Kinesis Data Analytics Studio, you pay based on the average number of Kinesis Processing Units (KPU) per hour, including those used by your running notebooks. One KPU comprises 1 vCPU of compute, 4 GB of memory, and associated networking. You also pay for running application storage and durable application storage. For more information, see the Kinesis Data Analytics pricing page.

Start using Kinesis Data Analytics Studio today to get better insights from your streaming data.
