Tag Archives: Amazon Kinesis

Building event-driven architectures with IoT sensor data

Post Syndicated from Raghavarao Sodabathina original https://aws.amazon.com/blogs/architecture/building-event-driven-architectures-with-iot-sensor-data/

The Internet of Things (IoT) brings sensors, cloud computing, analytics, and people together to improve productivity and efficiency. It empowers customers with the intelligence they need to build new services and business models, improve products and services over time, understand their customers’ needs to provide better services, and improve customer experiences. Business operations become more efficient by making intelligent decisions more quickly and over time develop a data-driven discipline leading to revenue growth and greater operational efficiency.

In this post, we showcase how to build an event-driven architecture by using AWS IoT services and AWS purpose-built data services. We also discuss key considerations and best practices while building event-driven application architectures with IoT sensor data.

Deriving insights from IoT sensor data

Organizations create value by making decisions from their IoT sensor data in near real time. Some common use cases and solutions that fit under event-driven architecture using IoT sensor data include:

  • Medical device data collection for personalized patient health monitoring, adverse event prediction, and avoidance.
  • Industrial IoT use cases to monitor equipment quality and determine actions like adjusting machine settings, using different sources of raw materials, or performing additional worker training to improve the quality of the factory output.
  • Connected vehicle use cases, such as voice interaction, navigation, location-based services, remote vehicle diagnostics, predictive maintenance, media streaming, and vehicle safety, that are based on in-vehicle computing and near real-time predictive analytics in the cloud.
  • Sustainability and waste reduction solutions, which provide access to dashboards, monitoring systems, data collection, and summarization tools that use machine learning (ML) algorithms to meet sustainability goals. Meeting sustainability goals is paramount for customers in the travel and hospitality industries.

Event-driven reference architecture with IoT sensor data

Figure 1 illustrates how to architect an event-driven architecture with IoT sensor data for near real-time predictive analytics and recommendations.

Building event-driven architecture with IoT sensor data

Figure 1. Building event-driven architecture with IoT sensor data

Architecture flow:

  1. Data originates in IoT devices such as medical devices, car sensors, industrial IoT sensors.This telemetry data is collected using AWS IoT Greengrass, an open-source IoT edge runtime and cloud service that helps your devices collect and analyze data closer to where the data is generated.When an event arrives, AWS IoT Greengrass reacts autonomously to local events, filters and aggregates device data, then communicates securely with the cloud and other local devices in your network to send the data.
  2. Event data is ingested into the cloud using edge-to-cloud interface services such as AWS IoT Core, a managed cloud platform that connects, manages, and scales devices easily and securely.AWS IoT Core interacts with cloud applications and other devices. You can also use AWS IoT SiteWise, a managed service that helps you collect, model, analyze, and visualize data from industrial equipment at scale.
  3. AWS IoT Core can directly stream ingested data into Amazon Kinesis Data Streams. The ingested data gets transformed and analyzed in near real time using Amazon Kinesis Data Analytics with Apache Flink and Apache Beam frameworks.Stream data can further be enriched using lookup data hosted in a data warehouse such as Amazon Redshift. Amazon Kinesis Data Analytics can persist SQL results to Amazon Redshift after the customer’s integration and stream aggregation (for example, one minute or five minutes).The results in Amazon Redshift can be used for further downstream business intelligence (BI) reporting services, such as Amazon QuickSight.
  4. Amazon Kinesis Data Analytics can also write to an AWS Lambda function, which can invoke Amazon SageMaker models. Amazon SageMaker is a the most complete, end-to-end service for machine learning.
  5. Once the ML model is trained and deployed in SageMaker, inferences are invoked in a micro batch using AWS Lambda. Inferenced data is sent to Amazon OpenSearch Service to create personalized monitoring dashboards using Amazon OpenSearch Service dashboards.The transformed IoT sensor data can be stored in Amazon DynamoDB. Customers can use AWS AppSync to provide near real-time data queries to API services for downstream applications. These enterprise applications can be mobile apps or business applications to track and monitor the IoT sensor data in near real-time.Amazon Kinesis Data Analytics can write to an Amazon Kinesis Data Firehose stream, which is a fully managed service for delivering near real-time streaming data to destinations like Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon OpenSearch Service, Splunk, and any custom HTTP endpoints or endpoints owned by supported third-party service providers, including Datadog, Dynatrace, LogicMonitor, MongoDB, New Relic, and Sumo Logic.

    In this example, data from Amazon Kinesis Data Analytics is written to Amazon Kinesis Data Firehose, which micro-batch streams data into an Amazon S3 data lake. The Amazon S3 data lake stores telemetry data for future batch analytics.

Key considerations and best practices

Keep the following best practices in mind:

  • Define the business value from IoT sensor data through interactive discovery sessions with various stakeholders within your organization.
  • Identify the type of IoT sensor data you want to collect and analyze for predictive analytics.
  • Choose the right tools for the job, depending upon your business use case and your data consumers. Please refer to step 5 earlier in this post, where different purpose-built data services were used based on user personas.
  • Consider the event-driven architecture as three key components: event producers, event routers, and event consumers. A producer publishes an event to the router, which filters and pushes the events to consumers. Producer and consumer services are decoupled, which allows them to be scaled, updated, and deployed independently.
  • In this architecture, IoT sensors are event producers. Amazon IoT Greengrass, Amazon IoT Core, Amazon Kinesis Data Streams, and Amazon Kinesis Data Analytics work together as the router from which multiple consumers can consume IoT sensor-generated data. These consumers include Amazon S3 data lakes for telemetry data analysis, Amazon OpenSearch Service for personalized dashboards, and Amazon DynamoDB or AWS AppSync for the downstream enterprise application’s consumption.


In this post, we demonstrated how to build an event-driven architecture with IoT sensor data using AWS IoT services and AWS purpose-built data services. You can now build your own event-driven applications using this post with your IoT sensor data and integrate with your business applications as needed.

Further reading

Learn more about Apache Flink and Amazon Kinesis Data Analytics with three new videos

Post Syndicated from Deepthi Mohan original https://aws.amazon.com/blogs/big-data/learn-more-about-apache-flink-and-amazon-kinesis-data-analytics-with-three-new-videos/

Amazon Kinesis Data Analytics is a fully managed service for Apache Flink that reduces the complexity of building, managing, and integrating Apache Flink applications with other AWS services. Apache Flink is an open-source framework and engine for stateful processing of data streams. It’s highly available and scalable, delivering high throughput and low latency for the most demanding stream-processing applications.

In this post, we highlight three new videos for you to learn more about Apache Flink and Kinesis Data Analytics, including open-source contributions to Apache Flink, our learnings from running thousands of Flink jobs on a managed service, and how we use Kinesis Data Analytics and Apache Flink to enable machine learning (ML) in Alexa.

In Introducing the new Async Sink, we present the new Async Sink framework, an open-source contribution to make it easier than ever to build sink connectors for Apache Flink. You can learn about the need for the Async Sink framework and how we built it, followed by a demo of building a new sink to Amazon CloudWatch to deliver CloudWatch metrics, in under 20 minutes! The Async Sink framework bootstraps development of Flink sinks, is compatible with Apache Flink 1.15 and above, and has already seen usage by the community beyond building new sinks to AWS services.

The video Practical learnings from running thousands of Flink jobs shares insight from running Kinesis Data Analytics, a managed service for Apache Flink that runs tens of thousands of Flink jobs. You can learn lessons based on our experience of operating Apache Flink at very large scale, touching on issues such as out-of-memory errors, timeouts, and stability challenges. The video also covers improving application performance with memory tuning and configuration changes and the approaches to automating job health monitoring and management of Flink jobs at scale.

“Alexa, be quiet!” End-to-end near-real time model building and evaluation in Amazon Alexa discusses how Alexa has built an automated end-to-end solution for incremental model building or fine-tuning ML models through continuous learning, continual learning, or semi-supervised active learning. Alexa uses Apache Flink to transform and discover metrics in real time. In this video, you learn about how Alexa scales infrastructure to meet the needs of ML teams across Alexa, and explore specific use cases that use Apache Flink and Kinesis Data Analytics to improve Alexa experiences to delight customers.

To learn more about Kinesis Data Analytics for Apache Flink, visit our product page.

About the author

Deepthi Mohan is a Principal Product Manager on the Kinesis Data Analytics team.

Enrich VPC Flow Logs with resource tags and deliver data to Amazon S3 using Amazon Kinesis Data Firehose

Post Syndicated from Chaitanya Shah original https://aws.amazon.com/blogs/big-data/enrich-vpc-flow-logs-with-resource-tags-and-deliver-data-to-amazon-s3-using-amazon-kinesis-data-firehose/

VPC Flow Logs is an AWS feature that captures information about the network traffic flows going to and from network interfaces in Amazon Virtual Private Cloud (Amazon VPC). Visibility to the network traffic flows of your application can help you troubleshoot connectivity issues, architect your application and network for improved performance, and improve security of your application.

Each VPC flow log record contains the source and destination IP address fields for the traffic flows. The records also contain the Amazon Elastic Compute Cloud (Amazon EC2) instance ID that generated the traffic flow, which makes it easier to identify the EC2 instance and its associated VPC, subnet, and Availability Zone from where the traffic originated. However, when you have a large number of EC2 instances running in your environment, it may not be obvious where the traffic is coming from or going to simply based on the EC2 instance IDs or IP addresses contained in the VPC flow log records.

By enriching flow log records with additional metadata such as resource tags associated with the source and destination resources, you can more easily understand and analyze traffic patterns in your environment. For example, customers often tag their resources with resource names and project names. By enriching flow log records with resource tags, you can easily query and view flow log records based on an EC2 instance name, or identify all traffic for a certain project.

In addition, you can add resource context and metadata about the destination resource such as the destination EC2 instance ID and its associated VPC, subnet, and Availability Zone based on the destination IP in the flow logs. This way, you can easily query your flow logs to identify traffic crossing Availability Zones or VPCs.

In this post, you will learn how to enrich flow logs with tags associated with resources from VPC flow logs in a completely serverless model using Amazon Kinesis Data Firehose and the recently launched Amazon VPC IP Address Manager (IPAM), and also analyze and visualize the flow logs using Amazon Athena and Amazon QuickSight.

Solution overview

In this solution, you enable VPC flow logs and stream them to Kinesis Data Firehose. This solution enriches log records using an AWS Lambda function on Kinesis Data Firehose in a completely serverless manner. The Lambda function fetches resource tags for the instance ID. It also looks up the destination resource from the destination IP using the Amazon EC2 API and IPAM, and adds the associated VPC network context and metadata for the destination resource. It then stores the enriched log records in an Amazon Simple Storage Service (Amazon S3) bucket. After you have enriched your flow logs, you can query, view, and analyze them in a wide variety of services, such as AWS Glue, Athena, QuickSight, Amazon OpenSearch Service, as well as solutions from the AWS Partner Network such as Splunk and Datadog.

The following diagram illustrates the solution architecture.


The workflow contains the following steps:

  1. Amazon VPC sends the VPC flow logs to the Kinesis Data Firehose delivery stream.
  2. The delivery stream uses a Lambda function to fetch resource tags for instance IDs from the flow log record and add it to the record. You can also fetch tags for the source and destination IP address and enrich the flow log record.
  3. When the Lambda function finishes processing all the records from the Kinesis Data Firehose buffer with enriched information like resource tags, Kinesis Data Firehose stores the result file in the destination S3 bucket. Any failed records that Kinesis Data Firehose couldn’t process are stored in the destination S3 bucket under the prefix you specify during delivery stream setup.
  4. All the logs for the delivery stream and Lambda function are stored in Amazon CloudWatch log groups.


As a prerequisite, you need to create the target S3 bucket before creating the Kinesis Data Firehose delivery stream.

If using a Windows computer, you need PowerShell; if using a Mac, you need Terminal to run AWS Command Line Interface (AWS CLI) commands. To install the latest version of the AWS CLI, refer to Installing or updating the latest version of the AWS CLI.

Create a Lambda function

You can download the Lambda function code from the GitHub repo used in this solution. The example in this post assumes you are enabling all the available fields in the VPC flow logs. You can use it as is or customize per your needs. For example, if you intend to use the default fields when enabling the VPC flow logs, you need to modify the Lambda function with the respective fields. Creating this function creates an AWS Identity and Access Management (IAM) Lambda execution role.

To create your Lambda function, complete the following steps:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose Create function.
  3. Select Author from scratch.
  4. For Function name, enter a name.
  5. For Runtime, choose Python 3.8.
  6. For Architecture, select x86_64.
  7. For Execution role, select Create a new role with basic Lambda permissions.
  8. Choose Create function.

Create Lambda Function

You can then see code source page, as shown in the following screenshot, with the default code in the lambda_function.py file.

  1. Delete the default code and enter the code from the GitHub Lambda function aws-vpc-flowlogs-enricher.py.
  2. Choose Deploy.

VPC Flow Logs Enricher function

To enrich the flow logs with additional tag information, you need to create an additional IAM policy to give Lambda permission to describe tags on resources from the VPC flow logs.

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. On the JSON tab, enter the JSON code as shown in the following screenshot.

This policy gives the Lambda function permission to retrieve tags for the source and destination IP and retrieve the VPC ID, subnet ID, and other relevant metadata for the destination IP from your VPC flow log record.

  1. Choose Next: Tags.


  1. Add any tags and choose Next: Review.

  1. For Name, enter vpcfl-describe-tag-policy.
  2. For Description, enter a description.
  3. Choose Create policy.

Create IAM Policy

  1. Navigate to the previously created Lambda function and choose Permissions in the navigation pane.
  2. Choose the role that was created by Lambda function.

A page opens in a new tab.

  1. On the Add permissions menu, choose Attach policies.

Add Permissions

  1. Search for the vpcfl-describe-tag-policy you just created.
  2. Select the vpcfl-describe-tag-policy and choose Attach policies.

Create the Kinesis Data Firehose delivery stream

To create your delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. For Source, choose Direct PUT.
  3. For Destination, choose Amazon S3.

Kinesis Firehose Stream Source and Destination

After you choose Amazon S3 for Destination, the Transform and convert records section appears.

  1. For Data transformation, select Enable.
  2. Browse and choose the Lambda function you created earlier.
  3. You can customize the buffer size as needed.

This impacts on how many records the delivery stream will buffer before it flushes it to Amazon S3.

  1. You can also customize the buffer interval as needed.

This impacts how long (in seconds) the delivery stream will buffer the incoming records from the VPC.

  1. Optionally, you can enable Record format conversion.

If you want to query from Athena, it’s recommended to convert it to Apache Parquet or ORC and compress the files with available compression algorithms, such as gzip and snappy. For more performance tips, refer to Top 10 Performance Tuning Tips for Amazon Athena. In this post, record format conversion is disabled.

Transform and Conver records

  1. For S3 bucket, choose Browse and choose the S3 bucket you created as a prerequisite to store the flow logs.
  2. Optionally, you can specify the S3 bucket prefix. The following expression creates a Hive-style partition for year, month, and day:


  1. Optionally, you can enable dynamic partitioning.

Dynamic partitioning enables you to create targeted datasets by partitioning streaming S3 data based on partitioning keys. The right partitioning can help you to save costs related to the amount of data that is scanned by analytics services like Athena. For more information, see Kinesis Data Firehose now supports dynamic partitioning to Amazon S3.

Note that you can enable dynamic partitioning only when you create a new delivery stream. You can’t enable dynamic partitioning for an existing delivery stream.

Destination Settings

  1. Expand Buffer hints, compression and encryption.
  2. Set the buffer size to 128 and buffer interval to 900 for best performance.
  3. For Compression for data records, select GZIP.

S3 Buffer settings

Create a VPC flow log subscription

Now you create a VPC flow log subscription for the Kinesis Data Firehose delivery stream you created.

Navigate to AWS CloudShell or Terminal/PowerShell for a Mac or Windows computer and run the following AWS CLI command to enable the subscription. Provide your VPC ID for the parameter --resource-ids and delivery stream ARN for the parameter --log-destination.

aws ec2 create-flow-logs \ 
--resource-type VPC \ 
--resource-ids vpc-0000012345f123400d \ 
--traffic-type ALL \ 
--log-destination-type kinesis-data-firehose \ 
--log-destination arn:aws:firehose:us-east-1:123456789101:deliverystream/PUT-Kinesis-Demo-Stream \ 
--max-aggregation-interval 60 \ 
--log-format '${account-id} ${action} ${az-id} ${bytes} ${dstaddr} ${dstport} ${end} ${flow-direction} ${instance-id} ${interface-id} ${log-status} ${packets} ${pkt-dst-aws-service} ${pkt-dstaddr} ${pkt-src-aws-service} ${pkt-srcaddr} ${protocol} ${region} ${srcaddr} ${srcport} ${start} ${sublocation-id} ${sublocation-type} ${subnet-id} ${tcp-flags} ${traffic-path} ${type} ${version} ${vpc-id}'

If you’re running CloudShell for the first time, it will take a few seconds to prepare the environment to run.

After you successfully enable the subscription for your VPC flow logs, it takes a few minutes depending on the intervals mentioned in the setup to create the log record files in the destination S3 folder.

To view those files, navigate to the Amazon S3 console and choose the bucket storing the flow logs. You should see the compressed interval logs, as shown in the following screenshot.

S3 destination bucket

You can download any file from the destination S3 bucket on your computer. Then extract the gzip file and view it in your favorite text editor.

The following is a sample enriched flow log record, with the new fields in bold providing added context and metadata of the source and destination IP addresses:

{'account-id': '123456789101',
 'action': 'ACCEPT',
 'az-id': 'use1-az2',
 'bytes': '7251',
 'dstaddr': '',
 'dstport': '52942',
 'end': '1661285182',
 'flow-direction': 'ingress',
 'instance-id': 'i-123456789',
 'interface-id': 'eni-0123a456b789d',
 'log-status': 'OK',
 'packets': '25',
 'pkt-dst-aws-service': '-',
 'pkt-dstaddr': '',
 'pkt-src-aws-service': 'AMAZON',
 'pkt-srcaddr': '',
 'protocol': '6',
 'region': 'us-east-1',
 'srcaddr': '',
 'srcport': '443',
 'start': '1661285124',
 'sublocation-id': '-',
 'sublocation-type': '-',
 'subnet-id': 'subnet-01eb23eb4fe5c6bd7',
 'tcp-flags': '19',
 'traffic-path': '-',
 'type': 'IPv4',
 'version': '5',
 'vpc-id': 'vpc-0123a456b789d',
 'src-tag-Name': 'test-traffic-ec2-1', 'src-tag-project': ‘Log Analytics’, 'src-tag-team': 'Engineering', 'dst-tag-Name': 'test-traffic-ec2-1', 'dst-tag-project': ‘Log Analytics’, 'dst-tag-team': 'Engineering', 'dst-vpc-id': 'vpc-0bf974690f763100d', 'dst-az-id': 'us-east-1a', 'dst-subnet-id': 'subnet-01eb23eb4fe5c6bd7', 'dst-interface-id': 'eni-01eb23eb4fe5c6bd7', 'dst-instance-id': 'i-06be6f86af0353293'}

Create an Athena database and AWS Glue crawler

Now that you have enriched the VPC flow logs and stored them in Amazon S3, the next step is to create the Athena database and table to query the data. You first create an AWS Glue crawler to infer the schema from the log files in Amazon S3.

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.

Glue Crawler

  1. For Name¸ enter a name for the crawler.
  2. For Description, enter an optional description.
  3. Choose Next.

Glue Crawler properties

  1. Choose Add a data source.
  2. For Data source¸ choose S3.
  3. For S3 path, provide the path of the flow logs bucket.
  4. Select Crawl all sub-folders.
  5. Choose Add an S3 data source.

Add Data source

  1. Choose Next.

Data source classifiers

  1. Choose Create new IAM role.
  2. Enter a role name.
  3. Choose Next.

Configure security settings

  1. Choose Add database.
  2. For Name, enter a database name.
  3. For Description, enter an optional description.
  4. Choose Create database.

Create Database

  1. On the previous tab for the AWS Glue crawler setup, for Target database, choose the newly created database.
  2. Choose Next.

Set output and scheduling

  1. Review the configuration and choose Create crawler.

Create crawler

  1. On the Crawlers page, select the crawler you created and choose Run.

Run crawler

You can rerun this crawler when new tags are added to your AWS resources, so that they’re available for you to query from the Athena database.

Run Athena queries

Now you’re ready to query the enriched VPC flow logs from Athena.

  1. On the Athena console, open the query editor.
  2. For Database, choose the database you created.
  3. Enter the query as shown in the following screenshot and choose Run.

Athena query

The following code shows some of the sample queries you can run:

Select * from awslogs where "dst-az-id"='us-east-1a'
Select * from awslogs where "src-tag-project"='Log Analytics' or "dst-tag-team"='Engineering' 
Select "srcaddr", "srcport", "dstaddr", "dstport", "region", "az-id", "dst-az-id", "flow-direction" from awslogs where "az-id"='use1-az2' and "dst-az-id"='us-east-1a'

The following screenshot shows an example query result of the source Availability Zone to the destination Availability Zone traffic.

Athena query result

You can also visualize various charts for the flow logs stored in the S3 bucket via QuickSight. For more information, refer to Analyzing VPC Flow Logs using Amazon Athena, and Amazon QuickSight.


For pricing details, refer to Amazon Kinesis Data Firehose pricing.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the Kinesis Data Firehose delivery stream and associated IAM role and policies.
  2. Delete the target S3 bucket.
  3. Delete the VPC flow log subscription.
  4. Delete the Lambda function and associated IAM role and policy.


This post provided a complete serverless solution architecture for enriching VPC flow log records with additional information like resource tags using a Kinesis Data Firehose delivery stream and Lambda function to process logs to enrich with metadata and store in a target S3 file. This solution can help you query, analyze, and visualize VPC flow logs with relevant application metadata because resource tags have been assigned to resources that are available in the logs. This meaningful information associated with each log record wherever the tags are available makes it easy to associate log information to your application.

We encourage you to follow the steps provided in this post to create a delivery stream, integrate with your VPC flow logs, and create a Lambda function to enrich the flow log records with additional metadata to more easily understand and analyze traffic patterns in your environment.

About the Authors

Chaitanya Shah is a Sr. Technical Account Manager with AWS, based out of New York. He has over 22 years of experience working with enterprise customers. He loves to code and actively contributes to AWS solutions labs to help customers solve complex problems. He provides guidance to AWS customers on best practices for their AWS Cloud migrations. He is also specialized in AWS data transfer and in the data and analytics domain.

Vaibhav Katkade is a Senior Product Manager in the Amazon VPC team. He is interested in areas of network security and cloud networking operations. Outside of work, he enjoys cooking and the outdoors.

Ingest VPC flow logs into Splunk using Amazon Kinesis Data Firehose

Post Syndicated from Ranjit Kalidasan original https://aws.amazon.com/blogs/big-data/ingest-vpc-flow-logs-into-splunk-using-amazon-kinesis-data-firehose/

In September 2017, during the annual Splunk.conf, Splunk and AWS jointly announced Amazon Kinesis Data Firehose integration to support Splunk Enterprise and Splunk Cloud as a delivery destination. This native integration between Splunk Enterprise, Splunk Cloud, and Kinesis Data Firehose is designed to make AWS data ingestion setup seamless, while offering a secure and fault-tolerant delivery mechanism. We want to enable you to monitor and analyze machine data from any source and use it to deliver operational intelligence and optimize IT, security, and business performance.

With Kinesis Data Firehose, you can use a fully managed, reliable, and scalable data streaming solution to Splunk. In September 2022, AWS announced a new Amazon Virtual Private Cloud (Amazon VPC) feature that enables you to create VPC flow logs to send the flow log data directly into Kinesis Data Firehose as a destination. Previously, you could send VPC flow logs to either Amazon CloudWatch Logs or Amazon Simple Storage Service (Amazon S3) before it was ingested by other AWS or Partner tools. In this post, we show you how to use this feature to set up VPC flow logs for ingesting into Splunk using Kinesis Data Firehose.

