Tag Archives: Amazon Kinesis

ICYMI: Serverless Q3 2020

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-q3-2020/

Welcome to the 11th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

Q3 Calendar

In case you missed our last ICYMI, checkout what happened last quarter here.

AWS Lambda

MSK trigger in Lambda

In August, we launched support for using Amazon Managed Streaming for Apache Kafka (Amazon MSK) as an event source for Lambda functions. Lambda has existing support for processing streams from Kinesis and DynamoDB. Now you can process data streams from Amazon MSK and easily integrate with downstream serverless workflows. This integration allows you to process batches of records, one per partition at a time, and scale concurrency by increasing the number of partitions in a topic.

We also announced support for Java 8 (Corretto) in Lambda, and you can now use Amazon Linux 2 for custom runtimes. Amazon Linux 2 is the latest generation of Amazon Linux and provides an application environment with access to the latest innovations in the Linux ecosystem.

Amazon API Gateway

API integrations

API Gateway continued to launch new features for HTTP APIs, including new integrations for five AWS services. HTTP APIs can now route requests to AWS AppConfig, Amazon EventBridge, Amazon Kinesis Data Streams, Amazon SQS, and AWS Step Functions. This makes it easy to create webhooks for business logic hosted in these services. The service also expanded the authorization capabilities, adding Lambda and IAM authorizers, and enabled wildcards in custom domain names. Over time, we will continue to improve and migrate features from REST APIs to HTTP APIs.

In September, we launched mutual TLS for both regional REST APIs and HTTP APIs. This is a new method for client-to-server authentication to enhance the security of your API. It can protect your data from exploits such as client spoofing or man-in-the-middle. This enforces two-way TLS (or mTLS) which enables certificate-based authentication both ways from client-to-server and server-to-client.

Enhanced observability variables now make it easier to troubleshoot each phase of an API request. Each phase from AWS WAF through to integration adds latency to a request, returns a status code, or raises an error. Developers can use these variables to identify the cause of latency within the API request. You can configure these variables in AWS SAM templates – see the demo application to see how you can use these variables in your own application.

AWS Step Functions

X-Ray tracing in Step Functions

We added X-Ray tracing support for Step Functions workflows, giving you full visibility across state machine executions, making it easier to analyze and debug distributed applications. Using the service map view, you can visually identify errors in resources and view error rates across workflow executions. You can then drill into the root cause of an error. You can enable X-Ray in existing workflows by a single-click in the console. Additionally, you can now also visualize Step Functions workflows directly in the Lambda console. To see this new feature, open the Step Functions state machines page in the Lambda console.

Step Functions also increased the payload size to 256 KB and added support for string manipulation, new comparison operators, and improved output processing. These updates were made to the Amazon States Languages (ASL), which is a JSON-based language for defining state machines. The new operators include comparison operators, detecting the existence of a field, wildcarding, and comparing two input fields.

AWS Serverless Application Model (AWS SAM)


AWS SAM is an open source framework for building serverless applications that converts a shorthand syntax into CloudFormation resources.

In July, the AWS SAM CLI became generally available (GA). This tool operates on SAM templates and provides developers with local tooling for building serverless applications. The AWS SAM CLI offers a rich set of tools that enable developers to build serverless applications quickly.


X-Ray Insights

X-Ray launched a public preview of X-Ray Insights, which can help produce actionable insights for anomalies within your applications. Designed to make it easier to analyze and debug distributed applications, it can proactively identify issues caused by increases in faults. Using the incident timeline, you can visualize when the issue started and how it developed. The service identifies a probable root cause along with any anomalous services. There is no additional instrumentation needed to use X-Ray Insights – you can enable this feature within X-Ray Groups.

Amazon Kinesis

In July, Kinesis announced support for data delivery to generic HTTP endpoints, and service providers like Datadog, New Relic, MongoDB, and Splunk. Use the Amazon Kinesis console to configure your data producers to send data to Amazon Kinesis Data Firehose and specify one of these new delivery targets. Additionally, Amazon Kinesis Data Firehose is now available in the Europe (Milan) and Africa (Cape Town) AWS Regions.

Serverless Posts

Our team is always working to build and write content to help our customers better understand all our serverless offerings. Here is a list of the latest posts published to the AWS Compute Blog this quarter.




Tech Talks & Events

We hold several AWS Online Tech Talks covering serverless tech talks throughout the year, so look out for them in the Serverless section of the AWS Online Tech Talks page. We also regularly deliver talks at conferences and events around the globe, regularly join in on podcasts, and record short videos you can find to learn in quick byte sized chunks.

Here are some from Q3:

Learning Paths

Ask Around Me

Learn How to Build and Deploy a Web App Backend that Supports Authentication, Geohashing, and Real-Time Messaging

Ask Around Me is an example web app that shows how to build authenticaton, geohashing and real-time messaging into your serverless applications. This learning path includes videos and learning resources to help walk you through the application.

Build a Serverless Web App for a Theme Park

This five-video learning path walks you through the Innovator Island workshop, and provides learning resources for building realtime serverless web applications.

Live streams




There are also a number of other helpful video series covering serverless available on the Serverless Land YouTube channel.

New AWS Serverless Heroes

Serverless Heroes Q3 2020

We’re pleased to welcome Angela Timofte, Luca Bianchi, Matthieu Napoli, Peter Hanssens, Sheen Brisals, and Tom McLaughlin to the growing list of AWS Serverless Heroes.

The AWS Hero program is a selection of worldwide experts that have been recognized for their positive impact within the community. They share helpful knowledge and organize events and user groups. They’re also contributors to numerous open-source projects in and around serverless technologies.

New! The Serverless Land website

Serverless Land

To help developers find serverless learning resources, we have curated a list of serverless blogs, videos, events and training programs at a new site, Serverless Land. This is regularly updated with new information – you can subscribe to the RSS feed for automatic updates, follow the LinkedIn page or subscribe to the YouTube channel.

Still looking for more?

The Serverless landing page has lots of information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow all of us on Twitter to see the latest news, follow conversations, and interact with the team.

Unified serverless streaming ETL architecture with Amazon Kinesis Data Analytics

Post Syndicated from Ram Vittal original https://aws.amazon.com/blogs/big-data/unified-serverless-streaming-etl-architecture-with-amazon-kinesis-data-analytics/

Businesses across the world are seeing a massive influx of data at an enormous pace through multiple channels. With the advent of cloud computing, many companies are realizing the benefits of getting their data into the cloud to gain meaningful insights and save costs on data processing and storage. As businesses embark on their journey towards cloud solutions, they often come across challenges involving building serverless, streaming, real-time ETL (extract, transform, load) architecture that enables them to extract events from multiple streaming sources, correlate those streaming events, perform enrichments, run streaming analytics, and build data lakes from streaming events.

In this post, we discuss the concept of unified streaming ETL architecture using a generic serverless streaming architecture with Amazon Kinesis Data Analytics at the heart of the architecture for event correlation and enrichments. This solution can address a variety of streaming use cases with various input sources and output destinations. We then walk through a specific implementation of the generic serverless unified streaming architecture that you can deploy into your own AWS account for experimenting and evolving this architecture to address your business challenges.

Overview of solution

As data sources grow in volume, variety, and velocity, the management of data and event correlation become more challenging. Most of the challenges stem from data silos, in which different teams and applications manage data and events using their own tools and processes.

Modern businesses need a single, unified view of the data environment to get meaningful insights through streaming multi-joins, such as the correlation of sensory events and time-series data. Event correlation plays a vital role in automatically reducing noise and allowing the team to focus on those issues that really matter to the business objectives.

To realize this outcome, the solution proposes creating a three-stage architecture:

  • Ingestion
  • Processing
  • Analysis and visualization

The source can be a varied set of inputs comprising structured datasets like databases or raw data feeds like sensor data that can be ingested as single or multiple parallel streams. The solution envisions multiple hybrid data sources as well. After it’s ingested, the data is divided into single or multiple data streams depending on the use case and passed through a preprocessor (via an AWS Lambda function). This highly customizable processor transforms and cleanses data to be processed through analytics application. Furthermore, the architecture allows you to enrich data or validate it against standard sets of reference data, for example validating against postal codes for address data received from the source to verify its accuracy. After the data is processed, it’s sent to various sink platforms depending on your preferences, which could range from storage solutions to visualization solutions, or even stored as a dataset in a high-performance database.

The solution is designed with flexibility as a key tenant to address multiple, real-world use cases. The following diagram illustrates the solution architecture.

The architecture has the following workflow:

  1. We use AWS Database Migration Service (AWS DMS) to push records from the data source into AWS in real time or batch. For our use case, we use AWS DMS to fetch records from an on-premises relational database.
  2. AWS DMS writes records to Amazon Kinesis Data Streams. The data is split into multiple streams as necessitated through the channels.
  3. A Lambda function picks up the data stream records and preprocesses them (adding the record type). This is an optional step, depending on your use case.
  4. Processed records are sent to the Kinesis Data Analytics application for querying and correlating in-application streams, taking into account Amazon Simple Storage Service (Amazon S3) reference data for enrichment.

Solution walkthrough

For this post, we demonstrate an implementation of the unified streaming ETL architecture using Amazon RDS for MySQL as the data source and Amazon DynamoDB as the target. We use a simple order service data model that comprises orders, items, and products, where an order can have multiple items and the product is linked to an item in a reference relationship that provides detail about the item, such as description and price.

We implement a streaming serverless data pipeline that ingests orders and items as they are recorded in the source system into Kinesis Data Streams via AWS DMS. We build a Kinesis Data Analytics application that correlates orders and items along with reference product information and creates a unified and enriched record. Kinesis Data Analytics outputs output this unified and enriched data to Kinesis Data Streams. A Lambda function consumer processes the data stream and writes the unified and enriched data to DynamoDB.

To launch this solution in your AWS account, use the GitHub repo.


Before you get started, make sure you have the following prerequisites:

Setting up AWS resources in your account

To set up your resources for this walkthrough, complete the following steps:

  1. Set up the AWS CDK for Java on your local workstation. For instructions, see Getting Started with the AWS CDK.
  2. Install Maven binaries for Java if you don’t have Maven installed already.
  3. If this is the first installation of the AWS CDK, make sure to run cdk bootstrap.
  4. Clone the following GitHub repo.
  5. Navigate to the project root folder and run the following commands to build and deploy:
    1. mvn compile
    2. cdk deploy UnifiedStreamETLCommonStack UnifiedStreamETLDataStack UnifiedStreamETLProcessStack

Setting up the orders data model for CDC

In this next step, you set up the orders data model for change data capture (CDC).

  1. On the Amazon Relational Database Service (Amazon RDS) console, choose Databases.
  2. Choose your database and make sure that you can connect to it securely for testing using bastion host or other mechanisms (not detailed in scope of this post).
  3. Start MySQL Workbench and connect to your database using your DB endpoint and credentials.
  4. To create the data model in your Amazon RDS for MySQL database, run orderdb-setup.sql.
  5. On the AWS DMS console, test the connections to your source and target endpoints.
  6. Choose Database migration tasks.
  7. Choose your AWS DMS task and choose Table statistics.
  8. To update your table statistics, restart the migration task (with full load) for replication.
  9. From your MySQL Workbench session, run orders-data-setup.sql to create orders and items.
  10. Verify that CDC is working by checking the Table statistics

Setting up your Kinesis Data Analytics application

To set up your Kinesis Data Analytics application, complete the following steps:

  1. Upload the product reference products.json to your S3 bucket with the logical ID prefix unifiedBucketId (which was previously created by cdk deploy).

You can now create a Kinesis Data Analytics application and map the resources to the data fields.

  1. On the Amazon Kinesis console, choose Analytics Application.
  2. Choose Create application.
  3. For Runtime, choose SQL.
  4. Connect the streaming data created using the AWS CDK as a unified order stream.
  5. Choose Discover schema and wait for it to discover the schema for the unified order stream. If discovery fails, update the records on the source Amazon RDS tables and send streaming CDC records.
  6. Save and move to the next step.
  7. Connect the reference S3 bucket you created with the AWS CDK and uploaded with the reference data.
  8. Input the following:
    1. “products.json” on the path to the S3 object
    2. Products on the in-application reference table name
  9. Discover the schema, then save and close.
  10. Choose SQL Editor and start the Kinesis Data Analytics application.
  11. Edit the schema for SOURCE_SQL_STREAM_001 and map the data resources as follows:
Column NameColumn TypeRow Path


  1. Choose Save schema and update stream samples.

When it’s complete, verify for 1 minute that nothing is in the error stream. If an error occurs, check that you defined the schema correctly.

  1. On your Kinesis Data Analytics application, choose your application and choose Real-time analytics.
  2. Go to the SQL results and run kda-orders-setup.sql to create in-application streams.
  3. From the application, choose Connect to destination.
  4. For Kinesis data stream, choose unifiedOrderEnrichedStream.
  5. For In-application stream, choose ORDER_ITEM_ENRICHED_STREAM.
  6. Choose Save and Continue.

Testing the unified streaming ETL architecture

You’re now ready to test your architecture.

  1. Navigate to your Kinesis Data Analytics application.
  2. Choose your app and choose Real-time analytics.
  3. Go to the SQL results and choose Real-time analytics.
  4. Choose the in-application stream ORDER_ITEM_ENRCIHED_STREAM to see the results of the real-time join of records from the order and order item streaming Kinesis events.
  5. On the Lambda console, search for UnifiedStreamETLProcess.
  6. Choose the function and choose Monitoring, Recent invocations.
  7. Verify the Lambda function run results.
  8. On the DynamoDB console, choose the OrderEnriched table.
  9. Verify the unified and enriched records that combine order, item, and product records.

The following screenshot shows the OrderEnriched table.

Operational aspects

When you’re ready to operationalize this architecture for your workloads, you need to consider several aspects:

  • Monitoring metrics for Kinesis Data Streams: GetRecords.IteratorAgeMilliseconds, ReadProvisionedThroughputExceeded, and WriteProvisionedThroughputExceeded
  • Monitoring metrics available for the Lambda function, including but not limited to Duration, IteratorAge, Error count and success rate (%), Concurrent executions, and Throttles
  • Monitoring metrics for Kinesis Data Analytics (millisBehindLatest)
  • Monitoring DynamoDB provisioned read and write capacity units
  • Using the DynamoDB automatic scaling feature to automatically manage throughput

We used the solution architecture with the following configuration settings to evaluate the operational performance:

  • Kinesis OrdersStream with two shards and Kinesis OrdersEnrichedStream with two shards
  • The Lambda function code does asynchronous processing with Kinesis OrdersEnrichedStream records in concurrent batches of five, with batch size as 500
  • DynamoDB provisioned WCU is 3000, RCU is 300

We observed the following results:

  • 100,000 order items are enriched with order event data and product reference data and persisted to DynamoDB
  • An average of 900 milliseconds latency from the time of event ingestion to the Kinesis pipeline to when the record landed in DynamoDB

The following screenshot shows the visualizations of these metrics.

Cleaning up

To avoid incurring future charges, delete the resources you created as part of this post (the AWS CDK provisioned AWS CloudFormation stacks).


In this post, we designed a unified streaming architecture that extracts events from multiple streaming sources, correlates and performs enrichments on events, and persists those events to destinations. We then reviewed a use case and walked through the code for ingesting, correlating, and consuming real-time streaming data with Amazon Kinesis, using Amazon RDS for MySQL as the source and DynamoDB as the target.

Managing an ETL pipeline through Kinesis Data Analytics provides a cost-effective unified solution to real-time and batch database migrations using common technical knowledge skills like SQL querying.

About the Authors

Ram Vittal is an enterprise solutions architect at AWS. His current focus is to help enterprise customers with their cloud adoption and optimization journey to improve their business outcomes. In his spare time, he enjoys tennis, photography, and movies.





Akash Bhatia is a Sr. solutions architect at AWS. His current focus is helping customers achieve their business outcomes through architecting and implementing innovative and resilient solutions at scale.



Using AWS Lambda as a consumer for Amazon Kinesis

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/using-aws-lambda-as-a-consumer-for-amazon-kinesis/

This post is courtesy of Prateek Mehrotra, Software Development Engineer.

AWS Lambda integrates natively with Amazon Kinesis as a consumer to process data ingested through a data stream. The polling, checkpointing, and error handling complexities are abstracted when you use this native integration. This allows the Lambda function code to focus on business logic processing.

This blog post describes how to operate and optimize this integration at high throughput with low system overhead time and processing latencies.

To learn more about Kinesis concepts and terminology, visit the documentation page.


You can attach a Lambda function to a Kinesis stream to process data. Multiple Lambda functions can consume from a single Kinesis stream for different kinds of processing independently. These can be used alongside other consumers such as Amazon Kinesis Data Firehose.

If a Kinesis stream has ‘n’ shards, then at least ‘n’ concurrency is required for a consuming Lambda function to process data without any induced delay. Less than ‘n’ available concurrency results in elevated iterator age in the Kinesis stream and elevated iterator age in the Lambda consumer. In a multi-consumer paradigm, if the Kinesis iterator age spikes then at least one of the stream consumers also reports a corresponding iterator age spike.

Stream poller

When the parallelization factor is greater than 1 for a Lambda consumer, the record processor polls up-to ‘parallelization-factor’ partition keys at a time while processing from a single shard. To learn more, read about handling traffic with a parallelization factor.

Kinesis shard level metrics

When using Kinesis streams, it’s best practice to enable enhanced shard level metrics. These metrics can help in detecting if the data distribution is happening uniformly within the shards of the stream, or not.

In a single-source, multiple-consumer use case, enhanced shard level metrics can help identify the cause of elevated iterator age. This could be due to a single shard receiving data too quickly, or at least one of the consumers failing to process the data.

To learn more about Kinesis monitoring, visit the documentation page. If per-partition processing is not a requirement, distribute data uniformly across shards. To learn more about Kinesis partition keys, visit the documentation page.

Processing delay caused by consumer misconfiguration

Kinesis reports an iterator age metric. If this value spikes, data processing from the stream is delayed. The metric value is set by the earliest record read from the stream measured over the specified time period.

This delay slows the data processing of the pipeline. This happens when a single shard is receiving data faster than the consumer can process it or the consumer is failing to complete processing due to errors.

Graph of records iterator age

In a single-source, multiple-consumer use case, at least one of the consumers shows a corresponding iterator age spike. If there are multiple Lambda consumers of the same data stream, then each Lambda consumer will report its own iterator age metric. This helps identify the problematic consumer for further analysis.

Tuning the configuration to optimize for iterator age

There are several tuning options available when the iterator age is increasing for the consumer Lambda function.

1. Increase the batch size

If the Lambda function operates at a low maximum duration, a single invocation may process less than a maximum batch size. Increase the batch size (up to a maximum of 10,000) to read more records from a shard in a single batch. This can help normalize the iterator age.

2. Change the parallelization factor

Increasing the parallelization factor in the Lambda function allows concurrent invocations to read a single shard. Multiple batches of records are created in the shard based on partition keys, resulting in faster data consumption.

Iterator age can spike when the batch size is set to 10,000 and the parallelization factor is set to 10. This can happen when data is produced faster than the consumer can process it, backing up the per-shard/per-partition queues. To mitigate this, subdivide the partition into multiple keys. This helps distribute the data for that partition key more evenly across shards.

Partition keys

3. Reduce the batch window

If data is distributed unequally across shards, or there is low write volume from producers, the Lambda poller may wait for an entire batch. You can reduce this wait time by reducing the batch window, which results in faster processing.

To learn more about Lambda poller batch window for Kinesis, visit the documentation page.

4. De-scale the Kinesis stream if overprovisioned

If the Kinesis stream metrics indicate that the stream is over-provisioned, de-scaling the stream helps increase data compaction within shards. This results in better throughput per Lambda invocation.

After reducing stream size, reduce the Lambda concurrency to maintain a 1:1 ratio of shard count to Lambda concurrency mapping. As load increases, increase the parallelization factor the keep the shard size constant. With this increase, the Lambda concurrency should be at least shard count * parallelization factor.

To learn more, read about handling traffic with a parallelization factor.

5. Enable enhanced fan-out for consumers