Overview of solution

We deploy the following architecture to ingest data into Splunk.

We create a VPC flow log in an existing VPC to send the flow log data to a Kinesis Data Firehose delivery stream. This delivery stream has an AWS Lambda function enabled for data transformation and has destination settings to point to the Splunk endpoint along with an HTTP Event Collector (HEC) token.


Before you begin, ensure that you have the following prerequisites:

  • AWS account – If you don’t have an AWS account, you can create one. For more information, see Setting Up for Amazon Kinesis Data Firehose.
  • Splunk AWS Add-on – Ensure you install the Splunk AWS Add-on app from Splunkbase in your Splunk deployment. This app provides the required source types and event types mapping to AWS machine data.
  • HEC token – In your Splunk deployment, set up an HEC token with the source type aws:cloudwatchlogs:vpcflow.

Create the transformation Lambda function

Integrating VPC flow logs with Kinesis Data Firehose requires a Lambda function to transform the flow log records. The data that VPC flow logs sends to the delivery stream is encoded as JSON records. However, Splunk expects this as raw flow log data. Therefore, when you create the delivery stream, you enable data transformation and configure a Lambda function to transform the flow log data to raw format. Kinesis Data Firehose then sends the data in raw format to Splunk.

You can deploy this transformation Lambda function as a serverless application from the Lambda serverless app repository on the Lambda console. The name of this application is splunk-firehose-flowlogs-processor.

After it’s deployed, you can see a Lambda function and an AWS Identity and Access Management (IAM) role getting deployed on the console. Note the physical ID of the Lambda function; you use this when you create the Firehose delivery stream in the next step.

Create a Kinesis Data Firehose delivery stream

In this step, you create a Kinesis Data Firehose delivery stream to receive the VPC flow log data and deliver that data to Splunk.

  1. On the Kinesis Data Firehose console, create a new delivery stream.
  2. For Source, choose Direct PUT.
  3. For Destination, choose Splunk.
  4. For Delivery stream name, enter a name (for example, VPCtoSplunkStream).
  5. In the Transform records section, for Data transformation, select Enabled.
  6. For AWS Lambda function, choose Browse.
  7. Select the function you created earlier by looking for the physical ID.
  8. Choose Choose.
  9. In the Destination settings section, for Splunk cluster endpoint, enter your endpoint.If you’re using a Splunk Cloud endpoint, refer to Configure Amazon Kinesis Firehose to send data to the Splunk platform for different Splunk cluster endpoint values.
  10. For Splunk endpoint type, select Raw endpoint.
  11. For Authentication token, enter the value of your Splunk HEC that you created as a prerequisite.
  12. In the Backup settings section, for Source record backup in Amazon S3, select Failed events only so you only save the data that fails to be ingested into Splunk.
  13. For S3 backup bucket, enter the path to an S3 bucket.
  14. Complete creating your delivery stream.

The creation process may take a few minutes to complete.

Create a VPC flow log

In this final step, you create a VPC flow log with Kinesis Data Firehose as destination type.

  1. On the Amazon VPC console, choose Your VPCs.
  2. Select the VPC for which to create the flow log.
  3. On the Actions menu, choose Create flow log.
  4. Provide the required settings for Filter:
    1. If you want to filter the flow logs, select Accept traffic or Reject traffic.
    2. Select All if you need all the information sent to Splunk.
  5. For Maximum aggregation interval, select a suitable interval for your use case.Select the minimum setting of 1 minute interval if you need the flow log data to be available for near-real-time analysis in Splunk.
  6. For Destination, select Send to Kinesis Firehose in the same account if the delivery stream is set up on the same account where you create the VPC flow logs.If you want to send the data to a different account, refer to Publish flow logs to Kinesis Data Firehose.
  7. For Log record format, if you leave it at AWS default format, the flow logs are sent as version 2 format. Alternatively, you can specify which fields you need to be captured and sent to Splunk.For more information on log format and available fields, refer to Flow log records.
  8. Review all the parameters and create the flow log.Within a few minutes, you should be able to see the data in Splunk.
  9. Open your Splunk console and navigate to the Search tab of the Search & Reporting app.
  10. Run the following SPL query to look at sample VPC flow log records:
    index=<index name> sourcetype="aws:cloudwatchlogs:vpcflow"

Clean up

To avoid incurring future charges, delete the resources you created in the following order:

  1. Delete the VPC flow log.
  2. Delete the Kinesis Data Firehose delivery stream.
  3. Delete the serverless application to delete the transformation Lambda function.
  4. If you created a new VPC and new resources in the VPC, then delete the resources and VPC.


You can use VPC flow log data in multiple Splunk solutions, like the Splunk App for AWS Security Dashboards for traffic analysis or Splunk Security Essentials, which uses the data to provide deeper insights into the security posture of your AWS environment. Using Kinesis Data Firehose to send VPC flow log data into Splunk provides many benefits. This managed service can automatically scale to meet the data demand and provide near-real-time data analysis. Try out this new quick and hassle-free way of sending your VPC flow logs to Splunk Enterprise or Splunk Cloud Platform using Kinesis Data Firehose.

You can deploy this solution today on your AWS account by following the Kinesis Data Firehose Immersion Day Lab for Splunk

About the authors

Ranjit Kalidasan is a Senior Solutions Architect with Amazon Web Services based in Boston, Massachusetts. He is Partner Solutions Architect helping security ISV partners to co-build and co-market solutions with AWS. He brings over 20 years of experience in Information technology helping global customers implement complex solutions for Security & Analytics. You can connect with Ranjit in Linkedin.

How Launchmetrics improves fashion brands performance using Amazon EC2 Spot Instances

Post Syndicated from Ivo Pinto original https://aws.amazon.com/blogs/architecture/how-launchmetrics-improves-fashion-brands-performance-using-amazon-ec2-spot-instances/

Launchmetrics offers its Brand Performance Cloud tools and intelligence to help fashion, luxury, and beauty retail executives optimize their global strategy. Launchmetrics initially operated their whole infrastructure on-premises; however, they wanted to scale their data ingestion while simultaneously providing improved and faster insights for their clients. These business needs led them to build their architecture in AWS cloud.

In this blog post, we explain how Launchmetrics’ uses Amazon Web Services (AWS) to crawl the web for online social and print media. Using the data gathered, Launchmetrics is able to provide prescriptive analytics and insights to their clients. As a result, clients can understand their brand’s momentum and interact with their audience, successfully launching their products.

Architecture overview

Launchmetrics’ platform architecture is represented in Figure 1 and composed of three tiers:

  1. Crawl
  2. Data Persistence
  3. Processing
Launchmetrics backend architecture

Figure 1. Launchmetrics backend architecture

The Crawl tier is composed of several Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances launched via Auto Scaling groups. Spot Instances take advantage of unused Amazon EC2 capacity at a discounted rate compared with On-Demand Instances, which are compute instances that are billed per-hour or -second with no long-term commitments. Launchmetrics heavily leverages Spot Instances. The Crawl tier is responsible for retrieving, processing, and storing data from several media sources (represented in Figure 1 with the number 1).

The Data Persistence tier consists of two components: Amazon Kinesis Data Streams and Amazon Simple Queue Service (Amazon SQS). Kinesis Data Streams stores data that the Crawl tier collects, while Amazon SQS stores the metadata of the whole process. In this context, metadata helps Launchmetrics gain insight into when the data is collected and if it has started processing. This is key information if a Spot Instance is interrupted, which we will dive deeper into later.

The third tier, Processing, also makes use of Spot Instances and is responsible for pulling data from the Data Persistence tier (represented in Figure 1 with the number 2). It then applies proprietary algorithms, both analytics and machine learning models, to create consumer insights. These insights are stored in a data layer (not depicted) that consists of an Amazon Aurora cluster and an Amazon OpenSearch Service cluster.

By having this separation of tiers, Launchmetrics is able to use a decoupled architecture, where each component can scale independently and is more reliable. Both the Crawl and the Data Processing tiers use Spot Instances for up to 90% of their capacity.

Data processing using EC2 Spot Instances

When Launchmetrics decided to migrate their workloads to the AWS cloud, Spot Instances were one of the main drivers. As Spot Instances offer large discounts without commitment, Launchmetrics was able to track more than 1200 brands, translating to 1+ billion end users. Daily, this represents tracking upwards of 500k influencer profiles, 8 million documents, and around 70 million social media comments.

Aside from the cost-savings with Spot Instances, Launchmetrics incurred collateral benefits in terms of architecture design: building stateless, decoupled, elastic, and fault-tolerant applications. In turn, their stack architecture became more loosely coupled, as well.

All Launchmetrics Auto Scaling groups have the following configuration:

  • Spot allocation strategy: cost-optimized
  • Capacity rebalance: true
  • Three availability zones
  • A diversified list of instance types

By using Auto Scaling groups, Launchmetrics is able to scale worker instances depending on how many items they have in the SQS queue, increasing the instance efficiency. Data processing workloads like the ones Launchmetrics’ platform have, are an exemplary use of multiple instance types, such as M5, M5a, C5, and C5a. When adopting Spot Instances, Launchmetrics considered other instance types to have access to spare capacity. As a result, Launchmetrics found out that workload’s performance improved, as they use instances with more resources at a lower cost.

By decoupling their data processing workload using SQS queues, processes are stopped when an interruption arrives. As the Auto Scaling group launches a replacement Spot Instance, clients are not impacted and data is not lost. All processes go through a data checkpoint, where a new Spot Instance resumes processing any pending data. Spot Instances have resulted in a reduction of up to 75% of related operational costs.

To increase confidence in their ability to deal with Spot interruptions and service disruptions, Launchmetrics is exploring using AWS Fault Injection Simulator to simulate faults on their architecture, like a Spot interruption. Learn more about how this service works on the AWS Fault Injection Simulator now supports Spot Interruptions launch page.

Reporting data insights

After processing data from different media sources, AWS aided Launchmetrics in producing higher quality data insights, faster: the previous on-premises architecture had a time range of 5-6 minutes to run, whereas the AWS-driven architecture takes less than 1 minute.

This is made possible by elasticity and availability compute capacity that Amazon EC2 provides compared with an on-premises static fleet. Furthermore, offloading some management and operational tasks to AWS by using AWS managed services, such as Amazon Aurora or Amazon OpenSearch Service, Launchmetrics can focus on their core business and improve proprietary solutions rather than use that time in undifferentiated activities.

Building continuous delivery pipelines

Let’s discuss how Launchmetrics makes changes to their software with so many components.

Both of their computing tiers, Crawl and Processing, consist of standalone EC2 instances launched via Auto Scaling groups and EC2 instances that are part of an Amazon Elastic Container Service (Amazon ECS) cluster. Currently, 70% of Launchmetrics workloads are still running with Auto Scaling groups, while 30% are containerized and run on Amazon ECS. This is important because for each of these workload groups, the deployment process is different.

For workloads that run on Auto Scaling groups, they use an AWS CodePipeline to orchestrate the whole process, which includes:

I.  Creating a new Amazon Machine Image (AMI) using AWS CodeBuild
II. Deploying the newly built AMI using Terraform in CodeBuild

For containerized workloads that run on Amazon ECS, Launchmetrics also uses a CodePipeline to orchestrate the process by:

III. Creating a new container image, and storing it in Amazon Elastic Container Registry
IV. Changing the container image in the task definition, and updating the Amazon ECS service using CodeBuild


In this blog post, we explored how Launchmetrics is using EC2 Spot Instances to reduce costs while producing high-quality data insights for their clients. We also demonstrated how decoupling an architecture is important for handling interruptions and why following Spot Instance best practices can grant access to more spare capacity.

Using this architecture, Launchmetrics produced faster, data-driven insights for their clients and increased their capacity to innovate. They are continuing to containerize their applications and are projected to have 100% of their workloads running on Amazon ECS with Spot Instances by the end of 2023.

To learn more about handling EC2 Spot Instance interruptions, visit the AWS Best practices for handling EC2 Spot Instance interruptions blog post. Likewise, if you are interested in learning more about AWS Fault Injection Simulator and how it can benefit your architecture, read Increase your e-commerce website reliability using chaos engineering and AWS Fault Injection Simulator.

Analyze logs with Dynatrace Davis AI Engine using Amazon Kinesis Data Firehose HTTP endpoint delivery

Post Syndicated from Erick Leon original https://aws.amazon.com/blogs/big-data/analyze-logs-with-dynatrace-davis-ai-engine-using-amazon-kinesis-data-firehose-http-endpoint-delivery/

This blog post is co-authored with Erick Leon, Sr. Technical Alliance Manager from Dynatrace.

Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores, and analytics services. With just a few clicks, you can create fully-managed delivery streams that auto scale on demand to match the throughput of your data. Customers already use Kinesis Data Firehose to ingest raw data from various data sources, including logs from AWS services. Kinesis Data Firehose now supports delivering streaming data to Dynatrace. Dynatrace begins analyzing incoming data within minutes of Amazon CloudWatch data generation.

Starting today, you can use Kinesis Data Firehose to send CloudWatch Metrics and Logs directly to the Dynatrace observability platform to perform your explorations and analysis. Dynatrace, an AWS Partner Network (APN) has provided full observability into AWS Services by ingesting CloudWatch metrics that are published by AWS services. Dynatrace ingests this data to perform root-cause analysis using the Dynatrace Davis AI engine.

In this post, we describe the Kinesis Data Firehose and related Dynatrace integration.


For this walkthrough, you should have the following prerequisites:

  • AWS account.
  • Access to the CloudWatch and Kinesis Data Firehose with permissions to manage HTTP endpoints.
  • Dynatrace Intelligent Observability Platform account, or get a free 15 day trial here.
  • Dynatrace version 1.182+.
  • An updated AWS monitoring policy to include the additional AWS services.
    To update the AWS Identity and Access Management (IAM) policy, use the JSON in the link above, containing the monitoring policy (permissions) for all supporting services.
  • Dynatrace API token: create token with the following permission and keep readily available in a notepad.
Dynatrace API Token

Figure 1 – Dynatrace API Token

How it works

Amazon Kinesis Data Firehose HTTP endpoint delivery

Figure 2 – Amazon Kinesis Data Firehose HTTP endpoint delivery

Simply create a log stream for your Amazon services to deliver your context rich logs to the Amazon CloudWatch Logs service. Next, select your Dynatrace HTTP endpoint to enhance your logs streams with the power of the Dynatrace Intelligence Platform. Finally, you can also back up your logs to an Amazon Simple Storage Service (Amazon S3) bucket.

Setup instructions

To add a service to monitoring, follow these steps:

  1. In the Dynatrace menu, go to Settings > Cloud and virtualization, and select AWS.
  2. On the AWS overview page, scroll down and select the desired AWS instance. Select the Edit button.
  3. Scroll down and select Add service. Choose the service name from the drop-down, and select Add service.
  4. Select Save changes.

To process and deliver AWS CloudWatch Metrics to Dynatrace, follow these steps.

  1. Log in to the AWS console and type “Kinesis” in the text search bar. Select Kinesis
AWS Console

Figure 3 – AWS Console

  1. On the Amazon Kinesis services page, select the radio button for Kinesis Data Firehose and select the Create delivery stream button.
Amazon Kinesis

Figure 4 – Amazon Kinesis

  1. Choose the “Direct PUT” from the drop down, and from Destination drop down, choose “Dynatrace”.
Amazon Kinesis Data Firehose

Figure 5 – Amazon Kinesis Data Firehose

  1. Delivery stream name – Give your stream a new name, for example: – “KFH-StreamToDynatrace”

Figure 6 – Delivery stream name

  1. In the section “Destination settings”:
Destination settings

Figure 7 – Destination settings

  1. HTTP endpoint name – “Dynatrace”.
  2. HTTP endpoint URL – From the drop down, select “Dynatrace – US”.
  3. API token – Enter Dynatrace API TOKEN created in the prerequisite section.
  4. API URL – enter the Dynatrace URL for your tenant, for example: https://xxxxx.live.dynatrace.com
  5. Back Up Settings – Either select an existing S3 bucket or create a new one and add details and select the Create delivery stream button.
Backup settings

Figure 8 – Backup settings

Once successful, your AWS Console will look like the following:

Amazon Kinesis Data Firehose

Figure 9 – Amazon Kinesis Data Firehose

The Dynatrace Experience

Once the initial setups are completed in both Dynatrace and the AWS Console, follow these steps to visualize your new KHF stream data in the Dynatrace console.

  1. Log in to the Dynatrace Console, and on the left side menu expand the “infrastructure” section, and select “AWS
  2. From the screen, select the AWS account that you want to add the KFH stream to.
  3. Next, you’ll see a virtualization of your AWS assets for the account selected. Select the box marked “Supporting Services”.
  4. Next, press the “Configure services” button.
  5. Next, select “Add service”.
  6. From the drop down, select “Kinesis Data Firehose”.
  7. Next, select the “Add metric” button, and select the metrics that you want to see for this stream. Dynatrace has a comprehensive list of metrics that can be selected from the UI. The list can be found in this link.


  1. After configuration, load to the new KFH stream no data in the Dynatrace Console.
    1. Check the Error Logs tab check to make sure that the Destination URL is correct for the Dynatrace Tenant.
Destination error logs

Figure 10 – Destination error logs

  1. Invalid or misconfigured Dynatrace API token or scope isn’t properly set.
Destination error logs

Figure 11 – Destination error logs


In this post, we demonstrate the Kinesis Data Firehose and related Dynatrace integration. In addition, engineers can use CloudWatch Metrics to explore their production systems alongside events in Dynatrace. This provides a seamless, current view of your system (from logs to events and traces) in a single data store.

To learn more about CloudWatch Service, see the Amazon CloudWatch home page. If you have any questions, post them on the AWS CloudWatch service forum.

If you haven’t yet signed up for Dynatrace, then you can try out Kinesis Data Firehose with Dynatrace with a free Dynatrace trial.

About the Authors

Erick Leon is a Technical Alliances Sr. Manager at Dynatrace, Observability Practice Architect, and Customer Advocate. He promotes strong technical integrations with a focus on AWS. With over 15 years as a Dynatrace customer, his real-world experiences and lessons learned bring valuable insights into the Dynatrace Intelligent Observability Platform.

Shashiraj Jeripotula (Raj) is a San Francisco-based Sr. Partner Solutions Architect at AWS. He works with various independent software vendors (ISVs), and partners who specialize in cloud management tools and DevOps to develop joint solutions and accelerate cloud adoption on AWS.

Stream change data to Amazon Kinesis Data Streams with AWS DMS

Post Syndicated from Sukhomoy Basak original https://aws.amazon.com/blogs/big-data/stream-change-data-to-amazon-kinesis-data-streams-with-aws-dms/

In this post, we discuss how to use AWS Database Migration Service (AWS DMS) native change data capture (CDC) capabilities to stream changes into Amazon Kinesis Data Streams.

AWS DMS is a cloud service that makes it easy to migrate relational databases, data warehouses, NoSQL databases, and other types of data stores. You can use AWS DMS to migrate your data into the AWS Cloud or between combinations of cloud and on-premises setups. AWS DMS also helps you replicate ongoing changes to keep sources and targets in sync.

CDC refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations.

Kinesis Data Streams is a fully managed streaming data service. You can continuously add various types of data such as clickstreams, application logs, and social media to a Kinesis stream from hundreds of thousands of sources. Within seconds, the data will be available for your Kinesis applications to read and process from the stream.

AWS DMS can do both replication and migration. Kinesis Data Streams is most valuable in the replication use case because it lets you react to replicated data changes in other integrated AWS systems.

This post is an update to the post Use the AWS Database Migration Service to Stream Change Data to Amazon Kinesis Data Streams. This new post includes steps required to configure AWS DMS and Kinesis Data Streams for a CDC use case. With Kinesis Data Streams as a target for AWS DMS, we make it easier for you to stream, analyze, and store CDC data. AWS DMS uses best practices to automatically collect changes from a data store and stream them to Kinesis Data Streams.

With the addition of Kinesis Data Streams as a target, we’re helping customers build data lakes and perform real-time processing on change data from your data stores. You can use AWS DMS in your data integration pipelines to replicate data in near-real time directly into Kinesis Data Streams. With this approach, you can build a decoupled and eventually consistent view of your database without having to build applications on top of a database, which is expensive. You can refer to the AWS whitepaper AWS Cloud Data Ingestion Patterns and Practices for more details on data ingestion patters.

AWS DMS sources for real-time change data

The following diagram illustrates that AWS DMS can use many of the most popular database engines as a source for data replication to a Kinesis Data Streams target. The database source can be a self-managed engine running on an Amazon Elastic Compute Cloud (Amazon EC2) instance or an on-premises database, or it can be on Amazon Relational Database Service (Amazon RDS), Amazon Aurora, or Amazon DocumentDB (with MongoDB availability).

Kinesis Data Streams can collect, process, and store data streams at any scale in real time and write to AWS Glue, which is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. You can use Amazon EMR for big data processing, Amazon Kinesis Data Analytics to process and analyze streaming data , Amazon Kinesis Data Firehose to run ETL (extract, transform, and load) jobs on streaming data, and AWS Lambda as a serverless compute for further processing, transformation, and delivery of data for consumption.

You can store the data in a data warehouse like Amazon Redshift, which is a cloud-scale data warehouse, and in an Amazon Simple Storage Service (Amazon S3) data lake for consumption. You can use Kinesis Data Firehose to capture the data streams and load the data into S3 buckets for further analytics.

Once the data is available in Kinesis Data Streams targets (as shown in the following diagram), you can visualize it using Amazon QuickSight; run ad hoc queries using Amazon Athena; access, process, and analyze it using an Amazon SageMaker notebook instance; and efficiently query and retrieve structured and semi-structured data from files in Amazon S3 without having to load the data into Amazon Redshift tables using Amazon Redshift Spectrum.

Solution overview

In this post, we describe how to use AWS DMS to load data from a database to Kinesis Data Streams in real time. We use a SQL Server database as example, but other databases like Oracle, Microsoft Azure SQL, PostgreSQL, MySQL, SAP ASE, MongoDB, Amazon DocumentDB, and IBM DB2 also support this configuration.

You can use AWS DMS to capture data changes on the database and then send this data to Kinesis Data Streams. After the streams are ingested in Kinesis Data Streams, they can be consumed by different services like Lambda, Kinesis Data Analytics, Kinesis Data Firehose, and custom consumers using the Kinesis Client Library (KCL) or the AWS SDK.

The following are some use cases that can use AWS DMS and Kinesis Data Streams:

  • Triggering real-time event-driven applications – This use case integrates Lambda and Amazon Simple Notification Service (Amazon SNS).
  • Simplifying and decoupling applications – For example, moving from monolith to microservices. This solution integrates Lambda and Amazon API Gateway.
  • Cache invalidation, and updating or rebuilding indexes – Integrates Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) and Amazon DynamoDB.
  • Data integration across multiple heterogeneous systems – This solution sends data to DynamoDB or another data store.
  • Aggregating data and pushing it to downstream system – This solution uses Kinesis Data Analytics to analyze and integrate different sources and load the results in another data store.

To facilitate the understanding of the integration between AWS DMS, Kinesis Data Streams, and Kinesis Data Firehose, we have defined a business case that you can solve. In this use case, you are the data engineer of an energy company. This company uses Amazon Relational Database Service (Amazon RDS) to store their end customer information, billing information, and also electric meter and gas usage data. Amazon RDS is their core transaction data store.

You run a batch job weekly to collect all the transactional data and send it to the data lake for reporting, forecasting, and even sending billing information to customers. You also have a trigger-based system to send emails and SMS periodically to the customer about their electricity usage and monthly billing information.