Enhanced fan-out allows developers to scale up the number of stream consumers by offering each stream consumer its own read throughput.

To learn more about Kinesis enhanced fan-out, visit the documentation page.


This blog post shows some of the best practices when using Lambda with Kinesis. It covers operational levers for high-throughput, low latency, single source data processing pipelines.

The enhanced Amazon Kinesis shard level metrics help monitor the maximum overhead processing delay per shard. When correlated with the Lambda consumer’s iterator age metrics, this shows each consumer’s performance. The effective combination of batch size, parallelization factor, batch window, and partition key can lead to more efficient stream processing.

To learn more about Amazon Kinesis, visit the Getting Started page.

Enhanced monitoring and automatic scaling for Apache Flink

Post Syndicated from Karthi Thyagarajan original https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/

Thousands of developers use Apache Flink to build streaming applications to transform and analyze data in real time. Apache Flink is an open-source framework and engine for processing data streams. It’s highly available and scalable, delivering high throughput and low latency for the most demanding stream-processing applications. Monitoring and scaling your applications is critical to keep your applications running successfully in a production environment.

Amazon Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications. Amazon Kinesis Data Analytics manages the underlying Apache Flink components that provide durable application state, metrics and logs, and more. Kinesis Data Analytics recently announced new Amazon CloudWatch metrics and the ability to create custom metrics to provide greater visibility into your application.

In this post, we show you how to easily monitor and automatically scale your Apache Flink applications with Amazon Kinesis Data Analytics. We walk through three examples. First, we create a custom metric in the Kinesis Data Analytics for Apache Flink application code. Second, we use application metrics to automatically scale the application. Finally, we share a CloudWatch dashboard for monitoring your application and recommend metrics that you can alarm on.

Custom metrics

Kinesis Data Analytics uses Apache Flink’s metrics system to send custom metrics to CloudWatch from your applications. For more information, see Using Custom Metrics with Amazon Kinesis Data Analytics for Apache Flink.