Because the company has millions of customers, processing massive amounts of data every day and sending emails or SMS was slowing down the core transactional system. Additionally, running weekly batch jobs for analytics wasn’t giving accurate and latest results for the forecasting you want to do on customer gas and electricity usage. Initially, your team was considering rebuilding the entire platform and avoiding all those issues, but the core application is complex in design, and running in production for many years and rebuilding the entire platform will take years and cost millions.

So, you took a new approach. Instead of running batch jobs on the core transactional database, you started capturing data changes with AWS DMS and sending that data to Kinesis Data Streams. Then you use Lambda to listen to a particular data stream and generate emails or SMS using Amazon SNS to send to the customer (for example, sending monthly billing information or notifying when their electricity or gas usage is higher than normal). You also use Kinesis Data Firehose to send all transaction data to the data lake, so your company can run forecasting immediately and accurately.

The following diagram illustrates the architecture.

In the following steps, you configure your database to replicate changes to Kinesis Data Streams, using AWS DMS. Additionally, you configure Kinesis Data Firehose to load data from Kinesis Data Streams to Amazon S3.

It’s simple to set up Kinesis Data Streams as a change data target in AWS DMS and start streaming data. For more information, see Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service.

To get started, you first create a Kinesis data stream in Kinesis Data Streams, then an AWS Identity and Access Management (IAM) role with minimal access as described in Prerequisites for using a Kinesis data stream as a target for AWS Database Migration Service. After you define your IAM policy and role, you set up your source and target endpoints and replication instance in AWS DMS. Your source is the database that you want to move data from, and the target is the database that you’re moving data to. In our case, the source database is a SQL Server database on Amazon RDS, and the target is the Kinesis data stream. The replication instance processes the migration tasks and requires access to the source and target endpoints inside your VPC.

A Kinesis delivery stream (created in Kinesis Data Firehose) is used to load the records from the database to the data lake hosted on Amazon S3. Kinesis Data Firehose can load data also to Amazon Redshift, Amazon OpenSearch Service, an HTTP endpoint, Datadog, Dynatrace, LogicMonitor, MongoDB Cloud, New Relic, Splunk, and Sumo Logic.

Configure the source database

For testing purposes, we use the database democustomer, which is hosted on a SQL Server on Amazon RDS. Use the following command and script to create the database and table, and insert 10 records:

create database democustomer

use democustomer