We use a basic word count program to illustrate the use of custom metrics. The following code shows how to extend RichFlatMapFunction to track the number of words it sees. This word count is then surfaced via the Flink metrics API.

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
            private transient Counter counter;
            public void open(Configuration config) {
                this.counter = getRuntimeContext().getMetricGroup()
                        .addGroup("Service", "WordCountApplication")
            public void flatMap(String value, Collector<Tuple2<String, Integer>>out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2<>(token, 1));

Custom metrics emitted through the Flink metrics API are forwarded to CloudWatch metrics by Kinesis Data Analytics for Apache Flink. The following screenshot shows the word count metric in CloudWatch.

Custom automatic scaling

This section describes how to implement an automatic scaling solution for Kinesis Data Analytics for Apache Flink based on CloudWatch metrics. You can configure Kinesis Data Analytics for Apache Flink to perform CPU-based automatic scaling. However, you can automatically scale your application based on something other than CPU utilization. To perform custom automatic scaling, use Application Auto Scaling with the appropriate metric.

For applications that read from a Kinesis stream source, you can use the metric millisBehindLatest. This captures how far behind your application is from the head of the stream.

A target tracking policy is one of two scaling policy types offered by Application Auto Scaling. You can specify a threshold value around which to vary the degree of parallelism of your Kinesis Data Analytics application. The following sample code on GitHub configures Application Auto Scaling when millisBehindLatest for the consuming application exceeds 1 minute. This increases the parallelism, which increases the number of KPUs.

The following diagram shows how Application Auto Scaling, used with Amazon API Gateway and AWS Lambda, scales a Kinesis Data Analytics application in response to a CloudWatch alarm.

The sample code includes examples for automatic scaling based on the target tracking policy and step scaling policy.

Automatic scaling solution components

The following is a list of key components used in the automatic scaling solution. You can find these components in the AWS CloudFormation template in the GitHub repo accompanying this post.

  • Application Auto Scaling scalable target – A scalable target is a resource that Application Auto Scaling can scale in and out. It’s uniquely identified by the combination of resource ID, scalable dimension, and namespace. For more information, see RegisterScalableTarget.
  • Scaling policy – The scaling policy defines how your scalable target should scale. As described in the PutScalingPolicy, Application Auto Scaling supports two policy types: TargetTrackingScaling and StepScaling. In addition, you can configure a scheduled scaling action using Application Auto Scaling. If you specify TargetTrackingScaling, Application Auto Scaling also creates corresponding CloudWatch alarms for you.
  • API Gateway – Because the scalable target is a custom resource, we have to specify an API endpoint. Application Auto Scaling invokes this to perform scaling and get information about the current state of our scalable resource. We use an API Gateway and Lambda function to implement this endpoint.
  • Lambda – API Gateway invokes the Lambda function. This is called by Application Auto Scaling to perform the scaling actions. It also fetches information such as current scale value and returns information requested by Application Auto Scaling.

Additionally, you should be aware of the following:

  • When scaling out or in, this sample only updates the overall parallelism. It doesn’t adjust parallelism or KPU.
  • When scaling occurs, the Kinesis Data Analytics application experiences downtime.
  • The throughput of a Flink application depends on many factors, such as complexity of processing and destination throughput. The step-scaling example assumes a relationship between incoming record throughput and scaling. The millisBehindLatest metric used for target tracking automatic scaling works the same way.
  • We recommend using the default scaling policy provided by Kinesis Data Analytics for CPU-based scaling, the target tracking auto scaling policy for the millisBehindLatest metric, and a step scaling auto scaling policy for a metric such as numRecordsInPerSecond. However, you can use any automatic scaling policy for the metric you choose.

CloudWatch operational dashboard

Customers often ask us about best practices and the operational aspects of Kinesis Data Analytics for Apache Flink. We created a CloudWatch dashboard that captures the key metrics to monitor. We categorize the most common metrics in this dashboard with the recommended statistics for each metric.

This GitHub repo contains a CloudFormation template to deploy the dashboard for any Kinesis Data Analytics for Apache Flink application. You can also deploy a demo application with the dashboard. The dashboard includes the following:

  • Application health metrics:
    • Use uptime to see how long the job has been running without interruption and downtime to determine if a job failed to run. Non-zero downtime can indicate issues with your application.
    • Higher-than-normal job restarts can indicate an unhealthy application.
    • Checkpoint information size, duration, and number of failed checkpoints can help you understand application health and progress. Increasing checkpoint duration values can signify application health problems like backpressure and the inability to keep up with input data. Increasing checkpoint size over time can point to an infinitely growing state that can lead to out-of-memory errors.
  • Resource utilization metrics:
    • You can check the CPU and heap memory utilization along with the thread count. You can also check the garbage collection time taken across all Flink task managers.
  • Flink application progress metrics:
    • numRecordsInPerSecond and numRecordsOutPerSecond show the number of records accepted and emitted per second.
    • numLateRecordsDropped shows the number of records this operator or task has dropped due to arriving late.
    • Input and output watermarks are valid only when using event time semantics. You can use the difference between these two values to calculate event time latency.
  • Source metrics:
    • The Kinesis Data Streams-specific metric millisBehindLatest shows that the consumer is behind the head of the stream, indicating how far behind current time the consumer is. We used this metric to demonstrate Application Auto Scaling earlier in this post.
    • The Kafka-specific metric recordsLagMax shows the maximum lag in terms of number of records for any partition in this window.

The dashboard contains useful metrics to gauge the operational health of a Flink application. You can modify the threshold, configure additional alarms, and add other system or custom metrics to customize the dashboard for your use. The following screenshot shows a section of the dashboard.


In this post, we covered how to use the enhanced monitoring features for Kinesis Data Analytics for Apache Flink applications. We created custom metrics for an Apache Flink application within application code and emitted it to CloudWatch. We also used Application Auto Scaling to scale an application. Finally, we shared a CloudWatch dashboard to monitor the operational health of Kinesis Data Analytics for Apache Flink applications. For more information about using Kinesis Data Analytics, see Getting Started with Amazon Kinesis Data Analytics.

About the Authors

Karthi Thyagarajan is a Principal Solutions Architect on the Amazon Kinesis team.





Deepthi Mohan is a Sr. TPM on the Amazon Kinesis Data Analytics team.

Stream CDC into an Amazon S3 data lake in Parquet format with AWS DMS

Post Syndicated from Viral Shah original https://aws.amazon.com/blogs/big-data/stream-cdc-into-an-amazon-s3-data-lake-in-parquet-format-with-aws-dms/

Most organizations generate data in real time and ever-increasing volumes. Data is captured from a variety of sources, such as transactional and reporting databases, application logs, customer-facing websites, and external feeds. Companies want to capture, transform, and analyze this time-sensitive data to improve customer experiences, increase efficiency, and drive innovations. With increased data volume and velocity, it’s imperative to capture the data from source systems as soon as they are generated and store them on a secure, scalable, and cost-efficient platform.

AWS Database Migration Service (AWS DMS) performs continuous data replication using change data capture (CDC). Using CDC, you can determine and track data that has changed and provide it as a stream of changes that a downstream application can consume and act on. Most database management systems manage a transaction log that records changes made to the database contents and metadata. AWS DMS reads the transaction log by using engine-specific API operations and functions and captures the changes made to the database in a nonintrusive manner.

Amazon Simple Storage Service (Amazon S3) is the largest and most performant object storage service for structured and unstructured data and the storage service of choice to build a data lake. With Amazon S3, you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% of durability.

AWS DMS offers many options to capture data changes from relational databases and store the data in columnar format (Apache Parquet) into Amazon S3:

The second option helps you build a flexible data pipeline to ingest data into an Amazon S3 data lake from several relational and non-relational data sources, compared to just relational data sources support in the former option. Kinesis Data Firehose provides pre-built AWS Lambda blueprints for converting common data sources such as Apache logs and system logs to JSON and CSV formats or writing your own custom functions. It can also convert the format of incoming data from JSON to Parquet or Apache ORC before storing the data in Amazon S3. Data stored in columnar format gives you faster and lower-cost queries with downstream analytics services like Amazon Athena.

In this post, we focus on the technical challenges outlined in the second option and how to address them.

As shown in the following reference architecture, data is ingested from a database into Parquet format in Amazon S3 via AWS DMS integrating with Kinesis Data Streams and Kinesis Data Firehose.

Our solution provides flexibility to ingest data from several sources using Kinesis Data Streams and Kinesis Data Firehose with built-in data format conversion and integrated data transformation capabilities before storing data in a data lake. For more information about data ingestion into Kinesis Data Streams, see Writing Data into Amazon Kinesis Data Streams. You can then query Parquet data in Amazon S3 efficiently with Athena.

Implementing the architecture

AWS DMS can migrate data to and from most widely used commercial and open-source databases. You can migrate and replicate data directly to Amazon S3 in CSV and Parquet formats, and store data in Amazon S3 in Parquet because it offers efficient compression and encoding schemes. Parquet format allows compression schemes on a per-column level, and is future-proofed to allow adding more encodings as they are invented and implemented.

AWS DMS supports Kinesis Data Streams as a target. Kinesis Data Streams is a massively scalable and durable real-time data streaming service that can collect and process large streams of data records in real time. AWS DMS service publishes records to a data stream using JSON. For more information about configuration details, see Use the AWS Database Migration Service to Stream Change Data to Amazon Kinesis Data Streams.

Kinesis Data Firehose can pull data from Kinesis Data Streams. It’s a fully managed service that delivers real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk. Kinesis Data Firehose can convert the format of input data from JSON to Parquet or ORC before sending it to Amazon S3. It needs reference schema to interpret the AWS DMS streaming data in JSON and convert into Parquet. In this post, we use AWS Glue, a fully managed ETL service, to create a schema in the AWS Glue Data Catalog for Kinesis Data Firehose to reference.

When AWS DMS migrates records, it creates additional fields (metadata) for each migrated record. The metadata provides additional information about the record being migrated, such as source table name, schema name, and type of operation. Most metadata fields add – in their field names (for example, record-type, schema-name, table-name, transaction-id). See the following code:

        "data": {
            "MEET_CODE": 5189459,
            "MEET_DATE": "2020-02-21T19:20:04Z",
            "RACE_CODE": 5189459,
            "LAST_MODIFIED_DATE": "2020-02-24T19:20:04Z",
            "RACE_ENTRY_CODE": 11671651,
            "HORSE_CODE": 5042811
        "metadata": {
            "transaction-id": 917505,
            "schema-name": "SH",
            "operation": "insert",
            "table-name": "RACE_ENTRY",
            "record-type": "data",
            "timestamp": "2020-02-26T00:20:07.482592Z",
            "partition-key-type": "schema-table"

Additional metadata added by AWS DMS leads to an error during the data format conversion phase in Kinesis Data Firehose. Kinesis Data Firehose follows Hive style formatting and therefore doesn’t recognize the – character in the metadata field names during data conversion from JSON into Parquet and returns an error message: expected at the position 30 of ‘struct’ but ‘-’ is found. For example, see the following code:

	"deliveryStreamARN": "arn:aws:firehose:us-east-1:1234567890:deliverystream/abc-def-KDF",
	"destination": "arn:aws:s3:::abc-streaming-bucket",
	"deliveryStreamVersionId": 13,
	"message": "The schema is invalid. Error parsing the schema:
	 Error: : expected at the position 30 of 'struct<timestamp:string,record-type:string,operation:string,partition-key-type:string,schema-name:string,table-name:string,transaction-id:int>' but '-' is found.",
	"errorCode": "DataFormatConversion.InvalidSchema"

You can resolve the issue by making the following changes: specifying JSON key mappings and creating a reference table in AWS Glue before configuring Kinesis Data Firehose.

Specifying JSON key mappings

In your Kinesis Data Firehose configuration, specify JSON key mappings for fields with – in their names. Mapping transforms these specific metadata fields names to _ (for example, record-type changes to record_type).

Use AWS Command Line Interface (AWS CLI) to create Kinesis Data Firehose with the JSON key mappings. Modify the parameters to meet your specific requirements.

Kinesis Data Firehose configuration mapping is only possible through the AWS CLI or API and not through the AWS Management Console.

The following code configures Kinesis Data Firehose with five columns with – in their field names mapped to new field names with _”:

"S3BackupMode": "Disabled",
                    "DataFormatConversionConfiguration": {
                        "SchemaConfiguration": {
                            "RoleARN": "arn:aws:iam::123456789012:role/sample-firehose-delivery-role",
                            "DatabaseName": "sample-db",
                            "TableName": "sample-table",
                            "Region": "us-east-1",
                            "VersionId": "LATEST"
                        "InputFormatConfiguration": {
                            "Deserializer": {
                                "OpenXJsonSerDe": {
                                 "record_type": "record-type","partition_key_type": "partition-key-type","schema_name":"schema-name","table_name":"table-name","transaction_id":"transaction-id"

Creating a reference table in AWS Glue

Because Kinesis Data Firehose uses the Data Catalog to reference schema for Parquet format conversion, you must first create a reference table in AWS Glue before configuring Kinesis Data Firehose. Use Athena to create a Data Catalog table. For instructions, see CREATE TABLE. In the table, make sure that the column name uses _ in their names, and manually modify it in advance through the Edit schema option for the referenced table in AWS Glue, if needed.

Use Athena to query the results of data ingested by Kinesis Data Firehose into Amazon S3.

This solution is only applicable in the following use cases:

  • Capturing data changes from your source with AWS DMS
  • Converting data into Parquet with Kinesis Data Firehose

If you want to store data in non-Parquet format (such CSV or JSON) or ingest into Kinesis through other routes, then you don’t need to modify your Kinesis Data Firehose configuration.


This post demonstrated how to convert AWS DMS data into Parquet format and specific configurations to make sure metadata follows the expected format of Kinesis Data Streams and Kinesis Data Firehose. We encourage you to try this solution and take advantage of all the benefits of using AWS DMS with Kinesis Data Streams and Kinesis Data Firehose. For more information, see Getting started with AWS Database Migration Service and Setting up Amazon Kinesis Firehose.

If you have questions or suggestions, please leave a comment.


About the Author

Viral Shah is a Data Lab Architect with Amazon Web Services. Viral helps our customers architect and build data and analytics prototypes in just four days in the AWS Data Lab. He has over 20 years of experience working with enterprise customers and startups primarily in the Data and Database space.



Building storage-first serverless applications with HTTP APIs service integrations

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/building-storage-first-applications-with-http-apis-service-integrations/

Over the last year, I have been talking about “storage first” serverless patterns. With these patterns, data is stored persistently before any business logic is applied. The advantage of this pattern is increased application resiliency. By persisting the data before processing, the original data is still available, if or when errors occur.

Common pattern for serverless API backend

Common pattern for serverless API backend

Using Amazon API Gateway as a proxy to an AWS Lambda function is a common pattern in serverless applications. The Lambda function handles the business logic and communicates with other AWS or third-party services to route, modify, or store the processed data. One option is to place the data in an Amazon Simple Queue Service (SQS) queue for processing downstream. In this pattern, the developer is responsible for handling errors and retry logic within the Lambda function code.

The storage first pattern flips this around. It uses native error handling with retry logic or dead-letter queues (DLQ) at the SQS layer before any code is run. By directly integrating API Gateway to SQS, developers can increase application reliability while reducing lines of code.

Storage first pattern for serverless API backend

Storage first pattern for serverless API backend

Previously, direct integrations require REST APIs with transformation templates written in Velocity Template Language (VTL). However, developers tell us they would like to integrate directly with services in a simpler way without using VTL. As a result, HTTP APIs now offers the ability to directly integrate with five AWS services without needing a transformation template or code layer.

The first five service integrations

This release of HTTP APIs direct integrations includes Amazon EventBridge, Amazon Kinesis Data Streams, Simple Queue Service (SQS), AWS System Manager’s AppConfig, and AWS Step Functions. With these new integrations, customers can create APIs and webhooks for their business logic hosted in these AWS services. They can also take advantage of HTTP APIs features like authorizers, throttling, and enhanced observability for securing and monitoring these applications.

Amazon EventBridge

HTTP APIs service integration with Amazon EventBridge

HTTP APIs service integration with Amazon EventBridge

The HTTP APIs direct integration for EventBridge uses the PutEvents API to enable client applications to place events on an EventBridge bus. Once the events are on the bus, EventBridge routes the event to specific targets based upon EventBridge filtering rules.

This integration is a storage first pattern because data is written to the bus before any routing or logic is applied. If the downstream target service has issues, then EventBridge implements a retry strategy with incremental back-off for up to 24 hours. Additionally, the integration helps developers reduce code by filtering events at the bus. It routes to downstream targets without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Different tasks are required based upon incoming event details
  • Only data ingestion is required
  • Payload size is less than 256 kb
  • Expected requests per second are less than the Region quotas.

Amazon Kinesis Data Streams

HTTP APIs service integration with Amazon Kinesis Data Streams

HTTP APIs service integration with Amazon Kinesis Data Streams

The HTTP APIs direct integration for Kinesis Data Streams offers the PutRecord integration action, enabling client applications to place events on a Kinesis data stream. Kinesis Data Streams are designed to handle up to 1,000 writes per second per shard, with payloads up to 1 mb in size. Developers can increase throughput by increasing the number of shards in the data stream. You can route the incoming data to targets like an Amazon S3 bucket as part of a data lake or a Kinesis data analytics application for real-time analytics.

This integration is a storage first option because data is stored on the stream for up to seven days until it is processed and routed elsewhere. When processing stream events with a Lambda function, errors are handled at the Lambda layer through a configurable error handling strategy.

Use this direct integration when:

  • Ingesting large amounts of data
  • Ingesting large payload sizes
  • Order is important
  • Routing the same data to multiple targets

Amazon SQS

HTTP APIs service integration with Amazon SQS

HTTP APIs service integration with Amazon SQS

The HTTP APIs direct integration for Amazon SQS offers the SendMessage, ReceiveMessage, DeleteMessage, and PurgeQueue integration actions. This integration differs from the EventBridge and Kinesis integrations in that data flows both ways. Events can be created, read, and deleted from the SQS queue via REST calls through the HTTP API endpoint. Additionally, a full purge of the queue can be managed using the PurgeQueue action.

This pattern is a storage first pattern because the data remains on the queue for four days by default (configurable to 14 days), unless it is processed and removed. When the Lambda service polls the queue, the messages that are returned are hidden in the queue for a set amount of time. Once the calling service has processed these messages, it uses the DeleteMessage API to remove the messages permanently.

When triggering a Lambda function with an SQS queue, the Lambda service manages this process internally. However, HTTP APIs direct integration with SQS enables developers to move this process to client applications without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Data must be received as well as sent to the service
  • Downstream services need reduced concurrency
  • The queue requires custom management
  • Order is important (FIFO queues)

AWS AppConfig

HTTP APIs service integration with AWS Systems Manager AppConfig

HTTP APIs service integration with AWS Systems Manager AppConfig

The HTTP APIs direct integration for AWS AppConfig offers the GetConfiguration integration action and allows applications to check for application configuration updates. By exposing the systems parameter API through an HTTP APIs endpoint, developers can automate configuration changes for their applications. While this integration is not considered a storage first integration, it does enable direct communication from external services to AppConfig without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Access to AWS AppConfig is required.
  • Managing application configurations.

AWS Step Functions

HTTP APIs service integration with AWS Step Functions

HTTP APIs service integration with AWS Step Functions

The HTTP APIs direct integration for Step Functions offers the StartExecution and StopExecution integration actions. These actions allow for programmatic control of a Step Functions state machine via an API. When starting a Step Functions workflow, JSON data is passed in the request and mapped to the state machine. Error messages are also mapped to the state machine when stopping the execution.

This pattern provides a storage first integration because Step Functions maintains a persistent state during the life of the orchestrated workflow. Step Functions also supports service integrations that allow the workflows to send and receive data without needing a Lambda function as a transport layer.

Use this direct integration when:

  • Orchestrating multiple actions.
  • Order of action is required.

Building HTTP APIs direct integrations

HTTP APIs service integrations can be built using the AWS CLI, AWS SAM, or through the API Gateway console. The console walks through contextual choices to help you understand what is required for each integration. Each of the integrations also includes an Advanced section to provide additional information for the integration.

Creating an HTTP APIs service integration

Creating an HTTP APIs service integration

Once you build an integration, you can export it as an OpenAPI template that can be used with infrastructure as code (IaC) tools like AWS SAM. The exported template can also include the API Gateway extensions that define the specific integration information.

Exporting the HTTP APIs configuration to OpenAPI

Exporting the HTTP APIs configuration to OpenAPI

OpenAPI template

An example of a direct integration from HTTP APIs to SQS is located in the Sessions With SAM repository. This example includes the following architecture:

AWS SAM template resource architecture

AWS SAM template resource architecture

The AWS SAM template creates the HTTP APIs, SQS queue, Lambda function, and both Identity and Access Management (IAM) roles required. This is all generated in 58 lines of code and looks like this:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: HTTP API direct integrations

    Type: AWS::SQS::Queue
    Type: AWS::Serverless::HttpApi
          Name: 'AWS::Include'
            Location: './api.yaml'
    Type: "AWS::IAM::Role"
        Version: "2012-10-17"
          - Effect: "Allow"
              Service: "apigateway.amazonaws.com"
              - "sts:AssumeRole"
        - PolicyName: ApiDirectWriteToSQS
            Version: '2012-10-17'
              - sqs:SendMessage
              Effect: Allow
                - !GetAtt MyQueue.Arn
    Type: AWS::Serverless::Function
      CodeUri: src/
      Handler: app.lambdaHandler
      Runtime: nodejs12.x
        - SQSPollerPolicy:
            QueueName: !GetAtt MyQueue.QueueName
          Type: SQS
            Queue: !GetAtt MyQueue.Arn

    Description: "HTTP API endpoint URL"
    Value: !Sub "https://${MyHttpApi}.execute-api.${AWS::Region}.amazonaws.com"

The OpenAPI template handles the route definitions for the HTTP API configuration and configures the service integration. The template looks like this:

openapi: "3.0.1"
  title: "my-sqs-api"
          description: "Default response for POST /"
        integrationSubtype: "SQS-SendMessage"
          Fn::GetAtt: [MyHttpApiRole, Arn]
          MessageBody: "$request.body.MessageBody"
            Ref: MyQueue
        payloadFormatVersion: "1.0"
        type: "aws_proxy”
        connectionType: "INTERNET"
x-amazon-apigateway-importexport-version: "1.0"

Because the OpenAPI template is included in the AWS SAM template via a transform, the API Gateway integration can reference the roles and services created within the AWS SAM template.


This post covers the concept of storage first integration patterns and how the new HTTP APIs direct integrations can help. I cover the five current integrations and possible use cases for each. Additionally, I demonstrate how to use AWS SAM to build and manage the integrated applications using infrastructure as code.

Using the storage first pattern with direct integrations can help developers build serverless applications that are more durable with fewer lines of code. A Lambda function is no longer required to transport data from the API endpoint to the desired service. Instead, use Lambda function invocations for differentiating business logic.

To learn more join us for the HTTP API service integrations session of Sessions With SAM! 


Stream, transform, and analyze XML data in real time with Amazon Kinesis, AWS Lambda, and Amazon Redshift

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/stream-transform-and-analyze-xml-data-in-real-time-with-amazon-kinesis-aws-lambda-and-amazon-redshift/

When we look at enterprise data warehousing systems, we receive data in various formats, such as XML, JSON, or CSV. Most third-party system integrations happen through SOAP or REST web services, where the input and output data format is either XML or JSON. When applications deal with CSV or JSON, it becomes fairly simple to parse because most programming languages and APIs have direct support for CSV or JSON. But for XML files, we need to consider a custom parser, because the format is custom and can be very complex.

When systems interact with each other and process data through different pipelines, they expect real-time processing or availability of data, so that business decisions can be instant and quick. In this post, we discuss a use case where XMLs are streamed through a real-time processing system and can go through a custom XML parser to flatten data for easier business analysis.

To demonstrate the implementation approach, we use AWS cloud services like Amazon Kinesis Data Streams as the message bus, Amazon Kinesis Data Firehose as the delivery stream with Amazon Redshift data warehouse as the target storage solution, and AWS Lambda as record transformer of Kinesis Data Firehose, which flattens the nested XML structure with custom parser script in Python.

AWS services overview

This solution uses AWS services for the following purposes:

  • Kinesis Data Streams is a massively scalable and durable real-time data streaming service. It can continuously capture gigabytes of data per second from hundreds of thousands of sources, such as website click-streams, database event streams, financial transactions, social media feeds, IT logs, and location-tracking events. The data collected is available in milliseconds to enable real-time analytics use cases such as real-time dashboards, real-time anomaly detection, dynamic pricing, and more. We use Kinesis Data Streams because it’s a serverless solution that can scale based on usage.
  • Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores, and analytics tools. It can capture, transform, and load streaming data into Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk, enabling near-real-time analytics with existing business intelligence (BI) tools and dashboards you’re already using today. It’s a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt the data before loading it, minimizing the amount of storage used at the destination and increasing security. In our use case, our target storage layer is Amazon Redshift, so Kinesis Data Firehose fits great to simplify the solution.
  • Lambda is an event-driven, serverless computing platform provided by AWS. It’s a computing service that runs code in response to events and automatically manages the computing resources required by that code. Lambda supports multiple programming languages, and for our use case, we use Python 3.8. Other options include Amazon Kinesis Data Analytics with Flink, Amazon EMR with Spark streaming, Kinesis Data Firehose, or a custom application based on Kinesis consumer library. We use Kinesis Data Firehose as the consumer in this use case, with AWS Lambda as the record transformer, because our target storage is Amazon Redshift, which is supported by Kinesis Data Firehose.
  • Amazon S3 is an object storage service that offers industry-leading scalability, data availability, security, and performance. This means customers of all sizes and industries can use it to store and protect any amount of data for a range of use cases, such as websites, mobile applications, backup and restore, archive, enterprise applications, IoT devices, and big data analytics. For our use case, we use Amazon S3 as an intermediate storage before loading to the data warehousing system, so that it’s fault tolerant and provides better performance while loading to Amazon Redshift. By default, Kinesis Data Firehose requests an intermediate S3 bucket path when Amazon Redshift is the target.
  • Amazon Redshift is a fast, fully managed data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing BI tools. In our use case, we use Amazon Redshift so that BI tools like Amazon QuickSight can easily connect to Amazon Redshift to build real-time dashboards.

Architecture overview

The following diagram illustrates the simple architecture that you can use to implement the solution.

The architecture includes the following components:

  • The Amazon Kinesis Producer Library (KPL) represents the system that pushes data to Kinesis Data Streams. It can be a simple Amazon Elastic Compute Cloud (Amazon EC2) machine or your local windows command line that executes the Kinesis Data Streams command line interface (CLI) to push messages. Alternatively, it can be a dynamic application that uses Kinesis Data Streams APIs or KPL to push messages dynamically. For our use case, we spin up an EC2 instance through AWS Cloud9 and use Kinesis Data Streams CLI commands to publish messages.
  • Kinesis Data Streams receives messages against a partition key from the publisher and waits for consumers to consume it. By default, the retention period of the messages in Kinesis Data Streams is 24 hours, but you can extend it to 7 days.
  • Kinesis Data Firehose takes a few actions:
    • Consumes data from Kinesis Data Streams and writes the same XML message into a backup S3 bucket.
    • Invokes a Lambda function that acts as a record transformer. Lambda receives input as XML, applies transformations to flatten it to be pipe-delimited content, and returns it to Kinesis Data Firehose.
    • Writes the pipe-delimited content to another S3 bucket, which acts as an intermediate storage bucket before writing into Amazon Redshift.
    • Invokes the Amazon Redshift COPY command, which takes pipe-delimited data from the intermediate S3 bucket and writes it into Amazon Redshift.
  • Data is inserted into the Amazon Redshift table, which you can query for data analysis and reporting.

Solution overview

To implement this solution, you complete the following steps:

  1. Set up the Kinesis data stream as the message bus.
  2. Set up KPL, which publishes sample XML message data to Kinesis Data Streams.
  3. Create an Amazon Redshift cluster, which acts as target storage for the Firehose delivery stream.
  4. Set up the delivery stream, which uses Lambda for record transformation and Amazon Redshift as target storage.
  5. Customize a Lambda function script that converts the nested XML string to a flat pipe-delimited stream.


Before beginning this tutorial, make sure you have permissions to create Kinesis data streams and publish messages to the streams.

Setting up your Kinesis data stream

You can use the AWS Management Console to create a data stream as a one-time activity. You can configure the cluster capacity as per your requirement, but start with the minimum and apply auto scaling as the data volume increases. Auto scaling is based on Amazon CloudWatch metrics. For more information, see Scale Amazon Kinesis Data Streams with AWS Application Auto Scaling.

Setting up KPL

For this use case, we use the AWS Cloud9 environment IDE, where through the Linux command line, we can execute Kinesis Data Streams CLI commands to publish sample XML messages. The following code shows an example XML of an employee record that has one-level nesting for the all_addresses attribute:

aws kinesis put-record --stream-name <Stream-Name> --data "<employees><employee><first_name>FName 1</first_name><last_name>LName 1</last_name><all_address><address><type>primary</type><street_address>Street Address 1</street_address><state>State 1</state><zip>11111</zip></address><address><type>secondary</type><street_address>Street Address 2</street_address><state>State 2</state><zip>11112</zip></address></all_address><phone>111-111-1111</phone></employee><employee><first_name>FName 2</first_name><last_name>LName 2</last_name><all_address><address><type>primary</type><street_address>Street Address 3</street_address><state>State 3</state><zip>11113</zip></address><address><type>secondary</type><street_address>Street Address 4</street_address><state>State 4</state><zip>11114</zip></address></all_address><phone>111-111-1112</phone></employee></employees>" —partition-key <partition-key-name>

You need to change the stream name, XML data, and partition key in the preceding code as per your use case. Also, instead of an AWS Cloud9 environment, you have additional ways to submit messages to the data stream:

  • Use an EC2 instance to execute the Kinesis Data Streams CLI command
  • Use KPL or Kinesis Data Streams APIs in any programming language to submit messages dynamically through your custom application

Creating an Amazon Redshift cluster

In this step, you create an Amazon Redshift cluster that has required permissions and ports open for Kinesis Data Firehose to write to it. For instructions, see Controlling Access with Amazon Kinesis Data Firehose.

Make sure the cluster has the required port and permissions so that Kinesis Firehose can push data into it. Also make sure the table schema you create matches your pipe-delimited format that Lambda creates as output and Kinesis Data Firehose uses it to write to Amazon Redshift.

Setting up the delivery stream

When you create your Kinesis Data Firehose delivery stream on the console, define the source as Kinesis Data Streams, the target as the Amazon Redshift cluster, and enable record transformation with Lambda.

To complete this step, you need to create an AWS Identity and Access Management (IAM) role with the following permissions for the delivery stream:

  • Read permissions from the data stream
  • Write permissions to the intermediate S3 bucket
  • Write permissions to the defined Amazon Redshift cluster

Define the following configurations for the delivery stream:

  • Enable the source record transformation, where you selected your Lambda function.

  • As an optional step, you can enable source record backup, which saves the source XML to the S3 bucket path you define.

  • Define the intermediate S3 bucket, which you use to store transformed pipe-delimited records and later use for the Amazon Redshift copy.

  • In your Amazon Redshift configurations, for COPY options, make sure to specify DELIMITER ‘|’, because the Lambda function output is pipe delimited and Kinesis Data Firehose uses that in the Amazon Redshift copy operation.

Customizing the Lambda function

This function is invoked through Kinesis Data Firehose when the record arrives in Kinesis Data Streams.

Make sure you increase the Lambda execution timeout to more than 1 minute. See the following code:

from __future__ import print_function

import base64
import json
import boto3
import os
import time
import csv 
import sys

from xml.etree.ElementTree import XML, fromstring
import xml.etree.ElementTree as ET

print('Loading function')

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        payload = base64.b64decode(record['data'])
        parsedRecords = parseXML(payload)
        # Do custom processing on the payload here
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(parsedRecords)

    print('Successfully processed {} records.'.format(len(event['records'])))
    return {'records': output}
def parseXML(inputXML):
    xmlstring =  str(inputXML.decode('utf-8'))
    # create element tree object
    root = ET.fromstring(str(xmlstring))
    #print("Root Tag"+root.tag)
    # create empty list for items 
    xmlItems = ""
    # iterate over employee records
    for item in root.findall('employee'):
       #print("child tag name:"+item.tag+" - Child attribute")
       # Form pipe delimited string, by concatenating XML values
       record = item.find('first_name').text + "|" + item.find('last_name').text + "|" + item.find('phone').text
       primaryaddress = ""
       secondaryaddress = ""
       # Get primary address and secondary address separately to be concatenated to the original record in sequence
       for addressitem in item.find('all_address').findall('address'):
           if(addressitem.find('type').text == "primary"):
               primaryaddress = addressitem.find('street_address').text + "|" + addressitem.find('state').text + "|" + addressitem.find('zip').text
           elif(addressitem.find('type').text == "secondary"):
               secondaryaddress = addressitem.find('street_address').text + "|" + addressitem.find('state').text + "|" + addressitem.find('zip').text
       #print("Primary Address:"+primaryaddress)
       #print("Secondary Address:"+secondaryaddress)
       record += "|" + primaryaddress + "|" + secondaryaddress + "\n"
       xmlItems += record
    #print("Final Transformed Output:"+xmlItems)
    return xmlItems.encode('utf-8')

You can customize this example code to embed your own XML parser logic. Keep in mind that, while using the function, the request and response (synchronous calls) body payload size can be up to 6 MB, so it’s important to make sure the return value isn’t increased over that limit.

Your Amazon Redshift table (employees) has respective fields to capture the flattened pipe-delimited data. Your query might look like the following code to fetch and read the data:

SELECT first_name, last_name, phone, primary_address_street, primary_address_state, primary_address_zip, secondary_address_street, secondary_address_state, secondary_address_zip
FROM employees

The following screenshot shows the result of the query in the Amazon Redshift query editor.


While setting up this framework in your development environment, you can debug individual components of the architecture with the following guidelines:

  • Use the Kinesis Data Streams Monitoring tab to validate that it receives messages and read operations are happening through the consumer (Kinesis Data Firehose). You can also use Kinesis Data Streams CLI commands to read from the stream.
  • Use the Kinesis Data Firehose Monitoring tab to check if it receives messages from Kinesis Data Streams and can push them to Amazon Redshift. You can also check for errors on the Error logs tab or directly on the Amazon CloudWatch console.
  • Validate Lambda with a test execution to check that it can transform records to pipe-delimited formats and return to Amazon Data Firehose with the expected format (base64 encoded format).
  • Confirm that the S3 intermediate storage bucket has the transformed record and doesn’t write into failed processing or error record paths. Also, check if the transformed records are pipe delimited and match the schema of the target Amazon Redshift table.
  • Validate if the backup S3 bucket has the original XML format records. If Lambda or the delivery stream fails, you have an approach to manually reprocess it.
  • Make sure Amazon Redshift has the new data records reflecting through SQL SELECT queries and check the cluster’s health on the Monitoring


This post showed you how to integrate real-time streaming of XML messages and flatten them to store in a data warehousing system for real-time dashboards.

Although you followed individual steps for each service in your development environment, for a production setup, consider the following automation methods:

  • AWS CloudFormation allows you to embed infrastructure as code that can spin up all required resources for the project, and you can easily migrate or set up your application in production or other AWS accounts.
  • A custom monitoring dashboard can take input from each AWS service you use through its APIs and show the health of each service with the number of records being processed.

Let us know in the comments any thoughts of questions you have about applying this solution to your use cases.


About the Author

Sakti Mishra is a Data Lab Solutions Architect at AWS. He helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and travel.



Enhancing customer safety by leveraging the scalable, secure, and cost-optimized Toyota Connected Data Lake

Post Syndicated from Sandeep Kulkarni original https://aws.amazon.com/blogs/big-data/enhancing-customer-safety-by-leveraging-the-scalable-secure-and-cost-optimized-toyota-connected-data-lake/

Toyota Motor Corporation (TMC), a global automotive manufacturer, has made “connected cars” a core priority as part of its broader transformation from an auto company to a mobility company. In recent years, TMC and its affiliate technology and big data company, Toyota Connected, have developed an array of new technologies to provide connected services that enhance customer safety and the vehicle ownership experience. Today, Toyota’s connected cars come standard with an on-board Data Communication Module (DCM) that links to a Controller Area Network (CAN). By using this hardware, Toyota provides various connected services to its customers.

Some of the connected services help drivers to safely enjoy their cars. Telemetry data is available from the car 24×7, and Toyota makes the data available to its dealers (when their customers opt-in for data sharing). For instance, a vehicle’s auxiliary battery voltage declines over time. With this data, dealership staff can proactively contact customers to recommend a charge prior to experiencing any issues. This automotive telemetry can also help fleet management companies monitor vehicle diagnostics, perform preventive maintenance and help avoid breakdowns.

There are other services such as usage-based auto insurance that leverage driving behavior data that can help safe drivers receive discounts on their car insurance. Telemetry plays a vital role in understanding driver behavior. If drivers choose to opt-in, a safety score can be generated based on their driving data and drivers can use their smartphones to check their safe driving scores.

A vehicle generates data every second, which can be bundled into larger packets at one-minute intervals. With millions of connected cars that have data points available every second, the incredible scale required to capture and store that data is immense—there are billions of messages daily generating petabytes of data. To make this vision a reality, Toyota Connected’s Mobility Team embarked on building a real-time “Toyota Connected Data Lake.” Given the scale, we leveraged AWS to build this platform. In this post, we show how we built the data lake and how we provide significant value to our customers.


The guiding principles for architecture and design that we used are as follows:

  • Serverless: We want to use cloud native technologies and spend minimal time on infrastructure maintenance.
  • Rapid speed to market: We work backwards from customer requirements and iterate frequently to develop minimally viable products (MVPs).
  • Cost-efficient at scale.
  • Low latency: near real time processing.

Our data lake needed to be able to:

  • Capture and store new data (relational and non-relational) at petabyte scale in real time.
  • Provide analytics that go beyond batch reporting and incorporate real time and predictive capabilities.
  • Democratize access to data in a secure and governed way, allowing our team to unleash their creative energy and deliver innovative solutions.

The following diagram shows the high-level architecture


We built the serverless data lake with Amazon S3 as the primary data store, given the scalability and high availability of S3. The entire process is automated, which reduces the likelihood of human error, increases efficiency, and ensures consistent configurations over time, as well as reduces the cost of operations.

The key components of a data lake include Ingest, Decode, Transform, Analyze, and Consume:

  • IngestConnected vehicles send telemetry data once a minute—which includes speed, acceleration, turns, geo location, fuel level, and diagnostic error codes. This data is ingested into Amazon Kinesis Data Streams, processed through AWS Lambda to make it readable, and the “raw copy” is saved through Amazon Kinesis Data Firehose into an S3
  • Decode:  Data arriving into the Kinesis data stream in the ‘Decode’ pillar is decoded by a serverless Lambda function, which does most of the heavy lifting. Based upon a proprietary specification, this Lambda function does the bit-by-bit decoding of the input message to capture the particular sensor values. The small input payload of 35KB with data from over 180 sensors is now decoded and converted to a JSON message of 3 MB. This is then compressed and written to the ‘Decoded S3 bucket’.
  • Transform The aggregation jobs leverage the massively parallel capability of Amazon EMR, decrypt the decoded messages and convert the data to Apache Parquet Apache Parquet is a columnar storage file format designed for querying large amounts of data, regardless of the data processing framework, or programming language. Parquet allows for better compression, which reduces the amount of storage required. It also reduces I/O, since we can efficiently scan the data. The data sets are now available for analytics purposes, partitioned by masked identification numbers as well as by automotive models and dispatch type. A separate set of jobs transform the data and store it in Amazon DynamoDB to be consumed in real time from APIs.
  • ConsumeApplications needing to consume the data make API calls through the Amazon API Gateway. Authentication to the API calls is based on temporary tokens issued by Amazon Cognito.
  • AnalyzeData analytics can be directly performed off Amazon S3 by leveraging serverless Amazon Athena. Data access is democratized and made available to data science groups, who build and test various models that provide value to our customers.

Additionally, comprehensive monitoring is set up by leveraging Amazon CloudWatch, Amazon ES, and AWS KMS for managing the keys securely.


The scalability capabilities of the building blocks in our architecture that allow us to reach this massive scale are:

  • S3: S3 is a massively scalable key-based object store that is well-suited for storing and retrieving large datasets. S3 partitions the index based on key name. To maximize performance of high-concurrency operations on S3, we introduced randomness into each of the Parquet object keys to increase the likelihood that the keys are distributed across many partitions.
  • Lambda: We can run as many concurrent functions as needed and can raise limits as required with AWS support.
  • Kinesis Firehose: It scales elastically based on volume without requiring any human intervention. We batch requests up to 128MiB or 15 minutes, whichever comes earlier to avoid small files. Additional details are available in Srikanth Kodali’s blog post.
  • Kinesis Data Streams: We developed an automated program that adjusts the shards based on incoming volume. This is based on the Kinesis Scaling Utility from AWS Labs, which allows us to scale in a way similar to EC2 Auto Scaling groups.
  • API Gateway: automatically scales to billions of requests and seamlessly handles our API traffic.
  • EMR cluster: We can programmatically scale out to hundreds of nodes based on our volume and scale in after processing is completed.

Our volumes have increased seven-fold since we migrated to AWS and we have only adjusted the number of shards in Kinesis Data Streams and the number of core nodes for EMR processing to scale with the volume.

Security in the AWS cloud

AWS provides a robust suite of security services, allowing us to have a higher level of security in the AWS cloud. Consistent with our security guidelines, data is encrypted both in transit and at rest. Additionally, we use VPC Endpoints, allowing us to keep traffic within the AWS network.

Data protection in transit:

Data protection at rest:

  • S3 server-side encryption handles all encryption, decryption and key management transparently. All user data stored in DynamoDB is fully encrypted at rest, for which we use an AWS-owned customer master key at no additional charge. Server-side encryption for Kinesis Data streams and Kinesis Data Firehose is also enabled to ensure that data is encrypted at rest.

Cost optimization

Given our very large data volumes, we were methodical about optimizing costs across all components of the infrastructure. The ultimate goal was to figure out the cost of the APIs we were exposing. We developed a robust cost model validated with performance testing at production volumes:

  • NAT gateway: When we started this project, one of the significant cost drivers was traffic flowing from Lambda to Kinesis Data Firehose that went over the NAT gateway, since Kinesis Data Firehose did not have a VPC endpoint. Traffic flowing through the NAT gateway costs $0.045/GB, whereas traffic flowing through the VPC endpoint costs $0.01/GB. Based on a product feature request from Toyota, AWS implemented this feature (VPC Endpoint for Firehose) early this year. We implemented this feature, which resulted in a four-and-a-half-fold reduction in our costs for data transfer.
  • Kinesis Data Firehose: Since Kinesis Data Firehose did not support encryption of data at rest initially, we had to use client-side encryption using KMS–this was the second significant cost driver. We requested a feature for native server-side encryption in Kinesis Data Firehose. This was released earlier this year and we enabled server-side encryption on the Kinesis Data Firehose stream. This removed the Key Management Service (KMS), resulting in another 10% reduction in our total costs.

Since Kinesis Data Firehose charges based on the amount of data ingested ($0.029/GB), our Lambda function compresses the data before writing to Kinesis Data Firehose, which saves on the ingestion cost.

  • S3– We use lifecycle policies to move data from S3 (which costs $0.023/GB) to Amazon S3 Glacier (which costs $0.004/GB) after a specified duration. Glacier provides a six-fold cost reduction over S3. We further plan to move the data from Glacier to Amazon S3 Glacier Deep Archive (which costs $0.00099/GB), which will provide us a four-fold reduction over Glacier costs. Additionally, we have set up automated deletes of certain data sets at periodic intervals.
  • EMR– We were planning to use AWS Glue and keep the architecture serverless, but made the decision to leverage EMR from a cost perspective. We leveraged spot instances for transformation jobs in EMR, which can provide up to 60% savings. The hourly jobs complete successfully with spot instances, however the nightly aggregation jobs leveraging r5.4xlarge instances failed frequently as sufficient spot capacity was not available. We decided to move to “on-demand” instances, while we finalize our strategy for “reserved instances” to reduce costs.
  • DynamoDB: Time to Live (TTL) for DynamoDB lets us define when items in a table expire so that they can be automatically deleted from the database. We enabled TTL to expire objects that are not needed after a certain duration. We plan to use reserved capacity for read and write control units to reduce costs. We also use DynamoDB auto scaling ,which helps us manage capacity efficiently, and lower the cost of our workloads because they have a predictable traffic pattern. In Q2 of 2019, DynamoDBremoved the associated costs of DynamoDB Streams used in replicating data globally, which translated to extra cost savings in global tables.
  • Amazon DynamoDB Accelerator(DAX):  Our DynamoDB tables are front-ended by DAX, which improves the response time of our application by dramatically reducing read latency, as compared to using DynamoDB. Using DAX, we also lower the cost of DynamoDB by reducing the amount of provisioned read throughput needed for read-heavy applications.
  • Lambda: We ran benchmarks to arrive at the optimal memory configuration for Lambda functions. Memory allocation in Lambda determines CPU allocation and for some of our Lambda functions, we allocated higher memory, which results in faster execution, thereby reducing the amount of GB-seconds per function execution, which saves time and cost. Using DynamoDB Accelerator (DAX) from  Lambda has several benefits for serverless applications that also use DynamoDB. DAX can improve the response time of your application by dramatically reducing read latency, as compared to using DynamoDB. For serverless applications, combining Lambda with DAX provides an additional benefit: Lower latency results in shorter execution times, which means lower costs for Lambda.
  • Kinesis Data Streams: We scale our streams through an automated job, since our traffic patterns are fairly predictable. During peak hours we add additional shards and delete them during the off-peak hours, thus allowing us to reduce costs when shards are not in use

Enhancing customer safety

The Data Lake presents multiple opportunities to enhance customer safety. Early detection of market defects and pinpointing of target vehicles affected by those defects is made possible through the telemetry data ingested from the vehicles. This early detection leads to early resolution way before the customer is affected. On-board software in the automobiles can be constantly updated over-the-air (OTA), thereby saving time and costs. The automobile can generate a Health Check Report based on the driving style of its drivers, which can create the ideal maintenance plan for drivers for worry-free driving.

The driving data for an individual driver based on speed, sharp turns, rapid acceleration, and sudden braking can be converted into a “driver score” which ranges from 1 to 100 in value. The higher the driver-score, the safer the driver. Drivers can view their scores on mobile devices and monitor the specific locations of harsh driving on the journey map. They can then use this input to self-correct and modify their driving habits to improve their scores, which will not only result in a safer environment but drivers could also get lower insurance rates from insurance companies. This also gives parents an opportunity to monitor the scores for their teenage drivers and coach them appropriately on safe driving habits. Additionally, notifications can be generated if the teenage driver exceeds an agreed-upon speed or leaves a specific area.


The automated serverless data lake is a robust scalable platform that allows us to analyze data as it becomes available in real time. From an operations perspective, our costs are down significantly. Several aggregation jobs that took 15+ hours to run, now finish in 1/40th of the time. We are impressed with the reliability of the platform that we built. The architectural decision to go serverless has reduced operational burden and will also allow us to have a good handle on our costs going forward. Additionally, we can deploy this pipeline in other geographies with smaller volumes and only pay for what we consume.

Our team accomplished this ambitious development in a short span of six months. They worked in an agile, iterative fashion and continued to deliver robust MVPs to our business partners. Working with the service teams at AWS on product feature requests and seeing them come to fruition in a very short time frame has been a rewarding experience and we look forward to the continued partnership on additional requests.


About the Authors

Sandeep Kulkarni is an enterprise architect at AWS. His passion is to accelerate digital transformation for customers and build highly scalable and cost-effective solutions in the cloud. In his spare time, he loves to do yoga and gardening.





Shravanthi Denthumdas is the director of mobility services at Toyota Connected.Her team is responsible for building the Data Lake and delivering services that allow drivers to safely enjoy their cars. In her spare time, she likes to spend time with her family and children.





Integrating the MongoDB Cloud with Amazon Kinesis Data Firehose

Post Syndicated from Anusha Dharmalingam original https://aws.amazon.com/blogs/big-data/integrating-the-mongodb-cloud-with-amazon-kinesis-data-firehose/

Amazon Kinesis Data Firehose now supports the MongoDB Cloud platform as one of its delivery destinations. This native integration between Kinesis Data Firehose and MongoDB Cloud provides a managed, secure, scalable, and fault-tolerant delivery mechanism for customers into MongoDB Atlas, a global cloud solution for fully managed MongoDB database service for modern applications.

With the release of Kinesis Data Firehose HTTP endpoint delivery, you can now stream your data through Amazon Data Streams or directly push data to Kinesis Data Firehose and configure it to deliver data to MongoDB Atlas. You can also configure Kinesis Data Firehose to transform the data before delivering it to its destination. You don’t have to write applications and manage resources to read data and push to MongoDB. It’s all managed by AWS, making it easier to estimate costs for your data based on your data volume.

In this post, we discuss how to integrate Kinesis Data Firehose and MongoDB Cloud and demonstrate how to stream data from your source to MongoDB Atlas.

The following diagram depicts the overall architecture of the solution. We configure Kinesis Data Firehose to push the data to a MongoDB Realm event driven serverless javascript function. MongoDB Realm is an intuitive app development service to accelerate your frontend integration by simplifying your backend. We use a specific type of the function called a webhook. The webhook parses the JSON message from Kinesis Data Firehose and inserts parsed records into the MongoDB Atlas database.

Integrating Kinesis Data Firehose and MongoDB Atlas

Kinesis Data Firehose is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt the data before loading it, which minimizes the amount of storage used at the destination and increases security.

As part of Kinesis Data Firehose, you can transform your records before delivering them to the destination. In addition, Kinesis Data Firehose enables you to buffer data (based on size or time) before delivering to the final destination. In case of delivery failures, Kinesis Data Firehose can store your failed records in an Amazon Simple Storage Service (Amazon S3) bucket to prevent data loss.

MongoDB Atlas is a platform that can be used across a range of Online Transactional Processing (OLTP) and data analytics applications.  MongoDB Atlas allows developers to address popular use cases such as Internet of Things (IoT), Mobile Apps, Payments, Single View, Customer Data Management and many more.  In all of those cases, developers spend significant amount of time on delivering data to MongoDB Atlas from various data sources.  This integration significantly reduces the amount of development effort by leveraging Kinesis Data Firehose HTTP Endpoint integration to ingest data into MongoDB Atlas.

Creating a MongoDB Cloud Realm Application

  1. Log into your MongoDB cloud account. If you do not have an account you can sign up for a free account.
  2. Create an HTTP endpoint on the MongoDB Atlas platform by choosing 3rd Party Services on the Realm tab.
  3. Choose Add a Service.

  1. Choose HTTP.
  2. For Service Name, enter a name. Your service will appear under this name on the UI.

  1. Choose Add Incoming Webhook.

  1. For Authentication, select System.

  1. Leave other options at their default.
  2. In the function editor, enter the following code:
exports = function(payload, response) {
    const decodeBase64 = (s) => {
        var e={},i,b=0,c,x,l=0,a,r='',w=String.fromCharCode,L=s.length
        var A="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
        return r
    var fullDocument = JSON.parse(payload.body.text());
    const firehoseAccessKey = payload.headers["X-Amz-Firehose-Access-Key"]
    console.log('should be: ' + context.values.get("KDFH_SECRET_KEY"));
   // Check shared secret is the same to validate Request source
   if (firehoseAccessKey == context.values.get("KDFH_SECRET_KEY")) {

      var collection = context.services.get("Cluster0").db("kdf").collection("kdf-test");
      fullDocument.records.forEach((record) => {
            const document = JSON.parse(decodeBase64(record.data))
            const status = collection.insertOne(document);
            console.log("got status: "+ status)

            const s = JSON.stringify({
                requestId: payload.headers['X-Amz-Firehose-Request-Id'][0],
                timestamp: (new Date()).getTime()
            console.log("response JSON:" + s)
   } else {
                requestId: payload.headers['X-Amz-Firehose-Request-Id'][0],
                timestamp: (new Date()).getTime()
                errorMessage: "Error authenticating"

The preceding code is a simplified implementation of the webhook. The webhook inserts records one at a time and has abbreviated for readability error handling. For more information about the full implementation, see Using MongoDB Realm WebHooks with Amazon Kinesis Data Firehose.

This webhook uses the values and secrets of MongoDB Realm.

  1. On the Realm tab, choose Values & Secrets.

  1. On the Secrets tab, choose Create New Secret/Add a Secret.

  1. Enter the Secret Name and Secret Value and click save. The Secret Name entered here is the name used in webhook code.

  1. On the Values tab, choose Create New Value/dd a Value.

  1. Enter the Value Name.
  2. For Value Type, select Secret.
  3. For Secret Name, choose the secret you created.

  1. Choose Save.

You can now use the secret in your webhook function.

  1. Choose REVIEW & DEPLOY.

Creating a Kinesis Data Firehose delivery stream to MongoDB

  1. Log into AWS Console and search for Kinesis.
  2. On the Kinesis Data Firehose console, choose Create delivery stream.
  3. For Delivery stream name, enter a name.
  4. For Source, choose Direct PUT of other sources.
  5. Choose Next.

  1. On the Process recordspage, keep all settings at their default and choose Next.
  2. From the Third-party partner drop-down menu, choose MongoDB Cloud.

  1. For MongoDB Realm Webhooks HTTP Endpoint URL, please enter the URL of realm app that was created in MongoDB cloud console.
  2. For API Key, please enter the secret value stored in MongoDB secrets section.
  3. For Content encoding, leave it as Disabled.
  4. For S3 backup mode, select Failed data only.
  5. For S3 bucket, enter the S3 bucket for delivery of log events that exceeded the retry duration. Alternatively, you can create a new bucket by choosing Create new.
  6. Click on Next.
  7. For MongoDB buffer conditions, accept the default MongoDB and Amazon S3 buffer conditions for your stream.  Note that the buffer size should be a value between 1MiB and 16MiB.  Review the limits in MongoDB Atlas documentation.

  1. In the IAM role section, configure permissions for your delivery stream by choosing Create or update IAM role.
  2. Choose Next.
  3. Review your settings and choose Create delivery stream.

As part of HTTP endpoint integration, Kinesis Data Firehose only supports HTTPS endpoints. The server-side TLS/SSL certificate must be signed by a trusted Certificate Authority (CA) and is used for verification by Kinesis Data Firehose.

The body of the request that is delivered from Kinesis Data Firehose is a JSON document with the following schema:

"$schema": http://json-schema.org/draft-07/schema#

title: FirehoseCustomHttpsEndpointRequest
description: >
  The request body that the Firehose service sends to
  custom HTTPS endpoints.
type: object
    description: >
      Same as the value in the X-Amz-Firehose-Request-Id header,
      duplicated here for convenience.
    type: string
    description: >
      The timestamp (milliseconds since epoch) at which the Firehose
      server generated this request.
    type: integer
    description: >
      The actual records of the Delivery Stream, carrying 
      the customer data.
    type: array
    minItems: 1
    maxItems: 10000
      type: object
          description: >
            The data of this record, in Base64. Note that empty
            records are permitted in Firehose. The maximum allowed
            size of the data, before Base64 encoding, is 1024000
            bytes; the maximum length of this field is therefore
            1365336 chars.
          type: string
          minLength: 0
          maxLength: 1365336

  - requestId
  - records

The records are delivered as a collection based on BufferingHints configured on the Firehose delivery stream. The delivery-side service webhook created on MongoDB Realm has to process these records one by one before inserting them into MongoDB collections or use the MongoDB Bulk APIs.

When Kinesis Data Firehose is set up with an HTTP endpoint destination to MongoDB Cloud, you can push data into Kinesis Data Firehose using Kinesis Agent or SDK from your application. Kinesis Data Firehose is also integrated with other AWS data sources such as Kinesis Data Streams, AWS IoT, Amazon CloudWatch Logs, and Amazon CloudWatch Events.

To test the integration, use the testing option on the Kinesis Data Firehose console and test with sample data. After the time configured in BufferingHints, log in to your Atlas platform and navigate to your Database/Collection to see the ingested records.


In this post, we showed how easy it is to ingest data into the MongoDB Cloud platform using a Kinesis Data Firehose HTTP endpoint. This integration has many use cases.  For example you can stream Internet of Things (IoT) data directly into MongoDB Atlas platform with minimum code using AWS Kinesis Data Firehose HTTP endpoint integration.  Try MongoDB Atlas on AWS here.


About the Author

Anusha Dharmalingam is a Solutions Architect at Amazon Web Services, with a passion for Application Development and Big Data solutions. Anusha works with enterprise customers to help them architect, build, and scale applications to achieve their business goals.





Igor Alekseev is a Partner Solution Architect at AWS in Data and Analytics. Igor works with strategic partners helping them build complex, AWS-optimized architectures. Prior joining AWS, as a Data/Solution Architect, he implemented many projects in Big Data, including several data lakes in the Hadoop ecosystem. As a Data Engineer, he was involved in applying AI/ML to fraud detection and office automation. Igor’s projects were in a variety of industries including communications, finance, public safety, manufacturing, and healthcare. Earlier, Igor worked as full stack engineer/tech lead.

ICYMI: Season one of Sessions with SAM

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/icymi-season-one-of-sessions-with-sam/

Developers tell us they want to know how to easily build and manage their serverless applications. In 2017 AWS announced AWS Serverless Application Model (SAM) to help with just that. To help developers learn more about SAM, I created a weekly Twitch series called Sessions with SAM. Each session focuses on a specific serverless task or service. It demonstrates deploying and managing that task using infrastructure as code (IaC) with SAM templates. This post recaps each session of the first season to prepare you for Sessions with SAM season two, starting August 13.

Sessions with SAM

Sessions with SAM

What is SAM

AWS SAM is an open source framework designed for building serverless applications. The framework provides shorthand syntax to quickly declare AWS Lambda functions, Amazon DynamoDB tables and more. Additionally, SAM is not limited to serverless resources and can also declare any standard AWS CloudFormation resource. With around 20 lines of code, a developer can create an application with an API, logic, and database layer with the proper permissions in place.

Example of using SAM templates to generate infrastructure

20 Lines of code

By using infrastructure as code to manage and deploy serverless applications, developers gain several advantages. You can version the templates and rollback when necessary. They can be parameterized for flexibility across multiple environments. They can be shared with development teams for consistency across developer environments.


The code and linked videos are listed with the session. See the YouTube playlist and GitHub repository for the entire season.

Session one: JWT authorizers on Amazon API Gateway

In this session, I cover building an application backend using JWT authorizers with the new Amazon API Gateway HTTP API. We also discussed building an application with multiple routes and the ability to change the authorization requirements per route.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/http-api

Video: https://youtu.be/klOScYEojzY

Session two: Amazon Cognito authentication

In this session, I cover building an Amazon Cognito template for authentication. This includes the user management component with user pools and user groups in addition to a hosted authentication workflow with an app client.

Building an Amazon Cognito authentication provider

Building an Amazon Cognito authentication provider

We also discussed using custom pre-token Lambda functions to modify the JWT token issued by Amazon Cognito. This custom token allows you to insert custom scopes based on the Amazon Cognito user groups. These custom scopes are then used to customize the authorization requirements for the individual routes.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/cognito

Video: https://youtu.be/nBtWCjKd72M

Session three: Building a translation app with Amazon EventBridge

I covered using AWS SAM to build a basic translation and sentiment app centered around Amazon EventBridge. The SAM template created three Lambda functions, a custom EventBridge bus, and an HTTP API endpoint.

Architecture for serverless translation application

Architecture for serverless translation application

Requests from HTTP API endpoint are put into the custom EventBridge bus via the endpoint Lambda function. Based on the type of request, either the translate function or the sentiment function is invoked. The AWS SAM template manages all the infrastructure in addition to the permissions to invoke the Lambda functions and access Amazon Translate and Amazon Comprehend.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/eventbridge

Video: https://youtu.be/73R02KufLac

Session four: Building an Amazon Kinesis Data Firehose for ingesting website access logs

In this session, I covered building an Amazon Kinesis Data Firehose for ingesting large amounts of data. This particular application is designed for access logs generated from API Gateway. The logs are first stored to an Amazon DynamoDB data base for immediate processing. Next, the logs are sent through a Kinesis Data Firehose and stored in an Amazon S3 bucket for later processing.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/kinesis-firehose

Video: https://youtu.be/jdTBtaxs0hA

Session five: Analyzing API Gateway logs with Amazon Kinesis Data Analytics

Continuing from session 4, I discussed configuring API Gateway access logs to use the Kinesis Data Firehose built in the previous session. I also demonstrate an Amazon Kinesis data analytics application for near-real-time analytics of your access logs.

Example of Kinesis Data Analytics in SAM

Example of Kinesis Data Analytics in SAM

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/kinesis-firehose

Video: https://youtu.be/ce0v-q9EVTQ

Session six: Managing Amazon SQS with AWS SAM templates

I demonstrated configuring an Amazon Simple Queue Service (SQS) queue and the queue policy to control access to the queue. We also discuss allowing cross-account and external resources to access the queue. I show how to identify the proper principal resources for building the proper AWS IAM policy templates.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/SQS

Video: https://youtu.be/q2rbHMyJBDY

Session seven: Creating canary deploys for Lambda functions

In this session, I cover canary and linear deployments for serverless applications. We discuss how canary releases compare to linear releases and how they can be customized. We also spend time discussing pre-traffic and post-traffic tests and how rollbacks are handled when one of these tests fails.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/safe-deploy

Video: https://youtu.be/RE4r_6edaXc

Session eight: Configuring custom domains for Amazon API Gateway endpoints

In session eight I configured custom domains for API Gateway REST and HTTP APIs. The demonstration included the option to pass in an Amazon Route 53 zone ID or AWS Certificate Manager (ACM) certificate ARN. If either of these are missing, then the template built a zone or SSL cert respectively.

Working with Amazon Route 53 zones

Working with Amazon Route 53 zones

We discussed how to use declarative and imperative methods in our templates. We also discussed how to use a single domain across multiple APIs, regardless of they are REST or HTTP APIs.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/custom-domains

Video: https://youtu.be/4uXEGNKU5NI

Session nine: Managing AWS Step Functions with AWS SAM

In this session I was joined by fellow Senior Developer Advocate, Rob Sutter. Rob and I demonstrated managing and deploying AWS Step Functions using the new Step Functions support built into SAM. We discussed how SAM offers definition substitutions to pass data from the template into the state machine configuration.

Code: https://github.com/aws-samples/sessions-with-aws-sam/tree/master/step-functions

Video: https://youtu.be/BguUgdZwymQ

Session ten: Using Amazon EFS with Lambda functions in SAM

Joined by Senior Developer Advocate, James Beswick, we covered configuring Amazon Elastic File System (EFS) as a storage option for Lambda functions using AWS SAM. We discussed the Amazon VPC requirements in configuring for EFS. James also walked through using the AWS Command Line Interface (CLI) to aid in configuration of the VPC.

Code: https://github.com/aws-samples/aws-lambda-efs-samples

Video: https://youtu.be/up1op216trk

Session eleven: Ask the experts

This session introduced you to some of our SAM experts. Jeff Griffiths, Senior Product Manager, and Alex Woods, Software Development Engineer, joined me in answering live audience questions. WE discussed best practices for local development and debugging, Docker networking, CORS configurations, roadmap features and more.

SAM experts panel

SAM experts panel

Video: https://youtu.be/2JRa8MugPCY

Session twelve: Managing .Net Lambda function in AWS SAM and Stackery

In this final session of the season, I was joined by Stackery CTO and serverless hero, Chase Douglas. Chase demonstrated using Stackery and AWS SAM to build and deploy .Net Core Lambda functions. We discuss how Stackery’s editor allows developers to visually design a serverless application and how it uses SAM templates under the hood.

Stackery visual editor

Stackery visual editor

Code only examples

In addition to code examples with each video session, the repo includes developer-requested code examples. In this section, I demonstrate how to build an access log pipeline for HTTP API or use the SAM build command to compile Swift for Lambda functions.


Sessions with SAM helps developers bootstrap their serverless applications with instructional video and ready-made IaC templates. From JWT authorizers to EFS storage solutions, over 15 AWS services are represented in SAM templates. The first season of live videos supplements these templates with best practices explained and real developer questions answered.

Season two of Sessions with SAM starts August 13. The series will continue the pattern of explaining best practices, providing usable starter templates, and having some fun along the way.



Understanding database options for your serverless web applications

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/understanding-database-options-for-your-serverless-web-applications/

Many web developers use relational databases to store and manage data in their web applications. As you migrate to a serverless approach for web development, there are also other options available. These can help improve the scale, performance, and cost-effectiveness of your workloads. In this blog post, I highlight use-cases for different serverless database services, and common patterns useful for web applications.

Using Amazon RDS with serverless web applications

You can access Amazon RDS directly from your AWS Lambda functions. The RDS database, such as Amazon Aurora, is configured within the customer VPC. The Lambda function must be configured with access to the same VPC:

Lambda connecting to RDS

There are special considerations for this design in busy serverless applications. It’s common for popular web applications to experience “spiky” usage, where traffic volumes shift rapidly and unpredictably. Serverless services such as AWS Lambda and Amazon API Gateway are designed to automatically scale to meet these traffic increases.

However, relational databases are connection-based, so they are intended to work with a few long-lived clients, such as web servers. By contrast, Lambda functions are ephemeral and short-lived, so their database connections are numerous and brief. If Lambda scales up to hundreds or thousands of instances, you may overwhelm downstream relational databases with connection requests. This is typically only an issue for moderately busy applications. If you are using a Lambda function for low-volume tasks, such as running daily SQL reports, you do not experience this behavior.

The Amazon RDS Proxy service is built to solve the high-volume use-case. It pools the connections between the Lambda service and the downstream RDS database. This means that a scaling Lambda function is able to reuse connections via the proxy. As a result, the relational database is not overwhelmed with connections requests from individual Lambda functions. This does not require code changes in many cases. You only need to replace the database endpoint with the proxy endpoint in your Lambda function.

Lambda to RDS Proxy to RDS diagram

As a result, if you need to use a relational database in a high-volume web application, you can use RDS Proxy with minimal changes required.

Using Amazon DynamoDB as a high-performance operational database

Amazon DynamoDB is a high-performance key-value and document database that operates with single-digit millisecond response times at any scale. This is a NoSQL database that is a natural fit for many serverless workloads, especially web applications. It can operate equally well for low and high usage workloads. Unlike relational databases, the performance of a well-architected DynamoDB table is not adversely affected by heavy usage or large amounts of data storage.

For web applications, DynamoDB tables are ideal for storing common user configuration and application data. When integrated with Amazon Cognito, you can restrict row-level access to the current user context. This makes it a frequent choice for multi-tenant web applications that host data for many users.

DynamoDB tables can be useful for lookups of key-based information, in addition to geo-spatial queries in many cases. DynamoDB is not connection-based, so this integration works even if a Lambda function scales up to hundreds or thousands of concurrent executions. You can query directly from Lambda with minimal code:

const AWS = require('aws-sdk')
AWS.config.region = process.env.AWS_REGION
const documentClient = new AWS.DynamoDB.DocumentClient()

// Construct params
const params = {
  TableName: 'myDDBtable',
  Item: {
    partitionKey: 'user-123',
    sortKey: Date.now(),
    name: 'Alice',
    cartItems: 3

// Store in DynamoDB
const result = await documentClient.put(params).promise()

Using advanced patterns in DynamoDB, it’s possible to build equivalent features frequently found in relational schemas. For example, one-to-many tables, many-to-many tables, and ACID transactions can all be modeled in a single DynamoDB table.

Combining DynamoDB with RDS

While DynamoDB remains highly performant for high volumes of traffic, you need to understand data access patterns for your application before designing the schema. There are times where you need to perform ad hoc queries, or where downstream application users must use SQL-based tools to interact with databases.

In this case, combining both DynamoDB and RDS in your architecture can provide a resilient and flexible solution. For example, for a high-volume transactional web application, you can use DynamoDB to ingest data from your frontend application. For ad hoc SQL-based analytics, you could also use Amazon Aurora.

By using DynamoDB streams, you can process updates to a DynamoDB table using a Lambda function. In a simple case, this function can update tables in RDS, keeping the two databases synchronized. For example, when sales transactions are saved in DynamoDB, a Lambda function can post the sales information to transaction tables in Aurora.

DynamoDB to RDS architecture

Both the Lambda function and RDS database operate with the customer’s VPC, while DynamoDB is outside the VPC. DynamoDB Streams can invoke Lambda functions configured to access the VPC. In this model, RDS users can then run ad hoc SQL queries without impacting operational data managed by DynamoDB.

High-volume ETL processes between DynamoDB and RDS

For high-volume workloads capturing large numbers of transactions in DynamoDB, Lambda may still scale rapidly and exhaust the RDS connection pool. To process these flows, you may introduce Amazon Kinesis Data Firehose to help with data replication between DynamoDB and RDS.

ETL processing with with DynamoDB and RDS

  1. New and updated items in DynamoDB are sent to a DynamoDB stream. The stream invokes a stream processing Lambda function, sending batches of records to Kinesis Data Firehose.
  2. Kinesis buffers incoming messages and performs data transformations using a Lambda function. It then writes the output to Amazon S3, buffering by size (1–128 MB) or interval (60–900 seconds).
  3. The Kinesis Data Firehose transformation uses a custom Lambda function for processing records as needed.
  4. Amazon S3 is a durable store for these batches of transformed records. As objects are written, S3 invokes a Lambda function.
  5. The Lambda function loads the objects from S3, then connects to RDS and imports the data.

This approach supports high transaction volumes, enabling table item transformation before loading into RDS. The RDS concurrent connection pool is optimized by upstream batching and buffering, which reduces the number of concurrent Lambda functions and RDS connections.


Web developers commonly use relational databases in building their applications. When migrating to serverless architectures, a web developer can continue to use databases like RDS, or take advantage of other options available. RDS Proxy enables developers to pool database connections and use connection-based databases with ephemeral functions.

DynamoDB provides high-performance, low-latency NoSQL support, which is ideal for many busy web applications with spiky traffic volumes. However, it’s also possible to use both services to take advantage of the throughput of DynamoDB, together with the flexibility of ad hoc SQL queries in RDS.

For extremely high traffic volumes, you can introduce Kinesis Data Firehose to batch and transform data between DynamoDB and RDS. In this case, you separate the operational database from the analytics database. This solution uses multiple serverless services to handle scaling automatically.

To learn more about AWS serverless database solutions for web developers, visit https://aws.amazon.com/products/databases/.

New Relic drinks straight from the Firehose: Consuming Amazon Kinesis data

Post Syndicated from Mike Neville-O'Neill original https://aws.amazon.com/blogs/big-data/new-relic-drinks-straight-from-the-firehose-consuming-amazon-kinesis-data/

New Relic can now ingest data directly from Amazon Kinesis Data Firehose, expanding the insights New Relic can give you into your cloud stacks so you can deliver more perfect software. Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to AWS services like Amazon Simple Storage Service (Amazon S3), Amazon Redshift, and a wide array of external destinations.

As software teams have been forced to adopt disconnected monitoring tools for their infrastructure, applications, logs, and digital experience, it has created data silos that result in blind spots. Blind spots increase the amount of work to switch between tools to uncover answers, and make it harder to diagnose issues. New Relic One provides a connected real-time view of all your data in one place. New Relic’s open platform is designed to ensure easy ingestion and analysis regardless of source.

With the release of Kinesis Data Firehose HTTP endpoint delivery, you can easily configure data streams to automatically ingest and forward data to New Relic. You can also configure Kinesis Data Firehose to transform your data before delivering it. You don’t need to write applications, manage resources, or create AWS Lambda functions, which makes it easier to manage and estimate costs for your data based on data volume.

In this post, we demonstrate how to stream Amazon CloudWatch Logs data to New Relic using a Firehose delivery stream. We show you how to create and configure a delivery stream to ingest and forward data to New Relic and configure CloudWatch Logs to write to Kinesis Data Firehose.


Before continuing, you need a New Relic account and an Insights Insert API Key. You also need to install and configure the AWS Command Line Interface (AWS CLI) to make the policy and role changes covered by the post. For instructions, see Installing the AWS CLI.

You also need to make sure that your delivery stream has sufficient service-limit quotas to forward all your data. Kinesis Data Firehose has default quotas in place that range depending on the Region. You can create a case with AWS to request a quota increase.

Creating a delivery stream

To begin, you need to create a delivery stream to ingest logs from CloudWatch Logs. Complete the following steps:

  1. Sign in to the AWS Management Console and navigate to Kinesis.
  2. Under Data Firehose, choose Create delivery stream.
  3. Enter a name for the delivery stream.
  4. For Source, select Direct PUT or other sources.
  5. Choose Next until you’re prompted to Select a destination and choose 3rd party partner.
  6. From the drop-down menu, choose New Relic.
  7. For New Relic HTTP API, add the following endpoint: https://aws-api.newrelic.com/firehose/v1.
  8. Enter your Insights Insert API Key in the API access token field.
  9. Configure common attributes.
    Common attributes are inserted into every log that passes through the delivery stream, and can be queried against in New Relic Logs. As a best practice, we recommend including a logtype attribute to make sure your logs are properly parsed in New Relic Logs.The logtype attribute defined here appears in all your logs after they reach New Relic. If you intend to use the logtype attribute to determine which parsing rules are applied to your logs, we recommend creating a separate delivery stream for each logtype.
  10. Configure and review the remaining settings as desired.

Validating the delivery stream configuration

Optionally, we recommend that you confirm that your delivery steam forwards logs to your New Relic account by completing the following steps:

  1. On the Kinesis dashboard, choose Delivery Streams.
  2. Choose the delivery stream you created in the previous section.
  3. Expand Test with Demo Data and choose Start sending data.
  4. Wait 3–5 minutes for demo data to be written to your delivery stream.
  5. While you’re waiting, copy the Delivery Stream ARN—you need this to configure CloudWatch Logs to write to your delivery stream.If everything has been set up correctly, you should see demo data in your New Relic Logs account.
  6. Choose Stop sending demo data to avoid incurring additional usage charges.

Configuring CloudWatch Logs to write to Kinesis Data Firehose

Your next step is to configure CloudWatch to write logs to Kinesis Data Firehose. For more information, see Subscription Filters with Amazon Kinesis Data Firehose. For this post, we configure our delivery stream to forward logs to New Relic instead of Amazon S3.

We begin by creating an AWS Identity and Access Management (IAM) role that allows CloudWatch Logs to write data to your delivery stream. You can do this using the AWS Command Line Interface (AWS CLI).

  1. Use a text editor to create the following trust policy in a file (for example, ~/TrustPolicyForCWL.json). Make sure to replace us-east-1 with the Region containing your CloudWatch logs:
      "Statement": {
        "Effect": "Allow",
        "Principal": { "Service": "logs.us-east-1.amazonaws.com" },
        "Action": "sts:AssumeRole"

  2. Use the create-role command to create an IAM role using your newly created policy:
aws iam create-role \
      --role-name CWLtoKinesisFirehoseRole \
      --assume-role-policy-document file://~/TrustPolicyForCWL.json

Running the command returns the following code:

    "Role": {
        "AssumeRolePolicyDocument": {
            "Statement": {
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {
                    "Service": "logs.us-east-1.amazonaws.com"
        "RoleId": "AAOIIAH450GAB4HC5F431",
        "CreateDate": "2020-07-14T13:46:29.431Z",
        "RoleName": "CWLtoKinesisFirehoseRole",
        "Path": "/",
        "Arn": "arn:aws:iam::123456789012:role/CWLtoKinesisFirehoseRole"
  1. Create a policy that allows CloudWatch to write logs to your delivery stream. As before, use a text editor to create a file (for example, ~/PermissionsForCWL.json) containing the following code:
  1. Make sure to update the Region and AWS account ID placeholders in the preceding code with your account-specific details, and associate the policy with the role created at the beginning of this section. See the following code:
aws iam put-role-policy --role-name CWLtoKinesisFirehoseRole --policy-name Permissions-Policy-For-CWL --policy-document file://~/PermissionsForCWL.json

All required permissions should now be in place.

  1. The last step is to create a CloudWatch Logs subscription filter that determines which logs are written to your delivery stream and forwarded to New Relic. See the following code:
aws logs put-subscription-filter \
    --log-group-name "syslog" \
    --filter-name "Destination" \
    --filter-pattern "ERROR" \
    --destination-arn "arn:aws:firehose:region:123456789012:deliverystream/my-delivery-stream" \
    --role-arn "arn:aws:iam::123456789012:role/CWLtoKinesisFirehoseRole"

In the preceding code, replace the value of destination-arn with the ARN of the delivery stream you created at the beginning of this post. You also need to update the role-arn with the ARN of the CWLtoKinesisFirehoseRole you created earlier.

After you complete these steps, you can confirm that data is flowing in New Relic Logs. The following screenshot shows Kinesis Data Firehose delivering data to New Relic Logs.


In this post, we showed you how to automatically ingest and forward CloudWatch Logs data into New Relic using an Kinesis Data Firehose HTTP endpoint. We hope you will use this knowledge to expand the use of data streams in your organization to deliver more perfect software faster.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.


About the Authors

Mike Neville-O’Neill is a Product Manager on the New Relic Logging team. His professional interests include the ever-changing observability landscape, DevOps, information security, and log-based analytics






Colin Bookman is an ISV Sr. Solutions Architect at AWS based in Silicon Valley. He works with AWS ISVs and customers to help them build secure, high-performing, resilient, and cost-efficient infrastructure. He brings years of experience including childhood software and hardware projects, a Bachelor’s of Science in Computer Engineering from Georgia Tech, and past experience architecting and building services that handle millions of QPS. When not working, Colin enjoys taking his two dogs on long walks on the beach.





Analyze logs with Datadog using Amazon Kinesis Data Firehose HTTP endpoint delivery

Post Syndicated from Ilan Rabinovitch original https://aws.amazon.com/blogs/big-data/analyze-logs-with-datadog-using-amazon-kinesis-data-firehose-http-endpoint-delivery/

Amazon Kinesis Data Firehose now provides an easy-to-configure and straightforward process for streaming data to a third-party service for analysis, including logs from AWS services. Due to the varying formats and high volume of this data, it’s a complex challenge to identify and correlate key event details and data points to fix issues and improve your applications’ performance. This new feature enables you to easily and quickly send logs from AWS services directly to Datadog to perform this analysis. Kinesis Data Firehose is fully managed by AWS, so you don’t need to maintain any additional infrastructure or forwarding configurations for streaming logs.

In this post, we describe the Kinesis Data Firehose and related Datadog integration. We also show you how to ingest logs from multiple AWS services into Datadog.

Kinesis Data Firehose delivers real-time streaming data to destinations like Amazon Simple Storage Service (Amazon S3), Amazon Redshift, or Amazon Elasticsearch Service (Amazon ES), and now supports delivering streaming data to Datadog. Kinesis Data Firehose provides built-in, fully managed error handling, transformation, conversion, aggregation, and compression functionality so you don’t need to write applications to handle these complexities. These capabilities enable customers to monitor and analyze data from any source and use it to deliver operational intelligence and business performance.

Datadog is an AWS Partner Network (APN) Advanced Technology Partner with AWS Competencies in DevOps, Migration, Containers, and Microsoft Workloads, and enables you to easily explore and analyze logs to gain deeper insights into the state of your applications and AWS infrastructure. You can analyze all your AWS service logs while storing only the ones you need, and generate metrics from aggregated logs to uncover and alert on trends in your AWS services.

How it works

To process and deliver AWS service logs to Datadog, you can implement the following architecture.

Using this solution, you can configure AWS Web Application Firewall (AWS WAF) or Amazon API Gateway to deliver log events directly to Kinesis Data Firehose. If using another service that delivers logs to Amazon CloudWatch Logs, you can use CloudWatch log subscriptions to feed log events from CloudWatch Logs and have it delivered to a Firehose delivery stream. By configuring Kinesis Data Firehose with the Datadog API as a destination, you can deliver the logs to Datadog for further analysis. You can also back up events to an S3 bucket to help prevent data loss. You can configure backup for all log events or only those exceeding a specified retry duration. For more information about adjusting your backup based on the amount of log data you’re streaming, see Amazon Kinesis Data Firehose Quota.

Creating your delivery stream

Your new delivery stream needs an API key to send logs to Datadog. You can find your key in the API section of your Datadog account.

If you don’t already have an account, visit the Datadog website to sign up for a free 14-day trial.

To create your delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. For Delivery stream name, enter a name.
  3. For Source, choose Direct PUT of other sources.
  4. Choose Next.

  1. On the Process records page, keep all settings at their default and choose Next.
  2. On the Choose a destination page, for Destination, choose Third-party partner.
  3. From the Third-party partner drop-down menu, choose Datadog.

  1. For HTTP endpoint URL, choose the desired HTTP endpoint based on your Region and Datadog account configuration.

For more information, see Logs Guides.

  1. For API key, enter your Datadog API key.

This allows your delivery stream to publish to the endpoint.

  1. For Content encoding, select GZIP.

  1. Accept the defaults for Retry duration.
  2. For S3 backup mode, select Failed data only.
  3. For S3 bucket, enter the S3 bucket for delivery of log events that exceeded the retry duration.

Alternatively, you can create a new bucket by choosing Create new.

  1. Choose Next.
  2. For Datadog buffer conditions, accept the default Datadog and Amazon S3 buffer conditions for your stream.

  1. In the IAM role section, configure permissions for your delivery stream by choosing Create or update IAM role.
  2. Choose Next.
  3. Review your settings and choose Create delivery stream.

Logs subscribed to your delivery stream are now available for analysis with Datadog.


Datadog allows you to monitor your servers, containers, databases, and third-party services to ensure high availability, optimize performance, and troubleshoot issues. With Kinesis Data Firehose HTTP endpoint delivery, AWS service log data is available for analysis quickly. This allows you to identify issues and performance bottlenecks with your applications by correlating logs from AWS services such as AWS CloudTrail, Amazon Relational Database Service (Amazon RDS), and your AWS Lambda functions with metrics and traces. By taking advantage of a fully managed AWS service, this delivery method provides high availability and scalability for your integration. For more information about configuring this integration directly via AWS CloudFormation, see Log collection.


About the Authors

Ilan Rabinovitch is Vice President of Product and Community at Datadog.






Kalyan Janaki is Senior Technical Account Manager with AWS. Kalyan enjoys working with customers and helping them migrate their workloads to the cloud. In his spare time, he tries to keep up with his 2-year-old.






Bryan Hopkins is a Senior Technical Account Manager with AWS. Bryan helps large enterprises build secure and scalable solutions on the AWS cloud. In his spare time, he enjoys reading classic science fiction and building quirky creations with his family.

Stream data to an HTTP endpoint with Amazon Kinesis Data Firehose

Post Syndicated from Imtiaz Sayed original https://aws.amazon.com/blogs/big-data/stream-data-to-an-http-endpoint-with-amazon-kinesis-data-firehose/

The value of data is time sensitive. Streaming data services can help you move data quickly from data sources to new destinations for downstream processing. For example, Amazon Kinesis Data Firehose can reliably load streaming data into data stores like Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk.

We’re now expanding the Kinesis Data Firehose delivery destinations to include generic HTTP endpoints. This enables you to use a fully managed delivery service to HTTP endpoints without building custom applications or worrying about operating and managing the data delivery infrastructure. Additionally, the HTTP endpoint enhancement opens up a number of key integration opportunities between Kinesis Data Firehose and other AWS services such as Amazon DynamoDB or Amazon SNS or Amazon RDS using Amazon API Gateway’s AWS Service integrations.

All the existing Kinesis Data Firehose features are fully supported, including AWS Lambda service integration, retry option, data protection on delivery failure, and cross-account and cross-Region data delivery. Traffic between Kinesis Data Firehose and the HTTP endpoint is encrypted in transit using HTTPS. Kinesis Data Firehose incorporates error handling, automatic scaling, transformation, conversion, aggregation, and compression functionality to help you accelerate the deployment of data streams across your organization. There is no additional cost to use this feature.

In this post, we walk through setting up a Kinesis Data Firehose HTTP endpoint. Our example use case ingests data into a Firehose delivery stream and sends it to an Amazon API Gateway REST API endpoint that loads the data to a DynamoDB table using the DynamoDB AWS Service integration.

Configuring a delivery stream to an HTTP endpoint

To set this up, you provide the URL for your HTTP endpoint application or service, add an optional access key as a header to the HTTP calls made to your HTTP endpoint, and include optional key-value parameters. You can configure the endpoint on the AWS Management Console, the AWS Command Line Interface (AWS CLI), or the AWS SDK.

  1. On the console, under Analytics, choose Kinesis.

  1. Choose Create delivery stream.

  1. For Delivery stream name, enter a name.
  2. For Choose a source, select Direct PUT or other sources as the source using the Kinesis Data Firehose PutRecord API.
  3. Leave the rest of the settings at their default and choose Next.

  1. Leave all settings at their default in Step 2: Process records and choose Next.

  1. For Destination, select HTTP Endpoint.

  1. For HTTP endpoint name, enter a name.
  2. For HTTP endpoint URL, enter your endpoint URL.

For this post, we have provisioned an Amazon DynamoDB integrated REST API endpoint using Amazon API Gateway. For more information, see Creating a REST API in Amazon API Gateway.

  1. For Access key, enter an access key (optional).
  2. For Content encoding, select Disabled.

You can optionally choose to encode and compress your request body before posting it to your HTTP endpoint.

  1. For Parameters, you can pass optional key-value parameters as needed.

In this post, we pass two key-value pairs that serve as inputs to an Amazon API Gateway integration with our Amazon DynamoDB API endpoint.

    • TableName as the Key and Table1 as the Value.
    • Region as the Key and us-east-2 as the Value.

  1. For Retry duration, leave at its default of 300 seconds.
  2. For S3 backup mode, select Failed data only.
  3. For S3 bucket, enter an S3 bucket as a backup for the delivery stream to store data that failed delivery to the HTTP API endpoint.
  4. Choose Next.

  1. Leave everything at its default on the Configure settings page and choose Next.
  2. Review your settings and choose Create delivery stream.

When the delivery stream is active, your source can start streaming data to it.

Testing the delivery stream

For this post, we use the Test with demo data feature available in Kinesis Data Firehose to stream sample data to the newly created delivery stream.

  1. On the Kinesis Data Firehose console, choose the delivery stream you just created.
  2. Choose Test with demo data.

The delivery stream delivers the demo data to the API Gateway REST API, that is configured as the HTTP endpoint. The integration endpoint reads the key-value header attributes to determine the DynamoDB table name and region, and loads the payload to the specified table.

Monitoring the delivery stream

You can view Amazon CloudWatch metrics on the Monitoring tab. Pertinent metrics to observe are Delivery to HTTP Endpoint data freshness and HTTP Endpoint delivery success.


This post demonstrated how to create a delivery stream to a HTTP endpoint, which eliminates the need to develop custom applications or manage the corresponding infrastructure. Kinesis Data Firehose provides a fully managed service that helps you reduce complexities, so you can expand and accelerate the use of data streams throughout your organization.


About the Authors

Imtiaz (Taz) Sayed is the World Wide Tech Leader for Data Analytics at AWS. He is an ardent data engineer and relishes connecting with the data-analytics community. He likes roller-coasters, good heist movies, and is undecided between “The Godfather” and “The Shawshank Redemption” as the greatest movie of all time.







Masudur Rahaman Sayem is a Specialist Solution Architect for Analytics at AWS. He is passionate about distributed systems. He also likes to read, especially the classic comic books.





ICYMI: Serverless Q2 2020

Post Syndicated from Moheeb Zara original https://aws.amazon.com/blogs/compute/icymi-serverless-q2-2020/

Welcome to the 10th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, checkout what happened last quarter here.

AWS Lambda

AWS Lambda functions can now mount an Amazon Elastic File System (EFS). EFS is a scalable and elastic NFS file system storing data within and across multiple Availability Zones (AZ) for high availability and durability. In this way, you can use a familiar file system interface to store and share data across all concurrent execution environments of one, or more, Lambda functions. EFS supports full file system access semantics, such as strong consistency and file locking.

Using different EFS access points, each Lambda function can access different paths in a file system, or use different file system permissions. You can share the same EFS file system with Amazon EC2 instances, containerized applications using Amazon ECS and AWS Fargate, and on-premises servers.

Learn how to create an Amazon EFS-mounted Lambda function using the AWS Serverless Application Model in Sessions With SAM Episode 10.

With our recent launch of .NET Core 3.1 AWS Lambda runtime, we’ve also released version 2.0.0 of the PowerShell module AWSLambdaPSCore. The new version now supports PowerShell 7.

Amazon EventBridge

At AWS re:Invent 2019, we introduced a preview of Amazon EventBridge schema registry and discovery. This is a way to store the structure of the events (the schema) in a central location. It can simplify using events in your code by generating the code to process them for Java, Python, and TypeScript. In April, we announced general availability of EventBridge Schema Registry.

We also added support for resource policies. Resource policies allow sharing of schema repository across different AWS accounts and organizations. In this way, developers on different teams can search for and use any schema that another team has added to the shared registry.

Ben Smith, AWS Serverless Developer Advocate, published a guide on how to capture user events and monitor user behavior using the Amazon EventBridge partner integration with Auth0. This enables better insight into your application to help deliver a more customized experience for your users.

AWS Step Functions

In May, we launched a new AWS Step Functions service integration with AWS CodeBuild. CodeBuild is a fully managed continuous integration service that compiles source code, runs tests, and produces packages that are ready for deployment. Now, during the execution of a state machine, you can start or stop a build, get build report summaries, and delete past build executions records.

With the new AWS CodePipeline support to invoke Step Functions you can customize your delivery pipeline with choices, external validations, or parallel tasks. Each of those tasks can now call CodeBuild to create a custom build following specific requirements. Learn how to build a continuous integration workflow with Step Functions and AWS CodeBuild.

Rob Sutter, AWS Serverless Developer Advocate, has published a video series on Step Functions. We’ve compiled a playlist on YouTube to help you on your serverless journey.

AWS Amplify

The AWS Amplify Framework announced in April that they have rearchitected the Amplify UI component library to enable JavaScript developers to easily add authentication scenarios to their web apps. The authentication components include numerous improvements over previous versions. These include the ability to automatically sign in users after sign-up confirmation, better customization, and improved accessibility.

Amplify also announced the availability of Amplify Framework iOS and Amplify Framework Android libraries and tools. These help mobile application developers to easily build secure and scalable cloud-powered applications. Previously, mobile developers relied on a combination of tools and SDKS along with the Amplify CLI to create and manage a backend.

These new native libraries are oriented around use-cases, such as authentication, data storage and access, machine learning predictions etc. They provide a declarative interface that enables you to programmatically apply best practices with abstractions.

A mono-repository is a repository that contains more than one logical project, each in its own repository. Monorepo support is now available for the AWS Amplify Console, allowing developers to connect Amplify Console to a sub-folder in your mono-repository. Learn how to set up continuous deployment and hosting on a monorepo with the Amplify Console.

Amazon Keyspaces (for Apache Cassandra)

Amazon Managed Apache Cassandra Service (MCS) is now generally available under the new name: Amazon Keyspaces (for Apache Cassandra). Amazon Keyspaces is built on Apache Cassandra and can be used as a fully managed serverless database. Your applications can read and write data from Amazon Keyspaces using your existing Cassandra Query Language (CQL) code, with little or no changes. Danilo Poccia explains how to use Amazon Keyspace with API Gateway and Lambda in this launch post.

AWS Glue

In April we extended AWS Glue jobs, based on Apache Spark, to run continuously and consume data from streaming platforms such as Amazon Kinesis Data Streams and Apache Kafka (including the fully-managed Amazon MSK). Learn how to manage a serverless extract, transform, load (ETL) pipeline with Glue in this guide by Danilo Poccia.

Serverless posts

Our team is always working to build and write content to help our customers better understand all our serverless offerings. Here is a list of the latest published to the AWS Compute Blog this quarter.

Introducing the new serverless LAMP stack

Ben Smith, AWS Serverless Developer Advocate, introduces the Serverless LAMP stack. He explains how to use serverless technologies with PHP. Learn about the available tools, frameworks and strategies to build serverless applications, and why now is the right time to start.


Building a location-based, scalable, serverless web app

James Beswick, AWS Serverless Developer Advocate, walks through building a location-based, scalable, serverless web app. Ask Around Me is an example project that allows users to ask questions within a geofence to create an engaging community driven experience.

Building well-architected serverless applications

Julian Wood, AWS Serverless Developer Advocate, published two blog series on building well-architected serverless applications. Learn how to better understand application health and lifecycle management.

Device hacking with serverless

Go beyond the browser with these creative and physical projects. Moheeb Zara, AWS Serverless Developer Advocate, published several serverless powered device hacks, all using off the shelf parts.




Tech Talks and events

We hold AWS Online Tech Talks covering serverless topics throughout the year. You can find these in the serverless section of the AWS Online Tech Talks page. We also regularly join in on podcasts, and record short videos you can find to learn in quick bite-sized chunks.

Here are the highlights from Q2.

Innovator Island Workshop

Learn how to build a complete serverless web application for a popular theme park called Innovator Island. James Beswick created a video series to walk you through this popular workshop at your own pace.

Serverless First Function

In May, we held a new virtual event series, the Serverless-First Function, to help you and your organization get the most out of the cloud. The first event, on May 21, included sessions from Amazon CTO, Dr. Werner Vogels, and VP of Serverless at AWS, David Richardson. The second event, May 28, was packed with sessions with our AWS Serverless Developer Advocate team. Catch up on the AWS Twitch channel.

Live streams

The AWS Serverless Developer Advocate team hosts several weekly livestreams on the AWS Twitch channel covering a wide range of topics. You can catch up on all our past content, including workshops, on the AWS Serverless YouTube channel.

Eric Johnson hosts “Sessions with SAM” every Thursday at 10AM PST. Each week, Eric shows how to use SAM to solve different serverless challenges. He explains how to use SAM templates to build powerful serverless applications. Catch up on the last few episodes.

James Beswick, AWS Serverless Developer Advocate, has compiled a round-up of all his content from Q2. He has plenty of videos ranging from beginner to advanced topics.

AWS Serverless Heroes

We’re pleased to welcome Kyuhyun Byun and Serkan Özal to the growing list of AWS Serverless Heroes. The AWS Hero program is a selection of worldwide experts that have been recognized for their positive impact within the community. They share helpful knowledge and organize events and user groups. They’re also contributors to numerous open-source projects in and around serverless technologies.

Still looking for more?

The Serverless landing page has much more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more getting started tutorials.

Follow the AWS Serverless team on our new LinkedIn page we share all the latest news and events. You can also follow all of us on Twitter to see latest news, follow conversations, and interact with the team.

Chris Munns: @chrismunns
Eric Johnson: @edjgeek
James Beswick: @jbesw
Moheeb Zara: @virgilvox
Ben Smith: @benjamin_l_s
Rob Sutter: @rts_rob
Julian Wood: @julian_wood

Best practices from Delhivery on migrating from Apache Kafka to Amazon MSK

Post Syndicated from Akash Deep Verma original https://aws.amazon.com/blogs/big-data/best-practices-from-delhivery-on-migrating-from-apache-kafka-to-amazon-msk/

This is a guest post by Delhivery.

In this post, we describe the steps Delhivery took to migrate from self-managed Apache Kafka running on Amazon Elastic Compute Cloud (Amazon EC2) to Amazon Managed Streaming for Apache Kafka (Amazon MSK). “We’ve been in production for over a year now,” said Akash Deep Verma, Senior Technical Architect, at Delhivery. “We have over 350+ applications running on Amazon MSK that are producing and consuming data every second of every day. Overall, it’s a happy experience to be working with Amazon MSK!”

Delhivery is India’s leading fulfilment platform for digital commerce. With its nationwide network extending beyond 18,000 pin codes and 2,500 cities, the company provides a full suite of logistics services like express parcel transportation, LTL and FTL freight, reverse logistics, cross-border, B2B and B2C warehousing, and technology services.

“Our vision is to become the operating system for commerce in India, through a combination of world-class infrastructure, logistics operations of the highest quality, and cutting-edge engineering and technology capabilities,” Verma says. “Our team has successfully fulfilled over 650 million orders to more than 120 million households across India. We operate 24 automated sort centers, 75 fulfilment centers, 70 hubs, over 2,500+ direct delivery centers, over 8,000+ partner centers, and more than 14,000+ vehicles. Over 40,000 team members make it possible to deliver a million packages a day, 24 hours a day, 7 days a week, 365 days a year.”

Self-managing Apache Kafka was difficult

We process close to 1 TB of data per day to serve various analytical functions. The data comes from shipment tracking, order tracking, GPS, biometrics, handheld devices, sorters, weights, clients, and facilities. It moves through various systems and services using several real-time and batch pipelines. The data is processed and enriched to serve both business and technical use cases. Due to the nature of our business, incoming messages and events on Apache Kafka consisted of a steady pace with intermittent spikes, and ranged from 10,000–12,000 messages coming in per second and 50,000–55,000 messages going out per second.

Apache Kafka serves as the critical messaging and events backbone to these dependent systems and services.

We were self-managing Apache Kafka brokers and its associated components, like Apache ZooKeeper, on Amazon Elastic Compute Cloud (Amazon EC2) instances.

With the growth in our business, managing these components and ensuring uptime became a significant resource-intensive operation. We had to allocate two developers on a constant basis to manage our Apache Kafka infrastructure and maintain uptime. This undifferentiated heavy lifting caused productivity loss because the developers couldn’t contribute effectively towards business feature development.

“We wanted a managed Apache Kafka service to reduce the time and resources we used for infrastructure management,” says Verma. “This would enable us to reprioritize our technical team to focus on feature development that added more business value.”

Getting time back by migrating to Amazon MSK

We looked at several options to replace our self-hosted Apache Kafka on EC2 instances, and chose Amazon MSK. With Amazon MSK, we could continue to use native Apache Kafka APIs and run our existing applications on AWS without changing the code. It provisions, configures, and maintains Apache Kafka clusters and Apache ZooKeeper nodes for us. This enables us to shift developers from managing infrastructure to writing more creative applications for our business.

On advice from the AWS team, we took the following steps:

  1. Size the MSK cluster
  2. Migrate individual Apache Kafka topics to Amazon MSK
  3. Monitor on Amazon MSK

Sizing the MSK cluster

To properly size our MSK cluster, we needed to understand our existing workload dynamics. We retrieved the following metrics from our existing Amazon EC2-based Apache Kafka clusters:

  • Ingestion rate from producers – We considered the broker level metric BytesInPerSec, chose the average value for individual broker, and aggregated the value across all brokers present in the cluster to estimate the net ingestion rate (not to be confused with ReplicationBytesInPerSec, which gives ingestion rate from other brokers).
  • Consumption rate from consumers – We looked at the broker level metric BytesOutPerSec, chose the average value for individual broker, and aggregated the value across all brokers present in the cluster to estimate net consumption rate for the cluster (not to be confused with ReplicationBytesOutPerSec which gives consumption rate to other brokers).
  • Data replication strategy – We determined the highest value of replication factor, evaluated between cluster global parameter default.replication.factor, and that specified for individual topics.
  • Data retention strategy and target percentage of disk utilization – We considered the highest value of data retention evaluated between cluster global parameter log.retention.hours and that specified for individual topics. We also specified the percentage of used storage and estimated the headroom that we needed to be comfortable with our use case.

AWS provided us with an Amazon MSK Sizing and Pricing spreadsheet to help us estimate the number of brokers that we needed in our MSK cluster. We subsequently performed POCs in our environment and found the suggested cluster sizing from the spreadsheet accurate. The spreadsheet also helped us to easily estimate the cluster pricing well in advance. For more information, see Numbers of brokers per cluster.

Migrating individual Apache Kafka topics to Amazon MSK

We considered several options to migrate individual topics to Amazon MSK:

  • MirrorMaker 1.0, which ships with Apache Kafka and is a standalone tool that can move data from self-managed Apache Kafka clusters to Amazon MSK with minimal downtime.
  • Using consumers to read data from self-managed Apache Kafka clusters and write to Amazon MSK. This migration approach required some application downtime.
  • Other replication tools that we had experience with.

We used a combination of the first two approaches for our migration. For time-critical topics, we used MirrorMaker 1.0 to migrate data to Amazon MSK. For non-time-critical topics, our internal SLAs allowed us to perform a redirection of application traffic from our self-managed Apache Kafka clusters to Amazon MSK.

The MirrorMaker option involved setting up a MirrorMaker 1.0 daemon on a self-managed EC2 instance to consume messages from the source cluster, and republish them to the target MSK cluster. Each MirrorMaker 1.0 thread gets equipped with a separate consumer instance and shares one common producer. The process is as follows:

  1. The MirrorMaker 1.0 instance spawns a consumer process that interacts with the Apache ZooKeeper ensemble supporting the source Apache Kafka cluster for topic discovery.
  2. The consumer process reads payload from the concerned topic.
  3. MirrorMaker 1.0 spawns a producer process that interacts with the managed Apache ZooKeeper fleet of Amazon MSK via the Apache ZooKeeper endpoint.
  4. The producer process relays the payload retrieved by the consumer process to the respective topic of Amazon MSK via the broker endpoint

The following diagram shows our migration topology.

The MSK cluster creation process requires subnet IDs as input so that the broker and Apache ZooKeeper nodes can map to the customer VPC. This mapping is acheived by creating ENIs within these subnet IDs with a primary private IPv4 address. The broker and Apache ZooKeeper endpoints attached with the MSK cluster actually resolve to these private IPv4 addresses.

We used the following command to mirror all topics from the Amazon EC2-based source cluster to Amazon MSK:

--consumer.config config/mirrormaker-consumer.properties 
--producer.config config/mirrormaker-producer.properties 
--whitelist '*'

The command contains the following details:

With acceptance of KIP-382, Amazon MSK can now support MirrorMaker 2.0 and benefit from MirrorMaker 2.0 advantages. For instructions, configuration files, sample code, and labs using MirrorMaker 2.0 to migrate a self-managed Apache Kafka cluster to an MSK cluster, see the MirrorMaker2 on Amazon EC2 workshop.

You can migrate an existing Apache Kafka cluster to Amazon MSK using Amazon Kinesis Data Analytics, a fully managed service for Apache Flink. This enables you to use fully managed Apache Flink applications to process streaming data stored in Amazon MSK. For more information about using Amazon Kinesis Data Analytics with Amazon MSK, see Tutorial: Using a Kinesis Data Analytics application to Replicate Data from One MSK Cluster to Another in a VPC and the Clickstream Lab.

At a steady state, our MSK cluster in production uses the following configuration:

  • Broker nodes – 6 x m5.4xlarge
  • Replication factor – 3
  • Number of producers – 110+
  • Number of consumers – 300+
  • Topics – 500+
  • Kstreams running on broker – 55+

Monitoring on Amazon MSK

Amazon MSK provides Amazon CloudWatch metrics under three levels of granularity:


For our use case, we used the highest granularity PER_TOPIC_PER_BROKER for monitoring with optimum visibility. To automate the detection of operational issues, we developed custom CloudWatch alarms with the following metrics.

Metric Name

Alarm Trigger


UnderReplicatedPartitionsWhen not 0If partition replicas fall too far behind their leaders, the follower partition is removed from the ISR pool. A corresponding increase in the IsrShrinksPerSec metric is also expected.
OfflinePartitionsCountWhen not 0Defines the number of partitions without an active leader. Any partition without an active leader is completely inaccessible, and both consumers and producers of that partition are blocked until a leader becomes available.
IsrShrinksPerSecWhen increases without an increase in IsrExpandsPerSecA replica could be removed from the ISR pool for a couple of reasons:

  • The replica is too far behind the leader’s offset.
  • The replica hasn’t contacted the leader for some time (configurable with the replica.socket.timeout.ms).
  • More brokers are being added to the cluster (IsrExpandsPerSec should increase as well).
  • Partitions are removed (IsrExpandsPerSec should increase as well).
UncleanLeaderElectionsPerSecWhen not 0An unclean leader election is a special case in which no available replicas are in sync. Because each topic must have a leader, an election is held among the out-of-sync replicas and a leader is chosen—meaning any messages that weren’t synced prior to the loss of the former leader are lost forever.
ActiveControllerCountWhen not 1The controller in an Apache Kafka cluster is responsible for maintaining the list of partition leaders and coordinating leadership transitions (in the event a partition leader becomes unavailable).
KafkaDataLogsDiskUsedSet an alarm when the disk utilization exceeds 85%, and add storage either via API or using the console such that disk utilization drops to 70% or belowPercentage of space utilized in the EBS volume per broker.

Amazon MSK also supports metrics exposed via port 11001 (for the JMX Exporter) and port 11002 (for the Node Exporter) to be captured using monitoring tools such as Prometheus and a variety of third-party tools compatible with Prometheus-formatted metrics like Datadog, New Relic, and SumoLogic. For more information, see Open Monitoring with Prometheus. For instructions on configuring open monitoring, see the Open Monitoring lab.


Self-managed Apache Kafka brokers meant that we had to make sure Apache ZooKeeper always maintained quorum, the network connectivity between brokers was monitored, and different auxiliary Apache Kafka processes like LogCleaner were monitored. All these parts failed at some point, so we were constantly fixing issues with our infrastructure and monitoring. The amount of time that we spent taking care of Apache Kafka could have been better utilized delivering actual business value.

Amazon MSK enabled us to be more productive by reducing the time we spent maintaining our infrastructure, finding and fixing issues, and maintaining our brokers. It takes care of Apache Kafka maintenance in the background, gives us the level of monitoring that we needed, and frees our team to take on more activities to improve our applications and provide value to our customers.


About the Authors

Akash Deep Verma is a Senior Technical Architect at Delhivery. Akash joined Delhivery in 2016 and worked on many projects related to Big Data and Analytics. Most recently, he led architecture initiatives related to Democratization of Data. In his free time, he enjoys playing table tennis and watching mind-boggling movies.




Dipta S. Bhattacharya is an Enterprise Solutions Architect at AWS. Dipta joined AWS in 2018. He works with large startup customers to design and develop architectures on AWS and support their journey on the cloud.





Nikhil Khokhar is a Solutions Architect at AWS. He joined AWS in 2016 and specializes in building and supporting data streaming solutions that help customers analyze and get value out of their data. In his free time, he makes use of his 3D printing skills to solve everyday problems.

How Wind Mobility built a serverless data architecture

Post Syndicated from Pablo Giner original https://aws.amazon.com/blogs/big-data/how-wind-mobility-built-a-serverless-data-architecture/

Guest post by Pablo Giner, Head of BI, Wind Mobility.

Over the past few years, urban micro-mobility has become a trending topic. With the contamination indexes hitting historic highs, cities and companies worldwide have been introducing regulations and working on a wide spectrum of solutions to alleviate the situation.

We at Wind Mobility strive to make commuters’ life more sustainable and convenient by bringing short distance urban transportation to cities worldwide.

At Wind Mobility, we scale our services at the same pace as our users demand them, and we do it in an economically and environmentally viable way. We optimize our fleet distribution to avoid overcrowding cities with more scooters than those that are actually going to be used, and we position them just meters away from where our users need them and at the time of the day when they want them.

How do we do that? By optimizing our operations to their fullest. To do so, we need to be very well informed about our users’ behavior under varying conditions and understand our fleet’s potential.

Scalability and flexibility for rapid growth

We knew that before we could solve this challenge, we needed to collect data from many different sources, such as user interactions with our application, user demand, IoT signals from our scooters, and operational metrics. To analyze the numerous datasets collected and extract actionable insights, we needed to build a data lake. While the high-level goal was clear, the scope was less so. We were working hard to scale our operation as we continued to launch new markets. The rapid growth and expansion made it very difficult to predict the volume of data we would need to consume. We were also launching new microservices to support our growth, which resulted in more data sources to ingest. We needed an architecture that allowed us to be agile and quickly adopt to meet our growth. It became clear that a serverless architecture was best positioned to meet those needs, so we started to design our 100% serverless infrastructure.

The first challenge was ingesting and storing data from our scooters in the field, events from our mobile app, operational metrics, and partner APIs. We use AWS Lambda to capture changes in our operational databases and mobile app and push the events to Amazon Kinesis Data Streams, which allows us to take action in real time. We also use Amazon Kinesis Data Firehose to write the data to Amazon Simple Storage Service (Amazon S3), which we use for analytics.

After we were in Amazon S3 and adequately partitioned as per its most common use cases (we partition by date, region, and business line, depending on the data source), we had to find a way to query this data for both data profiling (understanding structure, content, and interrelationships) and ad hoc analysis. For that we chose AWS Glue crawlers to catalog our data and Amazon Athena to read from the AWS Glue Data Catalog and run queries. However, ad hoc analysis and data profiling are relatively sporadic tasks in our team, because most of the data processing computing hours are actually dedicated to transforming the multiple data sources into our data warehouse, consolidating the raw data, modeling it, adding new attributes, and picking the data elements, which constitute 95% of our analytics and predictive needs.

This is where all the heavy lifting takes place. We parse through millions of scooter and user events generated daily (over 300 events per second) to extract actionable insight. We selected AWS Glue to perform this task. Our primary ETL job reads the newly added raw event data from Amazon S3, processes it using Apache Spark, and writes the results to our Amazon Redshift data warehouse. AWS Glue plays a critical role in our ability to scale on demand. After careful evaluation and testing, we concluded that AWS Glue ETL jobs meet all our needs and free us from procuring and managing infrastructure.

Architecture overview

The following diagram represents our current data architecture, showing two serverless data collection, processing, and reporting pipelines:

  • Operational databases from Amazon Relational Database Service (Amazon RDS) and MongoDB
  • IoT and application events, followed by Athena for data profiling and Amazon Redshift for reporting

Our data is curated and transformed multiple times a day using an automated pipeline running on AWS Glue. The team can now focus on analyzing the data and building machine learning (ML) applications.

We chose Amazon QuickSight as our business intelligence tool to help us visualize and better understand our operational KPIs. Additionally, we use Amazon Elastic Container Registry (Amazon ECR) to store our Docker images containing our custom ML algorithms and Amazon Elastic Container Service (Amazon ECS) where we train, evaluate, and host our ML models. We schedule our models to be trained and evaluated multiple times a day. Taking as input curated data about demand, conversion, and flow of scooters, we run the models to help us optimize fleet utilization for a particular city at any given time.

The following diagram represents how data from the data lake is incorporated into our ML training, testing, and serving system. First, our developers work in the application code and commit their changes, which are built into new Docker images by our CI/CD pipeline and stored in the Amazon ECR registry. These images are pushed into Amazon ECS and tested in DEV and UAT environments before moving to PROD (where they are triggered by the Amazon ECS task scheduler). During their execution, the Amazon ECS tasks (some train the demand and usage forecasting models, some produce the daily and hourly predictions, and others optimize the fleet distribution to satisfy the forecast) read their configuration and pull data from Amazon S3 (which has been previously produced by scheduled AWS Glue jobs), finally storing their results back into Amazon S3. Executions of these pipelines are tracked via MLFlow (in a dedicated Amazon Elastic Compute Cloud (Amazon EC2) server) and the final result indicating the fleet operations required is fit into a Kepler map, which is then consumed by the operators on the field.


We at Wind Mobility place data at the forefront of our operations. For that, we need our data infrastructure to be as flexible as the industry and the context we operate in, which is why we chose serverless. Over the course of a year, we have built a data lake, a data warehouse, a BI suite, and a variety of (production) data science applications. All of that with a very small team.

Also, within the last 12 months, we have scaled up several of our data pipelines by a factor of 10, without slowing our momentum or redesigning any part of our architecture. When it came to double our fleet in 1 week and increase the frequency at which we capture data from scooters by a factor of 10, our serverless data architecture scaled with no issues. This allowed us to focus on adding value by simplifying our operation, reacting to changes quickly, and delighting our users.

We have measured our success in multiple dimensions:

  • Speed – Serverless is faster to deploy and expand; we believe we have reduced our time to market for the entire infrastructure by a factor of 2
  • Visibility – We have 360 degree visibility of our operations worldwide, accessible by our city managers, finance team, and management board
  • Optimized fleet deployment – We know, at any minute of the day, the number of scooters that our customers need over the next few hours, which reduces unsatisfied demand by more than 50%

If you face a similar challenge, our advice is clear: go fully serverless and use the spectrum of solutions available from AWS.

Follow us and discover more about Wind Mobility on Facebook, Instagram and LinkedIn.


About the Author

Pablo Giner is Head of BI at Wind Mobility. Pablo’s background is in wheels (motorcycle racing > vehicle engineering > collision insurance > eScooters sharing…) and for the last few years he has specialized in forming and developing data teams. At Wind Mobility, he leads the data function (data engineering + analytics + data science), and the project he is most proud of is what they call smart fleet rebalancing, an AI backed solution to reposition their fleet in real-time. “In God we trust. All others must bring data.” – W. Edward Deming




Build an AWS Well-Architected environment with the Analytics Lens

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/build-an-aws-well-architected-environment-with-the-analytics-lens/

Building a modern data platform on AWS enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools. Yet you may be unsure of how to get started and the impact of certain design decisions. To address the need to provide advice tailored to specific technology and application domains, AWS added the concept of well-architected lenses 2017. AWS now is happy to announce the Analytics Lens for the AWS Well-Architected Framework. This post provides an introduction of its purpose, topics covered, common scenarios, and services included.

The new Analytics Lens offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. The goal is to give you a consistent way to design and evaluate cloud architectures, based on the following five pillars:

  • Operational excellence
  • Security
  • Reliability
  • Performance efficiency
  • Cost optimization

The tool can help you assess the analytics workloads you have deployed in AWS by identifying potential risks and offering suggestions for improvements.

Using the Analytics Lens to address common requirements

The Analytics Lens models both the data architecture at the core of the analytics applications and the application behavior itself. These models are organized into the following six areas, which encompass the vast majority of analytics workloads deployed on AWS:

  1. Data ingestion
  2. Security and governance
  3. Catalog and search
  4. Central storage
  5. Processing and analytics
  6. User access

The following diagram illustrates these areas and their related AWS services.

There are a number of common scenarios where the Analytics Lens applies, such as the following:

  • Building a data lake as the foundation for your data and analytics initiatives
  • Efficient batch data processing at scale
  • Building a platform for streaming ingest and real-time event processing
  • Handling big data processing and streaming
  • Data-preparation operations

Whichever of these scenarios fits your needs, building to the principles of the Analytics Lens in the AWS Well-Architected Framework can help you implement best practices for success.

The Analytics Lens explains when and how to use the core services in the AWS analytics portfolio. These include Amazon Kinesis, Amazon Redshift, Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. It also explains how Amazon Simple Storage Service (Amazon S3) can serve as the storage for your data lake and how to integrate with relevant AWS security services. With reference architectures, best practices advice, and answers to common questions, the Analytics Lens can help you make the right design decisions.


Applying the lens to your existing architectures can validate the stability and efficiency of your design (or provide recommendations to address the gaps that are identified). AWS is committed to the Analytics Lens as a living tool; as the analytics landscape evolves and new AWS services come on line, we’ll update the Analytics Lens appropriately. Our mission will always be to help you design and deploy well-architected applications.

For more information about building your own Well-Architected environment using the Analytics Lens, see the Analytics Lens whitepaper.

Special thanks to the following individuals who contributed to building this resource, among many others who helped with review and implementation: Radhika Ravirala, Laith Al-Saadoon, Wallace Printz, Ujjwal Ratan, and Neil Mukerje.

Are there questions you’d like to see answered in the tool? Share your thoughts and questions in the comments.


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at Amazon Web Services. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.



Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

New – Serverless Streaming ETL with AWS Glue

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-serverless-streaming-etl-with-aws-glue/

When you have applications in production, you want to understand what is happening, and how the applications are being used. To analyze data, a first approach is a batch processing model: a set of data is collected over a period of time, then run through analytics tools. To be able to react quickly, you can use a streaming model, where data is processed as it arrives, a record at a time or in micro-batches of tens, hundreds, or thousands of records.

Managing continuous ingestion pipelines and processing data on-the-fly is quite complex, because it’s an always-on system that needs to be managed, patched, scaled, and generally taken care of. Today, we are making this easier and more cost-effective to implement by extending AWS Glue jobs, based on Apache Spark, to run continuously and consume data from streaming platforms such as Amazon Kinesis Data Streams and Apache Kafka (including the fully-managed Amazon MSK).

In this way, Glue can provision, manage, and scale the infrastructure needed to ingest data to data lakes on Amazon S3, data warehouses such as Amazon Redshift, or other data stores. For example, you can store streaming data in a DynamoDB table for quick lookups, or in Elasticsearch to look for specific patterns. This procedure is usually referred to as extract, transform, load (ETL).

As you process streaming data in a Glue job, you have access to the full capabilities of Spark Structured Streaming to implement data transformations, such as aggregating, partitioning, and formatting as well as joining with other data sets to enrich or cleanse the data for easier analysis. For example, you can access an external system to identify fraud in real-time, or use machine learning algorithms to classify data, or detect anomalies and outliers.

Processing Streaming Data with AWS Glue
To try this new feature, I want to collect data from IoT sensors and store all data points in an S3 data lake. I am using a Raspberry Pi with a Sense HAT to collect temperature, humidity, barometric pressure, and its position in space in real-time (using the integrated gyroscope, accelerometer, and magnetometer). Here’s an architectural view of what I am building:

First, I register the device with AWS IoT Core, and run the following Python code to send, once per second, a JSON message with sensor data to the streaming-data MQTT topic. I have a single device in this setup, with more devices, I would use a subtopic per device, for example streaming-data/{client_id}.

import time
import datetime
import json
from sense_hat import SenseHat
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder

sense = SenseHat()

topic = "streaming-data"
client_id = "raspberrypi"

# Callback when connection is accidentally lost.

def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))

# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(
        return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.

def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print("Resubscribe results: {}".format(resubscribe_results))

    for topic, qos in resubscribe_results['topics']:
        if qos is None:
            sys.exit("Server rejected resubscribe to topic: {}".format(topic))

# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))

def collect_and_send_data():
    publish_count = 0

        humidity = sense.get_humidity()
        print("Humidity: %s %%rH" % humidity)

        temp = sense.get_temperature()
        print("Temperature: %s C" % temp)

        pressure = sense.get_pressure()
        print("Pressure: %s Millibars" % pressure)

        orientation = sense.get_orientation_degrees()
        print("p: {pitch}, r: {roll}, y: {yaw}".format(**orientation))

        timestamp = datetime.datetime.fromtimestamp(
            time.time()).strftime('%Y-%m-%d %H:%M:%S')

        message = {
            "client_id": client_id,
            "timestamp": timestamp,
            "humidity": humidity,
            "temperature": temp,
            "pressure": pressure,
            "pitch": orientation['pitch'],
            "roll": orientation['roll'],
            "yaw": orientation['yaw'],
            "count": publish_count
        print("Publishing message to topic '{}': {}".format(topic, message))

        publish_count += 1

if __name__ == '__main__':
    # Spin up resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    mqtt_connection = mqtt_connection_builder.mtls_from_path(

    connect_future = mqtt_connection.connect()

    # Future.result() waits until a result is available

    # Subscribe
    print("Subscribing to topic '{}'...".format(topic))
    subscribe_future, packet_id = mqtt_connection.subscribe(

    subscribe_result = subscribe_future.result()
    print("Subscribed with {}".format(str(subscribe_result['qos'])))


This is an example of the JSON messages sent by the device:

    "client_id": "raspberrypi",
    "timestamp": "2020-04-16 11:33:23",
    "humidity": 39.35261535644531,
    "temperature": 30.10732078552246,
    "pressure": 1020.447509765625,
    "pitch": 4.044007304723748,
    "roll": 7.533848064912158,
    "yaw": 77.01560798660883,
    "count": 104

In the Kinesis console, I create the my-data-stream data stream (1 shard is more than enough for my workload). Back in the AWS IoT console, I create an IoT rule to send all data from the MQTT topic to this Kinesis data stream.

Now that all sensor data is sent to Kinesis, I can leverage the new Glue integration to process data as it arrives. In the Glue console, I manually add a table in the Glue Data Catalog. I select Kinesis as the type of source, and enter my stream name and the endpoint of the Kinesis Data Streams service. Note that for Kafka streams, before creating the table, you need to create a Glue connection.

I select JSON as data format, and define the schema for the streaming data. If I don’t specify a column here, it will be ignored when processing the stream.

After that, I confirm the final recap step, and create the my_streaming_data table. We are working to add schema inference to streaming ETL jobs. With that, specifying the full schema up front won’t be necessary. Stay tuned.

To process the streaming data, I create a Glue job. For the IAM role, I create a new one attaching the AWSGlueServiceRole and AmazonKinesisReadOnlyAccess managed policies. Depending on your use case and the set up of your AWS accounts, you may want to use a role providing more fine-grained access.

For the data source, I select the table I just created, receiving data from the Kinesis stream.

To get a script generated by Glue, I select the Change schema transform type. As target, I create a new table in the Glue Data Catalog, using an efficient format like Apache Parquet. The Parquet files generated by this job are going to be stored in an S3 bucket whose name starts with aws-glue- (including the final hyphen). By following the naming convention for resources specified in the AWSGlueServiceRole policy, this job has the required permissions to access those resources.

I leave the default mapping that keeps in output all the columns in the source stream. In this way, I can ingest all the records using the proposed script, without having to write a single line of code.

I quickly review the proposed script and save. Each record is processed as a DynamicFrame, and I can apply any of the Glue PySpark Transforms or any transforms supported by Spark Structured Streaming. By default with this configuration, only ApplyMapping is used.

I start the job, and after a few minutes I see the Parquet files containing the output of the job appearing in the output S3 bucket. They are partitioned by ingest date (year, month, day, and hour).

To populate the Glue Data Catalog with tables based on the content of the S3 bucket, I add and run a crawler. In the crawler configuration, I exclude the checkpoint folder used by Glue to keep track of the data that has been processed. After less than a minute, a new table has been added.

In the Amazon Athena console, I refresh database and tables, and select to preview the output_my_data containing ingest data from this year. In this way, I see the first ten records in the table, and get a confirmation that my setup is working!

Now, as data is being ingested, I can run more complex queries. For example, I can get the minimum and maximum temperature, collected from the device sensors, and the overall number of records stored in the Parquet files.

Looking at the results, I see more than 8,000 records have been processed, with a maximum temperature of 31 degrees Celsius (about 88 degrees Fahrenheit). Actually, it was never really this hot. Temperature is measured by these sensors very close to the device, and is growing as the device is warming up with usage.

I am using a single device in this set up, but the solution implemented here can easily scale up with the number of data sources.

Available Now
Support for streaming sources is available in all regions where Glue is offered, as described in the AWS Region table. For more information, please have a look at the documentation.

Managing a serverless ETL pipeline with Glue makes it easier and more cost-effective to set up and manage streaming ingestion processes, reducing implementation efforts so you can focus on the business outcomes of analytics. You can set up a whole ingestion pipeline without writing code, as I did in this walkthrough, or customize the proposed script based on your needs.

Let me know what are you going to use this new feature for!


Ingest streaming data into Amazon Elasticsearch Service within the privacy of your VPC with Amazon Kinesis Data Firehose

Post Syndicated from Tarik Makota original https://aws.amazon.com/blogs/big-data/ingest-streaming-data-into-amazon-elasticsearch-service-within-the-privacy-of-your-vpc-with-amazon-kinesis-data-firehose/

Today we are adding a new Amazon Kinesis Data Firehose feature to set up VPC delivery to your Amazon Elasticsearch Service domain from the Kinesis Data Firehose. If you have been managing a custom application on Amazon Kinesis Data Streams to keep traffic private, you can now use Kinesis Data Firehose and load your data into an Amazon Elasticsearch Service endpoint in a VPC without having to invest, operate, and scale ingestion and delivery infrastructure. You can start using this new feature from Kinesis Data Firehose console, AWS CLI, and API by selecting Amazon Elasticsearch Service as the destination, the specific domain with VPC access, and setting the VPC configuration with subnets and the optional security groups.

Before this feature

Amazon Elasticsearch Service domains can have public or private endpoints. Public endpoints are backed by IP addresses on the public internet. Private endpoints are backed by IP addresses within the IP space of your VPC.

If you have been using an Amazon Elasticsearch Service VPC endpoint, you most likely use Kinesis Data Streams or similar soultion to ingest streaming data. This means running a custom application on the stream that delivers it to the Amazon Elasticsearch Service VPC domain. You likely had to perform the following actions:

  • Implement buffering
  • Format conversions
  • Perform compression
  • Apply transformation
  • Manage backup
  • Handle transient delivery failures

Additionally, you have to build, scale, monitor, update, and maintain this custom application.

Kinesis Data Firehose delivery to Amazon Elasticsearch Service VPC endpoint

Kinesis Data Firehose can now deliver data into an Amazon Elasticsearch Service VPC endpoint. This provides a secure and easy way to ingest, transform, and deliver streaming data. You don’t need to worry about managing your data ingestion and delivery infrastructure. With this new feature, Kinesis Data Firehose enables additional secure communication to Amazon Elasticsearch Service VPC endpoints. Amazon Elasticsearch Service endpoints that live within a VPC give you an extra layer of security.

How it works

When you create a Kinesis Data Firehose delivery stream that delivers data to an Amazon Elasticsearch Service VPC endpoint, Kinesis Data Firehose creates an Elastic Network Interface (ENI) in each subnet you select. If you only use one Availability Zone, Kinesis Data Firehose places an endpoint into only one subnet. Similarly, when you create an Amazon Elasticsearch Service VPC endpoint, it creates endpoints in the subnets you chose. Kinesis Data Firehose uses ENI to deliver the data to your Amazon Elasticsearch Service ENI, all inside your VPC. The following screenshot outlines the resulting architecture with a single subnet.

For this walkthrough, you have two security groups:

  • kdf-sec-grp for your Kinesis Data Firehose endpoint
  • es-sec-grp for your Amazon Elasticsearch Service endpoint

To let Kinesis Data Firehose access your Amazon Elasticsearch Service VPC endpoint, security group es-sec-grp needs to allow the ENI that Kinesis Data Firehose created to make HTTPS calls. Kinesis Data Firehose scales the ENIs automatically to meet the throughput requirements. As Kinesis Data Firehose scales ENIs, the outbound rules of the enclosing security group kdf-sec-grp control the data stream. You should configure the Amazon Elasticsearch Service security group (es-sec-grp) to allow HTTPS traffic from the Kinesis Data Firehose security group (kdf-sec-grp). The Kinesis Data Firehose security group needs to allow outbound HTTPS traffic, and its destination is the Amazon Elasticsearch Service security group. With Kinesis Data Firehose VPC delivery, you do not need to make the Firehose security group open to outside traffic.

You can also use the same security group for Kinesis Data Firehose and Amazon Elasticsearch Service endpoints. If you use the same security group for both, make sure the security group inbound rule allows HTTPS traffic.

For your existing delivery streams, you can change the destination endpoint. The new destination must be accessible within the same VPC, subnets, and security groups. Changing either of the VPC, subnets, and security groups requires you to recreate a delivery stream.

All existing Kinesis Data Firehose limits apply to this capability. For example, you can increase the default 50 delivery streams per account by submitting a quota increase request. Also, Kinesis Data Firehose creates one or more ENIs per VPC destination subnet per delivery stream. Kinesis Data Firehose automatically scales the number of ENIs as needed based on the actual throughput. The default throughput limit per delivery stream is 5 MB/second (dependent on Region). You can request an increase to this limit by submitting a support case.

You need to make sure you have enough ENIs available. By default, VPC has a quota of 5000 ENIs per Region. For more information, see Amazon VPC Quotas.

The advantage of using a managed service like Kinesis Data Firehose is that you can focus on the value of your data and not the underlying plumbing. You can configure the frequency of data delivery from your delivery stream to your Amazon Elasticsearch Service domain. Kinesis Data Firehose buffers incoming data before delivering it to Amazon ES. You can configure the values for Amazon Elasticsearch Service buffer size (1 MB–100 MB) or buffer interval (60–900 seconds), and the condition satisfied first triggers data delivery to Amazon Elasticsearch Service. In case data delivery fails for an Amazon Elasticsearch Service destination, you can specify a retry duration between 0 and 7,200 seconds when you create the delivery stream. If data delivery to your Amazon Elasticsearch Service endpoint fails, Kinesis Data Firehose retries data delivery for the specified time duration. After the retrial period, Kinesis Data Firehose skips the current batch of data and moves on to the next batch. Skipped documents go to your Amazon S3 bucket elasticsearch_failed folder, which you can use for manual backfill.

For more information about sizing, see Get started with Amazon Elasticsearch Service: T-shirt-size your domain.

Solution overview

To show you how to use this new feature, this post uses stock demo data available on the Kinesis Data Firehose console to deliver to an Amazon Elasticsearch Service endpoint in VPC. The following diagram illustrates the workflow.

This use case simulates a producer sending stock ticker data to the delivery stream (A). You use an AWS Lambda function (B) to add a timestamp to the stock records so that you can create Kibana visualization. Kinesis Data Firehose streams the stock records to the Amazon Elasticsearch Service endpoint (C) in your VPC. Finally, you can visualize the data using Kibana (D).

This post uses the Amazon Management Console to implement this solution, but you can also use AWS CLI.

Creating security groups

Start by creating two security groups: one for the Amazon Elasticsearch Service VPC endpoint (es-sec-grp) and another for the delivery stream (kdf-sec-grp). Create security groups without any rules first. After you have created them, set the inbound and outbound rules. The following table summarizes these rules.

Creating an Amazon Elasticsearch Service VPC endpoint

To create an Amazon Elasticsearch Service endpoint in VPC, complete the following steps:

  1. On the Amazon Elasticsearch Service console, choose Create a new Domain.
  2. For Deployment Type and Latest Version, choose Development and Testing.
  3. Choose Next.
  4. Give your Amazon Elasticsearch Service endpoint a name.
  5. Select your instance type.

This post uses m5.xlarge.elasticsearch. For production environments, select the appropriately sized instance type. For this post, leave the number of nodes at 1, though best practice is to set it to 2.

  1. Set EBS storage size per node to 100 GiB.
  2. Leave the rest of the settings at their defaults and choose Next.
  3. Select the VPC and private subnet for your Amazon Elasticsearch Service endpoint and the security group for Amazon Elasticsearch Service that you created previously (es-sec-grp).
  4. To access Kibana, choose fine-grained access.
  5. Choose Create Master User.

In this post, we are using internal user database enabled with HTTP basic authentication. For production environments, use IAM roles and configure the appropriate fine-grained access. For more information, see Fine-Grained Access Control in Amazon Elasticsearch Service.

  1. Choose Allow open access to Domain.

Security groups already enforce IP-based access policies. This step opens access to your Amazon Elasticsearch Service endpoint to resources in your VPC, and your Amazon Elasticsearch Service endpoint is not accessible to the internet. For an additional layer of security in your Amazon Elasticsearch Service endpoint, use access policies that specify IAM users or roles. For more information about controlling access to your domains, see Identity and Access Management in Amazon Elasticsearch Service.

  1. Choose Next.
  2. Review your settings and choose Confirm.

The following screenshot shows an example of what your Amazon Elasticsearch Service endpoint VPC settings should look like.

Creating a Lambda function for record transformation

Create a Lambda function to add a timestamp to the data feed. Complete the following steps:

  1. On the Lambda console, choose Create Function.
  2. Choose Author from scratch.
  3. Name your function; for example, tmakAddTSToStream.
  4. Choose Python 3.7 as your runtime.
  5. Choose Create.

The following code is for your Lambda function (under the basic settings section, change the timeout from 3 sec to 45 sec):

import base64
import json
from datetime import datetime

def lambda_handler(event, context):
    send_back = []
    now = datetime.utcnow().isoformat()

    for record in event['records']:
        stock_rec = json.loads(base64.b64decode(record['data']))
        stock_rec["timestamp"] = now
        record_w_ts = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(json.dumps(stock_rec).encode('utf-8') + b'\n').decode('utf-8')

    return {'records': send_back} 

Creating a Kinesis Data Firehose delivery stream

To create your delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, under Data Firehose, choose Create Delivery Stream.
  2. Enter a name for your stream; for example, tmak-kdf-stock-delivery-stream.
  3. For source, choose Direct PUT or other sources.
  4. Choose Next.
  5. For Data transformation, choose Enabled.
  6. Choose the Lambda function you created.
  7. Choose Next.
  8. Choose Amazon Elasticsearch Service as the destination for your delivery stream.
  9. For Index, enter stockdata.

The VPC section populates automatically. Make sure you use the security group you created for Kinesis Data Firehose (kdf-sec-grp).

  1. For Backup Mode, choose Failed records only.

You can select an existing S3 bucket or create a new one. The following screenshot shows an example of your delivery stream settings.

  1. Choose Next.
  2. Review the buffering settings and set any tags to identify your stream.

A delivery stream that delivers to VPC destinations needs permissions to manage ENIs, list VPCs, and subnets. The console gives you the option to create a new role based on a template that includes all the needed permissions. You can also use an existing role if you already created one.

  1. Choose Next.
  2. Review the settings and choose Create Stream.

It may take up to a few minutes to see the stream status show as Active. See the following screenshot.

On the Amazon EC2 console, under Network and Security, you can see the endpoints created in your VPC by Kinesis Data Firehose and Amazon ES. See the following screenshot.

Configuring Kibana fine-grained access for Kinesis Data Firehose

You need to give Kinesis Data Firehose permissions to deliver stock data to your Amazon Elasticsearch Service endpoint. You can accomplish this via the Kibana console or API. For more information, see API on the Open Distro for Elasticsearch website.

For more information about controlling access to your Amazon Elasticsearch Service endpoint, see How to Control Access to Your Amazon Elasticsearch Service Domain.

Because your Amazon Elasticsearch Service endpoint is in the VPC to access Kibana, you must first connect to the VPC. This process varies by network configuration, but likely involves connecting to a VPN or corporate network. For this post, create a remote desktop EC2 instance public subnet of your VPC. The newly created security group (rdp-sec-grp) protects the instance. You can modify the es-sec-grp security group and allow inbound RDP traffic from rdp-sec-grp so you can access the Kibana URL. The following diagram illustrates this architecture.

Kinesis Data Firehose uses the delivery role to sign HTTP (Signature Version 4) requests before sending the data to the Amazon Elasticsearch Service endpoint. You manage Amazon Elasticsearch Service fine-grained access control permissions using roles, users, and mappings. This section describes how to create roles and set permissions for Kinesis Data Firehose.

The roles you create in this section are different from IAM roles. For more information, see Key Concepts.

Complete the following steps:

  1. Navigate to Kibana (you can find the URL on the Amazon Elasticsearch Service console).
  2. Enter the master user and password that you set up when you created the Amazon Elasticsearch Service endpoint.
  3. Under Security, choose Roles.
  4. Choose Add New Role.
  5. Name your role; for example, firehose-role.
  6. For cluster permissions, add cluster_composite_ops and cluster_monitor.
  7. Under Index permissions, choose Index Patterns and enter stockdata*.
  8. Under Permissions, add three action groups: crud, create_index, and manage.
  9. Choose Save Role Definition.

In the next step, you map the IAM role that Kinesis Data Firehose uses to the role you just created.

  1. Under Security, choose Role Mappings.
  2. Choose the role you just created (firehose-role).
  3. For Backend Roles, choose Add Backend Role.
  4. Enter the IAM ARN of the role Kinesis Data Firehose uses: arn:aws:iam::123456789012:role/firehose_stream_role_name.

You can find your delivery stream ARN on the Kinesis Data Firehose console.

Streaming stock data through Kinesis Data Firehose

To stream your stock data, complete the following steps:

  1. On the Kinesis Data Firehose console, choose the stream you created.
  2. Choose Test with demo data.
  3. Choose Start sending demo data.

If everything is working, you see message Demo data is being sent to your delivery stream. Wait a few minutes before you choose Stop sending demo data.

Analyzing and visualizing data

To analyze and visualize your data, complete the following steps:

  1. On the Kibana console, choose Management.
  2. Choose Index patterns.
  3. For Index pattern, enter stockdata*.
  4. Choose Next.
  5. For the Time filter field, choose timestamp.
  6. Choose Visualize.
  7. Create a new visualization and choose Line.
  8. For Index pattern, choose stockdata*.
  9. For Y-Axis, choose Aggregation=Average and Field=price.
  10. For X-Axis, choose Aggregation=Data Histogram, Field=timestamp, and Interval=seconds.
  11. Under X-Axis, choose Add Sub-buckets.
  12. Choose Split Series.
  13. Set Sub-Aggregation=Terms and Field=ticker_symbol.keyword.
  14. Choose Apply Changes.

The following screenshot shows an example visualization.

You can see the raw data by choosing Discover on the Kibana dashboard. See the following screenshot.


This post demonstrated how you can move an Amazon Elasticsearch Service endpoint inside your VPC with Kinesis Data Firehose. Additionally, you do not need to enable and secure public access to your Amazon Elasticsearch Service endpoint. If you have been reluctant to expose your Amazon Elasticsearch Service endpoint to the internet but want to stream data, you can now do so with Kinesis Data Firehose.


About the Authors

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.