create table invoices (
	invoice_id INT,
	customer_id INT,
	billing_date DATE,
	due_date DATE,
	balance INT,
	monthly_kwh_use INT,
	total_amount_due VARCHAR(50)
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (1, 1219578, '4/15/2022', '4/30/2022', 25, 6, 28);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (2, 1365142, '4/15/2022', '4/28/2022', null, 41, 20.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (3, 1368834, '4/15/2022', '5/5/2022', null, 31, 15.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (4, 1226431, '4/15/2022', '4/28/2022', null, 47, 23.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (5, 1499194, '4/15/2022', '5/1/2022', null, 39, 19.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (6, 1221240, '4/15/2022', '5/2/2022', null, 38, 19);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (7, 1235442, '4/15/2022', '4/27/2022', null, 50, 25);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (8, 1306894, '4/15/2022', '5/2/2022', null, 16, 8);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (9, 1343570, '4/15/2022', '5/3/2022', null, 39, 19.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (10, 1465198, '4/15/2022', '5/4/2022', null, 47, 23.5);

To capture the new records added to the table, enable MS-CDC (Microsoft Change Data Capture) using the following commands at the database level (replace SchemaName and TableName). This is required if ongoing replication is configured on the task migration in AWS DMS.

EXEC msdb.dbo.rds_cdc_enable_db 'democustomer';
EXECUTE sys.sp_cdc_enable_table @source_schema = N'SchemaName', @source_name =N'TableName', @role_name = NULL;
EXEC sys.sp_cdc_change_job @job_type = 'capture' ,@pollinginterval = 3599;

You can use ongoing replication (CDC) for a self-managed SQL Server database on premises or on Amazon Elastic Compute Cloud (Amazon EC2), or a cloud database such as Amazon RDS or an Azure SQL managed instance. SQL Server must be configured for full backups, and you must perform a backup before beginning to replicate data.

For more information, see Using a Microsoft SQL Server database as a source for AWS DMS.

Configure the Kinesis data stream

Next, we configure our Kinesis data stream. For full instructions, see Creating a Stream via the AWS Management Console. Complete the following steps:

  1. On the Kinesis Data Streams console, choose Create data stream.
  2. For Data stream name¸ enter a name.
  3. For Capacity mode, select On-demand.When you choose on-demand capacity mode, Kinesis Data Streams instantly accommodates your workloads as they ramp up or down. For more information, refer to Choosing the Data Stream Capacity Mode.
  4. Choose Create data stream.
  5. When the data stream is active, copy the ARN.

Configure the IAM policy and role

Next, you configure your IAM policy and role.

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. Select JSON and use the following policy as a template, replacing the data stream ARN:
        "Version": "2012-10-17",
        "Statement": [
                "Effect": "Allow",
                "Action": [
                "Resource": "<streamArn>"

  4. In the navigation pane, choose Roles.
  5. Choose Create role.
  6. Select AWS DMS, then choose Next: Permissions.
  7. Select the policy you created.
  8. Assign a role name and then choose Create role.

Configure the Kinesis delivery stream

We use a Kinesis delivery stream to load the information from the Kinesis data stream to Amazon S3. To configure the delivery stream, complete the following steps:

  1. On the Kinesis console, choose Delivery streams.
  2. Choose Create delivery stream.
  3. For Source, choose Amazon Kinesis Data Streams.
  4. For Destination, choose Amazon S3.
  5. For Kinesis data stream, enter the ARN of the data stream.
  6. For Delivery stream name, enter a name.
  7. Leave the transform and convert options at their defaults.
  8. Provide the destination bucket and specify the bucket prefixes for the events and errors.
  9. Under Buffer hints, compression and encryption, change the buffer size to 1 MB and buffer interval to 60 seconds.
  10. Leave the other configurations at their defaults.

Configure AWS DMS

We use an AWS DMS instance to connect to the SQL Server database and then replicate the table and future transactions to a Kinesis data stream. In this section, we create a replication instance, source endpoint, target endpoint, and migration task. For more information about endpoints, refer to Creating source and target endpoints.

  1. Create a replication instance in a VPC with connectivity to the SQL Server database and associate a security group with enough permissions to access to the database.
  2. On the AWS DMS console, choose Endpoints in the navigation pane.
  3. Choose Create endpoint.
  4. Select Source endpoint.
  5. For Endpoint identifier, enter a label for the endpoint.
  6. For Source engine, choose Microsoft SQL Server.
  7. For Access to endpoint database, select Provide access information manually.
  8. Enter the endpoint database information.
  9. Test the connectivity to the source endpoint.
    Now we create the target endpoint.
  10. On the AWS DMS console, choose Endpoints in the navigation pane.
  11. Choose Create endpoint.
  12. Select Target endpoint.
  13. For Endpoint identifier, enter a label for the endpoint.
  14. For Target engine, choose Amazon Kinesis.
  15. Provide the AWS DMS service role ARN and the data stream ARN.
  16. Test the connectivity to the target endpoint.

    The final step is to create a database migration task. This task replicates the existing data from the SQL Server table to the data stream and replicates the ongoing changes. For more information, see Creating a task.
  17. On the AWS DMS console, choose Database migration tasks.
  18. Choose Create task.
  19. For Task identifier, enter a name for your task.
  20. For Replication instance, choose your instance.
  21. Choose the source and target database endpoints you created.
  22. For Migration type, choose Migrate existing data and replicate ongoing changes.
  23. In Task settings, use the default settings.
  24. In Table mappings, add a new selection rule and specify the schema and table name of the SQL Server database. In this case, our schema name is dbo and the table name is invoices.
  25. For Action, choose Include.

When the task is ready, the migration starts.

After the data has been loaded, the table statistics are updated and you can see the 10 records created initially.

As the Kinesis delivery stream reads the data from Kinesis Data Streams and loads it in Amazon S3, the records are available in the bucket you defined previously.

To check that AWS DMS ongoing replication and CDC are working, use this script to add 1,000 records to the table.

You can see 1,000 inserts on the Table statistics tab for the database migration task.

After about 1 minute, you can see the records in the S3 bucket.

At this point the replication has been activated, and a Lambda function can start consuming the data streams to send emails SMS to the customers through Amazon SNS. More information, refer to Using AWS Lambda with Amazon Kinesis.


With Kinesis Data Streams as an AWS DMS target, you now have a powerful way to stream change data from a database directly into a Kinesis data stream. You can use this method to stream change data from any sources supported by AWS DMS to perform real-time data processing. Happy streaming!

If you have any questions or suggestions, please leave a comment.

About the Authors

Luis Eduardo Torres is a Solutions Architect at AWS based in Bogotá, Colombia. He helps companies to build their business using the AWS cloud platform. He has a great interest in Analytics and has been leading the Analytics track of AWS Podcast in Spanish.

Sukhomoy Basak is a Solutions Architect at Amazon Web Services, with a passion for Data and Analytics solutions. Sukhomoy works with enterprise customers to help them architect, build, and scale applications to achieve their business outcomes.

A serverless operational data lake for retail with AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, and Amazon QuickSight

Post Syndicated from Gandhi Raketla original https://aws.amazon.com/blogs/big-data/a-serverless-operational-data-lake-for-retail-with-aws-glue-amazon-kinesis-data-streams-amazon-dynamodb-and-amazon-quicksight/

Do you want to reduce stockouts at stores? Do you want to improve order delivery timelines? Do you want to provide your customers with accurate product availability, down to the millisecond? A retail operational data lake can help you transform the customer experience by providing deeper insights into a variety of operational aspects of your supply chain.

In this post, we demonstrate how to create a serverless operational data lake using AWS services, including AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, Amazon Athena, and Amazon QuickSight.

Retail operations is a critical functional area that gives retailers a competitive edge. An efficient retail operation can optimize the supply chain for a better customer experience and cost reduction. An optimized retail operation can reduce frequent stockouts and delayed shipments, and provide accurate inventory and order details. Today, a retailer’s channels aren’t just store and web—they include mobile apps, chatbots, connected devices, and social media channels. The data is both structured and unstructured. This coupled with multiple fulfillment options like buy online and pick up at store, ship from store, or ship from distribution centers, which increases the complexity of retail operations.

Most retailers use a centralized order management system (OMS) for managing orders, inventory, shipments, payments, and other operational aspects. These legacy OMSs are unable to scale in response to the rapid changes in retail business models. The enterprise applications that are key for efficient and smooth retail operations rely on a central OMS. Applications for ecommerce, warehouse management, call centers, and mobile all require an OMS to get order status, inventory positions of different items, shipment status, and more. Another challenge with legacy OMSs is they’re not designed to handle unstructured data like weather data and IoT data that could impact inventory and order fulfillment. A legacy OMS that can’t scale prohibits you from implementing new business models that could transform your customer experience.

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. An operational data lake addresses this challenge by providing easy access to structured and unstructured operational data in real time from various enterprise systems. You can store your data as is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML)—to guide better decisions. This can ease the burden on OMSs that can instead focus on order orchestration and management.

Solution overview

In this post, we create an end-to-end pipeline to ingest, store, process, analyze, and visualize operational data like orders, inventory, and shipment updates. We use the following AWS services as key components:

  • Kinesis Data Streams to ingest all operational data in real time from various systems
  • DynamoDB, Amazon Aurora, and Amazon Simple Storage Service (Amazon S3) to store the data
  • AWS Glue DataBrew to clean and transform the data
  • AWS Glue crawlers to catalog the data
  • Athena to query the processed data
  • A QuickSight dashboard that provides insights into various operational metrics

The following diagram illustrates the solution architecture.

The data pipeline consists of stages to ingest, store, process, analyze, and finally visualize the data, which we discuss in more detail in the following sections.

Data ingestion

Orders and inventory data is ingested in real time from multiple sources like web applications, mobile apps, and connected devices into Kinesis Data Streams. Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources, such as web applications, database events, inventory transactions, and payment transactions. Frontend systems like ecommerce applications and mobile apps ingest the order data as soon as items are added to a cart or an order is created. The OMS ingests orders when the order status changes. OMSs, stores, and third-party suppliers ingest inventory updates into the data stream.

To simulate orders, an AWS Lambda function is triggered by a scheduled Amazon CloudWatch event every minute to ingest orders to a data stream. This function simulates the typical order management system lifecycle (order created, scheduled, released, shipped, and delivered). Similarly, a second Lambda function is triggered by a CloudWatch event to generate inventory updates. This function simulates different inventory updates such as purchase orders created from systems like the OMS or third-party suppliers. In a production environment, this data would come from frontend applications and a centralized order management system.

Data storage

There are two types of data: hot and cold data. Hot data is consumed by frontend applications like web applications, mobile apps, and connected devices. The following are some example use cases for hot data:

  • When a customer is browsing products, the real-time availability of the item must be displayed
  • Customers interacting with Alexa to know the status of the order
  • A call center agent interacting with a customer needs to know the status of the customer order or its shipment details

The systems, APIs, and devices that consume this data need the data within seconds or milliseconds of the transactions.

Cold data is used for long-term analytics like orders over a period of time, orders by channel, top 10 items by number of orders, or planned vs. available inventory by item, warehouse, or store.

For this solution, we store orders hot data in DynamoDB. DynamoDB is a fully managed NoSQL database that delivers single-digit millisecond performance at any scale. A Lambda function processes records in the Kinesis data stream and stores it in a DynamoDB table.

Inventory hot data is stored in an Amazon Aurora MySQL-Compatible Edition database. Inventory is transactional data that requires high consistency so that customers aren’t over-promised or under-promised when they place orders. Aurora MySQL is fully managed database that is up to five times faster than standard MySQL databases and three times faster than standard PostgreSQL databases. It provides the security, availability, and reliability of commercial databases at a tenth of the cost.

Amazon S3 is object storage built to store and retrieve any amount of data from anywhere. It’s a simple storage service that offers industry-leading durability, availability, performance, security, and virtually unlimited scalability at very low cost. Order and inventory cold data is stored in Amazon S3.

Amazon Kinesis Data Firehose reads the data from the Kinesis data stream and stores it in Amazon S3. Kinesis Data Firehose is the easiest way to load streaming data into data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk, enabling near-real-time analytics.

Data processing

The data processing stage involves cleaning, preparing, and transforming the data to help downstream analytics applications easily query the data. Each frontend system might have a different data format. In the data processing stage, data is cleaned and converted into a common canonical form.

For this solution, we use DataBrew to clean and convert orders into a common canonical form. DataBrew is a visual data preparation tool that makes it easy for data analysts and data scientists to prepare data with an interactive, point-and-click visual interface without writing code. DataBrew provides over 250 built-in transformations to combine, pivot, and transpose the data without writing code. The cleaning and transformation steps in DataBrew are called recipes. A scheduled DataBrew job applies the recipes to the data in an S3 bucket and stores the output in a different bucket.

AWS Glue crawlers can access data stores, extract metadata, and create table definitions in the AWS Glue Data Catalog. You can schedule a crawler to crawl the transformed data and create or update the Data Catalog. The AWS Glue Data Catalog is your persistent metadata store. It’s a managed service that lets you store, annotate, and share metadata in the AWS Cloud in the same way you would in an Apache Hive metastore. We use crawlers to populate the Data Catalog with tables.

Data analysis

We can query orders and inventory data from S3 buckets using Athena. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Views are created in Athena that can be consumed by business intelligence (BI) services like QuickSight.

Data visualization

We generate dashboards using QuickSight. QuickSight is a scalable, serverless, embeddable BI service powered by ML and built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights.

QuickSight also has features to forecast orders, detect anomalies in the order, and provide ML-powered insights. We can create analyses such as orders over a period of time, orders split by channel, top 10 locations for orders, or order fulfillment timelines (the time it took from order creation to order delivery).

Walkthrough overview

To implement this solution, you complete the following high-level steps:

  1. Create solution resources using AWS CloudFormation.
  2. Connect to the inventory database.
  3. Load the inventory database with tables.
  4. Create a VPC endpoint using Amazon Virtual Private Cloud (Amazon VPC).
  5. Create gateway endpoints for Amazon S3 on the default VPC.
  6. Enable CloudWatch rules via Amazon EventBridge to ingest the data.
  7. Transform the data using AWS Glue.
  8. Visualize the data with QuickSight.


Complete the following prerequisite steps:

  1. Create AWS account if you don’t have done already.
  2. Sign up for QuickSight if you’ve never used QuickSight in this account before. To use the forecast ability in QuickSight, sign up for the Enterprise Edition.

Create resources with AWS CloudFormation

To launch the provided CloudFormation template, complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter a name.
  4. Provide the following parameters:
    1. The name of the S3 bucket that holds all the data for the data lake.
    2. The name of the database that holds the inventory tables.
    3. The database user name.
    4. The database password.
  5. Enter any tags you want to assign to the stack and choose Next.
  6. Select the acknowledgement check boxes and choose Create stack.

The stack takes 5–10 minutes to complete.

On the AWS CloudFormation console, you can navigate to the stack’s Outputs tab to review the resources you created.

If you open the S3 bucket you created, you can observe its folder structure. The stack creates sample order data for the last 7 days.

Connect to the inventory database

To connect to your database in the query editor, complete the following steps:

  1. On the Amazon RDS console, choose the Region you deployed the stack in.
  2. In the navigation pane, choose Query Editor.

    If you haven’t connected to this database before, the Connect to database page opens.
  3. For Database instance or cluster, choose your database.
  4. For Database username, choose Connect with a Secrets Manager ARN.
    The database user name and password provided during stack creation are stored in AWS Secrets Manager. Alternatively, you can choose Add new database credentials and enter the database user name and password you provided when creating the stack.
  5. For Secrets Manager ARN, enter the value for the key InventorySecretManager from the CloudFormation stack outputs.
  6. Optionally, enter the name of your database.
  7. Choose Connect to database.

Load the inventory database with tables

Enter the following DDL statement in the query editor and choose Run:

    ItemID varchar(25) NOT NULL,
    ShipNode varchar(25) NOT NULL,
    SupplyType varchar(25) NOT NULL,
    SupplyDemandType varchar(25) NOT NULL,
    ItemName varchar(25),
    UOM varchar(10),
    Quantity int(11) NOT NULL,
    ETA varchar(25)	 ,
    UpdatedDate DATE,
    PRIMARY KEY (ItemID,ShipNode,SupplyType)

Create a VPC endpoint

To create your VPC endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for rds and choose the service name ending with rds-data.
  6. For VPC, choose the default VPC.
  7. Leave the remaining settings at their default and choose Create endpoint.

Create a gateway endpoint for Amazon S3

To create your gateway endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for S3 and choose the service name with type Gateway.
  6. For VPC, choose the default VPC.
  7. For Configure route tables, select the default route table.
  8. Leave the remaining settings at their default and choose Create endpoint.

Wait for both the gateway endpoint and VPC endpoint status to change to Available.

Enable CloudWatch rules to ingest the data

We created two CloudWatch rules via the CloudFormation template to ingest the order and inventory data to Kinesis Data Streams. To enable the rules via EventBridge, complete the following steps:

  1. On the CloudWatch console, under Events in the navigation pane, choose Rules.
  2. Make sure you’re in the Region where you created the stack.
  3. Choose Go to Amazon EventBridge.
  4. Select the rule Ingest-Inventory-Update-Schedule-Rule and choose Enable.
  5. Select the rule Ingest-Order-Schedule-Rule and choose Enable.

After 5–10 minutes, the Lambda functions start ingesting orders and inventory updates to their respective streams. You can check the S3 buckets orders-landing-zone and inventory-landing-zone to confirm that the data is being populated.

Perform data transformation

Our CloudFormation stack included a DataBrew project, a DataBrew job that runs every 5 minutes, and two AWS Glue crawlers. To perform data transformation using our AWS Glue resources, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose the project OrderDataTransform.

    You can review the project and its recipe on this page.
  3. In the navigation pane, choose Jobs.
  4. Review the job status to confirm it’s complete.
  5. On the AWS Glue console, choose Crawlers in the navigation pane.
    The crawlers crawl the transformed data and update the Data Catalog.
  6. Review the status of the two crawlers, which run every 15 minutes.
  7. Choose Tables in the navigation pane to view the two tables the crawlers created.
    If you don’t see these tables, you can run the crawlers manually to create them.

    You can query the data in the tables with Athena.
  8. On the Athena console, choose Query editor.
    If you haven’t created a query result location, you’re prompted to do that first.
  9. Choose View settings or choose the Settings tab.
  10. Choose Manage.
  11. Select the S3 bucket to store the results and choose Choose.
  12. Choose Query editor in the navigation pane.
  13. Choose either table (right-click) and choose Preview Table to view the table contents.

Visualize the data

If you have never used QuickSight in this account before, complete the prerequisite step to sign up for QuickSight. To use the ML capabilities of QuickSight (such as forecasting) sign up for the Enterprise Edition using the steps in this documentation.

While signing up for QuickSight, make sure to use the same region where you created the CloudFormation stack.

Grant QuickSight permissions

To visualize your data, you must first grant relevant permissions to QuickSight to access your data.

  1. On the QuickSight console, on the Admin drop-down menu, choose Manage QuickSight.
  2. In the navigation pane, choose Security & permissions.
  3. Under QuickSight access to AWS services, choose Manage.
  4. Select Amazon Athena.
  5. Select Amazon S3 to edit QuickSight access to your S3 buckets.
  6. Select the bucket you specified during stack creation (for this post, operational-datalake).
  7. Choose Finish.
  8. Choose Save.

Prepare the datasets

To prepare your datasets, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena.
  4. For Data source name, enter retail-analysis.
  5. Choose Validate connection.
  6. After your connection is validated, choose Create data source.
  7. For Database, choose orderdatalake.
  8. For Tables, select orders_clean.
  9. Choose Edit/Preview data.
  10. For Query mode, select SPICE.
    SPICE (Super-fast, Parallel, In-memory Calculation Engine) is the robust in-memory engine that QuickSight uses.
  11. Choose the orderdatetime field (right-click), choose Change data type, and choose Date.
  12. Enter the date format as MM/dd/yyyy HH:mm:ss.
  13. Choose Validate and Update.
  14. Change the data types of the following fields to QuickSight geospatial data types:
    1. billingaddress.zipcode – Postcode
    2. billingaddress.city – City
    3. billingaddress.country – Country
    4. billingaddress.state – State
    5. shippingaddress.zipcode – Postcode
    6. shippingaddress.city – City
    7. shippingaddress.country – Country
    8. shippingaddress.state – State
  15. Choose Save & publish.
  16. Choose Cancel to exit this page.

    Let’s create another dataset for the Athena table inventory_landing_zone.
  17. Follow steps 1–7 to create a new dataset. For Table selection, choose inventory_landing_zone.
  18. Choose Edit/Preview data.
  19. For Query mode, select SPICE.
  20. Choose Save & publish.
  21. Choose Cancel to exit this page.

    Both datasets should now be listed on the Datasets page.
  22. Choose each dataset and choose Refresh now.
  23. Select Full refresh and choose Refresh.

To set up a scheduled refresh, choose Schedule a refresh and provide your schedule details.

Create an analysis

To create an analysis in QuickSight, complete the following steps:

  1. On the QuickSight console, choose Analyses in the navigation pane.
  2. Choose New analysis.
  3. Choose the orders_clean dataset.
  4. Choose Create analysis.
  5. To adjust the theme, choose Themes in the navigation pane, choose your preferred theme, and choose Apply.
  6. Name the analysis retail-analysis.

Add visualizations to the analysis

Let’s start creating visualizations. The first visualization shows orders created over time.

  1. Choose the empty graph on the dashboard and for Visual type¸ choose the line chart.
    For more information about visual types, see Visual types in Amazon QuickSight.
  2. Under Field wells, drag orderdatetime to X axis and ordernumber to Value.
  3. Set ordernumber to Aggregate: Count distinct.

    Now we can filter these orders by Created status.
  4. Choose Filter in the navigation pane and choose Create one.
  5. Search for and choose status.
  6. Choose the status filter you just created.
  7. Select Created from the filter list and choose Apply.
  8. Choose the graph (right-click) and choose Add forecast.
    The forecasting ability is only available in the Enterprise Edition. QuickSight uses a built-in version of the Random Cut Forest (RCF) algorithm. For more information, refer to Understanding the ML algorithm used by Amazon QuickSight.
  9. Leave the settings as default and choose Apply.
  10. Rename the visualization to “Orders Created Over Time.”

If the forecast is applied successfully, the visualization shows the expected number of orders as well as upper and lower bounds.

If you get the following error message, allow for the data to accumulate for a few days before adding the forecast.

Let’s create a visualization on orders by location.

  1. On the Add menu, choose Add visual.
  2. Choose the points on map visual type.
  3. Under Field wells, drag shippingaddress.zipcode to Geospatial and ordernumber to Size.
  4. Change ordernumber to Aggregate: Count distinct.

    You should now see a map indicating the orders by location.
  5. Rename the visualization accordingly.

    Next, we create a drill-down visualization on the inventory count.
  6. Choose the pencil icon.
  7. Choose Add dataset.
  8. Select the inventory_landing_zone dataset and choose Select.
  9. Choose the inventory_landing_zone dataset.
  10. Add the vertical bar chart visual type.
  11. Under Field wells, drag itemname, shipnode, and invtype to X axis, and quantity to Value.
  12. Make sure that quantity is set to Sum.

    The following screenshot shows an example visualization of order inventory.
  13. To determine how many face masks were shipped out from each ship node, choose Face Masks (right-click) and choose Drill down to shipnode.
  14. You can drill down even further to invtype to see how many face masks in a specific ship node are in which status.

The following screenshot shows this drilled-down inventory count.

As a next step, you can create a QuickSight dashboard from the analysis you created. For instructions, refer to Tutorial: Create an Amazon QuickSight dashboard.

Clean up

To avoid any ongoing charges, on the AWS CloudFormation console, select the stack you created and choose Delete. This deletes all the created resources. On the stack’s Events tab, you can track the progress of the deletion, and wait for the stack status to change to DELETE_COMPLETE.

The Amazon EventBridge rules generate orders and inventory data every 15 minutes, to avoid generating huge amount of data, please ensure to delete the stack after testing the blog.

If the deletion of any resources fails, ensure that you delete them manually. For deleting Amazon QuickSight datasets, you can follow these instructions. You can delete the QuickSight Analysis using these steps. For deleting the QuickSight subscription and closing the account, you can follow these instructions.


In this post, we showed you how to use AWS analytics and storage services to build a serverless operational data lake. Kinesis Data Streams lets you ingest large volumes of data, and DataBrew lets you cleanse and transform the data visually. We also showed you how to analyze and visualize the order and inventory data using AWS Glue, Athena, and QuickSight. For more information and resources for data lakes on AWS, visit Analytics on AWS.

About the Authors

Gandhi Raketla is a Senior Solutions Architect for AWS. He works with AWS customers and partners on cloud adoption, as well as architecting solutions that help customers foster agility and innovation. He specializes in the AWS data analytics domain.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS Cloud and specializes in the containers and data analytics domains.

Build a big data Lambda architecture for batch and real-time analytics using Amazon Redshift

Post Syndicated from Jagadish Kumar original https://aws.amazon.com/blogs/big-data/build-a-big-data-lambda-architecture-for-batch-and-real-time-analytics-using-amazon-redshift/

With real-time information about customers, products, and applications in hand, organizations can take action as events happen in their business application. For example, you can prevent financial fraud, deliver personalized offers, and identify and prevent failures before they occur in near real time. Although batch analytics provides abilities to analyze trends and process data at scale that allow processing data in time intervals (such as daily sales aggregations by individual store), real-time analytics is optimized for low-latency analytics, ensuring that data is available for querying in seconds. Both paradigms of data processing operate in silos, which results in data redundancy and operational overhead to maintain them. A big data Lambda architecture is a reference architecture pattern that allows for the seamless coexistence of the batch and near-real-time paradigms for large-scale data for analytics.

Amazon Redshift allows you to easily analyze all data types across your data warehouse, operational database, and data lake using standard SQL. In this post, we collect, process, and analyze data streams in real time. With data sharing, you can share live data across Amazon Redshift clusters for read purposes with relative security and ease out of the box. In this post, we discuss how we can harness the data sharing ability of Amazon Redshift to set up a big data Lambda architecture to allow both batch and near-real-time analytics.

Solution overview

Example Corp. is a leading electric automotive company that revolutionized the automotive industry. Example Corp. operationalizes the connected vehicle data and improves the effectiveness of various connected vehicle and fleet use cases, including predictive maintenance, in-vehicle service monetization, usage-based insurance. and delivering exceptional driver experiences. In this post, we explore the real-time and trend analytics using the connected vehicle data to illustrate the following use cases:

  • Usage-based insurance – Usage-based insurance (UBI) relies on analysis of near-real-time data from the driver’s vehicle to access the risk profile of the driver. In addition, it also relies on the historical analysis (batch) of metrics (such as the number of miles driven in a year). The better the driver, the lower the premium.
  • Fleet performance trends – The performance of a fleet (such as a taxi fleet) relies on the analysis of historical trends of data across the fleet (batch) as well as the ability to drill down to a single vehicle within the fleet for near-real-time analysis of metrics like fuel consumption or driver distraction.

Architecture overview

In this section, we discuss the overall architectural setup for the Lambda architecture solution.

The following diagram shows the implementation architecture and the different computational layers:

  • Data ingestion from AWS IoT Core
  • Batch layer
  • Speed layer
  • Serving layer

Data ingestion

Vehicle telemetry data is ingested into the cloud through AWS IoT Core and routed to Amazon Kinesis Data Streams. The Kinesis Data Streams layer acts as a separation layer for the speed layer and batch layer, where the incoming telemetry is consumed by the speed layer’s Amazon Redshift cluster and Amazon Kinesis Data Firehose, respectively.

Batch layer

Amazon Kinesis Data Firehose is a fully managed service that can batch, compress, transform, and encrypt your data streams before loading them into your Amazon Simple Storage Service (Amazon S3) data lake. Kinesis Data Firehose also allows you to specify a custom expression for the Amazon S3 prefix where data records are delivered. This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thereby improving performance and reducing cost.

The batch layer persists data in Amazon S3 and is accessed directly by an Amazon Redshift Serverless endpoint (serving layer). With Amazon Redshift Serverless, you can efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables.

The batch layer can also optionally precompute results as batch views from the immutable Amazon S3 data lake and persist them as either native tables or materialized views for very high-performant use cases. You can create these precomputed batch views using AWS Glue, Amazon Redshift stored procedures, Amazon Redshift materialized views, or other options.

The batch views can be calculated as:

batch view = function (all data)

In this solution, we build a batch layer for Example Corp. for two types of queries:

  • rapid_acceleration_by_year – The number of rapid accelerations by each driver aggregated per year
  • total_miles_driven_by_year – The total number of miles driven by the fleet aggregated per year

For demonstration purposes, we use Amazon Redshift stored procedures to create the batch views as Amazon Redshift native tables from external tables using Amazon Redshift Spectrum.

Speed layer

The speed layer processes data streams in real time and aims to minimize latency by providing real-time views into the most recent data.

Amazon Redshift Streaming Ingestion uses SQL to connect with one or more Kinesis data streams simultaneously. The native streaming ingestion feature in Amazon Redshift lets you ingest data directly from Kinesis Data Streams and enables you to ingest hundreds of megabytes of data per second and query it at exceptionally low latency—in many cases only 10 seconds after entering the data stream.

The speed cluster uses materialized views to materialize a point-in-time view of a Kinesis data stream, as accumulated up to the time it is queried. The real-time views are computed using this layer, which provide a near-real-time view of the incoming telemetry stream.

The speed views can be calculated as a function of recent data unaccounted for in the batch views:

speed view = function (recent data)

We calculate the speed views for these batch views as follows:

  • rapid_acceleration_realtime – The number of rapid accelerations by each driver for recent data not accounted for in the batch view rapid_acceleration_by_month
  • miles_driven_realtime – The number of miles driven by each driver for recent data not in miles_driven_by_month

Serving layer

The serving layer comprises an Amazon Redshift Serverless endpoint and any consumption services such as Amazon QuickSight or Amazon SageMaker.

Amazon Redshift Serverless (preview) is a serverless option of Amazon Redshift that makes it easy to run and scale analytics in seconds without the need to set up and manage data warehouse infrastructure. With Amazon Redshift Serverless, any user—including data analysts, developers, business professionals, and data scientists—can get insights from data by simply loading and querying data in the data warehouse.

Amazon Redshift data sharing enables instant, granular, and fast data access across Amazon Redshift clusters without the need to maintain redundant copies of data.

The speed cluster provides outbound data shares of the real-time materialized views to the Amazon Redshift Serverless endpoint (serving cluster).

The serving cluster joins data from the batch layer and speed layer to get near-real-time and historical data for a particular function with minimal latency. The consumption layer (such as Amazon API Gateway or QuickSight) is only aware of the serving cluster, and all the batch and stream processing is abstracted from the consumption layer.

We can view the queries to the speed layer from data consumption layer as follows:

query = function (batch views, speed views)

Deploy the CloudFormation template

We have provided an AWS CloudFormation template to demonstrate the solution. You can download and use this template to easily deploy the required AWS resources. This template has been tested in the us-east-1 Region.

The template requires you to provide the following parameters:

  • DatabaseName – The name of the first database to be created for speed cluster
  • NumberOfNodes – The number of compute nodes in the cluster.
  • NodeType – The type of node to be provisioned
  • MasterUserName – The user name that is associated with the master user account for the cluster that is being created
  • MasterUserPassword – The password that is associated with the master user account
  • InboundTraffic – The CIDR range to allow inbound traffic to the cluster
  • PortNumber – The port number on which the cluster accepts incoming connections
  • SQLForData – The source query to extract from AWS IOT Core topic


When setting up this solution and using your own application data to push to Kinesis Data Streams, you can skip setting up the IoT Device Simulator and start creating your Amazon Redshift Serverless endpoint. This post uses the simulator to create related database objects and assumes use of the simulator in the solution walkthrough.

Set up the IoT Device Simulator

We use the IoT Device simulator to generate and simulate vehicle IoT data. The solution allows you to create and simulate hundreds of connected devices, without having to configure and manage physical devices or develop time-consuming scripts.

Use the following CloudFormation template to create the IoT Device Simulator in your account for trying out this solution.

Configure devices and simulations

To configure your devices and simulations, complete the following steps:

  1. Use the login information you received in the email you provided to log in to the IoT Device Simulator.
  2. Choose Device Types and Add Device Type.
  3. Choose Automotive Demo.
  4. For Device type name, enter testVehicles.
  5. For Topic, enter the topic where the sensor data is sent to AWS IoT Core.
  6. Save your settings.
  7. Choose Simulations and Add simulation.
  8. For Simulation name, enter testSimulation.
  9. For Simulation type¸ choose Automotive Demo.
  10. For Select a device type¸ choose the device type you created (testVehicles).
  11. For Number of devices, enter 15.

You can choose up to 100 devices per simulation. You can configure a higher number of devices to simulate large data.

  1. For Data transmission interval, enter 1.
  2. For Data transmission duration, enter 300.

This configuration runs the simulation for 5 minutes.

  1. Choose Save.

Now you’re ready to simulate vehicle telemetry data to AWS IoT Core.

Create an Amazon Redshift Serverless endpoint

The solution uses an Amazon Redshift Serverless endpoint as the serving layer cluster. You can set up Amazon Redshift Serverless in your account.

Set up Amazon Redshift Query Editor V2

To query data, you can use Amazon Redshift Query Editor V2. For more information, refer to Introducing Amazon Redshift Query Editor V2, a Free Web-based Query Authoring Tool for Data Analysts.

Get namespaces for the provisioned speed layer cluster and Amazon Redshift Serverless

Connect to speed-cluster-iot (the speed layer cluster) through Query Editor V2 and run the following SQL:

select current_namespace; -- (Save as <producer_namespace>)

Similarly, connect to the Amazon Redshift Serverless endpoint and get the namespace:

select current_namespace; -- (Save as <consumer_namespace>)

You can also get this information via the Amazon Redshift console.

Now that we have all the prerequisites set up, let’s go through the solution walkthrough.

Implement the solution

The workflow includes the following steps:

  1. Start the IoT simulation created in the previous section.

The vehicle IoT is simulated and ingested through IoT Device Simulator for the configured number of vehicles. The raw telemetry payload is sent to AWS IoT Core, which routes the data to Kinesis Data Streams.

At the batch layer, data is directly put from Kinesis Data Streams to Kinesis Data Firehose, which converts the data to parquet and delivers to Amazon with the prefix s3://<Bucketname>/vehicle_telematics_raw/year=<>/month=<>/day=<>/.

  1. When the simulation is complete, run the pre-created AWS Glue crawler vehicle_iot_crawler on the AWS Glue console.

The serving layer Amazon Redshift Serverless endpoint can directly access data from the Amazon S3 data lake through Redshift Spectrum external tables. In this demo, we compute batch views through Redshift Spectrum and store them as Amazon Redshift tables using Amazon Redshift stored procedures.

  1. Connect to the Amazon Redshift Serverless endpoint through Query Editor V2 and create the stored procedures using the following SQL script.
  2. Run the two stored procedures to create the batch views:
call rapid_acceleration_by_year_sp();
call total_miles_driven_by_year_sp();

The two stored procedures create batch views as Amazon Redshift native tables:

    • batchlayer_rapid_acceleration_by_year
    • batchlayer_total_miles_by_year

You can also schedule these stored procedures as batch jobs. For more information, refer to Scheduling SQL queries on your Amazon Redshift data warehouse.

At the speed layer, the incoming data stream is read and materialized by the speed layer Amazon Redshift cluster in the materialized view vehicleiotstream_mv.

  1. Connect to the provisioned speed-cluster-iot and run the following SQL script to create the required objects.

Two real-time views are created from this materialized view:

    • batchlayer_rapid_acceleration_by_year
    • batchlayer_total_miles_by_year
  1. Refresh the materialized view vehicleiotstream_mv at the required interval, which triggers Amazon Redshift to read from the stream and load data into the materialized view.
    REFRESH MATERIALIZED VIEW vehicleiotstream_mv;

Refreshes are currently manual, but can be automated using the query scheduler.

The real-time views are shared as an outbound data share by the speed cluster to the serving cluster.

  1. Connect to speed-cluster-iot and create an outbound data share (producer) with the following SQL:
    -- Create Datashare from Primary (Producer) to Serverless (Consumer)
    ALTER DATASHARE speedlayer_datashare ADD SCHEMA public;
    ALTER DATASHARE speedlayer_datashare ADD ALL TABLES IN SCHEMA public;
    GRANT USAGE ON DATASHARE speedlayer_datashare TO NAMESPACE '<consumer_namespace>'; -- (replace with consumer namespace created in prerequisites 5)

  2. Connect to speed-cluster-iot and create an inbound data share (consumer) with the following SQL:
    CREATE DATABASE vehicleiot_shareddb FROM DATASHARE speedlayer_datashare OF NAMESPACE '< producer_namespace >'; -- (replace with producer namespace created in prerequisites 5)

Now that the real-time views are available for the Amazon Redshift Serverless endpoint, we can run queries to get real-time metrics or historical trends with up-to-date data by accessing the batch and speed layers and joining them using the following queries.

For example, to calculate total rapid acceleration by year with up-to-the-minute data, you can run the following query:

-- Rapid Acceleration By Year

select SUM(rapid_acceleration) rapid_acceleration, vin, year from 
select rapid_acceleration, vin,year
  from public.batchlayer_rapid_acceleration_by_year batch
union all
select rapid_acceleration, vin,year
from speedlayer_shareddb.public.speedlayer_rapid_acceleration_by_year speed)
group by VIN, year;

Similarly, to calculate total miles driven by year with up-to-the-minute data, run the following query:

-- Total Miles Driven By Year

select SUM(total_miles) total_miles_driven , year from 
select total_miles, year
  from public.batchlayer_total_miles_by_year batch
union all
select total_miles, year
from speedlayer_shareddb.public.speedlayer_total_miles_by_year speed)
group by year;

For only access to real-time data to power daily dashboards, you can run queries against real-time views shared to your Amazon Redshift Serverless cluster.

For example, to calculate the average speed per trip of your fleet, you can run the following SQL:

select CAST(measuretime as DATE) "date",
from speedlayer_shareddb.public.vehicleiotstream_mv 
group by vin, date, trip_id;

Because this demo uses the same data as a quick start, there are duplicates in this demonstration. In actual implementations, the serving cluster manages the data redundancy and duplication by creating views with date predicates that consume non-overlapping data from batch and real-time views and provide overall metrics to the consumption layer.

You can consume the data with QuickSight for dashboards, with API Gateway for API-based access, or via the Amazon Redshift Data API or SageMaker for AI and machine learning (ML) workloads. This is not included as part of the provided CloudFormation template.

Best practices

In this section, we discuss some best practices and lessons learned when using this solution.

Provisioned vs. serverless

The speed layer is a continuous ingestion layer reading data from the IoT streams often running 24/7 workloads. There is less idle time and variability in the workloads and it is advantageous to have a provisioned cluster supporting persistent workloads that can scale elastically.

The serving layer can be provisioned (in case of 24/7 workloads) or Amazon Redshift Serverless in case of sporadic or ad hoc workloads. In this post, we assumed sporadic workloads, so serverless is the best fit. In addition, the serving layer can house multiple Amazon Redshift clusters, each consuming their data share and serving downstream applications.

RA3 instances for data sharing

Amazon Redshift RA3 instances enable data sharing to allow you to securely and easily share live data across Amazon Redshift clusters for reads. You can combine the data that is ingested in near-real time with the historical data using the data share to provide personalized driving characteristics to determine the insurance recommendation.

You can also grant fine-grained access control to the underlying data in the producer to the consumer cluster as needed. Amazon Redshift offers comprehensive auditing capabilities using system tables and AWS CloudTrail to allow you to monitor the data sharing permissions and usage across all the consumers and revoke access instantly when necessary. The permissions are granted by the superusers from both the producer and the consumer clusters to define who gets access to what objects, similar to the grant commands used in the earlier section. You can use the following commands to audit the usage and activities for the data share.

Track all changes to the data share and the shared database imported from the data share with the following code:

Select username, share_name, recordtime, action, 
         share_object_type, share_object_name 
  from svl_datashare_change_log
   order by recordtime desc;

Track data share access activity (usage), which is relevant only on the producer, with the following code:

Select * from svl_datashare_usage;

Pause and Resume

You can pause the producer cluster when batch processing is complete to save costs. The pause and resume actions on Amazon Redshift allow you to easily pause and resume clusters that may not be in operation at all times. It allows you to create a regularly-scheduled time to initiate the pause and resume actions at specific times or you can manually initiate a pause and later a resume. Flexible on-demand pricing and per-second billing gives you greater control of costs of your Redshift compute clusters while maintaining your data in a way that is simple to manage.

Materialized views for fast access to data

Materialized views allow pre-composed results from complex queries on large tables for faster access. The producer cluster exposes data as materialized views to simplify access for the consumer cluster. This also allows flexibility at the producer cluster to update the underlying table structure to address new business use cases, without affecting consumer-dependent queries and enabling a loose coupling.


In this post, we demonstrated how to process and analyze large-scale data from streaming and batch sources using Amazon Redshift as the core of the platform guided by the Lambda architecture principles.

You started by collecting real-time data from connected vehicles, and storing the streaming data in an Amazon S3 data lake through Kinesis Data Firehose. The solution simultaneously processes the data for near-real-time analysis through Amazon Redshift streaming ingestion.

Through the data sharing feature, you were able to share live, up-to-date data to an Amazon Redshift Serverless endpoint (serving cluster), which merges the data from the speed layer (near-real time) and batch layer (batch analysis) to provide low-latency access to data from near-real-time analysis to historical trends.

Click here to get started with this solution today and let us know how you implemented this solution in your organization through the comments section.

About the Authors

Jagadish Kumar is a Sr Analytics Specialist Solutions Architect at AWS. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS. He is an avid college football fan and enjoys reading, watching sports and riding motorcycle.

Thiyagarajan Arumugam is a Big Data Solutions Architect at Amazon Web Services and designs customer architectures to process data at scale. Prior to AWS, he built data warehouse solutions at Amazon.com. In his free time, he enjoys all outdoor sports and practices the Indian classical drum mridangam.

Eesha Kumar is an Analytics Solutions Architect with AWS. He works with customers to realize business value of data by helping them building solutions leveraging AWS platform and tools.

Query your data streams interactively using Kinesis Data Analytics Studio and Python

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/

Amazon Kinesis Data Analytics Studio makes it easy for customers to analyze streaming data in real time, as well as build stream processing applications powered by Apache Flink using standard SQL, Python, and Scala. Just a few clicks in the AWS Management console lets customers launch a serverless notebook to query data streams and get results in seconds. Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications. Apache Flink is an open-source framework and engine for processing data streams. It’s highly available and scalable, and it delivers high throughput and low latency for stream processing applications.

Customers running Apache Flink workloads face the non-trivial challenge of developing their distributed stream processing applications without having true visibility into the steps conducted by their application for data processing. Kinesis Data Analytics Studio combines the ease-of-use of Apache Zeppelin notebooks, with the power of the Apache Flink processing engine, to provide advanced streaming analytics capabilities in a fully-managed offering. Furthermore, it accelerates developing and running stream processing applications that continuously generate real-time insights.

In this post, we will introduce you to Kinesis Data Analytics Studio and get started querying data interactively from an Amazon Kinesis Data Stream using the Python API for Apache Flink (Pyflink). We will use a Kinesis Data Stream for this example, as it is the quickest way to begin. Kinesis Data Analytics Studio is also compatible with Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Simple Storage Service (Amazon S3), and various other data sources supported by Apache Flink.


  • Kinesis Data Stream
  • Data Generator

To follow this guide and interact with your streaming data, you will need a data stream with data flowing through.

Create a Kinesis Data Stream

You can create these streams using either the Amazon Kinesis console or the following AWS Command Line Interface (AWS CLI) command. For console instructions, see Creating and Updating Data Streams in the Kinesis Data Streams Developer Guide.

To create the data stream, use the following Kinesis create-stream AWS CLI command. Your data stream will be named input-stream.

$ aws kinesis create-stream \
--stream-name input-stream \
--shard-count 1 \
--region us-east-1

Creating a Kinesis Data Analytics Studio notebook

You can start interacting with your data stream by following these steps:

  1. Open the AWS Management Console and navigate to Amazon Kinesis Data Analytics for Apache Flink
  2. Select the Studio tab on the main page, and select Create Studio Notebook.
  3. Enter the name of your Studio notebook, and let Kinesis Data Analytics Studio create an AWS Identity and Access Management (IAM) role for this. You can create a custom role for specific use cases using the IAM Console.
  4. Choose an AWS Glue Database to store the metadata around your sources and destinations used by Kinesis Data Analytics Studio.
  5. Select Create Studio Notebook.

We will keep the default settings for the application, and we can scale up as needed.

Once the application has been created, select Start to start the Apache Flink application. This will take a few minutes to complete, at which point you can Open in Apache Zeppelin.

Write Sample Records to the Data Stream

In this section, you can create a Python script within the Apache Zeppelin notebook to write sample records to the stream for the application to process.

Select Create a new note in Apache Zeppelin, and name the new notebook stock-producer with the following contents:

import datetime
import json
import random
import boto3

STREAM_NAME = "input-stream"
REGION = "us-east-1"

def get_data():
    return {
        'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(["BTC","ETH","BNB", "XRP", "DOGE"]),
        'price': round(random.random() * 100, 2)}

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()

if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name=REGION))

You can run the stock-producer paragraph to begin publishing messages to your Kinesis Data Stream either by pressing SHIFT + ENTER on the paragraph, or by selecting the Play button in the top-right of the paragraph.

Feel free to close or navigate away from this notebook for now, as it will continue publishing events indefinitely.

Note that this will continue publishing events until the notebook is paused or the Apache Flink cluster is shut down.

Example Applications

Apache Zeppelin supports the Apache Flink interpreter and allows for the direct use of Apache Flink within a notebook for interactive data analysis. Within the Flink Interpreter, three languages are supported at this time—Scala, Python (PyFlink), and SQL. The notebook requires a specification to one of these languages at the top of each paragraph to interpret the language properly.

%flink          - Scala environment 
%flink.pyflink  - Python Environment
%flink.ipyflink - ipython Environment
%flink.ssql     - Streaming SQL Environment
%flink.bsql     - Batch SQL Environment 

There are several other predefined variables per interpreter, such as the senv variable in Scala for a StreamExecutionEnvironment, and st_env in python for the same. A full list of these entry point variables can be found here. Now we will showcase the capabilities of Apache Flink in Python (Pyflink) by providing code samples for the most common use cases.

How to follow along

If you would like to follow along with this walkthrough, we have provided the Kinesis Data Analytics Studio notebook here with comments and context. Once you have created your Kinesis Data Analytics application, you can download the file and upload it to Kinesis Data Analytics studio.

Once you have imported the notebook, you should be able to follow along with the remainder of the post as you try it out!

Create a source table for Kinesis

Using the %flink.pyflink header to signify that this code block will be interpreted via the Python Flink interpreter, we’re creating a table called stock_table with a ticker, price, and event_time column that signifies the time at which the price was recorded for the ticker. The WATERMARK clause defines the watermark strategy for generating watermarks according to the event_time (row_time) column. The event_time column must be defined as Timestamp(3) and be a top-level column to be used in conjunction with watermarks. The syntax following the WATERMARK definition—FOR event_time AS event_time - INTERVAL '5' SECOND declares that watermarks will be emitted according to a bounded out of orderness watermark strategy that allows for a five second delay in event_time data.

To learn more about event time and watermarks, read about the techniques implemented by Apache Flink here.

The table defined below uses the Kinesis connector to read from a kinesis data stream called input-stream in the us-east-1 region from the latest stream position.

In this example, we are utilizing the Python interpreter’s built-in streaming table environment variable, st_env, to execute a SQL DDL statement. The streaming table environment provides access to the Table API within pyflink and uses the blink planner to optimize the job graph. This planner translates queries into a DataStream program regardless of whether the input is batch or streaming.

If the table already exists in the AWS Glue Data Catalog, then this statement will issue an error stating that the table already exists.

CREATE TABLE stock_table (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = 'input-stream',
                'aws.region' = 'us-east-1',
                'scan.stream.initpos' = 'LATEST',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """)

The screenshot above showcases the successful execution of this paragraph. We can verify the results by checking in the AWS Glue Data Catalog for the accompanying table.

To find this, navigate back to the AWS Management Console, and then search for Glue. Once here, locate the Glue database that you chose for our Kinesis Data Analytics application, and select it. You should see a link toward the bottom of the Databases view that lets you view the Tables in your database. Furthermore, you can directly select Tables in the left-hand side. Locate the table that we created in the previous step, called stock_table.

Here we can see that the table was not only created in Kinesis Data Analytics studio, but also durably persisted in a Glue Data Catalog table for reference from other applications or between runs of your application.

Tumbling windows

Performing a tumbling window in the Python Table API first requires the definition of an in-memory reference to the table created in Step 1. We use the st_env variable to define this table using the from_path function and referencing the table name. Once this is created, then we can create a windowed aggregation over one minute of data, according to the event_time column.

Note that you could also perform this transformation entirely in Flink SQL, as described in this blog post. We’re simply showcasing the features of the Pyflink API. The blog post linked above also showcases many different window operators that you might perform, such as sliding windows, group windows, over windows, session windows, etc. The windowing choice is entirely use-case dependent.

from pyflink.table.expressions import col, lit

stock_table = st_env.from_path("stock_table")

 # tumble over 1 minute, then group by that window and sum the number of trades over that time
count_table = stock_table.window(
                     Tumble.over(lit(1).minute).on(stock_table.event_time).alias("one_minute_window")) \
                           .group_by(col("one_minute_window"), col("ticker")) \
                           .select(col("ticker"), col("price").sum.alias("sum_price"), col("one_minute_window").end.alias("minute_window"))

Use the ZeppelinContext to visualize the Python Table aggregation within the notebook.


z.show(count_table, stream_type="update")

This image shows the count_table we defined previously displayed as a pie chart within the Apache Zeppelin notebook.

User-defined functions

To use and reuse common business logic into an operator, it can be useful to reference a User-defined function to transform your Data stream. This can be done either within the Kinesis Data Analytics notebook, or as an externally referenced application jar file. Utilizing User-defined functions can simplify the transformations or data enrichments that you might perform over streaming data.

In our notebook, we will be referencing a simple Java application jar that computes an integer hash of our ticker symbol. You can also write Python or Scala UDFs for use within the notebook. We chose a Java application jar to highlight the functionality of importing an application jar into a Pyflink notebook.

package com.aws.kda.udf;

import org.apache.flink.table.functions.ScalarFunction;

// The Java class must have a public no-argument constructor and can be founded in current Java classloader.
public class HashFunction extends ScalarFunction {
    private int factor = 12;

    public int eval(String s) {
        return s.hashCode() * factor;

You can find the application jar here.

  1. Create and package this jar, or download the link above.
  2. Next, upload this application jar to an Amazon S3 bucket to be referenced by our Kinesis Data Analytics Studio notebook.
  3. Head back to the Kinesis Data Analytics studio notebook, and under Configuration locate the User-defined functions box. From here, select Add user-defined function, and use the add wizard to locate your uploaded Java jar to reference it.

Once you save changes, the application will take a few minutes to update before you can open it again.

Open the notebook once it has been restarted so that we can reference our UDF.

st_env.create_java_temporary_function("hash", "com.aws.kda.udf.HashFunction")

hash_ticker = stock_table.select("ticker, hash(ticker) as secret_ticker_key, event_time")

Now we can view this newly transformed data from the hash_ticker table context.

st_env.create_java_temporary_function("hash", "com.aws.kda.udf.HashFunction")

hash_ticker = stock_table.select("ticker, hash(ticker) as secret_ticker_key, event_time")

The screenshot above showcases data being displayed in a tabular format from our hashed results set.

The screenshot above showcases data being displayed in a tabular format from our hashed results set.

Enable checkpointing

To utilize the fault-tolerant features of the Streaming File Sink (writing data to Amazon S3), we must enable checkpointing within our Apache Flink application. This setting isn’t enabled by default on any Kinesis Data Analytics Studio notebook. However, it can be enabled by simply accessing the streaming environment variable’s configuration and setting the proper string accordingly:

z.show(hash_ticker, stream_type="update")

Writing results out to Amazon S3

In the same way that we ingested data into Kinesis Data Analytics Studio, we will create another table, called a sink, that will be responsible for taking data within Kinesis Data Analytics Studio and writing it out to Amazon S3 using the Apache Flink Filesystem connector. This connector does require checkpoints to commit data to a Filesystem, hence the previous step.

First, let’s create the table.


table_name = "output_table"
bucket_name = "kda-python-sink-bucket"

st_env.execute_sql("""CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3)
              PARTITIONED BY (ticker)
              WITH (
                  'sink.partition-commit.delay' = '1 min'
        table_name, bucket_name))

Next, we can perform the insert by calling the streaming table environment’s execute_sql function.

table_result = st_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}".format("output_table", "hash_ticker"))

The return value table_result is a pyflink table TableResult object. This lets you query and interact with the Flink job that is operating in the background.

Since we’ve set our checkpointing interval to one minute, wait at least one minute with data flowing to see data in your Amazon S3 bucket.

To stop the Amazon S3 sink process, run the following cell:



A Studio notebook application consists of one or more tasks. You can split an application task into several parallel instances for execution, where each parallel instance processes a subset of the task’s data. The number of parallel instances of a task is called its parallelism, and adjusting that helps execute your tasks more efficiently.

Upon creation, Studio notebooks are given four parallel Kinesis Processing Units (KPU) which make up the application parallelism. To increase that parallelism, navigate to the Kinesis Data Analytics Studio Management Console, select your application name, and select the Configuration tab.

The screenshot above shows the Kinesis Data Analytics Studio console configuration page, where we can note the runtime environment, IAM Role, and modify things like the number of KPU’s the application is allocated.

  1. From this page, under the Scaling section, select Edit and modify the Parallelism entry. We don’t recommend increasing the Parallelism Per KPU setting higher than 1 unless your application is I/O bound.
  2. Select Save Changes to increase/decrease your application’s parallelism.


When you have thoroughly tested and iterated on your application code within a Kinesis Data Analytics Studio notebook, you may choose to promote your notebook to a Kinesis Data Analytics for Apache Flink application with durable state. The benefits of doing this include having full fault tolerance with stateful operations, such as checkpointing, snapshotting, and autoscaling based on CPU usage.

To promote your Kinesis Data Analytics Studio notebook to a Kinesis Data Analytics for Apache Flink application:

  1. Navigate to the top-right of your notebook and select Actions for <<notebook name>>.
  2. First, select Build <<notebook name>> and export to Amazon S3.
  3. Once this process finishes, select Deploy <<notebook name>> as Kinesis Analytics Application. This will open a modal.
  4. Then, select Deploy using AWS Console.
  5. On the next screen, you can enter the following
    1. An optional description
    2. The same IAM role that you used for your Kinesis Data Analytics Studio notebooks.
  6. Then, select Create streaming application. Once the process finishes, you will see a Streaming Application preconfigured with the code supplied by your Kinesis Data Analytics studio notebook.
  7. Select Run to start your application.

Make sure that you have stopped all paragraphs in your Kinesis Data Analytics studio notebook so as not to contend for resources with your Kinesis Data Stream.

When the application has started, you should begin to see new data flowing into your Amazon S3 bucket in an entirely fault-tolerant and stateful manner.

Congratulations! You’ve just promoted a Kinesis Data Analytics studio notebook to Kinesis Data Analytics for Apache Flink!


Kinesis Data Analytics Studio makes developing stream processing applications using Apache Flink much faster. Moreover, all of this is done with rich visualizations, a scalable and user-friendly interface to develop and collaborate on pipelines, and the flexibility of language choice to make any streaming workload performant and powerful. Users can run paragraphs from within the notebook as described in this post, or choose to promote their Studio notebook to a Kinesis Data Analytics for Apache Flink application with durable state.

For more information, please see the following documentation:

About the Author

Jeremy Ber has been working in the telemetry data space for the past five years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. In the past, Jeremy has supported and built systems that stream in terabytes of data-per-day, and process complex Machine Learning Algorithms in real-time. At AWS, he is a Solutions Architect Streaming Specialist supporting both Managed Streaming for Kafka (Amazon MSK) and Amazon Kinesis services.

Use Amazon CodeGuru Profiler to monitor and optimize performance in Amazon Kinesis Data Analytics applications for Apache Flink

Post Syndicated from Praveen Panati original https://aws.amazon.com/blogs/big-data/use-amazon-codeguru-profiler-to-monitor-and-optimize-performance-in-amazon-kinesis-data-analytics-applications-for-apache-flink/

Amazon Kinesis Data Analytics makes it easy to transform and analyze streaming data and gain actionable insights in real time with Apache Flink. Apache Flink is an open-source framework and engine for processing data streams in real time. Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications using open-source libraries and integrating with other AWS services.

Kinesis Data Analytics is a fully managed service that takes care of everything required to run real-time streaming applications continuously and scale automatically to match the volume and throughput of your incoming data.

As you start building and deploying business-critical, highly scalable, real-time streaming applications, it’s important that you continuously monitor applications for health and performance, and optimize the application to meet the demands of your business.

With Amazon CodeGuru Profiler, developers and operations teams can monitor the following:

You can use CodeGuru Profiler to analyze the application’s performance characteristics and bottlenecks in the application code by capturing metrics such as CPU and memory utilization. You can use these metrics and insights to identify the most expensive lines of code; optimize for performance; improve stability, latency, and throughput; and reduce operational cost.

In this post, we discuss some of the challenges of running streaming applications and how you can use Amazon Kinesis Data Analytics for Apache Flink to build reliable, scalable, and highly available streaming applications. We also demonstrate how to set up and use CodeGuru Profiler to monitor an application’s health and capture important metrics to optimize the performance of Kinesis Data Analytics for Apache Flink applications.


Streaming applications are particularly complex in nature. The data is continuously generated from a variety of sources with varying amounts of throughput. It’s critical that the application infrastructure scales up and down according to these varying demands without becoming overloaded, and not run into operational issues that might result in downtime.

As such, it’s crucial to constantly monitor the application for health, and identify and troubleshoot the bottlenecks in the application configuration and application code to optimize the application and the underlying infrastructure to meet the demands while also reducing the operational costs.

What Kinesis Data Analytics for Apache Flink and CodeGuru Profiler do for you

With Kinesis Data Analytics for Apache Flink, you can use Java, Scala, and Python to process and analyze real-time streaming data using open-source libraries based on Apache Flink. Kinesis Data Analytics provides the underlying infrastructure for your Apache Flink applications. It handles core capabilities such as provisioning compute resources, parallel computation, automatic scaling, and application backups (implemented as checkpoints and snapshots) to rapidly create, test, deploy, and scale real-time data streaming applications using best practices. This allows developers to focus more on application development and less on Apache Flink infrastructure management.

With CodeGuru Profiler, you can quickly and easily monitor Kinesis Data Analytics for Apache Flink applications to:

  • Identify and troubleshoot CPU and memory issues using CPU and memory (heap summary) utilization metrics
  • Identify bottlenecks and the application’s most expensive lines of code
  • Optimize application performance (latency, throughput) and reduce infrastructure and operational costs

Solution overview

In this post, we use a sample Java application deployed as a Kinesis Data Analytics application for Apache Flink, which consumes the records from Amazon Kinesis Data Streams and uses Apache Flink operators to generate real-time actionable insights. We use this sample to understand and demonstrate how to integrate with CodeGuru Profiler to monitor the health and performance of your Kinesis Data Analytics applications.

The following diagram shows the solution components.

At a high level, the solution covers the following steps:

  1. Set up, configure, and deploy a sample Apache Flink Java application on Kinesis Data Analytics.
  2. Set up CodeGuru Profiler.
  3. Integrate the sample Apache Flink Java application with CodeGuru Profiler.
  4. Use CodeGuru Profiler to analyze, monitor, and optimize application performance.

Set up a sample Apache Flink Java application on Kinesis Data Analytics

Follow the instructions in the GitHub repo and deploy the sample application that includes source code as well as AWS CloudFormation templates to deploy the Kinesis Data Analytics for Apache Flink application.

For this post, I deploy the stack in the us-east-1 Region.

After you deploy the sample application, you can test the application by running the following commands, and providing the correct parameters for the Kinesis data stream and Region.

The Java application has already been downloaded to an EC2 instance that has been provisioned by AWS CloudFormation; you just need to connect to the instance and run the JAR file to start ingesting events into the stream.

$ ssh [email protected]«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-*.jar -streamName «Kinesis data stream name» -streamRegion «AWS region» -speedup 3600

Set up CodeGuru Profiler

Set up and configure CodeGuru Profiler using the AWS Management Console. For instructions, see Set up in the CodeGuru Profiler console.

For this post, I create a profiling group called flinkappdemo in the us-east-1 Region.

In the next section, I demonstrate how to integrate the sample Kinesis Data Analytics application with the profiling group.

Integrate the sample Apache Flink Java application with CodeGuru Profiler

Download the source code that you deployed earlier and complete the following steps to integrate CodeGuru Profiler to the Java application:

  1. Include the CodeGuru Profiler agent in your application by adding the following dependencies to your pom.xml file:
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  2. Add the CodeGuru Profiler agent configuration code to the Apache Flink Operators (functions), as shown in the following code.

Because multiple operators and operator instances can run on the same TaskManager JVM, and because one instance of the profiler can capture all events in a JVM, you just need to enable the profiler on an operator that is guaranteed to be present on all TaskManager JVMs. For this, you can pick the operator with the highest parallelism. In addition, you could instantiate the profiler as a singleton such that there is one instance per JVM.

public class CountByGeoHash implements WindowFunction<TripGeoHash, PickupCount, String, TimeWindow> {

  static {
    new Profiler.Builder()
            .withHeapSummary(false) // optional - to start without heap profiling set to false or remove line
public class TripDurationToAverageTripDuration implements WindowFunction<TripDuration, AverageTripDuration, Tuple2<String, String>, TimeWindow> {

  static {
    new Profiler.Builder()
            .withHeapSummary(false) // optional - to start without heap profiling set to false or remove line
  1. Build the application using the following command:
    mvn clean package

The preceding command packages the application into a JAR file.

  1. Copy and replace the JAR file in the Amazon Simple Storage Service (Amazon S3) bucket that was created as part of the CloudFormation stack.
  2. Choose Save changes to update the application.

This step allows the application to use the latest JAR file that contains the CodeGuru Profiler code to start profiling the application.

Use CodeGuru Profiler to analyze, monitor, and optimize application performance

Now that the application has been configured to use CodeGuru Profiler, you can use the metrics and visualizations to explore profiling data collected from the application.

Run the following commands from when you set up your application to start ingesting data into the Kinesis data stream and enable CodeGuru Profiler to profile the application and gather metrics:

$ ssh [email protected]«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-*.jar -streamName «Kinesis data stream name» -streamRegion «AWS region» -speedup 3600

On the CodeGuru console, navigate to flinkappdemo on the Profiling groups page.

The summary page displays the status of your profiling group as well as the relevant metrics gathered while profiling the application.

In the following sections, we discuss the metrics and reports on this page in more detail.

CPU summary

Use this summary and the associated metrics CPU utilization and Time spent executing code to understand how much of the instance’s CPU resources are consumed by the application and how frequently the application’s JVM threads were in the RUNNABLE state. This helps you measure the application’s time spent running operations on the CPU so you can tune your application code and configuration.

With the CPU utilization metric, a low value (such as less than 10%) indicates your application doesn’t consume a large amount of the system CPU capacity. This means there could be an opportunity to scale in the application parallelism to reduce cost. A high value (over 90%) indicates your application is consuming a large amount of system CPU capacity. This means there is likely value in looking at your CPU profiles and recommendations for areas of optimization.

When examining the time spent running code, a high percentage (over 90%) indicates most of your application’s time is spent running operations on the CPU. A very low percentage (under 1%) indicates that most of your application was spent in other thread states (such as BLOCKED or WAITING) and there may be more value in looking at the latency visualization, which displays all non-idle thread states, instead of the CPU visualization.

For more information on understanding the CPU summary, see CPU summary.

Latency summary

Use this summary and the metrics Time spent blocked and Time spent waiting to understand what sections of the code are causing threads to block and threads that are waiting to tune your application code and configuration. For more information, see Latency summary.

The CPU summary and latency visualization can help you analyze the thread blocking and wait operations to further identify bottlenecks and tune your application’s performance and configuration.

Heap usage

Use this summary and the metrics Average heap usage and Peak heap usage to understand how much of your application’s maximum heap capacity is consumed by your application and to spot memory leaks. If the graph grows continuously over time, that could be an indication of a memory leak.

With the average heap usage metric, a high percentage (over 90%) could indicate that your application is close to running out of memory most of the time. If you wish to optimize this, the heap summary visualization shows you the object types consuming the most space on the heap. A low percentage (less than 10%) may indicate that your JVM is being provided much more memory than it actually requires and cost savings may be available by scaling in the application parallelism, although you should check the peak usage too.

Peak heap usage shows the highest percentage of memory consumed by your application seen by the CodeGuru Profiler agent. This is based on the same dataset as seen in the heap summary visualization. A high percentage (over 90%) could indicate that your application has high spikes of memory usage, especially if your average heap usage is low.

For more information on the heap summary, see Understanding the heap summary.

Anomalies and recommendation reports

CodeGuru Profiler uses machine learning to detect and alert on anomalies in your application profile and code. Use this to identify parts of the code for performance optimization and potential savings.

The issues identified during analysis are included in the recommendations report. Use this report to identify potential outages, latency, and other performance issues. For more information on how to work with anomalies and recommendations, see Working with anomalies and recommendation reports.


You can use visualizations associated with the preceding metrics to drill down further to identify what parts of the application configuration and application code are impacting the performance, and use these insights to improve and optimize application performance.

CodeGuru Profiler supports three types of visualizations and a heap summary to display profiling data collected from applications:

Let’s explore the profiling data collected from the preceding steps to observe and monitor application performance.

CPU utilization

The following screenshot shows the snapshot of the application’s profiling data in a flame graph visualization. This view provides a bottom-up view of the application’s profiling data, with the X-axis showing the stack profile and the Y-axis showing the stack depth. Each rectangle represents a stack frame. This visualization can help you identify specific call stacks that lead to inefficient code by looking at the top block function on CPU. This may indicate an opportunity to optimize.

Recommendation report with opportunities to optimize the application

Use the recommendation report to identify and correlate the sections of the application code that can be improved to optimize the application performance. In our example, we can improve the application code by using StringBuilder instead of String.format and by reusing the loggers rather than reinitializing them repetitively, and also by selectively applying the debug/trace logging, as recommended in the following report.

Hotspot visualization

The hotspot visualization shows a top-down view of the application’s profiling data. The functions that consume the most CPU time are at the top of the visualization and have the widest block. You can use this view to investigate functions that are computationally expensive.

Latency visualization

In this mode, you can visualize frames with different thread states, which can help you identify functions that spent a lot of time being blocked on shared resources, or waiting for I/O or sleeping. You can use this view to identify threads that are waiting or dependent on other threads and use it to improve latency on all or parts of your application.

You can inspect a visualization to further analyze any frame by selecting a frame and then choosing (right-click) the frame and choosing Inspect.

Heap summary

This summary view shows how much heap space your application requires to store all objects required in memory after a garbage collection cycle. If this value continuously grows over time until it reaches total capacity, that could be an indication of a memory leak. If this value is very low compared to total capacity, you may be able to save money by reducing your system’s memory.

For more information on how to work and explore data with visualizations, refer to Working with visualizations and Exploring visualization data.

Clean up

To avoid ongoing charges, delete the resources you created from the previous steps.

  1. On the CodeGuru console, choose Profiling groups in the navigation pane.
  2. Select the flinkappdemo profiling group.
  3. On the Actions meu, choose Delete profiling group.
  4. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  5. Select the stack you deployed (kinesis-analytics-taxi-consumer) and choose Delete.


This post explained how to configure, build, deploy, and monitor real-time streaming Java applications using Kinesis Data Analytics applications for Apache Flink and CodeGuru. We also explained how you can use CodeGuru Profiler to collect runtime performance data and metrics that can help you monitor application health and optimize your application performance.

For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics Developer Guide.

Several customers are now using CodeGuru Profiler to monitor and improve application performance, and you too can start monitoring your applications by following the instructions in the product documentation. Head over to the CodeGuru console to get started today!

About the Author

Praveen Panati is a Senior Solutions Architect at Amazon Web Services. He is passionate about cloud computing and works with AWS enterprise customers to architect, build, and scale cloud-based applications to achieve their business goals. Praveen’s area of expertise includes cloud computing, big data, streaming analytics, and software engineering.

How Net at Work built an email threat report system on AWS

Post Syndicated from Florian Mair original https://aws.amazon.com/blogs/architecture/how-net-at-work-built-an-email-threat-report-system-on-aws/

Emails are often used as an entry point for malicious software like trojan horses, rootkits, or encryption-based ransomware. The NoSpamProxy offering developed by Net at Work tackles this threat, providing secure and confidential email communication.

A subservice of NoSpamProxy called 32guards is responsible for threat reports of inbound and outbound emails. With the increasing number of NoSpamProxy customers, 32guards was found to have several limitations. 32guards was previously built on a relational database. But with the growth in traffic, this database was not able to keep up with storage demands and expected query performance. Further, the relational database schema was limiting the possibilities of complex pattern detections, due to performance limitations. The NoSpamProxy team decided to rearchitect the service based on the Lake House approach.

The goal was to move away from a one-size-fits-all approach for data analytics and integrate a data lake with purpose-built data stores, unified governance, and smooth data movement.

This post shows how Net at Work modernized their 32guards service, from a relational database to a fully serverless analytics solution. With adoption of the Well-Architected Analytics Lens best practices and the use of fully managed services, the 32guards team was able to build a production-ready application within six weeks.

Architecture for email threat reports and analytics

This section gives a walkthrough of the solution’s architecture, as illustrated in Figure 1.

Figure 1. 32guards threat reports architecture

Figure 1. 32guards threat reports architecture

1. The entry point is an Amazon API Gateway, which receives email metadata in JSON format from the NoSpamProxy fleet. The message contains information about the email in general, email attachments, and URLs in the email. As an example, a subset of the data is presented in JSON as follows:

  "Attachments": [
      "Sha256Hash": "69FB43BD7CCFD79E162B638596402AD1144DD5D762DEC7433111FC88EDD483FE",
      "Classification": 0,
      "Filename": "test.ods.tar.gz",
      "DetectedMimeType": "application/tar+gzip",
      "Size": 5895
  "Urls": [
      "Url": "http://www.aarhhie.work/",
      "Classification": 0,
    },        {
      "Url": "http://www.netatwork.de/",
      "Classification": 0,
      "Url": "http://aws.amazon.com/",
      "Classification": 0,

2. This JSON message is forwarded to an AWS Lambda function (called “frontend”), which takes care of the further downstream processing. There are two activities the Lambda function initiates:

  • Forwarding the record for real-time analysis/storage
  • Generating a threat report based on the information derived from the data stored in the indicators of compromises (IOCs) Amazon DynamoDB table

IOCs are patterns within the email metadata that are used to determine if emails are safe or not. For example, this could be for a suspicious file attachment or domain.

Threat report for suspicious emails

In the preceding JSON message, the attachments and URLs have been classified with “0” by the email service itself, which indicates that none of them look suspicious. The frontend Lambda function uses the vast number of IOCs stored in the DynamoDB table and heuristics to determine any potential threats within the email. The use of DynamoDB enables fast lookup times to generate a threat report. For the example, the response to the API Gateway in step 2 looks like this:

  "ReportedOnUtc": "2021-10-14T14:33:34.5070945Z",
  "Reason": "realtimeSuspiciousOrganisationalDomain",
  "Identifier": "aarhhie.work",

This threat report shows that the top-level domain “aarhiie.work” has been detected as suspicious. The report is used to determine further actions for the email, such as blocking.

Real-time data processing

3. In the real-time analytics flow, the frontend Lambda function ingests email metadata into a data stream using Amazon Kinesis Data Streams. This is a massively scalable, serverless, and durable real-time data streaming service. Compared to a queue, streaming storage permits more than one consumer of the same data.

4. The first consumer is an Apache Flink application running in Amazon Kinesis Data Analytics. This application generates statistical metrics (for example, occurrences of the top-level domain “.work”). The output is stored in Apache Parquet format on Amazon S3. Parquet is a columnar storage format for row-based files like csv.

The second consumer of the streaming data is Amazon Kinesis Data Firehose. Kinesis Data Firehose is a fully managed solution to reliably load streaming data into data lakes, data stores, and analytics services. Within the 32guards service, Kinesis Data Firehose is used to store all email metadata into Amazon S3. The data is stored in Apache Parquet format, which makes queries more time and cost efficient.

IOC detection

Now that we have shown how data is ingested and threat reports are generated to respond quickly to requests, let’s look at how the IOCs are updated. These IOCs are used for generating the threat report within the “frontend” Lambda function. As attack vectors are changing over time, quickly analyzing the data for new threats, is crucial to provide high-quality reports to the NoSpamProxy service.

The incoming email metadata is stored every few minutes in Amazon S3 by Kinesis Data Firehose. To query data directly in Amazon S3, Amazon Athena is used. Athena is a serverless query service that analyzes data stored in Amazon S3, by using standard SQL syntax.

5. To be able to query data in S3, Amazon Athena uses the AWS Glue Data Catalog, which contains the structure of the email metadata stored in the data lake. The data structure is derived from the data itself using AWS Glue Crawlers. Other external downstream processing services like business intelligence applications, also use Amazon Athena to consume the data.

6. Athena queries are initiated on a predefined schedule to update or generate new IOCs. The results of these queries are stored in the DynamoDB table to enable fast lookup times for the “frontend” Lambda.


In this blog post, we showed how Net at Work modernized their 32guards service within their NoSpamProxy product. The previous architecture used a relational database to ingest and store email metadata. This database was running into performance and storage issues, and must be redesigned into a more performant and scalable architecture.

Amazon S3 is used as the storage layer, which can scale up to exabytes of data. With Amazon Athena as the query engine, there is no need to operate a high-performance database cluster, as compute and storage is separated. By using Amazon Kinesis Data Streams and Amazon Kinesis Data Analytics, valuable insight can be generated in real time, and acted upon more quickly.

As a serverless, fully managed solution, the 32guards service has a lower-cost footprint of as much as 50% and requires less maintenance. By moving away from a relational database model, the query runtimes decrease significantly. You can now conduct analyses that have not been feasible before.

Interested in the NoSpamProxy? Read more about NoSpamProxy or sign up for a free trial.

Looking for more architecture content? AWS Architecture Center provides reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more!

How Cynamics built a high-scale, near-real-time, streaming AI inference system using AWS

Post Syndicated from Aviv Yehezkel original https://aws.amazon.com/blogs/big-data/how-cynamics-built-a-high-scale-near-real-time-streaming-ai-inference-system-using-aws/

This post is co-authored by Dr. Yehezkel Aviv, Co-Founder and CTO of Cynamics and Sapir Kraus, Head of Engineering at Cynamics.

Cynamics provides a new paradigm of cybersecurity — predicting attacks long before they hit by collecting small network samples (less than 1%), inferring from them how the full network (100%) behaves, and predicting threats using unique AI breakthroughs. The sample approach allows Cynamics to be generic, agnostic, and work for any client’s network architecture, no matter how messy the mix between legacy, private, and public clouds. Furthermore, the solution is scalable and provides full cover to the client’s network, no matter how large it is in volume and size. Moreover, because any network gateway (physical or virtual, legacy or cloud) supports one of the standard sampling protocols and APIs, Cynamics doesn’t require any installation of appliances nor agents, as well as no network changes and modifications, and the onboarding usually takes less than an hour.

In the crowded cybersecurity market, Cynamics is the first-ever solution based on small network samples, which has been considered a hard and unsolved challenge in academia (our academic paper “Network anomaly detection using transfer learning based on auto-encoders loss normalization” was recently presented in ACM CCS AISec 2021) and industry to this day.

The problem Cynamics faced

Early in the process, with the growth of our customer base, we were required to seamlessly support the increased scale and network throughput by our unique AI algorithms. We faced a few different challenges:

  • How can we perform near-real-time analysis on our streaming clients’ incoming data into our AI inference system to predict threats and attacks?
  • How can we seamlessly auto scale our solution to be cost-efficient with no impact on the platform ingestion rate?
  • Because many of our customers are from the public sector, how can we do this while supporting both AWS commercial and government environments (GovCloud)?

This post shows how we used AWS managed services and in particular Amazon Kinesis Data Streams and Amazon EMR to build a near-real-time streaming AI inference system serving hundreds of production customers in both AWS commercial and government environments, while seamlessly auto scaling.

Overview of solution

The following diagram illustrates our solution architecture:

To provide a cost-efficient, highly available solution that scales easily with user growth, while having no impact on near-real-time performance, we turned to Amazon EMR.

We currently process over 50 million records per day, which translates to just over 5 billion flows, and keeps growing on a daily basis. Using Amazon EMR along with Kinesis Data Streams provided the scalability we needed to achieve inference times of just a few seconds.

Although this technology was new to us, we minimized our learning curve by turning to the available guides from AWS for best practices on scale, partitioning, and resource management.


Our workflow contains the following steps:

  1. Flow samples are sent by the client’s network devices directly to the Cynamics cloud. A network flow (or connection) is a set of packets with the same five-tuple ID: source-IP-address, destination-IP-address, source-port, destination-port, and protocol.
  2. The samples are analyzed by Network Load Balancers, which forward them into an auto scaling group of stateless flow transformers running on Graviton-powered Amazon Elastic Compute Cloud (Amazon EC2) instances. With Graviton-based processors in the flow transformers, we reduced our operational costs by over 30%.
  3. The flows are transformed to the Cynamics data format and enriched with additional information from Cynamics’ databases and in-house sources such as IP resolutions, intelligence, and reputation.

The following figures show the network scale for a single flow transformer machine over a week. The first figure illustrates incoming network packets for a single flow transformer machine.

The following shows outcoming network packets for a single flow transformer machine.

The following shows incoming network bytes for a single flow transformer machine.

The following shows outcoming network bytes for a single flow transformer machine.

  1. The flows are sent using Kinesis Data Streams to the real-time analysis engine.
  2. The Amazon EMR-based real-time engine consumes records in a few seconds batches using Yarn/Spark. The sampling rate of each client is dynamically tuned according to its throughput to ensure a fixed incoming data rate for all clients. We achieved this using Amazon EMR Managed Scaling with a custom policy (available with Amazon EMR versions 5.30.1 and later), which allows us to scale EMR nodes in or out based on Amazon CloudWatch metrics, with two different rules for scale-out and scale-in. The metric we created is based on the Amazon EMR running time, because our real-time AI threat detection runs on a sliding window interval of a few seconds.
    1. The scale-out policy tracks the average running time over a period of 10 minutes, and scales the EMR nodes if it’s longer than 95% of the required interval. This allows us to prevent processing delays.
    2. Similarly, the scale-in policy uses the same metric but measures the average over a 30-minute period, and scales the cluster accordingly. This enables us to optimize cluster costs and reduce the number of EMR nodes in off-hours.
  3. To optimize and seamlessly scale our AI inference calls, these were made available through an ALB and another auto scaling group of servers (AI model-service).
  4. We use Amazon DynamoDB as a fast and highly available states table.

The following figure shows the number of records processed by the Kinesis data stream over a single day.

The following shows the Kinesis data streams records rate per minute.

AI predictions and threat detections are sent to continued processing and alerting, and are saved in Amazon DocumentDB (with MongoDB compatibility).


With the approach described in this post, Cynamics has been providing threat prediction based on near-real-time analysis of its unique AI algorithms for a constantly growing customer base in a seamless and automatically scalable way. Since first implementing the solution, we’ve managed to easily and linearly scale our architecture, and were able to further optimize our costs by transitioning to Graviton-based processors in the flow transformers, which reduced over 30% of our flow transformers costs.

We’re considering the following next steps:

  • An automatic machine learning lifecycle using an Amazon SageMaker Studio pipeline, which includes the following steps:
  • Additional cost reduction by moving the EMR instances to be Graviton-based as well, which should yield an additional 20% reduction.

About the Authors

Dr. Yehezkel Aviv is the co-founder and CTO of Cynamics, leading the company innovation and technology. Aviv holds a PhD in Computer Science from the Technion, specializing in cybersecurity, AI, and ML.

Sapir Kraus is Head of Engineering at Cynamics, where his core focus is managing the software development lifecycle. His responsibilities also include software architecture and providing technical guidance to team members. Outside of work, he enjoys roasting coffee and barbecuing.

Omer Haim is a Startup Solutions Architect at Amazon Web Services. He helps startups with their cloud journey, and is passionate about containers and ML. In his spare time, Omer likes to travel, and occasionally game with his son.

Automating Anomaly Detection in Ecommerce Traffic Patterns

Post Syndicated from Aditya Pendyala original https://aws.amazon.com/blogs/architecture/automating-anomaly-detection-in-ecommerce-traffic-patterns/

Many organizations with large ecommerce presences have procedures to detect major anomalies in their user traffic. Often, these processes use static alerts or manual monitoring. However, the ability to detect minor anomalies in traffic patterns near real-time can be challenging. Early detection of these minor anomalies in ecommerce traffic (such as website page visits and order completions) helps organizations take corrective actions to address issues. This decreases negative impacts to business key performance indicators (KPIs).

In this blog post, we will demonstrate an artificial intelligence/machine learning (AI/ML) solution using AWS services. We’ll show how Amazon Kinesis and Amazon Lookout for Metrics can be used to detect major and minor anomalies near-real time, based on historical and current traffic trends.

The inconsistency of ecommerce traffic

The ecommerce traffic (and number of orders placed) varies based on season, month, date, and time of day. For example, ecommerce websites experience high traffic during weekday evening hours, compared to morning hours. Similarly, there is a spike in web traffic on weekends, compared to weekdays. However, the ecommerce traffic on holiday events (for example, Black Friday, Cyber Monday) does not follow this trend. Due to such dynamic and varying patterns, detecting minor anomalies in user traffic near-real time becomes difficult.

We need a smart solution that can detect the smallest deviation in user traffic based on historical data (date and time). As you can imagine, programming these trends based on static rules is time-intensive. In the next section, we discuss a solution that can help organizations automate and detect minor (and major) anomalies while still accounting for varying traffic trends.

The components of our anomaly detection solution

The architecture consists of three functional components:

  • The ecommerce application that customers use for interaction
  • The data ingesting, transforming, and storage platform
  • Anomaly detection and notification

This solution automates data ingestion and anomaly detection, and provides a graphical user interface to interact, tweak, and filter anomalies based on severity.

Figure 1 illustrates the architecture of this solution:

Figure 1. Architecture diagram of an anomaly detection solution for ecommerce traffic

Figure 1. Architecture diagram of an anomaly detection solution for ecommerce traffic

Let’s look at the individual components of this architecture before reviewing the overall solution.

The ecommerce application that customers use for interaction 

A customer’s journey of purchasing a product online involves user actions that include:

  • Searching for and viewing the product on the “Product Display Page” (PDP)
  • Adding to the “cart”
  • Completing the purchase on the “checkout“ page

The traffic on these pages is broken down into chunks based on time intervals. These serve as the data points that we can use to understand traffic patterns.

The data ingesting, transforming, and storage platform

Ecommerce applications generate data in multiple formats and in different volumes. This data must be fed into a streaming platform that can ingest and collect data continuously. Typically, the data must be transformed and stored for analysis and machine learning purposes. To satisfy these requirements, we will use Amazon Kinesis Data Streams as a streaming platform for data ingestion. Amazon Kinesis Data Firehose with AWS Lambda can transform the data. And we’ll store the data in Amazon Simple Storage Service (S3).

Anomaly detection and notification in near-real time

Once our data is ready, we must analyze it near-real time to identify anomalies. We must notify the concerned team about this anomaly so that they can take necessary corrective actions, if needed. We will use Lookout for Metrics and Amazon Simple Notification Service (SNS) to satisfy these requirements.

Lookout for Metrics can detect and diagnose anomalies in traffic patterns using ML. Amazon Lookout for Metrics accepts feedback on detected anomalies and tunes the results to improve accuracy over time. Lookout for Metrics is also capable of integrating with Amazon SNS, which can send notifications via SMS, mobile push, and emails.

Monitoring ecommerce traffic with Lookout for Metrics

As shown in Figure 1, data from user traffic and user interactions with the ecommerce application is captured as a function of time, and ingested into Kinesis Data Streams. Using Kinesis Data Firehose and Lambda, data is transformed and stored in an S3 bucket. We then create a detector in Lookout for Metrics and use the S3 bucket as the data source. Because of seamless integration between S3 and Lookout for Metrics, data from S3 bucket is automatically ingested into the detector we created.

Once the detector is activated, Lookout for Metrics will start monitoring the data for anomalies, and start identifying the anomalies near-real time. Lookout for Metrics also provides a mechanism to adjust severity threshold on a scale of 0-100, which will help decrease false positives as much as desired. In addition, it integrates with SNS, and can publish notifications to an SNS Topic. An email/ SMS or mobile push subscription can be created on this topic, which will notify users about any current anomalies.


In this post, we discussed how minor anomalies are hard to detect near-real time in ecommerce traffic of organizations. We also discussed the services that can be used to monitor these anomalies, such as Lookout for Metrics. Use this architecture to help you monitor, detect anomalies in near-real time, and reduce any negative impact to your business KPIs.

For further reading:

Automate Amazon Connect Data Streaming using AWS CDK

Post Syndicated from Tarik Makota original https://aws.amazon.com/blogs/architecture/automate-amazon-connect-data-streaming-using-aws-cdk/

Many customers want to provision Amazon Web Services (AWS) cloud resources quickly and consistently with lifecycle management, by treating infrastructure as code (IaC). Commonly used services are AWS CloudFormation and HashiCorp Terraform. Currently, customers set up Amazon Connect data streaming manually, as the service is not available under CloudFormation resource types. Customers may want to extend it to retrieve real-time contact and agent data. Integration is done manually and can result in issues with IaC.

Amazon Connect contact trace records (CTRs) capture the events associated with a contact in the contact center. Amazon Connect agent event streams are Amazon Kinesis Data Streams that provide near real-time reporting of agent activity within the Amazon Connect instance. The events published to the stream include these contact control panel (CCP) events:

  • Agent login
  • Agent logout
  • Agent connects with a contact
  • Agent status change, such as to available to handle contacts, or on break, or at training.

In this blog post, we will show you how to automate Amazon Connect data streaming using AWS Cloud Development Kit (AWS CDK). AWS CDK is an open source software development framework to define your cloud application resources using familiar programming languages. We will create a custom CDK resource, which in turn uses Amazon Connect API. This can be used as a template to automate other parts of Amazon Connect, or for other AWS services that don’t expose its full functionality through CloudFormation.

Overview of Amazon Connect automation solution

Amazon Connect is an omnichannel cloud contact center that helps you provide superior customer service. We will stream Amazon Connect agent activity and contact trace records to Amazon Kinesis. We will assume that data will then be used by other services or third-party integrations for processing. Here are the high-level steps and AWS services that we are going use, see Figure 1:

  1. Amazon Connect: We will create an instance and enable data streaming
  2. Cloud Deployment Toolkit: We will create custom resource and orchestrate automation
  3. Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose: To stream data out of Connect
  4. AWS Identity and Access Management (IAM): To govern access and permissible actions across all AWS services
  5. Third-party tool or Amazon S3: Used as a destination of Connect data via Amazon Kinesis data
Figure 1. Connect data streaming automation workflow

Figure 1. Connect data streaming automation workflow

Walkthrough and deployment tasks

Sample code for this solution is provided in this GitHub repo. The code is packaged as a CDK application, so the solution can be deployed in minutes. The deployment tasks are as follows:

  • Deploy the CDK app
  • Update Amazon Connect instance settings
  • Import the demo flow and data

Custom Resources enables you to write custom logic in your CloudFormation deployment. You implement the creation, update, and deletion logic to define the custom resource deployment.

CDK implements the AWSCustomResource, which is an AWS Lambda backed custom resource that uses the AWS SDK to provision your resources. This means that the CDK stack deploys a provisioning Lambda. Upon deployment, it calls the AWS SDK API operations that you defined for the resource lifecycle (create, update, and delete).


For this walkthrough, you need the following prerequisites:

Deploy and verify

1. Deploy the CDK application.

The resources required for this demo are packaged as a CDK app. Before proceeding, confirm you have command line interface (CLI) access to the AWS account where you would like to deploy your solution.

  • Open a terminal window and clone the GitHub repository in a directory of your choice:
    git clone [email protected]:aws-samples/connect-cdk-blog
  • Navigate to the cdk-app directory and follow the deployment instructions. The default Region is usually us-east-1. If you would like to deploy in another Region, you can run:
    export AWS_DEFAULT_REGION=eu-central-1

2. Create the CloudFormation stack by initiating the following commands.

source .env/bin/activate
pip install -r requirements.txt
cdk synth
cdk bootstrap
cdk deploy  --parametersinstanceId={YOUR-AMAZON-CONNECT-INSTANCE-ID}

--parameters ctrStreamName={CTRStream}

--parameters agentStreamName={AgentStream}

Note: By default, the stack will create contact trace records stream [ctrStreamName] as a Kinesis Data Stream. If you want to use an Amazon Kinesis Data Firehose delivery stream instead, you can modify this behavior by going to cdk.json and adding “ctr_stream_type”: “KINESIS_FIREHOSE” as a parameter under “context.”

Once the status of CloudFormation stack is updated to CREATE_COMPLETE, the following resources are created:

  • Kinesis Data Stream
  • IAM roles
  • Lambda

3. Verify the integration.

  • Kinesis Data Streams are added to the Amazon Connect instance
Figure 2. Screenshot of Amazon Connect with Data Streaming enabled

Figure 2. Screenshot of Amazon Connect with Data Streaming enabled

Cleaning up

You can remove all resources provisioned for the CDK app by running the following command under connect-app directory:

cdk destroy

This will not remove your Amazon Connect instance. You can remove it by navigating to the AWS Management Console -> Services -> Amazon Connect. Find your Connect instance and click Delete.


In this blog, we demonstrated how to maintain Amazon Connect as Infrastructure as Code (IaC). Using a custom resource of AWS CDK, we have shown how to automate setting Amazon Kinesis Data Streams to Data Streaming in Amazon Connect. The same approach can be extended to automate setting other Amazon Connect properties such as Amazon Lex, AWS Lambda, Amazon Polly, and Customer Profiles. This approach will help you to integrate Amazon Connect with your Workflow Management Application in a faster and consistent manner, and reduce manual configuration.

For more information, refer to Enable Data Streaming for your instance.

Evolve JSON Schemas in Amazon MSK and Amazon Kinesis Data Streams with the AWS Glue Schema Registry

Post Syndicated from Aditya Challa original https://aws.amazon.com/blogs/big-data/evolve-json-schemas-in-amazon-msk-and-amazon-kinesis-data-streams-with-the-aws-glue-schema-registry/

Data is being produced, streamed, and consumed at an immense rate, and that rate is projected to grow exponentially in the future. In particular, JSON is the most widely used data format across streaming technologies and workloads. As applications, websites, and machines increasingly adopt data streaming technologies such as Apache Kafka and Amazon Kinesis Data Streams, which serve as a highly available transport layer that decouples the data producers from data consumers, it can become progressively more challenging for teams to coordinate and evolve JSON Schemas. Adding or removing a field or changing the data type on one or more existing fields could introduce data quality issues and downstream application failures without careful data handling. Teams rely on custom tools, complex code, tedious processes, or unreliable documentation to protect against these schema changes. This puts heavy dependency on human oversight, which can make the change management processes error-prone. A common solution is a schema registry that enables data producers and consumers to perform validation of schema changes in a coordinated fashion. This allows for risk-free evolution as business demands change over time.

The AWS Glue Schema Registry, a serverless feature of AWS Glue, now enables you to validate and reliably evolve streaming data against JSON Schemas. The Schema Registry is a free feature that can significantly improve data quality and developer productivity. With it, you can eliminate defensive coding and cross-team coordination, reduce downstream application failures, and use a registry that is integrated across multiple AWS services. Each schema can be versioned within the guardrails of a compatibility mode, providing developers the flexibility to reliably evolve JSON Schemas. Additionally, the Schema Registry can serialize data into a compressed format, which helps you save on data transfer and storage costs.

This post shows you how to use the Schema Registry for JSON Schemas and provides examples of how to use it with both Kinesis Data Streams and Apache Kafka or Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Overview of the solution

In this post, we walk you through a solution to store, validate, and evolve a JSON Schema in the AWS Glue Schema Registry. The schema is used by Apache Kafka and Kinesis Data Streams applications while producing and consuming JSON objects. We also show you what happens when a new version of the schema is created with a new field.

The following diagram illustrates our solution workflow:

The steps to implement this solution are as follows:

  1. Create a new registry and register a schema using an AWS CloudFormation template.
  2. Create a new version of the schema using the AWS Glue console that is backward-compatible with the previous version.
  3. Build a producer application to do the following:
    1. Generate JSON objects that adhere to one of the schema versions.
    2. Serialize the JSON objects into an array of bytes.
    3. Obtain the corresponding schema version ID from the Schema Registry and encode the byte array with the same.
    4. Send the encoded byte array through a Kinesis data stream or Apache Kafka topic.
  4. Build a consumer application to do the following:
    1. Receive the encoded byte array through a Kinesis data stream or Apache Kafka topic.
    2. Decode the schema version ID and obtain the corresponding schema from the Schema Registry.
    3. Deserialize the array of bytes into the original JSON object.
    4. Consume the JSON object as needed.

Description of the schema used

For this post, we start with the following schema. The schema is of a weather report object that contains three main pieces of data: location, temperature, and timestamp. All three are required fields, but the schema does allow additional fields (indicated by the additionalProperties flag) such as windSpeed or precipitation if the producer wants to include them. The location field is an object with two string fields: city and state. Both are required fields and the schema doesn’t allow any additional fields within this object.

    "$id": "https://example.com/weather-report.schema.json",
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "WeatherReport",
    "type": "object",
    "properties": {
        "location": {
            "type": "object",
            "properties": {
                "city": {
                    "type": "string",
                    "description": "Name of the city where the weather is being reported."
                "state": {
                    "type": "string",
                    "description": "Name of the state where the weather is being reported."
            "additionalProperties": false,
            "required": [
        "temperature": {
            "type": "integer",
            "description": "Temperature in Farenheit."
        "timestamp": {
            "description": "Timestamp in epoch format at which the weather was noted.",
            "type": "integer"
    "additionalProperties": true,
    "required": [

Using the above schema, a valid JSON object would look like this:

    "location": {
        "city": "Phoenix",
        "state": "Arizona"
    "temperature": 115,
    "windSpeed": 50,
    "timestamp": 1627335205

Deploy with AWS CloudFormation

For a quick start, you can deploy the provided CloudFormation stack. The CloudFormation template generates the following resources in your account:

  • Registry – A registry is a container of schemas. Registries allow you to organize your schemas, as well as manage access control for your applications. A registry has an Amazon Resource Name (ARN) to allow you to organize and set different access permissions to schema operations within the registry.
  • Schema – A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. Each schema can have multiple versions. Versioning is governed by a compatibility rule that is applied on a schema. Requests to register new schema versions are checked against this rule by the Schema Registry before they can succeed.

To manually create these resources without using AWS CloudFormation, refer to Creating a Registry and Creating a Schema.


Make sure to complete the following steps as prerequisites:

  1. Create an AWS account. For this post, you configure the required AWS resources in the us-east-1 or us-west-2 Region. If you haven’t signed up, complete the following tasks:
    1. Create an account. For instructions, see Sign Up for AWS.
    2. Create an AWS Identity and Access Management (IAM) user. For instructions, see Creating an IAM User in your AWS account.
  2. Choose Launch Stack to launch the CloudFormation stack:

Review the newly registered schema

Let’s review the registry and the schema on the AWS Glue console.

  1. Sign in to the AWS Glue console and choose the appropriate Region.
  2. Under Data Catalog, choose Schema registries.
  3. Choose the GsrBlogRegistry schema registry.
  4. Choose the GsrBlogSchema schema.
  5. Choose Version 1.

We can see the JSON Schema version details and its definition. Note that the compatibility mode chosen is Backward compatibility. We see the purpose of that in the next section.

Evolve the schema by creating a new backward-compatible version

In this section, we take what is created so far and add a new schema version to demonstrate how we can evolve our schema while keeping the integrity intact.

To add a new schema version, complete the following steps, continuing from the previous section:

  1. On the Schema version details page, choose Register new version.
  2. Inside the properties object within the location object (after the state field), add a new country field as follows:
    "country": {
              "type": "string",
              "description": "Name of the country where the weather is being reported."

Because the compatibility mode chosen for the schema is backward compatibility, it’s important that we don’t make this new field a required field. If we do that, the Schema Registry fail this new version.

  1. Choose Register version.

We now have a new version of the schema that allows the producers to include an optional country field within the location object if they choose to.

Use the AWS Glue Schema Registry

In this section, we walk through the steps to use the Schema Registry with Kinesis Data Streams or Apache Kafka.


Make sure to complete the following steps as prerequisites:

  1. Configure your AWS credentials in your local machine.
  2. Install Maven on the local machine.
  3. Download the application code from the GitHub repo.
  4. Build the package:
    mvn clean package

Use the Schema Registry with Kinesis Data Streams

Run the Kinesis producer code to produce JSON messages that are associated with a schema ID assigned by the Schema Registry:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kinesis.RunKinesisProducer" -Dexec.args="<<KINESIS_DATA_STREAM_NAME>>"

This command returns the following output:

Putting 1 record into <<KINESIS_DATA_STREAM_NAME>>
Sent message 0
Putting 1 record into <<KINESIS_DATA_STREAM_NAME>>
Sent message 1
Putting 1 record into <<KINESIS_DATA_STREAM_NAME>>
Sent message 2
Successfully produced 3 messages to a stream called <<KINESIS_DATA_STREAM_NAME>>

Run the Kinesis consumer code to receive JSON messages with the schema ID, obtain the schema from the Schema Registry, and validate:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kinesis.RunKinesisConsumer" -Dexec.args="<<KINESIS_DATA_STREAM_NAME>>"

This command returns the following output with the JSON records received and decoded:

Number of Records received: 1
[JsonDataWithSchema(schema={"$schema":"http://json-schema.org/draft-07/schema#","additionalProperties":true,"title":"WeatherReport","type":"object","properties":{"temperature":{"description":"Temperature in Farenheit.","type":"integer"},"location":{"additionalProperties":false,"type":"object","properties":{"city":{"description":"Name of the city where the weather is being reported.","type":"string"},"state":{"description":"Name of the state where the weather is being reported.","type":"string"}},"required":["city","state"]},"timestamp":{"description":"Timestamp in epoch format at which the weather was noted.","type":"integer"}},"required":["location","temperature","timestamp"],"$id":"https://example.com/weather-report.schema.json"}, payload={"temperature":89,"location":{"city":"Orlando","state":"Florida"},"timestamp":1627335205})]

Use the Schema Registry with Apache Kafka

In the root of the downloaded GitHub repo folder, create a config file with the connection parameters for the Kafka cluster:

# Kafka

Run the Kafka producer code to produce JSON messages that are associated with a schema ID assigned by the Schema Registry:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kafka.RunKafkaProducer" -Dexec.args="<<CONFIG_FILE_NAME>><< TOPIC_NAME>>"

This command returns the following output:

Sent message 0
Sent message 1
Sent message 2
Successfully produced 3 messages to a topic called <<TOPIC_NAME>>

Run the Kafka consumer code to consume JSON messages with the schema ID, obtain the schema from the Schema Registry, and validate:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kafka.RunKafkaConsumer" -Dexec.args="<<CONFIG_FILE_NAME>> <<TOPIC_NAME>>"

This command returns the following output with the JSON records received and decoded:

Received message: key = message-0, value = JsonDataWithSchema(schema={"$schema":"http://json-schema.org/draft-07/schema#","additionalProperties":true,"title":"WeatherReport","type":"object","properties":{"temperature":{"description":"Temperature in Farenheit.","type":"integer"},"location":{"additionalProperties":false,"type":"object","properties":{"city":{"description":"Name of the city where the weather is being reported.","type":"string"},"state":{"description":"Name of the state where the weather is being reported.","type":"string"}},"required":["city","state"]},"timestamp":{"description":"Timestamp in epoch format at which the weather was noted.","type":"integer"}},"required":["location","temperature","timestamp"],"$id":"https://example.com/weather-report.schema.json"}, payload={"temperature":115,"location":{"city":"Phoenix","state":"Arizona"},"windSpeed":50,"timestamp":1627335205})

Clean up

Now to the final step, cleaning up the resources. Delete the CloudFormation stack to remove any resources you created as part of this walkthrough.

Schema Registry features

Let’s discuss the features the Schema Registry has to offer:

  • Schema discovery – When a producer registers a schema change, metadata can be applied as a key-value pair to provide searchable information for administrators or developers. This metadata can indicate the original source of the data (source=MSK_west), the team’s name to contact (owner=DataEngineering), or AWS tags (environment=Production). You could potentially encrypt a field in your data on the producing client and use metadata to specify to potential consumer clients which public key fingerprint to use for decryption.
  • Schema compatibility – The versioning of each schema is governed by a compatibility mode. If a new version of a schema is requested to be registered that breaks the specified compatibility mode, the request fails, and an exception is thrown. Compatibility checks enable developers building downstream applications to have a bounded set of scenarios to build applications against, which helps prepare for the changes without issue. Commonly used modes are FORWARD, BACKWARD, and FULL. For more information about mode definitions, see Schema Versioning and Compatibility.
  • Schema validation – Schema Registry serializers work to validate that the data produced is compatible with the assigned schema. If it isn’t, the data producer receives an exception from the serializer. This ensures that potentially breaking changes are found earlier in development cycles and can also help prevent unintentional schema changes due to human error.
  • Auto-registration of schemas – If configured to do so, the data producer can auto-register schema changes as they flow in the data stream. This is especially helpful for use cases where the source of the data is generated by a change data capture process (CDC) from the database.
  • IAM support – Due to integrated IAM support, only authorized producers can change certain schemas. Furthermore, only those consumers authorized to read the schema can do so. Schema changes are typically performed deliberately and with care, so it’s important to use IAM to control who performs these changes. Additionally, access control to schemas is important in situations where you might have sensitive information included in the schema definition itself. In the previous examples, IAM roles are inferred via the AWS SDK for Java, so they are inherited from the Amazon Elastic Compute Cloud (Amazon EC2) instance’s role that the application runs on, if using Amazon EC2. You can also apply IAM roles to any other AWS service that could contain this code, such as containers or AWS Lambda functions.
  • Secondary deserializer – If you have already registered schemas in another schema registry, there’s an option for specifying a secondary deserializer when performing schema lookups. This allows for migrations from other schema registries without having to start all over again. Any schema ID that is unknown to the Schema Registry is looked up in the registry tied to the secondary deserializer.
  • Compression – Using a schema registry can reduce data payload by no longer needing to send and receive schemas with each message. Schema Registry libraries also provide an option for zlib compression, which can reduce data requirements even further by compressing the payload of the message. This varies by use case, but compression can reduce the size of the message significantly.
  • Multiple data formats – The Schema Registry currently supports AVRO (v1.10.2) data format, JSON data format with JSON Schema format for the schema (specifications Draft-04, Draft-06, and Draft-07), and Java language support, with other data formats and languages to come.


In this post, we discussed the benefits of using the AWS Glue Schema Registry to register, validate, and evolve JSON Schemas for data streams as business needs change. We also provided examples of how to use the Schema Registry.

Learn more about Integrating with AWS Glue Schema Registry.

About the Author

Aditya Challa is a Senior Solutions Architect at Amazon Web Services. Aditya loves helping customers through their AWS journeys because he knows that journeys are always better when there’s company. He’s a big fan of travel, history, engineering marvels, and learning something new every day.

Gain insights into your Amazon Kinesis Data Firehose delivery stream using Amazon CloudWatch

Post Syndicated from Alon Gendler original https://aws.amazon.com/blogs/big-data/gain-insights-into-your-amazon-kinesis-data-firehose-delivery-stream-using-amazon-cloudwatch/

The volume of data being generated globally is growing at an ever-increasing pace. Data is generated to support an increasing number of use cases, such as IoT, advertisement, gaming, security monitoring, machine learning (ML), and more. The growth of these use cases drives both volume and velocity of streaming data and requires companies to capture, processes, transform, analyze, and load the data into various data stores in near-real time.

Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores, and analytics services. As the volume of the data you stream into Kinesis Data Firehose grows, you should gain insights and monitor the health of your data ingestion, transformation, and delivery.

In this post, we review the capabilities of using Firehose delivery stream metrics and the Amazon CloudWatch dashboard located on your Kinesis Data Firehose console. These capabilities allow you to create alerts when, for example, if the destination you configured in Kinesis Data Firehose has missing privileges, misconfigurations, or other issues, then Firehose will be able to detect it for you and report it as a failure. Other errors that might also occur are if you configured data transformation using Lambda and your Lambda function invocation failed, or if you have reached the Kinesis Firehose quota limits associated with your AWS account. In these cases, the data delivery from Kinesis Data Firehose to its destination may delay or fail. The CloudWatch alerts described in this post should help identify such cases in a timely manner.

This post also covers the different proactive actions that you can take when alarms are being triggered, such as submitting a request to increase quota or adding exponential backoff to your data producers.

Monitoring the delivery streams and taking these actions makes sure that data is delivered to your destinations without interruptions, enabling your business to gain insights in near-real time.

Monitor data ingestion to Kinesis Data Firehose

You can deliver data from your data producers to Kinesis Data Firehose through Amazon Kinesis Data Streams (as described later in this post), using Kinesis Agent, or directly using the Kinesis Data Firehose API operations PutRecord and PutRecordBatch. When you use Kinesis Data Streams as a data source, Kinesis Data Firehose scales automatically as your Kinesis Data Stream scales. When using the API operations for direct ingestion, you need to check the quota limits associated with your AWS account to avoid API requests throttling. Depending on your data producer behavior, this throttling can cause your data producers to retry the operation, which results in a delay of the data delivery to your destination. This throttling can also result in data loss if your data producers don’t implement a retry mechanism.

To gain deeper insights into Firehose delivery stream usage, we provide additional CloudWatch metrics that help you monitor and proactively scale quota limits: ThrottledRecords, RecordsPerSecondLimit, BytesPerSecondLimit, and PutRequestsPerSecondLimit. You can use the CloudWatch metrics dashboard (on the Monitoring tab on your Kinesis Data Firehose console) to easily visualize current usage and the quota limits.

When ingesting data directly to your delivery stream using PutRecord or PutRecordBatch, you should monitor the ThrottledRecords metric. This metric represents the number of records that were actually throttled because data ingestion exceeded one of the delivery stream limits. Kinesis Data Firehose calculates the throttling rates during the ingestion at a 1-second granularity, but the data ingestion metrics we mentioned are aggregated and emitted to CloudWatch every 5 minutes. Because of that, you can get throttled within that 5-minute window even if the data ingestion metrics don’t show that you reached the limit.

To receive alerts before your data producers are actually throttled, you can use additional CloudWatch metrics to alert you when you’re about to reach one of the delivery stream limits. You can achieve this by using the CloudWatch metrics IncomingRecords, IncomingBytes, and IncomingPutRequests. To check the limits of these metrics, refer to Amazon Kinesis Data Firehose Quota.

You can use the following ingestion metrics and their corresponding limit metrics to create a CloudWatch alarm:

  • RecordsPerSecondLimit – The maximum number of records that can be ingested in a second (IncomingRecords)
  • BytesPerSecondLimit – The maximum volume of data that can be ingested in a second (IncomingBytes)
  • PutRequestsPerSecondLimit – The maximum number of successful PutRecord and PutRecordBatch API requests that can be performed in a second (IncomingPutRequests)

To set up an alarm that alerts you when your ingestion rates are close to a quota, you should look for a percentage relationship between the ingestion rate and its corresponding limit. Because Kinesis Data Firehose emits metrics to CloudWatch every 5 minutes, you need to divide your metric with the 5-minute aggregation period, expressed as seconds (300). For example, to generate an alert when the incoming records per second rate is breaching 80% of your API operations quota, your CloudWatch alarm should be defined as follows:

This gives you a way to proactively understand how close your ingestion rates are to your delivery stream limits, and the flexibility to modify the percentage levels based on your use case. To prevent a throttling bottleneck, you should separately monitor the three delivery stream ingestion rate metrics we discussed.

Define alerts using CloudWatch alarms

You can define CloudWatch alarms manually through the AWS Management Console or by using AWS CloudFormation. In this post we cover both methods, starting with the CloudFormation template.

The following template creates your CloudWatch alarms, which you can review and customize to suit your needs.

During the stack creation process, you provide the Firehose delivery stream name that you want to monitor, and the quota percentage where you want to be notified when it’s being breached, such as 80%. After the stack creation is successful, you have four CloudWatch alarms ready.

To create your CloudWatch alarms manually through the console, complete the following steps:

  1. On the Kinesis Data Firehose console, find your delivery stream.
  2. On the Monitoring tab, choose the more options icon of the metric you want to monitor (for this example, we monitor incoming records per second).
  3. On the options menu, choose View in metrics.

On the CloudWatch console, you can see a graph that represents your current API operations (blue line) and the quota limit (red line).

  1. To create an alarm, choose Math expression.
  2. Select Common and choose Percentage.
  3. For the metric name, enter Percentage of records per second quota.
  4. We use the metric expression 100*(e1/m2), which represents the formula 100*(BytesPerSecond/BytesPerSecondLimit) that was described earlier and reflects how close you are to your maximum in percentage.
  5. Change the expression of the metric e1 from METRICS("m1")/300 to m1/300.

You can also change the Y axis label.

  1. On the Graph options tab, under Left Y Axis, for Label, enter Percentage.
  2. Now that you have the expression to use for the alarm, deselect every other expression and metric on the page.

The only expression selected should be the one you just created. You should now see the desired percentage, as in the following screenshot.

Create a CloudWatch alarm

You have now created an expression on your IncomingRecords and RecordsPerSecond quota, which you can use as a base for the alarm. With this, you can configure the tolerance level that your business use case requires.

  1. Choose the alarm icon next to your expression.
  2. In the Specify metric and conditions section, choose to receive an alert when the alarms breach the 75% limit.
  3. In the Configure actions section, specify how to forward this alarm.

You can forward this alarm to your monitoring systems or to an email address through an Amazon Simple Notification Service (Amazon SNS) topic. For this post, we create a new SNS topic and subscribe [email protected] to it.

Actions you can take when approaching the limits

When you’re getting close to your limits, you can take several different actions, which we describe in this section.

Request a service quota increase

One action you can take when seeing an alert is to request an increase in quota using the Amazon Kinesis Data Firehose Limits form. The three quotas scale proportionally, for example, if you increase the throughput quota in US East (N. Virginia), US West (Oregon), or Europe (Ireland) from 5 MiB/second to 10 MiB/second, the other two quotas increase from 2,000 requests/second to 4,000 requests/second and from 500,000 records/second to 1 million records/second. For more information about the service quota limits by AWS Region, see Amazon Kinesis Data Firehose Quota.

Use the PutRecordBatch API

If you use the API call PutRecord to deliver events to a Firehose data stream and you’re reaching the request/second quota limit, consider using the PutRecordBatch API operation. PutRecordBatch writes multiple data records into a delivery stream in a single call to achieve higher throughput per producer than writing single records, and reduces the amount of requests per second to your delivery stream.

Implement exponential backoff

As we mentioned before, even when you’re monitoring your delivery stream, you can still have bursts in your data stream. This could be caused by sudden spikes in usage of your system or external events like high trading activity in financial markets. To protect the producers from multiple throttled records, you should implement an exponential backoff. Exponential backoff is a commonly used algorithm that you can use to decrease the rate of submitting records to Kinesis Data Firehose when being throttled, so that the producer can slowly retry in order to successfully send the records.

The following are the Kinesis Data Firehose API responses when records are throttled:

  • If you’re using the API operation PutRecord, the returned error from the service is ServiceUnavailableException with HTTP status code 500.
  • If you’re using PutRecordBatch, you should iterate through the RequestResponses array and look for individual PutRecordBatchResponseEntry with ErrorCode 500 and ErrorMessage ServiceUnavailableException. Also make sure to check the value of FailedPutCount in the response even when the API call succeeds.

In both cases, you should use exponential backoff and retry the operation. For more information about implementing exponential backoff, see Error retries and exponential backoff in AWS.

Use Kinesis Data Streams with Kinesis Data Firehose

Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Your data producers can produce data directly to Kinesis Data Streams, and you can configure Kinesis Data Firehose to consume the data from Kinesis Data Streams and deliver it to your destination. When you use Kinesis Data Streams as the source for the Firehose delivery stream, the throughput limits mentioned before don’t apply. You don’t need to worry about throughput limits because Kinesis Data Firehose scales automatically to match the number of shards your Kinesis data stream has.

If you’re attaching a Firehose delivery stream as a consumer to your Kinesis data stream, and you have multiple consumer applications that read data from your Kinesis data stream such as AWS Lambda (see Using AWS Lambda with Amazon Kinesis), make sure that the total consumer applications aren’t breaching the shard’s 2 MB total read rate. This can cause the Kinesis data stream to throttle your consumer applications’ reading throughput, including Kinesis Data Firehose.

If more read capacity is required, some application consumers such as Lambda (see AWS Lambda supports Kinesis Data Streams Enhanced Fan-Out and HTTP/2 for faster streaming) or custom consumers that were developed with the Kinesis Consumer Library can support dedicated throughput from Kinesis Data Streams using enhanced fan-out, which currently isn’t supported by Kinesis Data Firehose. This feature provides these consumer applications isolated connection to the stream with 2 MB/second outbound throughput, so they don’t impact other consumer applications that are reading from the shards.

If you need more ingest capacity, you can easily scale up the number of shards in the stream using the console or the UpdateShardCount API.

Monitor data delivery of Kinesis Data Firehose

In case of network timeouts, missing privileges, or misconfigurations of your delivery stream such as incorrect destination configuration or AWS Key Management Service (AWS KMS) key ARN, the data delivery of your data from Kinesis Data Firehose to its destination may delay or fail. Errors might also occur if you configured data transformation using Lambda and your Lambda function invocation failed.

When Kinesis Data Firehose encounters delivery or processing errors, it retries until the configured retry duration expires. If the retry duration ends and the data hasn’t delivered successfully, Kinesis Data Firehose retains the data internally up to a maximum period of 24 hours. If the issue continues beyond the 24-hour maximum retention period, then Kinesis Data Firehose discards the data, resulting in a data loss.

When such data delivery issues persist, the data freshness metric, which is the age of the oldest record in Kinesis Data Firehose that hasn’t been delivered yet, constantly increases. To be alerted in such cases, you should create a CloudWatch alarm for when the data freshness metric exceeds the threshold of 4 hours. We also recommend setting an alarm to observe the historical p90 of the data freshness metric value. For example, set a certain tolerance level (such as 50% above the observed value) as an alarm threshold to detect data freshness variations.

You should monitor the data freshness metric that is relevant to your Kinesis Data Firehose destination, such as DeliveryToS3.DataFreshness, DeliveryToAmazonOpenSearchService.DataFreshness, DeliveryToSplunk.DataFreshness, or DeliveryToHttpEndpoint.DataFreshness. For more information, see Monitoring Kinesis Data Firehose Using CloudWatch Metrics.

If this alarm is triggered, you should take action to understand the root cause of the data freshness variation. A reason for such a variation could be a change in your Lambda transformation logic or configuration change of Lambda concurrency when using Kinesis Data Firehose data transformation. It could also be a result of change in the configuration parameters, format conversion schema, or ingested record type. For more information, see Data Freshness Metric Increasing or Not Emitted or you can submit a technical support request if needed.

When data delivery fails because of data transformation or an issue at the destination, in some cases you can find detailed failure logs in CloudWatch Logs, which can help you troubleshot the problem.

We also recommend monitoring the data delivery byte rate to your destination (for example, DeliveryToS3.Byte), which must match or exceed your data ingestion byte rate (IncomingBytes) on a sustained average basis to avoid increase of the data freshness metric and possible eventual data loss. If the observed delivery data rates are lower than the ingestion rates, consider tuning bottlenecks such as Lambda concurrency levels or your Lambda transformation logic if used with Kinesis Data Firehose data transformation.

To gain additional insights on the delivery of your data to its destination, we provide CloudWatch metrics you can monitor. For example, you can monitor the number of records delivered to keep track of data ingested into your destinations from Kinesis Data Firehose. For more information and additional metrics per destination, see Monitoring Kinesis Data Firehose Using CloudWatch Metrics.


In this post, we discussed the capabilities of using the Firehose delivery stream metrics and the CloudWatch dashboard located on your Kinesis Data Firehose console. This allows you to gain operational insights into the data ingestion and data delivery of your Firehose deliv­­ery stream, and also create CloudWatch alerts to be notified when one of your thresholds is breached. We also covered the different actions that you can take when these alarms are triggered, such as submitting a request to increase your quota or adding exponential backoff to your data producers.

Monitor your delivery streams and take these actions to make sure that your business data is delivered to your destinations without interruptions, enabling your business to gain insights in near-real time.

About the Author

Alon Gendler is a Startup Solutions Architect Manager at Amazon Web Services. He works with AWS customers to help them solve complex problems and architect secure, resilient, scalable and high performance applications in the cloud. Alon is passionate about Data and helping customers get the most out of it.

Backtest trading strategies with Amazon Kinesis Data Streams long-term retention and Amazon SageMaker

Post Syndicated from Sachin Thakkar original https://aws.amazon.com/blogs/big-data/backtest-trading-strategies-with-amazon-kinesis-data-streams-long-term-retention-and-amazon-sagemaker/

Real-time insight is critical when it comes to building trading strategies. Any delay in data insight can cost lot of money to the traders. Often, you need to look at historical market trends to predict future trading pattern and make the right bid. More the historical data you analyze, better trading prediction you get. Back tracking streaming data can be tricky as it requires sophisticated storage and analysis mechanisms.

Amazon Kinesis Data Streams allows our customer to store streaming data for up to one year. Amazon Kinesis Data Streams Long Term Retention (LTR) of streaming data enables to use the same platform for both real-time and older data retained in Amazon Kinesis Data Streams. For example, one can train machine learning algorithms for financial trading, marketing personalization, and recommendation models without moving the data into a different data store or writing a new application. Customers can also satisfy certain data retention regulations, including under HIPAA and FedRAMP, using long term retention. This thus simplifies the data ingestion architecture for our trading use case we will discuss in this post.

In the post Building algorithmic trading strategies with Amazon SageMaker, we demonstrated how to backtest trading strategies with Amazon SageMaker with historical stock price data stored in Amazon Simple Storage Service (Amazon S3). In this post, we expand this solution for streaming data and describe how to use Amazon Kinesis.

In addition, we want to use Amazon SageMaker automatic tuning to find the optimal configuration for a moving average crossover strategy. In this strategy, two moving averages for a slow and a fast time period are calculated, and a trade is executed when a crossover happens. If the fast moving average crosses above the slow moving average, the strategy places a long trade, otherwise the strategy goes short. We find the optimal period length for these moving averages by running multiple backtests with different lengths over a historical dataset.

Finally, we run the optimal configuration for this moving average crossover strategy over a different test dataset and analyze the performance results. If the performance measured in profit and loss (P&L) is positive for the test period, we can consider this trading strategy for a forward test.

To show you how easy and quick it is to get started on AWS, we provide a one-click deployment for an extensible trading backtesting solution that uses Kinesis long-term retention for streaming data.

Solution overview

We use Kinesis Data Streams to store real-time streaming as well as historical market data. We use Jupyter notebooks as our central interface for exploring and backtesting new trading strategies. SageMaker allows you to set up Jupyter notebooks and integrate them with AWS CodeCommit to store different versions of strategies and share them with other team members.

We use Amazon S3 to store model artifacts and backtesting results.

For our trading strategies, we create Docker containers that contain the required libraries for backtesting and the strategy itself. These containers follow the SageMaker Docker container structure in order to run them inside of SageMaker. For more information about the structure of SageMaker containers, see Using the SageMaker Training and Inference Toolkits.

The following diagram illustrates this architecture.

We run the data preparation step from a SageMaker notebook. This copies the historical market data to the S3 bucket.

We use AWS Data Migration Service (AWS DMS) to load the market data to the data stream. The

SageMaker notebook connects with Kinesis Data Streams and runs the trading strategy algorithm via a SageMaker training job. The algorithm uses part of the data for training to find the optimal strategy configuration.

Finally, we run the trading strategy using the previously determined configuration on a test dataset.


Before getting started, we set up our resources. In this post, we use the us-east-2 Region as an example.

  1. Deploy the AWS resources using the provided AWS CloudFormation template.
  2. For Stack name, enter a name for your stack.
  3. Provide an existing S3 bucket name to store the historical market data.

The data is loaded in Kinesis Data Streams from this S3 bucket. Your bucket needs to be in same Region where your stack is set up.

  1. Accept all the defaults and choose Next.
  2. Acknowledge that AWS CloudFormation might create AWS Identity and Access Management (IAM) resources with custom names.
  3. Choose Create stack.

This creates all the required resources.

Data load to Kinesis Data Streams

To perform the data load, complete the following steps:

  1. On the SageMaker console, under Notebook in the navigation pane, choose Notebook instances.
  2. Locate the notebook instance AlgorithmicTradingInstance-*.
  3. Choose Open Jupyter for this instance.
  4. Go to the algorithmic-trading->4_Kinesis folder and choose Strategy_Kinesis_EMA_HPO.pynb.

You now run the data preparation step in the notebook.

  1. Load the dataset.

Specify the existing bucket where test data is stored. Make sure that the test bucket is in the same Region where you set up the stack.

  1. Run all the steps in the notebook until Step 2 Data Preparation.
  2. On the AWS DMS console, choose Database migration tasks.
  3. Select the AWS DMS task dmsreplicationtask-*.
  4. On the Actions menu, choose Restart/Resume.

This starts the data load from the S3 bucket to the data stream.

Wait until the replication task shows the status Load complete.

  1. Continue the steps in the Jupyter notebook.

Read data from Kinesis long-term retention

We read daily open, high, low, close price, and volume data from the stream’s long-term retention with the AWS SDK for Python (Boto3).

Although we don’t use enhanced fan-out (EFO) in this post, it may be advisable to do so an existing application is already reading from the stream. That way this backtesting app doesn’t interfere with the existing application.

You can visualize your data, as shown in the following screenshot.

Define your trading strategy

In this step, we define our moving average crossover trading strategy.

Build a Docker image

We build our backtesting job as a Docker image and push it to Amazon ECR.

Hyperparameter optimization with SageMaker on training data

For the moving average crossover trading strategy, we want to find the optimal fast period and slow period of this strategy, and we provide a range of days to search for.

We use profit and loss (P&L) of the strategy as the metric to find optimized hyperparameters.

You can see the tuning job recommended a value of 7 and 21 days for the fast and slow period for this trading strategy given the training dataset.

Run the strategy with optimal hyperparameters on test data

We now run this strategy with optimal hyperparameters on the test data.

When the job is complete, the performance results are stored in Amazon S3, and you can review the performance metrics in a chart and analyze the buy and sell orders for your strategy.


In this post, we described how to use the Kinesis Data Streams long-term retention feature to store the historical stock price data and how to use streaming data for backtesting of a trading strategy with SageMaker.

Long-term retention of streaming data enables you to use the same platform for both real-time and older data retained in Kinesis Data Streams. This allows you to use this data stream for financial use cases like backtesting or for machine learning without moving the data into a different data store or writing a new application. You can also satisfy certain data retention regulations, including under HIPAA and FedRAMP, using long-term retention. For more information, see Amazon Kinesis Data Streams enables data stream retention up to one year.

Risk disclaimer

This post is for educational purposes only and past trading performance does not guarantee future performance.

About the Authors

Sachin Thakkar is a Senior Solutions Architect at Amazon Web Services, working with a leading Global System Integrator (GSI). He brings over 22 years of experience as an IT Architect and as Technology Consultant for large institutions. His focus area is on Data & Analytics. Sachin provides architectural guidance and supports the GSI partner in building strategic industry solutions on AWS

Amogh Gaikwad is a Solutions Developer in the Prototyping Team. He specializes in machine learning and analytics and has extensive experience developing ML models in real-world environments and integrating AI/ML and other AWS services into large-scale production applications. Before joining Amazon, he worked as a software developer, developing enterprise applications focusing on Enterprise Resource Planning (ERP) and Supply Chain Management (SCM). Amogh has received his master’s in Computer Science specializing in Big Data Analytics and Machine Learning.

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration and strategy. He is passionate about technology and enjoys building and experimenting in Analytics and AI/ML space.

Oliver Steffmann is an Enterprise Solutions Architect at AWS based in New York. He brings over 18 years of experience as IT Architect, Software Development Manager, and as Management Consultant for international financial institutions. During his time as a consultant, he leveraged his extensive knowledge of Big Data, Machine Learning and cloud technologies to help his customers get their digital transformation off the ground. Before that he was the head of municipal trading technology at a tier-one investment bank in New York and started his career in his own startup in Germany.

Load CDC data by table and shape using Amazon Kinesis Data Firehose Dynamic Partitioning

Post Syndicated from Anand Shah original https://aws.amazon.com/blogs/big-data/load-cdc-data-by-table-and-shape-using-amazon-kinesis-data-firehose-dynamic-partitioning/

Amazon Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores, and analytics services. Customers already use Amazon Kinesis Data Firehose to ingest raw data from various data sources using direct API call or by integrating Kinesis Data Firehose with Amazon Kinesis Data Streams including “change data capture” (CDC) use case.

Customers typically use single Kinesis Data Stream per business domain to ingest CDC data. For example, related fact and dimension tables change data is sent to the same stream. Once the data is loaded to Amazon S3, customers use ETL tools to split the data by tables, shape, and desired partitions as the first step in the data enrichment process.

This post demonstrates how customers can use Amazon Kinesis Firehose Dynamic Partitioning to split the data by table, shape (by message schema/version), and by desired partitions on the fly to do this first step of data enrichment while ingesting data.

Solution Overview

In this post, we provide a working example of a CDC pipeline where fake customer, order, and transaction table data is pushed from the source and registered as tables to the AWS Glue Data Catalog. The following architecture diagram illustrates this overall flow. We are using AWS Lambda to generate test CDC data for this post. However, in the real world you would use AWS Data Migration Service (DMS) or a similar tool to push change data to the Amazon Kinesis Data Stream.

The workflow includes the following steps:

  1. An Amazon EventBridge event triggers an AWS Lambda function every minute.
  2. The Lambda function generates test transactions, customers and order CDC data, as well as sends the data to Amazon Kinesis Data Stream.
  3. Amazon Kinesis Data Firehose reads data from Amazon Kinesis Data Stream.
  4. Amazon Kinesis Data Firehose
    1. Applies Dynamic Partitioning configuration defined in the Firehose configuration
    2. Invokes AWS Lambda transform to derive custom Dynamic Partitioning.
  5. Amazon Kinesis Data Firehose saves data to Amazon Simple Storage Service (S3) bucket.
  6. The user runs queries on Amazon S3 bucket data using Amazon Athena, which internally uses the AWS Glue Data Catalog to supply meta data.

Deploying using AWS CloudFormation

You use CloudFormation templates to create all of the necessary resources for the data pipeline. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time.

Steps to follow:

  1. Click here to Launch Stack:
  2. Acknowledge that the template may create AWS Identity and Access Management (IAM) resources.
  3. Choose Create stack.

This CloudFormation template takes about five minutes to complete and creates the following resources in your AWS account:

  • An S3 bucket to store ingested data
  • Lambda function to publish test data
  • Kinesis Data Stream connected to Kinesis Data Firehose
  • A Lambda function to compute custom dynamic partition for Kinesis Data Firehose transform
  • AWS Glue Data Catalog tables and Athena named queries for you to query data processed by this example

Once the AWS CloudFormation stack creation is successful, you should be able to see data automatically arriving to Amazon S3 in about five more minutes.

Data sources input

The Lambda function automatically publishes four types of messages to the Kinesis Data Stream at regular intervals with random data when invoked in the following format. In this example, we use three tables:

  • Customers: Has basic customer details.
  • Orders: Mimics orders placed by customers on the shopping website or mobile app.
  • Transactions: Mimics payment transaction done for the order. The transaction table showcases possible message schema evolution that can happen over time from message schema v1 to v2. It also shows how you can split messages by schema version if you don’t want to merge them into a universal schema.

Customer table sample message

   "version": 1,
   "table": "Customer",
   "data": {
        "id": 1,
        "name": "John",
        "country": "US"

Orders table sample message

   "version": 1,
   "table": "Order",
   "data": {
        "id": 1,
        "customerId": 1,
        "qty": 1,
        "product": {
            "name": "Book 54",
            "price": 12.6265

Transactions in old message format (v1)

    "version": 1, 
    "txid": "52", 
    "amount": 32.6516

Transactions in new message format (v2 – latest)

This message example demonstrates message evolution over time. txid from old message format is now renamed to transactionId, and new information like source is added to the original old transaction message in the new message version v2.

   "version": 2,
   "transactionId": "52",
   "amount": 32.6516,
   "source": "Web"

Dynamic Partitioning Logic

Amazon Kinesis Data Firehose dynamic partitioning configuration is defined using jq style syntax. We will use the table field for the first partition and the version field for the second level partition. We can derive the table partition using dynamic partitioning jq syntax “.version”. As you can see, the version field is available in all of the messages. Therefore, we can use it directly in partitioning. However, the table field is not available for old and new transaction messages. Therefore, we derive the table field using custom transform Lambda function.

We check the existence of the table field from the incoming message and populate it with the static value “Transaction” if table field is not present. Lambda function also returns PartitionKeys for Kinesis Data Firehose to use as dynamic partition. The Lambda function also derives the year, month, and day from the current time.

for firehose_record_input in firehose_records_input['records']:
    # Get user payload
    payload = base64.b64decode(firehose_record_input['data'])
    json_value = json.loads(payload)

    # Create output Firehose record and add modified payload and record ID to it.
    firehose_record_output = {}

    table = "Transaction"
    if "table" in json_value:
        table = json_value["table"]

    now = datetime.datetime.now()
    partition_keys = {"table": table, "year": str(now.year), "month": str(now.month), "day": str(now.day)}

The Kinesis Data Firehose S3 destination Prefix is set to table=!{partitionKeyFromLambda:table}/version=!{partitionKeyFromQuery:version}/year=!{partitionKeyFromLambda:year}/month=!{partitionKeyFromLambda:month}/day=!{partitionKeyFromLambda:day}/

  • table partition key is coming from the Lambda function based on custom logic.
  • version partition key is extracted using jq expression using Kinesis Data Firehose dynamic partition configuration. Here, the version refers to the shape of the message and not the version of the data. For example, Updates to Customer record with same ID is not merged into one.
  • year, month, and day partition keys are coming from the Lambda function based on current time

You can follow the respective links from the CloudFormation stack Output tab to deep dive into the Kinesis Data Firehose configuration, record transformer Lambda function source code, and see output files in the Amazon S3 curated bucket. The entire code is also available in the GitHub repository.

Ingested data output

Kinesis Data Firehose processes all the messages and outputs result in the following S3 hive style partitioned paths:

# AWS Glue Data Catalog table transactions_v1
# AWS Glue Data Catalog table transactions
# AWS Glue Data Catalog table customers
# Glue catalog table orders

Query output data stored in Amazon S3

Kinesis Data Firehose loads new data every minute to the Amazon S3 bucket, and the associated tables are already created by CloudFormation for you in the AWS Glue Data Catalog. You can directly query Amazon S3 bucket data using the following steps:

  1. Go to Amazon Athena service and select the database with the same name as the CloudFormation stack name without dashes.
  2. Select the three dots next to each table name to open the table menu and select Load Partitions. This will add a new partition to the AWS Glue Data Catalog.
  3. Go to the CloudFormation stack Output tab.
  4. Select the link mentioned next to the key AthenaQueries.
  5. This will take you to the Amazon Athena saved query console. Type the word Blog to search named queries created by this blog.
  6. Select the query called “Blog – Query Customer Orders”. This will open the query in the Athena query console. Select Run query to see the results.
  7. Select the Saved queries menu from the top bar to go back to the Amazon Athena saved query console. Repeat the steps for other Blog queries to see results from the “new and old transactions” queries.

Clean up

Complete the following steps to delete your resources and stop incurring costs:

  1. Go to the CloudFormation stack Output tab.
  2. Select the link mentioned next to the key PauseDataSource. This will take you to the Amazon EventBridge event rules console.
  3. Select the Actions button from the top right menu bar and select Disable.
  4. Confirm the choice by clicking the Disable button again on the prompt. This will disable Amazon EventBridge event trigger that invokes the data generator Lambda function. This lets us make sure that no new data is sent to the Kinesis data stream by Lambda from now onward.
  5. Wait for at least two minutes for all of the buffered events to reach to the S3 from the Kinesis Data Firehose.
  6. Go back to the CloudFormation stack Output tab.
  7. Select the link mentioned next to the key S3BucketCleanup.

You’re redirected to the Amazon S3 console.

  1. Enter permanently delete to delete all of the objects in your S3 bucket.
  2. Choose Empty.
  3. On the AWS CloudFormation console, select the stack you created and choose Delete.


This post demonstrates how to use the Kinesis Data Firehose Dynamic Partitioning feature to load CDC data on the fly in near real-time. It also shows how we can split CDC data by table and message schema version for backward compatibility and quick query capability. To learn more about dynamic partitioning, you can refer to this blog and this documentation. Provide us with any feedback you have about the new feature.

About the Author

Anand Shah is a Big Data Prototyping Solution Architect at AWS. He works with AWS customers and their engineering teams to build prototypes using AWS Analytics services and purpose-built databases. Anand helps customers solve the most challenging problems using the art of the possible technology. He enjoys beaches in his leisure time.

Amazon Kinesis Data Streams On-Demand – Stream Data at Scale Without Managing Capacity

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/aws/amazon-kinesis-data-streams-on-demand-stream-data-at-scale-without-managing-capacity/

Today we are launching Amazon Kinesis Data Streams On-demand, a new capacity mode. This capacity mode eliminates capacity provisioning and management for streaming workloads.

Kinesis Data Streams is a fully-managed, serverless service for real-time processing of streamed data at a massive scale. Kinesis Data Streams can take any amount of data, from any number of sources, and scale up and down as needed. Creating a new data stream is easy, since we announced Kinesis Data Streams in November 2013. To get started, you only need to specify the number of shards with which you must provision your stream.

Shards are the way to define capacity in Kinesis Data Streams. Each shard can ingest 1 MB/s and 1,000 records/second and egress up to 2 MB/s. You can add or remove shards of the stream using Kinesis Data Streams APIs to adjust the stream capacity according to the throughout needs of their workloads. This lets you make sure that producer and consumer applications don’t experience any throttling.

As customers adopt data streaming broadly, workloads with data traffic that can increase by millions of events in a few minutes are becoming more common. For these volatile traffic patterns, customers carefully plan capacity, monitor throughput, and in some cases develop processes that automatically change the Kinesis Data Streams stream capacity.

Kinesis Data Streams On-Demand Mode
That is why today we are announcing Kinesis Data Streams On-demand. This new capacity mode eliminates the need for provisioning and managing the capacity for streaming data. Using Kinesis Data Streams On-demand automatically scales the capacity in response to varying data traffic. Customers are charged per gigabyte of data written, read, and stored in the stream, in a pay-per-throughput fashion.

Data streams in the on-demand mode have the same high durability, high availability, low latency, security, and deep AWS integrations that Kinesis Data Streams already provides. Moreover, there are no new APIs to write or read data. All existing Kinesis Data Streams integrations work in the on-demand mode.

Kinesis Data Streams uses the partition key to distribute data across shards. That is why when using Kinesis Data Streams On-demand, you still must specify a partition key for each record to write data into a data stream, as you do today in Kinesis Data Streams using the provisioned mode. In Kinesis Data Streams On-demand, the data stream automatically adapts to handle uneven data distribution patterns. But you must be careful that no partition key exceeds a shard’s limits. If this happens, then you will receive write throttles, and then you can retry these requests.

When a new data stream is created using Kinesis Data Streams On-demand, it gets created with the default capacity of 4 MB/s and 4,000 records per second for writes. Kinesis Data Streams On-demand can automatically scale up to 200 MB/s and 200,000 records per second for writes.

Kinesis Data Streams On-demand accommodates up to double its previous peak write throughput observed in the last 30 days. As your data stream’s write throughput hits a new peak, Kinesis Data Streams automatically scales the stream’s capacity.

For example, if your data stream has a write throughput that varies between 10 MB/s and 40 MB/s, Kinesis Data Streams will make sure that you can easily burst to double the peak—80 MB/s. And, if later on that same data stream reaches a new peak of 50 MB/s, then Kinesis Data Streams will make sure that there is enough capacity to ingest 100 MB/s. However, write throttling can occur if your traffic grows more than double the previous peak in less than 15 minutes.

When to Use Kinesis Data Streams On-demand
On-demand mode is great for customers that have an unknown or variable workload, or who simply don’t want to deal with capacity management. On-demand mode works best for workloads that have even partition key distribution. For example, you run a mobile game that has variable traffic through the week or day, as customers play mostly on nights or weekends. Or, you run a streaming platform that hosts live shows, and you see a sudden increase in demand depending on the guests you have.

In addition, you can switch between on-demand and provision mode twice a day. For example, you run an e-commerce site with predictable traffic. But, starting next month, there will be many marketing campaigns launched globally. You don’t know the impact that those will have on the site traffic. Switch your Kinesis Data Streams to on-demand mode, and now you can enjoy the automated capacity planning and management for your data streams.

Get Started with Kinesis Data Streams On-demand
Create a new data stream with Kinesis Data Streams On-demand from the AWS console, AWS SDKs, AWS Command Line Interface (CLI), and AWS CloudFormation.

To create one from the console, visit the Kinesis console and Create data stream. When selecting the capacity mode, select On-demand.

Creating a data stream

At the end of the page, all of the settings for the new data stream are presented. These settings can be changed after the data stream has been created.

Data stream settings

Let’s See This in Action!
For this demo, I want to show you how the new Kinesis Data Streams capability works. This situation is best described if you at look at the following Amazon CloudWatch graphs. The green line represents the bytes ingested successfully into the stream, and the red line shows the percentage of traffic that is throttled.

First, we will start with a stream provisioned with five shards. For the first three minutes, we are sending a load of 4 MB/s. You can see that the stream can handle the load.

At the time stamp 21:19, we increase the load to 12 MB/s. Now the stream cannot handle the load, and the throttles start (the red line starts climbing up to 60 percent of request being throttled).

Increase the load on a provisioned stream

At the time stamp 21:23, we change the stream capacity from provisioned to on-demand. You can do that on-the-fly without affecting the stream. See that it takes a very short time for the stream to handle the load when converting from one capacity mode to the other.

In a few minutes (time stamp 21:24) the throttles start to drop as the stream starts scaling up. The stream capacity doubles to 10 shards first (time stamp 21:26), and the stream keeps scaling up until each shard has a load of less than 0.5 MB/s. In this way, if the stream suddenly receives double the amount of load, then it has the capacity ready to handle it.

Change to on-demand mode

At the time stamp 21:26, the load in the stream is increased to 18 MB/s. You can see the green line climbing to 350,000 records – there are no throttles, and the stream ends this demo with 40 open shards. This means that if suddenly the stream receives a load of 40 MB/s, then it could handle it with no problem.

Increase the load

Available Now!
The Amazon Kinesis Data Streams On-demand is available globally in all commercial Regions.

You can learn more about the capacity modes in the Amazon Kinesis Data Streams Developer Guide.