Tag Archives: Amazon Kinesis

ICYMI: Serverless Q2 2021

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-q2-2021/

Welcome to the 14th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

Q2 calendar

In case you missed our last ICYMI, check out what happened last quarter here.

AWS Step Functions

Step Functions launched Workflow Studio, a new visual tool that provides a drag-and-drop user interface to build Step Functions workflows. This exposes all the capabilities of Step Functions that are available in Amazon States Language (ASL). This makes it easier to build and change workflows and build definitions in near-real time.

For more:

Workflow Studio

The new data flow simulator in the Step Functions console helps you evaluate the inputs and outputs passed through your state machine. It allows you to simulate each of the fields used to process data and updates in real time. It can help accelerate development with workflows and help visualize JSONPath processing.

For more:

Data flow simulator

Also, Amazon API Gateway can now invoke synchronous Express Workflows using REST APIs.

Amazon EventBridge

EventBridge now supports cross-Region event routing from any commercial AWS Region to a list of supported Regions. This feature allows you to centralize global events for auditing and monitoring or replicate events across Regions.

EventBridge cross-Region routing

The service now also supports bus-to-bus event routing in the same Region and in the same AWS account. This can be useful for centralizing events related to a single project, application, or team within your organization.

EventBridge bus-to-bus

You can now use EventBridge as a resource within Step Functions workflows. This provides a direct service integration for both standard and Express Workflows. You can publish events directly to a specified event bus using either a request-response or wait-for-callback pattern.

EventBridge added a new target for rules – Amazon SageMaker Pipelines. This allows you to use a rule to trigger a continuous integration and continuous deployment (CI/CD) service for your machine learning workloads.

AWS Lambda

Lambda Extensions

AWS Lambda extensions are now generally available including some performance and functionality improvements. Lambda extensions provide a new way to integrate your chosen monitoring, observability, security, and governance tools with AWS Lambda. These use the Lambda Runtime Extensions API to integrate with the execution environment and provide hooks into the Lambda lifecycle.

To help build your own extensions, there is an updated GitHub repository with example code.

To learn more:

  • Watch a Tech Talk with Julian Wood.
  • Watch the 8-episode Learning Path series covering all aspects of extensions.

Extensions available today

Amazon CloudWatch Lambda Insights support for Lambda container images is now generally available.

Amazon SNS

Amazon SNS has expanded the set of filter operators available to include IP address matching, existence of an attribute key, and “anything-but” matching.

The service has also introduced an SMS sandbox to help developers testing workloads that send text messages.

To learn more:

Amazon DynamoDB

DynamoDB announced CloudFormation support for several features. First, it now supports configuring Kinesis Data Streams using CloudFormation. This allows you to use infrastructure as code to set up Kinesis Data Streams instead of DynamoDB streams.

The service also announced that NoSQL Workbench now supports CloudFormation, so you can build data models and configure table capacity settings directly from the tool. Finally, you can now create and manage global tables with CloudFormation.

Learn how to use the recently launched Serverless Patterns Collection to configure DynamoDB as an event source for Lambda.

AWS Amplify

Amplify Hosting announced support for server-side rendered (SSR) apps built with the Next.js framework. This provides a zero configuration option for developers to deploy and host their Next.js-based applications.

The Amplify GLI now allows developers to make multiple DynamoDB GSI updates in a single deployment. This can help accelerate data model iterations. Additionally, the data management experience in the Amplify Admin UI launched at AWS re:Invent 2020 is now generally available.

AWS Serverless Application Model (AWS SAM)

AWS SAM has a public preview of support for local development and testing of AWS Cloud Development Kit (AWS CDK) projects.

To learn more:

Serverless blog posts

Operating Lambda

The “Operating Lambda” blog series includes the following posts in this quarter:

Streaming data

The “Building serverless applications with streaming data” blog series shows how to use Lambda with Kinesis.

Getting started with serverless for developers

Learn how to build serverless applications from your local integrated development environment (IDE).

April

May

June

Tech Talks & Events

We hold AWS Online Tech Talks covering serverless topics throughout the year. These are listed in the Serverless section of the AWS Online Tech Talks page. We also regularly deliver talks at conferences and events around the world, speak on podcasts, and record videos you can find to learn in bite-sized chunks.

Here are some from Q2:

Serverless Live was a day of talks held on May 19, featuring the serverless developer advocacy team, along with Adrian Cockroft and Jeff Barr. You can watch a replay of all the talks on the AWS Twitch channel.

Videos

YouTube ServerlessLand channel

Serverless Office Hours – Tues 10 AM PT / 1PM EST

Weekly live virtual office hours. In each session we talk about a specific topic or technology related to serverless and open it up to helping you with your real serverless challenges and issues. Ask us anything you want about serverless technologies and applications.

YouTube: youtube.com/serverlessland
Twitch: twitch.tv/aws

April

May

June

DynamoDB Office Hours

Are you an Amazon DynamoDB customer with a technical question you need answered? If so, join us for weekly Office Hours on the AWS Twitch channel led by Rick Houlihan, AWS principal technologist and Amazon DynamoDB expert. See upcoming and previous shows

Learning Path – AWS Lambda Extensions: The deep dive

Are you looking for a way to more easily integrate AWS Lambda with your favorite monitoring, observability, security, governance, and other tools? Welcome to AWS Lambda extensions: The deep dive, a learning path video series that shows you everything about augmenting Lambda functions using Lambda extensions.

There are also other helpful videos covering serverless available on the Serverless Land YouTube channel.

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on Twitter to see the latest news, follow conversations, and interact with the team.

How Optus improves broadband and mobile customer experience using the Network Data Analytics platform on AWS

Post Syndicated from Rajagopal Mahendran original https://aws.amazon.com/blogs/big-data/how-optus-improves-broadband-and-mobile-customer-experience-using-the-network-data-analytics-platform-on-aws/

This is a guest blog post co-written by Rajagopal Mahendran, Development Manager at the Optus IT Innovation Team.


Optus is part of The Singtel group, which operates in one of the world’s fastest growing and most dynamic regions, with a presence in 21 countries. Optus provides not only core telecom services, but also an extensive range of digital solutions, including cloud, cybersecurity, and digital advertising to enterprises, as well as entertainment and mobile financial services to millions of consumers. Optus provides mobile communication services to over 10.4 million customers and broadband services to over 1.1 million homes and businesses. In addition, Optus Sport connects close to 1 million fans to Premier League, international football, and fitness content.

In this post, we look at how Optus used Amazon Kinesis to ingest and analyze network related data in a data lake on AWS and improve customer experience and the service planning process.

The challenge

A common challenge for telecommunication providers is to form an accurate, real-time view of quality of service and issues experienced by their customers. Home network and broadband connectivity quality has a significant impact on customer productivity and satisfaction, especially considering the increased reliance on home networks for work, connecting with family and friends, and entertainment during the COVID-19 pandemic.

Additionally, network operations and planning teams often don’t have access to the right data and insights to plan new rollouts and manage their current fleet of devices.

The network analytics platform provides troubleshooting and planning data and insights to Optus teams and their customers in near-real time, which helps reduce mean time to rectify and enhance the customer experience. With the right data and insights, customers have a better experience because instead of starting a support call with a lot of questions, the support staff and the customer have a current and accurate view of the services and the customer’s home network.

Service owner teams within Optus can also use the insights and trends derived from this platform to better plan for the future and provide higher-quality service to customers.

Design considerations

To address this challenge and its requirements, we embarked on a project to transform our current batch collection and processing system to a stream-based, near-real-time processing system, and introduce APIs for insights so that support systems and customer applications can show the latest snapshot of the network and service status.

We had the following functional and non-functional requirements:

  • The new platform must be capable of supporting data capture from future types of customer equipment as well as new ways of ingestion (new protocols and frequency) and new formats of data.
  • It should support multiple consumers (a near-real-time API for support staff and customer applications and operational and business reporting) to consume data and generate insights. The aim is for the platform to proactively detect issues and generate appropriate alerting to support staff as well as customers.
  • After the data arrives, insights from the data should be ready in the form of an API in a few seconds (5 seconds maximum).
  • The new platform should be resilient enough to continue processing when parts of the infrastructure fail, such as nodes or Availability Zones.
  • It can support an increased number of devices and services as well as more frequent collection from the devices.
  • A small cross-functional team across business and technology will build and run this platform. We need to ensure minimal infrastructure and operational overhead in the long run.
  • The pipeline should be highly available and allow for new deployments with no downtime.

Solution overview

With the goal of the platform and design considerations in mind, we decided to use higher-order services and serverless services from AWS where possible, to avoid unnecessary operational overhead for our team and focus on the core business needs. This includes using the Kinesis family of services for stream ingestion and processing; AWS Lambda for processing; Amazon DynamoDB, Amazon Relational Database Service (Amazon RDS), and Amazon Simple Storage Service (Amazon S3) for data persistence; and AWS Elastic Beanstalk and Amazon API Gateway for application and API serving. The following diagram shows the overall solution.

 

The solution ingests log files from thousands of customer network equipment (home routers) in predefined periods. The customer equipment is only capable of sending simple HTTP PUT and POST requests to transfer log files. To receive these files, we use a Java application running in an Auto Scaling group of Amazon Elastic Compute Cloud (Amazon EC2) instances. After some initial checks, the receiver application performs cleansing and formatting, then it streams the log files to Amazon Kinesis Data Streams.

We intentionally use a custom receiver application in the ingestion layer to provide flexibility in supporting different devices and file formats.

To understand the rest of the architecture, let’s take a look at the expected insights. The platform produces two types of insights:

  • Individual insights – Questions answered in this category include:
    • How many errors has a particular customer device experienced in the last 15 minutes?
    • What was the last error?
    • How many devices are currently connected at a particular customer home?
    • What’s the transfer/receive rate as captured by a particular customer device?
  • Base insights – Pertaining to a group or the whole user base, questions in this category include:
    • How many customer devices reported service disruption in the past 24 hours?
    • Which device types (models) have experienced the highest number of errors in the past 6 months?
    • After last night’s patch update on a group of devices, have they reported any errors? Was the maintenance successful?

The top lane in the architecture shows the pipeline that generates the individual insights.

 

The event source mapping of the Lambda function is configured to consume records from the Kinesis data stream. This function reads the records, formats, and prepares them based on the insights required. Finally, it stores the results in the Amazon S3 location and also updates a DynamoDB table that maintains a summary and the metadata of the actual data stored in Amazon S3.

To optimize performance, we configured two metrics in the Lambda event source mapping:

  • Batch size – Shows the number of records to send to the function in each batch, which helps achieve higher throughput
  • Concurrent batches per shard – Processes multiple batches from the same shard concurrently, which helps with faster processing

Finally, the API is provided via API Gateway and runs on a Spring Boot application that is hosted on Elastic Beanstalk. In the future, we may need to keep state between API calls, which is why we use Elastic Beanstalk instead of a serverless application.

The bottom lane in the architecture is the pipeline that generates base reports.

 

We use Amazon Kinesis Data Analytics, running stateful computation on streaming data, to summarize certain metrics like transfer rates or error rates in given time windows. These summaries are then pushed to an Amazon Aurora database with a data model that’s suitable for dashboarding and reporting purposes.

The insights are then presented in dashboards using a web application running on Elastic Beanstalk.

Lessons learned

Using serverless patterns and higher-order services, in particular Lambda, Kinesis Data Streams, Kinesis Data Analytics, and DynamoDB, provided a lot of flexibility in our architecture and helped us move more towards microservices rather than big monolith batch jobs.

This shift also helped us dramatically decrease our operational and service management overhead. For example, over the last several months since the launch, customers of this platform didn’t experience any service disruption.

This solution also enabled us to adopt more DevOps and agile ways of working, in the sense that a single small team develops and runs the system. This in turn enabled the organization to be more agile and innovative in this domain.

We also discovered some technical tips through the course of development and production that are worth sharing:

Outcomes and benefits

We now have near-real-time visibility of our fixed and mobile networks performance as experienced by our customers. In the past, we only had data that came in batch mode with a delay and also only from our own network probes and equipment.

With the near-real-time view of the network when changes occur, our operational teams can also carry out upgrades and maintenance across the fleet of customer devices with higher confidence and frequency.

Lastly, our planning teams use these insights to form an accurate, up-to-date performance view of various equipment and services. This leads to higher-quality service for our customers at better prices because our service planning teams are enabled to optimize cost, better negotiate with vendors and service providers, and plan for the future.

Looking ahead

With the network analytics platform in production for several months and stable now, there is demand for more insights and new use cases. For example, we’re looking into a mobile use case to better manage capacity at large-scale events (such as sporting events). The aim is for our teams to be data driven and able to react in near-real time to capacity needs in these events.

Another area of demand is around predictive maintenance: we are looking to introduce machine learning into these pipelines to help drive insights faster and more accurately by using the AWS Machine Learning portfolio of services.


About the authors

Rajagopal Mahendran is a Development Manager at the Optus IT Innovation Team. Mahendran has over 14 years of experience in various organizations delivering enterprise applications from medium-scale to very large-scale using proven to cutting-edge technologies in big data, streaming data applications, mobile, and cloud native applications. His passion is to power innovative ideas using technology for better living. In his spare time, he loves bush walking and swimming.

 

Mostafa Safipour is a Solutions Architect at AWS based out of Sydney. He works with customers to realize business outcomes using technology and AWS. Over the past decade he has helped many large organizations in the ANZ region build their data, digital, and enterprise workloads on AWS.

 

Masudur Rahaman Sayem is a Specialist Solution Architect for Analytics at AWS. He works with AWS customers to provide guidance and technical assistance on data and analytics projects, helping them improve the value of their solutions when using AWS. He is passionate about distributed systems. He also likes to read, especially classic comic books.

Introducing Amazon Kinesis Data Analytics Studio – Quickly Interact with Streaming Data Using SQL, Python, or Scala

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/introducing-amazon-kinesis-data-analytics-studio-quickly-interact-with-streaming-data-using-sql-python-or-scala/

The best way to get timely insights and react quickly to new information you receive from your business and your applications is to analyze streaming data. This is data that must usually be processed sequentially and incrementally on a record-by-record basis or over sliding time windows, and can be used for a variety of analytics including correlations, aggregations, filtering, and sampling.

To make it easier to analyze streaming data, today we are pleased to introduce Amazon Kinesis Data Analytics Studio.

Now, from the Amazon Kinesis console you can select a Kinesis data stream and with a single click start a Kinesis Data Analytics Studio notebook powered by Apache Zeppelin and Apache Flink to interactively analyze data in the stream. Similarly, you can select a cluster in the Amazon Managed Streaming for Apache Kafka console to start a notebook to analyze data in Apache Kafka streams. You can also start a notebook from the Kinesis Data Analytics Studio console and connect to custom sources.

Architectural diagram.

In the notebook, you can interact with streaming data and get results in seconds using SQL queries and Python or Scala programs. When you are satisfied with your results, with a few clicks you can promote your code to a production stream processing application that runs reliably at scale with no additional development effort.

For new projects, we recommend that you use the new Kinesis Data Analytics Studio over Kinesis Data Analytics for SQL Applications. Kinesis Data Analytics Studio combines ease of use with advanced analytical capabilities, which makes it possible to build sophisticated stream processing applications in minutes. Let’s see how that works in practice.

Using Kinesis Data Analytics Studio to Analyze Streaming Data
I want to get a better understanding of the data sent by some sensors to a Kinesis data stream.

To simulate the workload, I use this random_data_generator.py Python script. You don’t need to know Python to use Kinesis Data Analytics Studio. In fact, I am going to use SQL in the following steps. Also, you can avoid any coding and use the Amazon Kinesis Data Generator user interface (UI) to send test data to Kinesis Data Streams or Kinesis Data Firehose. I am using a Python script to have finer control over the data that is being sent.

import datetime
import json
import random
import boto3

STREAM_NAME = "my-input-stream"


def get_random_data():
    current_temperature = round(10 + random.random() * 170, 2)
    if current_temperature > 160:
        status = "ERROR"
    elif current_temperature > 140 or random.randrange(1, 100) > 80:
        status = random.choice(["WARNING","ERROR"])
    else:
        status = "OK"
    return {
        'sensor_id': random.randrange(1, 100),
        'current_temperature': current_temperature,
        'status': status,
        'event_time': datetime.datetime.now().isoformat()
    }


def send_data(stream_name, kinesis_client):
    while True:
        data = get_random_data()
        partition_key = str(data["sensor_id"])
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key)


if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis')
    send_data(STREAM_NAME, kinesis_client)

This script sends random records to my Kinesis data stream using JSON syntax. For example:

{'sensor_id': 77, 'current_temperature': 93.11, 'status': 'OK', 'event_time': '2021-05-19T11:20:00.978328'}
{'sensor_id': 47, 'current_temperature': 168.32, 'status': 'ERROR', 'event_time': '2021-05-19T11:20:01.110236'}
{'sensor_id': 9, 'current_temperature': 140.93, 'status': 'WARNING', 'event_time': '2021-05-19T11:20:01.243881'}
{'sensor_id': 27, 'current_temperature': 130.41, 'status': 'OK', 'event_time': '2021-05-19T11:20:01.371191'}

From the Kinesis console, I select a Kinesis data stream (my-input-stream) and choose Process data in real time from the Process drop-down. In this way, the stream is configured as a source for the notebook.

Console screenshot.

Then, in the following dialog box, I create an Apache Flink – Studio notebook.

I enter a name (my-notebook) and a description for the notebook. The AWS Identity and Access Management (IAM) permissions to read from the Kinesis data stream I selected earlier (my-input-stream) are automatically attached to the IAM role assumed by the notebook.

Console screenshot.

I choose Create to open the AWS Glue console and create an empty database. Back in the Kinesis Data Analytics Studio console, I refresh the list and select the new database. It will define the metadata for my sources and destinations. From here, I can also review the default Studio notebook settings. Then, I choose Create Studio notebook.

Console screenshot.

Now that the notebook has been created, I choose Run.

Console screenshot.

When the notebook is running, I choose Open in Apache Zeppelin to get access to the notebook and write code in SQL, Python, or Scala to interact with my streaming data and get insights in real time.

In the notebook, I create a new note and call it Sensors. Then, I create a sensor_data table describing the format of the data in the stream:

%flink.ssql

CREATE TABLE sensor_data (
    sensor_id INTEGER,
    current_temperature DOUBLE,
    status VARCHAR(6),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (sensor_id)
WITH (
    'connector' = 'kinesis',
    'stream' = 'my-input-stream',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
)

The first line in the previous command tells to Apache Zeppelin to provide a stream SQL environment (%flink.ssql) for the Apache Flink interpreter. I can also interact with the streaming data using a batch SQL environment (%flink.bsql), or Python (%flink.pyflink) or Scala (%flink) code.

The first part of the CREATE TABLE statement is familiar to anyone who has used SQL with a database. A table is created to store the sensor data in the stream. The WATERMARK option is used to measure progress in the event time, as described in the Event Time and Watermarks section of the Apache Flink documentation.

The second part of the CREATE TABLE statement describes the connector used to receive data in the table (for example, kinesis or kafka), the name of the stream, the AWS Region, the overall data format of the stream (such as json or csv), and the syntax used for timestamps (in this case, ISO 8601). I can also choose the starting position to process the stream, I am using LATEST to read the most recent data first.

When the table is ready, I find it in the AWS Glue Data Catalog database I selected when I created the notebook:

Console screenshot.

Now I can run SQL queries on the sensor_data table and use sliding or tumbling windows to get a better understanding of what is happening with my sensors.

For an overview of the data in the stream, I start with a simple SELECT to get all the content of the sensor_data table:

%flink.ssql(type=update)

SELECT * FROM sensor_data;

This time the first line of the command has a parameter (type=update) so that the output of the SELECT, which is more than one row, is continuously updated when new data arrives.

On the terminal of my laptop, I start the random_data_generator.py script:

$ python3 random_data_generator.py

At first I see a table that contains the data as it comes. To get a better understanding, I select a bar graph view. Then, I group the results by status to see their average current_temperature, as shown here:

Notebook screenshot.

As expected by the way I am generating these results, I have different average temperatures depending on the status (OK, WARNING, or ERROR). The higher the temperature, the greater the probability that something is not working correctly with my sensors.

I can run the aggregated query explicitly using a SQL syntax. This time, I want the result computed on a sliding window of 1 minute with results updated every 10 seconds. To do so, I am using the HOP function in the GROUP BY section of the SELECT statement. To add the time to the output of the select, I use the HOP_ROWTIME function. For more information, see how group window aggregations work in the Apache Flink documentation.

%flink.ssql(type=update)

SELECT sensor_data.status,
       COUNT(*) AS num,
       AVG(sensor_data.current_temperature) AS avg_current_temperature,
       HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
  FROM sensor_data
 GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status;

This time, I look at the results in table format:

Notebook screenshot.

To send the result of the query to a destination stream, I create a table and connect the table to the stream. First, I need to give permissions to the notebook to write into the stream.

In the Kinesis Data Analytics Studio console, I select my-notebook. Then, in the Studio notebooks details section, I choose Edit IAM permissions. Here, I can configure the sources and destinations used by the notebook and the IAM role permissions are updated automatically.

Console screenshot.

In the Included destinations in IAM policy section, I choose the destination and select my-output-stream. I save changes and wait for the notebook to be updated. I am now ready to use the destination stream.

In the notebook, I create a sensor_state table connected to my-output-stream.

%flink.ssql

CREATE TABLE sensor_state (
    status VARCHAR(6),
    num INTEGER,
    avg_current_temperature DOUBLE,
    hop_time TIMESTAMP(3)
)
WITH (
'connector' = 'kinesis',
'stream' = 'my-output-stream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');

I now use this INSERT INTO statement to continuously insert the result of the select into the sensor_state table.

%flink.ssql(type=update)

INSERT INTO sensor_state
SELECT sensor_data.status,
    COUNT(*) AS num,
    AVG(sensor_data.current_temperature) AS avg_current_temperature,
    HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
FROM sensor_data
GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status;

The data is also sent to the destination Kinesis data stream (my-output-stream) so that it can be used by other applications. For example, the data in the destination stream can be used to update a real-time dashboard, or to monitor the behavior of my sensors after a software update.

I am satisfied with the result. I want to deploy this query and its output as a Kinesis Analytics application. To do so, I need to provide an S3 location to store the application executable.

In the configuration section of the console, I edit the Deploy as application configuration settings. There, I choose a destination bucket in the same region and save changes.

Console screenshot.

I wait for the notebook to be ready after the update. Then, I create a SensorsApp note in my notebook and copy the statements that I want to execute as part of the application. The tables have already been created, so I just copy the INSERT INTO statement above.

From the menu at the top right of my notebook, I choose Build SensorsApp and export to Amazon S3 and confirm the application name.

Notebook screenshot.

When the export is ready, I choose Deploy SensorsApp as Kinesis Analytics application in the same menu. After that, I fine-tune the configuration of the application. I set parallelism to 1 because I have only one shard in my input Kinesis data stream and not a lot of traffic. Then, I run the application, without having to write any code.

From the Kinesis Data Analytics applications console, I choose Open Apache Flink dashboard to get more information about the execution of my application.

Apache Flink console screenshot.

Availability and Pricing
You can use Amazon Kinesis Data Analytics Studio today in all AWS Regions where Kinesis Data Analytics is generally available. For more information, see the AWS Regional Services List.

In Kinesis Data Analytics Studio, we run the open-source versions of Apache Zeppelin and Apache Flink, and we contribute changes upstream. For example, we have contributed bug fixes for Apache Zeppelin, and we have contributed to AWS connectors for Apache Flink, such as those for Kinesis Data Streams and Kinesis Data Firehose. Also, we are working with the Apache Flink community to contribute availability improvements, including automatic classification of errors at runtime to understand whether errors are in user code or in application infrastructure.

With Kinesis Data Analytics Studio, you pay based on the average number of Kinesis Processing Units (KPU) per hour, including those used by your running notebooks. One KPU comprises 1 vCPU of compute, 4 GB of memory, and associated networking. You also pay for running application storage and durable application storage. For more information, see the Kinesis Data Analytics pricing page.

Start using Kinesis Data Analytics Studio today to get better insights from your streaming data.

Danilo

Build a data lake using Amazon Kinesis Data Streams for Amazon DynamoDB and Apache Hudi

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/build-a-data-lake-using-amazon-kinesis-data-streams-for-amazon-dynamodb-and-apache-hudi/

Amazon DynamoDB helps you capture high-velocity data such as clickstream data to form customized user profiles and online order transaction data to develop customer order fulfillment applications, improve customer satisfaction, and get insights into sales revenue to create a promotional offer for the customer. It’s essential to store these data points in a centralized data lake, which can be transformed, analyzed, and combined with diverse organizational datasets to derive meaningful insights and make predictions.

A popular use case in order management is receiving, tracking, and fulfilling customer orders. The order management process begins when an order is placed and ends when the customer receives their package. When storing high-velocity order transaction data in DynamoDB, you can use Amazon Kinesis streaming to extract data and store it in a centralized data lake built on Amazon Simple Storage Service (Amazon S3).

Amazon Kinesis Data Streams for DynamoDB helps you to publish item-level changes in any DynamoDB table to a Kinesis data stream of your choice. Additionally, you can take advantage of this feature for use cases that require longer data retention on the stream and fan out to multiple concurrent stream readers. You also can integrate with Amazon Kinesis Data Analytics or Amazon Kinesis Data Firehose to publish data to downstream destinations such as Amazon Elasticsearch Service (Amazon ES), Amazon Redshift, or Amazon S3.

In this post, you use Kinesis Data Streams for DynamoDB and take advantage of managed streaming delivery of DynamoDB data to other Kinesis Data Stream by simply enabling Kinesis streaming connection from Amazon DynamoDB console. To process DynamoDB events from Kinesis, you have multiple options: Amazon Kinesis Client Library (KCL) applications, Lambda, Kinesis Data Analytics for Apache Flink, and Kinesis Data Firehose. In this post, you use Kinesis Data Firehose to save the raw data in the S3 data lake and Apache Hudi to batch process the data.

Architecture

The following diagram illustrates the order processing system architecture.

In this architecture, users buy products in online retail shops and internally create an order transaction stored in DynamoDB. The order transaction data is ingested to the data lake and stored in the raw data layer. To achieve this, you enable Kinesis Data Streams for DynamoDB and use Kinesis Data Firehose to store data in Amazon S3. You use Lambda to transform the data from the delivery stream to remove unwanted data and finally store it in Parquet format. Next, you batch process the raw data and store it back in the Hudi dataset in the S3 data lake. You can then use Amazon Athena to do sales analysis. You build this entire data pipeline in a serverless manner.

Prerequisites

Complete the following steps to create AWS resources to build a data pipeline as mentioned in the architecture. For this post, we use the AWS Region us-west-1.

  1. On the Amazon Elastic Compute Cloud (Amazon EC2) console, create a keypair.
  2. Download the data files, Amazon EMR cluster, and Athena DDL code from GitHub.
  3. Deploy the necessary Amazon resources using the provided AWS CloudFormation template.
  4. For Stack name, enter a stack name of your choice.
  5. For Keypair name, choose a key pair.

A key pair is required to connect to the EMR cluster nodes. For more information, see Use an Amazon EC2 Key Pair for SSH Credentials.

  1. Keep the remaining default parameters.
  2. Acknowledge that AWS CloudFormation might create AWS Identity and Access Management (IAM) resources.

For more information about IAM, see Resources to learn more about IAM.

  1. Choose Create stack.

You can check the Resources tab for the stack after the stack is created.

The following table summarizes the resources that you created, which you use to build the data pipeline and analysis.

Logical ID Physical ID Type
DeliveryPolicy kines-Deli-* AWS::IAM::Policy
DeliveryRole kinesis-hudi-DeliveryRole-* AWS::IAM::Role
Deliverystream kinesis-hudi-Deliverystream-* AWS::KinesisFirehose::DeliveryStream
DynamoDBTable order_transaction_* AWS::DynamoDB::Table
EMRClusterServiceRole kinesis-hudi-EMRClusterServiceRole-* AWS::IAM::Role
EmrInstanceProfile kinesis-hudi-EmrInstanceProfile-* AWS::IAM::InstanceProfile
EmrInstanceRole kinesis-hudi-EmrInstanceRole-* AWS::IAM::Role
GlueDatabase gluedatabase-* AWS::Glue::Database
GlueTable gluetable-* AWS::Glue::Table
InputKinesisStream order-data-stream-* AWS::Kinesis::Stream
InternetGateway igw-* AWS::EC2::InternetGateway
InternetGatewayAttachment kines-Inter-* AWS::EC2::VPCGatewayAttachment
MyEmrCluster AWS::EMR::Cluster
ProcessLambdaExecutionRole kinesis-hudi-ProcessLambdaExecutionRole-* AWS::IAM::Role
ProcessLambdaFunction kinesis-hudi-ProcessLambdaFunction-* AWS::Lambda::Function
ProcessedS3Bucket kinesis-hudi-processeds3bucket-* AWS::S3::Bucket
PublicRouteTable AWS::EC2::RouteTable
PublicSubnet1 AWS::EC2::Subnet
PublicSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
PublicSubnet2 AWS::EC2::Subnet
PublicSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
RawS3Bucket kinesis-hudi-raws3bucket-* AWS::S3::Bucket
S3Bucket kinesis-hudi-s3bucket-* AWS::S3::Bucket
SourceS3Bucket kinesis-hudi-sources3bucket-* AWS::S3::Bucket
VPC vpc-* AWS::EC2::VPC

Enable Kinesis streaming for DynamoDB

AWS recently launched Kinesis Data Streams for DynamoDB so you can send data from DynamoDB to Kinesis data streams. You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to enable this feature.

To enable this feature from the console, complete the following steps:

  1. On the DynamoDB console, choose the table you created in the CloudFormation stack earlier (it begins with the prefix order_transaction_).
  2. On the Overview tab, choose Manage streaming to Kinesis.
  3. Choose your input stream (it starts with order-data-stream-).
  4. Choose Enable.
  5. Choose Close.
  6. Make sure that stream enabled is set to Yes.

Populate the sales order transaction dataset

To replicate a real-life use case, you need an online retail application. For this post, you upload raw data files in the S3 bucket and use a Lambda function to upload the data in DynamoDB. You can download the order data CSV files from the AWS Sample GitHub repository. Complete the following steps to upload the data in DynamoDB:

  1. On the Amazon S3 console, choose the bucket <stack-name>-sourcess3bucket-*.
  2. Choose Upload.
  3. Choose Add files.
  4. Choose the order_data_09_02_2020.csv and order_data_10_02_2020.csv files.
  5. Choose Upload.
  6. On the Lambda console, choose the function <stack-name>-CsvToDDBLambdaFunction-*.
  7. Choose Test.
  8. For Event template, enter an event name.
  9. Choose Create.
  10. Choose Test.

This runs the Lambda function and loads the CSV file order_data_09_02_2020.csv to the DynamoDB table.

  1. Wait until the message appears that the function ran successfully.

You can now view the data on the DynamoDB console, in the details page for your table.

Because you enabled the Kinesis data stream in the DynamoDB table, it starts streaming the data to Amazon S3. You can check the data by viewing the bucket on the Amazon S3 console. The following screenshot shows that a Parquet file is under the prefix in the bucket.

Use Apache Hudi with Amazon EMR

Now it’s time to process the streaming data using Hudi.

  1. Log in to the Amazon EMR leader node.

You can use the key pair you chose in the security options to SSH into the leader node.

  1. Use the following bash command to start the Spark shell to use it with Apache Hudi:
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

The Amazon EMR instance looks like the following screenshot.

  1. You can use the following Scala code to import the order transaction data from the S3 data lake to a Hudi dataset using the copy-on-write storage type. Change inputDataPath as per file path in <stack-name>-raws3bucket-* in your environment, and replace the bucket name in hudiTablePath as <stack-name>- processeds3bucket-*.
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Set up various input values as variables
val inputDataPath = "s3://kinesis-hudi-raws3bucket-1p6nszvqd9awz/2021/02/01/15/"
val hudiTableName = "order_hudi_cow"
val hudiTablePath = "s3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/" + hudiTableName

// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id",
	DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "time_stamp",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "transaction_date", 
    HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "time_stamp", 
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "transaction_date", 
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
        classOf[MultiPartKeysValueExtractor].getName)

// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("parquet").load(inputDataPath)

// Write data into the Hudi dataset
inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath)

For more information about DataSourceWriteOptions, see Work with a Hudi Dataset.

  1. In the Spark shell, you can now count the total number of records in the Apache Hudi dataset:
scala> inputDF.count()
res1: Long = 1000

You can check the processed Apache Hudi dataset in the S3 data lake via the Amazon S3 console. The following screenshot shows the prefix order_hudi_cow is in <stack-name>- processeds3bucket-*.

When navigating into the order_hudi_cow prefix, you can find a list of Hudi datasets that are partitioned using the transaction_date key—one for each date in our dataset.

Let’s analyze the data stored in Amazon S3 using Athena.

Analyze the data with Athena

To analyze your data, complete the following steps:

  1. On the Athena console, create the database order_db using the following command:
create database order_db;

You use this database to create all the Athena tables.

  1. Create your table using the following command (replace the S3 bucket name with <stack-name>- processeds3bucket* created in your environment):
    CREATE EXTERNAL TABLE order_transaction_cow (
      `_hoodie_commit_time` string,
      `_hoodie_commit_seqno` string,
      `_hoodie_record_key` string,
      `_hoodie_partition_path` string,
      `_hoodie_file_name` string,
      `order_id` string,
      `item_id` string,
      `customer_id` string,
      `product` string,
      `amount` decimal(3,1),
      `currency` string,
      `time_stamp` string
      )
      PARTITIONED BY ( 
      `transaction_date` string)
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hudi.hadoop.HoodieParquetInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/order_hudi_cow'

  2. Add partitions by running the following query on the Athena console:
    ALTER TABLE order_transaction_cow ADD
    PARTITION (transaction_date = '2020-09-02') LOCATION 's3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/order_hudi_cow/2020-09-02/';

  3. Check the total number of records in the Hudi dataset with the following query:
    SELECT count(*) FROM "order_db"."order_transaction_cow";

It should return a single row with a count of 1,000.

Now check the record that you want to update.

 

  1. Run the following query on the Athena console:
SELECT * FROM "order_db"."order_transaction_cow"
where order_id ='3801'
and item_id ='23'
and transaction_date ='2020-09-02';

The output should look like the following screenshot. Note down the value of product and amount.

Analyze the change data capture

Now let’s test the change data capture (CDC) in streaming. Let’s take an example where the customer changed an existing order. We load the order_data_10_02_2020.csv file, where order_id 3801 has a different product and amount.

To test the CDC feature, complete the following steps:

  1. On the Lambda console, choose the stack <stack-name>-CsvToDDBLambdaFunction-*.
  2. In the Environment variables section, choose Edit.
  3. For key, enter order_data_10_02_2020.csv.
  4. Choose Save.

You can see another prefix has been created in <stack-name>-raws3bucket-*.

  1. In Amazon EMR, run the following code in the Scala shell prompt to update the data (change inputDataPath to the file path in <stack-name>-raws3bucket-* and hudiTablePath to <stack-name>- processeds3bucket-*):
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor
    
    //Set up various input values as variables
    val inputDataPath = "s3://kinesis-hudi-raws3bucket-1p6nszvqd9awz/2021/02/01/18/"
    val hudiTableName = "order_hudi_cow"
    val hudiTablePath = "s3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/" + hudiTableName
    
    // Set up our Hudi Data Source Options
    val hudiOptions = Map[String,String](
        DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id",
    	DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "time_stamp",
        DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "transaction_date", 
        HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
        DataSourceWriteOptions.OPERATION_OPT_KEY ->
            DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
        DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "time_stamp", 
        DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
        DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
        DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "transaction_date", 
        DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
            classOf[MultiPartKeysValueExtractor].getName)
    
    // Read data from S3 and create a DataFrame with Partition and Record Key
    val inputDF = spark.read.format("parquet").load(inputDataPath)
    
    // Write data into the Hudi dataset
    inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath
    

  2. Run the following query on the Athena console to check for the change to the total number of records as 1,000:
    SELECT count(*) FROM "order_db"."order_transaction_cow";

  3. Run the following query on the Athena console to test for the update:
SELECT * FROM "order_db"."order_transaction_cow"
where order_id ='3801'
and item_id ='23'
and transaction_date ='2020-09-02';

The following screenshot shows that the product and amount values for the same order are updated.

In a production workload, you can trigger the updates on a schedule or by S3 modification events. A fully automated data lake makes sure your business analysts are always viewing the latest available data.

Clean up the resources

To avoid incurring future charges, follow these steps to remove the example resources:

  1. Delete the resources you created earlier in the pre-requisite section by deleting the stack instances from your stack set, if you created the EMR cluster with the CloudFormation template,.
  2. Stop the cluster via the Amazon EMR console, if you launched the EMR cluster manually.
  3. Empty all the relevant buckets via the Amazon S3 console.

Conclusion

You can build an end-to-end serverless data lake to get real-time insights from DynamoDB by using Kinesis Data Streams—all without writing any complex code. It allows your team to focus on solving business problems by getting useful insights immediately. Application developers have various use cases for moving data quickly through an analytics pipeline, and you can make this happen by enabling Kinesis Data Streams for DynamoDB.

If this post helps you or inspires you to solve a problem, we would love to hear about it! The code for this solution is available in the GitHub repository for you to use and extend. Contributions are always welcome!


About the Authors

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to guide enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

 

 

 

Saurabh Shrivastava is a solutions architect leader and analytics/ML specialist working with global systems integrators. He works with AWS Partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

 

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on data analytics, AI/ML, and DevOps.

 

Retaining data streams up to one year with Amazon Kinesis Data Streams

Post Syndicated from Nihar Sheth original https://aws.amazon.com/blogs/big-data/retaining-data-streams-up-to-one-year-with-amazon-kinesis-data-streams/

Streaming data is used extensively for use cases like sharing data between applications, streaming ETL (extract, transform, and load), real-time analytics, processing data from internet of things (IoT) devices, application monitoring, fraud detection, live leaderboards, and more. Typically, data streams are stored for short durations of time before being loaded into a permanent data store like a data lake or analytics service.

Additional use cases are becoming more prevalent that may require you retain data in streams for longer periods of time. For example, compliance programs like HIPAA and FedRAMP may require you to store raw data for more than a few days or weeks, or you may want to backtest machine learning (ML) algorithms with historical data that may be several months old.

A challenge arises when you want to process historical data and newly arriving data streams. This requires complex logic to access your data lake and your data stream store, or two sets of code—one to process data from your data lake and one to process your new data streams.

Amazon Kinesis Data Streams solves this challenge by storing your data streams up to 1 year with long-term retention. You can use the same Kinesis Data Streams code base to process both historical and newly arriving data streams, and continue to use features like enhanced fan-out to read large data volumes at very high throughput.

In this post, we describe how long-term retention enables new use cases by bridging real-time and historical data processing. We also demonstrate how you can reduce the time to retrieve 30 days of data from a data stream by an order of magnitude using Kinesis Data Streams enhanced fan-out.

Simple setup, no resource provisioning

Kinesis Data Streams durably stores all data stream records in a shard, an append-only log ordered by arrival time. The time period from when a record is added to when it’s no longer accessible is called the retention period. A Kinesis data stream stores records for 24 hours by default, up to 365 days (8,760 hours). Applications can start reading data at any point in the retention period in the exact order in which the data stream is stored. Shards enable these applications to process data in parallel and at low-latency.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

The default retention period is 24 hours and covers scenarios where intermittent lags in processing need to catch up with the real-time data. You can extend retention up to 7 days to reprocess slightly aged data to resolve potential downstream data losses. You can also use long-term retention to store data for more than 7 days and up to 365 days to reprocess historical data for use cases like algorithm backtesting, data store backfills, and auditing. For more information, see Changing the Data Retention Period.

Similarly, you can use the following AWS Command Line Interface (AWS CLI) command to set the retention period in hours (the following code sets it to 9 days, or 216 hours):

aws kinesis increase-stream-retention-period \
    --stream-name samplestream \
    --retention-period-hours 216

Read new and historical data, no code changes necessary

All the data captured in the stream is stored in a durable, encrypted, and secure manner for the specified retention period up to a maximum of 1 year. You can store any amount of data, retrieve it by specifying a start position, and read sequentially using the familiar getRecords and SubscribeToShard APIs. The start position can be the sequence number of a data record in a shard or a timestamp. This enables you to use the same code to process older data. You can set up multiple consuming applications to start processing data at different points in the data stream.

Speed up data reads using enhanced fan-out consumers

Kinesis Data Streams provides two types of models to consume data: shared throughput consumer and enhanced fan-out (EFO) consumer. In the shared throughput consumer model, all the consuming applications share 2 MB/s per shard read throughput and a 5 transactions per second (TPS) quota. In the enhanced fan-out model, each consumer gets a dedicated read throughput of 2MB/s per shard. Because it uses an HTTP/2 data retrieval API, there is no longer a limit of 5 TPS. You can attach up to 20 EFO consumers to a single stream and read data at a total rate of 40MB/s per shard. Because each consumer gets dedicated read throughput, processing one doesn’t impact another. So you can attach new consumers to process old data without worrying about the performance of the existing consumer processing real-time data. For example, you can retrain an ML model in an ad hoc fashion without impacting real-time workflows.

You can add and remove EFO consumers at any time and avoid paying for over-provisioned resources. For example, when backtesting, you can register EFO consumers before the test and remove them after completion. You’re only charged for resources used during the test. Also, you can use EFO consumers to accelerate the speed of processing. Each consuming application can process different parts of streams across the retention period to process all the data in parallel, thereby dramatically reducing the total processing time.

Clickstream pipeline use case

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

This pipeline takes clickstream data and creates an alert every time a user leaves your ecommerce site without purchasing the items in their cart. A simple pipeline like this is a great way to start with stream processing, but soon you may want to implement a recommendation system based on user activity on your website and mobile app. To do this, you need to gather historical data in your existing data stream and send it to Amazon Simple Storage Service (Amazon S3) so it can be used for training a recommendation ML model. This scenario illustrates a key benefit of enabling long-term retention: it gives you the flexibility to “go back in time” and replay the existing data in your stream to generate new analytics that you may not have considered when you initially set up the streaming pipeline.

Let’s say you enabled 30 days of retention on your Kinesis data stream. After you train your ML model, you can set up a new streaming pipeline that generates recommendations by calling an inference endpoint hosted on Amazon SageMaker based on the trained ML model. The following diagram illustrates the final state of this architecture.

The following diagram illustrates the final state of this architecture.

You can efficiently and quickly consume the existing data in the stream and write it to Amazon S3 so it can be used for training your ML model. The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

You may wonder, why read from Kinesis Data Streams and write to Amazon S3? Why not write to Amazon S3 directly without enabling long-term retention? First, ingesting into Kinesis Data Streams with long-term retention enabled gives you the flexibility to generate additional streaming analytics as time passes. Second, this gives you the flexibility to filter and transform the data being read from Kinesis Data Streams before generating analytics or writing to Amazon S3. Lastly, you can use this approach to render analytics onto other systems besides Amazon S3, such as Amazon Elasticsearch Service (Amazon ES) using the Elasticsearch sink for Apache Flink.

Keep in mind that we only use this pipeline to bootstrap our second, long-lived pipeline that does recommendations, but this is an important step and we need a way to do this efficiently. Although there are multiple options for consuming data from Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink provides an elegant way to attach multiple EFO consumers in the same consuming application.

You can find more information at the official Apache Flink website, and about Kinesis Data Analytics for Apache Flink in the Kinesis Data Analytics developer guide. Apache Flink has a number of connectors, like the recently released FlinkKinesisConsumer, which supports enhanced fan-out for consuming from Kinesis Data Streams, or the Streaming File Sink to write to Amazon S3 from your Apache Flink application.

Accelerating data consumption

For the sake of simplicity, let’s use just one shard in our data stream, ingest data at the maximum rate of 1MB/s, and specify a retention period of 30 days. To bootstrap our new analytics, reading the full amount of data over 30 days with one EFO consumer at 2MB/s could potentially take up to 15 days to load this data into Amazon S3. However, you can accelerate this to 20 times faster using 20 EFO consumers at the same time, each reading from different points in the stream at 2 MB/s. The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

This gives us a total of 40MB/s in consumption capacity as opposed to 2MB/s per shard with just one EFO consumer, reducing the overall time by 95%. In most use cases, this combination of Kinesis Data Analytics and EFO allows you to process 30 days of data in hours, instead of days.

A point of clarification regarding our approach: When all 20 consumers are finished reading past their respective endpoints in the stream, we stop the Apache Flink application. You can do this by raising an exception when all 20 consumers finish reading their respective time slices—effectively stopping the application. The following diagram illustrates the time savings we get from using 20 EFO consumers.

The following diagram illustrates the time savings we get from using 20 EFO consumers.

For more information about implementing this approach, see the GitHub repo.

Pricing

An additional cost is associated with long-term retention (from 7–365 days) and EFO consumers. For more information, see Amazon Kinesis Data Streams pricing. Because you can register EFO consumers on demand, you pay only for the limited time you used all 20 consumers to load data, resulting in faster loads. It’s important to point out that you pay roughly the same amount to consume a fixed volume of data from the stream with 20 EFO consumers as you do with 1 EFO consumer because of the shorter duration required when using 20 consumers. 

Summary

In this post, we discussed long-term retention use cases of Kinesis Data Streams, how to increase the retention of a data stream, and related feature enhancements with Kinesis Data Streams APIs and KCL. We took a deep dive into the Apache Flink-based enhanced-fan out consumer approach to replay long-term data quickly. We shared open-source code based on this approach so you can easily implement your use cases using Kinesis Data Streams long-term retention. 

You should use long-term retention if you’re planning to develop ML systems, generate customer behavior insights, or have compliance requirements for retaining raw data for more than 7 days. We would love to hear about your use cases with the long-term retention feature. Please submit your feedback to [email protected].


About the Authors

Nihar ShethNihar Sheth is a Senior Product Manager on the Amazon Kinesis Data Streams team at Amazon Web Services. He is passionate about developing intuitive product experiences that solve complex customer problems and enables customers to achieve their business goals. Outside of work, he is focusing on hiking 200 miles of beautiful PNW trails with his son in 2021.

 

 

Karthi Thyagarajan is a Solutions Architect on the Amazon Kinesis Team focusing on all things streaming and he enjoys helping customers tackle distributed systems challenges.

 

 

 

 

Sai Maddali is a Sr. Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Streams . He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

 

 

Larry Heathcote is a Senior Product Marketing Manager at Amazon Web Services for data streaming and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys walking his Samoyed Sasha in the mornings so she can look for squirrels to bark at.

How Baqend built a real-time web analytics platform using Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Wolfram Wingerath original https://aws.amazon.com/blogs/big-data/how-baqend-built-a-real-time-web-analytics-platform-using-amazon-kinesis-data-analytics-for-apache-flink/

This is a customer post written by the engineers from German startup Baqend and the AWS EMEA Prototyping Labs team.

Baqend is one of the fastest-growing software as a service (SaaS) startups in Germany, serving over 5,000 business customers with more than 100 million monthly users and $2 billion EUR revenue per year. Baqend’s main product is a one-click solution to accelerate ecommerce websites called Speed Kit. By rerouting a portion of the web traffic through Speed Kit’s caching infrastructure, it achieves a typical performance boost between 1.5–3 times faster.

To measure the impact of Speed Kit and confirm its uplift to Baqend’s customers, we maintain several dashboards that display the technical and business performance improvements achieved by Speed Kit. This requires complex aggregations of tracking data collected during A/B tests on our customers’ websites.

The Challenge: Real-time analytics and reporting at scale

One of the key issues with our legacy solution for monitoring and reporting needed to process. The raw tracking data from all users was batched through various systems, which resulted in processing delays up to 24 hours for some analytics jobs. This impacted our operations monitoring and sales activities negatively, because our customers sometimes couldn’t analyze the impact of deployment changes until the next day. Furthermore, our legacy reporting service lacked any support for custom visualization development.

This post shows you how we transformed our batch-based analytics process into a continuous complex event-processing pipeline, which is managed by Amazon Kinesis Data Analytics for Apache Flink. The new solution exhibits less than a minute of end-to-end latency from data ingestion to visual output in the dashboard.

The key topics presented in this post are:

Solution overview and key components

Following a remote planning phase in which we defined our requirements and laid out the basic design, we built the solution on an on-site prototyping engagement with AWS over the course of 4 weeks in early 2020 in Hamburg. Seven team members from Baqend and AWS EMEA Prototyping Labs implemented the following architecture.

Following a remote planning phase in which we defined our requirements and laid out the basic design.

The workflow includes the following steps:

  1. The performance tracking data is streamed by Speed Kit Amazon Elastic Compute Cloud (Amazon EC2) instances.
  2. This data goes into an Amazon Kinesis Data Streams
  3. This data stream is consumed by a Kinesis Data Analytics for Apache Flink application.
  4. The data is ingested into Amazon ES.
  5. This streaming application relies on AWS Secrets Manager to store and access the credentials for Elasticsearch with basic HTTP authentication.
  6. An Nginx proxy server application hosted on EC2 instances in multiple public subnets and Availability Zones redirects the user requests Kibana with Amazon Cognito authentication (for more information, see How do I use an NGINX proxy to access Kibana from outside a VPC that’s using Amazon Cognito authentication?).
  7. The Apache Flink application also uses Amazon DynamoDB as a backend for long-living external states required for certain operations (covered later in this post).
  8. The streaming application also delivers the raw and intermediate data outputs to an Amazon Simple Storage Service (Amazon S3) bucket to enable historical data analysis and operational troubleshooting with Amazon Athena.

Although the prototyping engagement also covered other aspects, we focus on the Kinesis Data Analytics application in the following sections of this post.

Continuous aggregation with Kinesis Data Analytics

We need to collect all kinds of technical data points on every page load of a website visitor. Details on the individual page impressions (PI) help us analyze web performance for the websites of our customers. Speed Kit provides a performance tracking functionality that collects data within the browser of every website visitor and sends it to our analytics backend.

Aggregating page impressions

Intuitively, there should be only one data beacon for any given PI because the data could be aggregated in the browser before it’s sent to our backend. Speed Kit sends several data beacons during the page load to minimize the possibility of any data loss, as shown in the following figure.

Speed Kit sends several data beacons during the page load to minimize the possibility of any data loss, as shown in the following figure.

For example, static information such as the target URL or the current time can be sent away as soon the navigation starts (navigation beacon), whereas certain measurements can’t be sent until very late in the load process, like the time it took to load the entire page (load beacon). Certain events may even occur minutes after the page load, or not at all (for example, user interaction with the page or JavaScript errors) and are therefore handled via dedicated and optional transmissions (event beacons). These beacons need to be correlated in our analytical backend later on.

Aggregating session data

Because some of the most interesting metrics are computed on the level of user sessions, aggregating all data beacons for the individual PIs isn’t enough to analyze web performance. For instance, the user engagement metrics are often quantified by the number of pages visited in one sitting (session length) or the share of users that left on the very first page (bounce rate).

Aggregating relevant information may even involve identifying and removing duplicates, as illustrated in the following figure.

Aggregating relevant information may even involve identifying and removing duplicates, as illustrated in the following figure.

Suppose the user first checks out the landing page and immediately leaves (Session 1), and then comes back later to browse through some products and buy some blue shoes (Session 2), and finally returns after a few hours to reload the order confirmation page and browse some more products (Session 3). Because Session 3 starts with a reload of the order confirmation page, tracking data on the order that was completed in Session 2 is transmitted a second time, resulting in a potentially duplicated count of the completed orders. Therefore, our analytical backend needs to identify the duplicated tracking information as such and ignore it for further analysis. To enable this, we persistently store a salted hash of every order ID and simply have the aggregation pipeline drop the tracking data on any order that has already been written to the external key value store (see the diagram in the following section).

Anatomy of the streaming application

The following diagram shows our event processing pipeline from raw data collection to the storage of aggregation results.

The following diagram shows our event processing pipeline from raw data collection to the storage of aggregation results.

The workflow is as follows:

  1. The first step is tracking the data within the browsers of the end users.
  2. The data is sent to Kinesis Data Streams for consumption through a custom stateful Apache Flink process function within a Kinesis Data Analytics application.
  3. Raw data beacons are initially normalized and invalid data beacons are delivered to Amazon S3 via side outputs to facilitate later analysis of all data that has been sorted out.
  4. As mentioned earlier, we use a DynamoDB table to run a deduplication rule over all incoming order data (confirmation pages) by the DynamoDB Transactions API. We also use another DynamoDB table to identify bot traffic by storing the user agent strings that have been associated with suspicious behavior consistently (because they belong to web crawlers). Finally, the stream of cleaned tracking beacons is processed in stateful window aggregation steps for storage.
  5. We aggregate all beacons referring to the same PI and write them off to our data lake on Amazon S3 to enable offline analysis with Athena.
  6. Furthermore, we compile the tracking beacon stream into 1-minute summaries containing both PI and session data for storage via Amazon ES to enable efficient reporting with Kibana.

State storage and application management

Most of the application state for the streaming application is held in the built-in RocksDB state backend with incremental checkpointing. This default built-in state storage mechanism depends on a 50 GB storage limit provided for each Kinesis Processing Unit (KPU) allocated to a Kinesis Data Analytics application. On the other hand, we used DynamoDB tables to store the state permanently for unique conversions and user agent strings in order to decouple historical state for these two data types from Apache Flink application management and to keep the checkpointing duration and size under control. Using DynamoDB for these two use cases helps to control the overhead for creating and restoring checkpoints and thereby controls the application startup time.

Workload distribution and scalability

As of February 2021, our processing pipeline handles over 2.8 billion tracking beacons per month, which corresponds to more than 500 million individual PIs from over 140 million user sessions and more than 100 million unique users. Achieving this scale requires even distribution of both processing and storage load across all stream partitions. Therefore, we use randomly generated session IDs as a partitioning key for the input Kinesis data stream and throughout most of the remaining sections of our pipeline.

In the presence of certain anomalies such as heavy bot traffic, a load skew may occur regardless, which may impair overall throughput or even crash the entire application in extreme cases. We monitor the number of incoming and outgoing records (to derive the current buffer size) for the individual Apache Flink operators in every stream partition to identify issues with the load distribution quickly and generate alert notifications via multiple channels (such as Slack and email) if the measurements for different stream partitions diverge significantly. For convenience, we further visualize custom Amazon CloudWatch metrics in a Grafana dashboard.

Event processing, delivery semantics, and fault tolerance

The application restarts and downtime (such as during and after application deployment) can be handled seamlessly by using Apache Flink’s event time processing semantics as generated output is independent of the wall-clock time of the processing nodes. All processing is based on monotonically increasing ingestion timestamps to eliminate the possibility of late arrivers. While our data cleaning procedure identifies the invalid records, it never drops any data items from the stream, but instead it only attaches information on the detected issue to the data item in question. This approach enables us to analyze the frequency and distribution of every problem in our aggregation pipeline by using the same Kibana dashboard.

Even though the data ingestion to Amazon ES provides at-least-once delivery guarantees by default, we managed to achieve exactly-once delivery guarantees from the source Kinesis data stream to the Elasticsearch index by generating document identifiers in a deterministic fashion. Therefore, the data stream can be replayed safely because the existing data records are overwritten on re-insertion into the Elasticsearch index.

Data retention and multi-tenancy in Amazon ES

We store pre-aggregated data at the minute level in Amazon ES to make sure our Kibana dashboard remains responsive even when analyzing a scope of weeks or months. As illustrated in the following figure, the Apache Flink application summarizes the raw tracking data along different dimensions (browser, device, test group, and aggregation time in minutes) before writing it to Elasticsearch.

Apache Flink application summarizes the raw tracking data along different dimensions (browser, device, test group, and aggregation time in minutes) before writing it to Elasticsearch.

The Elasticsearch documents are composed of bucketed histogram data for performance timers such as the First Contentful Paint (FCP) instead of the actual timer values. Running queries over these aggregates instead of the raw data minimizes query run costs significantly: traffic-heavy customers may have tens of millions of raw tracking beacons in a single week, whereas the number of 1-minute buckets is several orders of magnitudes lower (for small and large customers alike). We observe over 5 times more PIs and 30 times more raw beacons than aggregates stored in Elasticsearch across all of our customers.

We store the data for different customers in separate indexes generated for a fixed temporal rolling period by the Apache Flink Elasticsearch Sink Connector. We also implemented customer-specific retention policies in Amazon ES by deleting the old indexes as required. Our deployment is multi-tenant so that our customers can receive fine-grained access only to their own data stored in the indexes created for them.

Kibana for continuous reporting

We used Kibana to build our dashboards because it provides powerful and easy-to-create built-in visualizations and virtually boundless flexibility through custom Vega chart visualizations. Kibana also works well in combination with Elasticsearch indexes, thereby facilitating the role-based access management that enables us to provide individual customers access to the data in our multi-tenant dashboard.

Easy data exploration

The following illustration shows one of the standard visualizations in Kibana that we use to understand the distribution of device types and browsers used by website visitors.

The following illustration shows one of the standard visualizations in Kibana that we use to understand the distribution of device types and browsers used by website visitors.

Real-time histogram visualization

Illustrating the distribution of performance metrics requires using a custom visualization. The following custom Vega histogram chart illustrates by the concrete performance metric LCP how Speed Kit improves the webpage load time.

The following custom Vega histogram chart illustrates by the concrete performance metric LCP how Speed Kit improves the webpage load time.

In comparison with the vanilla website where page loads are almost never faster than 2 seconds (pink area), Speed Kit-accelerated end users experience comparatively faster and even sub-second level load times (blue area).

Because our main business revolves around accelerating our customers’ websites, visualizing the actual uplift is critical for all developers (to debug performance and identify issues quickly) as well as our customers (highlighting the value of our product). With the continuous aggregation and reporting solution outlined in this post, we were able to satisfy all these requirements in a scalable and fully managed fashion.

Conclusion and future directions

In this post, we shared our journey from a high-volume batch analytics solution to a continuous aggregation pipeline using Kinesis Data Analytics for Apache Flink. Key aspects are:

  • End-to-end processing time is reduced from 24 hours to sub-minute latency.
  • We implemented a fully functional prototype within 4 weeks. The AWS Prototyping team enabled us to build our system on a multitude of managed AWS services.
  • The system was used with production load after 8 weeks.
  • The new system based on the Kinesis Data Analytics for Apache Flink application exhibits extreme scalability as it handles workloads with ease that were infeasible for the old system. As of February 2021, our system processes more than 500 million page loads from over 100 million unique users every month.
  • Elasticsearch and Kibana with customized Vega visualizations provides flexible and continuously updating dashboards for all our customers.

Additional Resources

For more details on the challenges and solutions discussed in this article, we recommend the following resources:

We would be glad to get feedback on our work, so please drop us a line in case of any remaining questions!


About the Authors

Wolfram “Wolle” Wingerath heads the data engineering team that is responsible for developing and operating Baqend’s infrastructure for analytics and reporting.

 

 

 

Florian Bücklers is Baqend’s Chief Technology Officer and therefore responsible for coordinating between the different teams for front-end and backend development, devOps, onboarding, and data engineering.

 

Benjamin Wollmer develops data-intensive systems at Baqend, but he is also doing his PhD at the University of Hamburg and therefore likes to read and write about related topics.

 

 

Stephan Succo is one of the core developers of Baqend’s continuous analytics pipeline.

 

Jörn Domnik is a Senior Software Engineer at Baqend with a focus on backend development and reliability engineering.

 

 

 

As a DevOps engineer, Virginia Amberg monitors cluster health and keeps all systems running smoothly at Baqend.

 

 

As a Principal Prototyping Engagement Manager in AWS, Markus Bestehorn is responsible for building business-critical prototypes with AWS customers and is a specialist for IoT and machine learning.

 

 

 

As a Data Prototyping Architect in AWS, Anil Sener builds prototypes on big data analytics, data streaming, and machine learning, which accelerates the production journey on the AWS Cloud for top EMEA customers.

 

 

As B2B Strategic Account Manager for Startups at AWS, Daniel Zäeh works with customers to make their ideas come true and helps them grow, by connecting tech and business.

 

 

 

 

 

 

Building a real-time notification system with Amazon Kinesis Data Streams for Amazon DynamoDB and Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/building-a-real-time-notification-system-with-amazon-kinesis-data-streams-for-amazon-dynamodb-and-amazon-kinesis-data-analytics-for-apache-flink/

Amazon DynamoDB helps you capture high-velocity data such as clickstream data to form customized user profiles and Internet of Things (IoT) data so that you can develop insights on sensor activity across various industries, including smart spaces, connected factories, smart packing, fitness monitoring, and more. It’s important to store these data points in a centralized data lake in real time, where they can be transformed, analyzed, and combined with diverse organizational datasets to derive meaningful insights and make predictions.

A popular use case in the wind energy sector is to protect wind turbines from wind speed. As per National Wind Watch, every wind turbine has a range of wind speeds, typically 30–55 mph, in which it produces maximum capacity. When wind speed is greater than 70 mph, it’s important to start shutdown to protect the turbine from a high wind storm. Customers often store high-velocity IoT data in DynamoDB and use Amazon Kinesis streaming to extract data and store it in a centralized data lake built on Amazon Simple Storage Service (Amazon S3). To facilitate this ingestion pipeline, you can deploy AWS Lambda functions or write custom code to build a bridge between DynamoDB Streams and Kinesis streaming.

Amazon Kinesis Data Streams for DynamoDB help you to publish item-level changes in any DynamoDB table to a Kinesis data stream of your choice. Additionally, you can take advantage of this feature for use cases that require longer data retention on the stream and fan out to multiple concurrent stream readers. You also can integrate with Amazon Kinesis Data Analytics or Amazon Kinesis Data Firehose to publish data to downstream destinations such as Amazon Elasticsearch Service, Amazon Redshift, or Amazon S3.

In this post, you use Kinesis Data Analytics for Apache Flink (Data Analytics for Flink) and Amazon Simple Notification Service (Amazon SNS) to send a real-time notification when wind speed is greater than 60 mph so that the operator can take action to protect the turbine. You use Kinesis Data Streams for DynamoDB and take advantage of managed streaming delivery of DynamoDB data to other AWS services without having to use Lambda or write and maintain complex code. To process DynamoDB events from Kinesis, you have multiple options: Amazon Kinesis Client Library (KCL) applications, Lambda, and Data Analytics for Flink. In this post, we showcase Data Analytics for Flink, but this is just one of many available options.

Architecture

The following architecture diagram illustrates the wind turbine protection system.

The following architecture diagram illustrates the wind turbine protection system.

In this architecture, high-velocity wind speed data comes from the wind turbine and is stored in DynamoDB. To send an instant notification, you need to query the data in real time and send a notification when the wind speed is greater than the established maximum. To achieve this goal, you enable Kinesis Data Streams for DynamoDB, and then use Data Analytics for Flink to query real-time data in a 60-second tumbling window. This aggregated data is stored in another data stream, which triggers an email notification via Amazon SNS using Lambda when the wind speed is greater than 60 mph. You will build this entire data pipeline in a serverless manner.

Deploying the wind turbine data simulator

To replicate a real-life scenario, you need a wind turbine data simulator. We use Amazon Amplify in this post to deploy a user-friendly web application that can generate the required data and store it in DynamoDB. You must have a GitHub account which will help to fork the Amplify app code and deploy it in your AWS account automatically.

Complete the following steps to deploy the data simulator web application:

  1. Choose the following AWS Amplify link to launch the wind turbine data simulator web app.

  1. Choose Connect to GitHub and provide credentials, if required.

Choose Connect to GitHub and provide credentials, if required.

  1. In the Deploy App section, under Select service role, choose Create new role.
  2. Follow the instructions to create the role amplifyconsole-backend-role.
  3. When the role is created, choose it from the drop-down menu.
  4. Choose Save and deploy.

Choose Save and deploy.

On the next page, the dynamodb-streaming app is ready to deploy.

  1. Choose Continue.

On the next page, the dynamodb-streaming app is ready to deploy.

On the next page, you can see the app build and deployment progress, which might take as many as 10 minutes to complete.

  1. When the process is complete, choose the URL on the left to access the data generator user interface (UI).
  2. Make sure to save this URL because you will use it in later steps.

Make sure to save this URL because you will use it in later steps.

You also get an email during the build process related to your SSH key. This email indicates that the build process created an SSH key on your behalf to connect to the Amplify application with GitHub.

  1. On the sign-in page, choose Create account.

On the sign-in page, choose Create account.

  1. Provide a user name, password, and valid email to which the app can send you a one-time passcode to access the UI.
  2. After you sign in, choose Generate data to generate wind speed data.
  3. Choose the Refresh icon to show the data in the graph.

You can generate a variety of data by changing the range of minimum and maximum speeds and the number of values.

You can generate a variety of data by changing the range of minimum and maximum speeds and the number of values.

To see the data in DynamoDB, choose the DynamoDB icon, note the table name that starts with windspeed-, and navigate to the table in the DynamoDB console.

To see the data in DynamoDB, choose the DynamoDB icon, note the table name that starts with windspeed.

Now that the wind speed data simulator is ready, let’s deploy the rest of the data pipeline.

Deploying the automated data pipeline by using AWS CloudFormation

You use AWS CloudFormation templates to create all the necessary resources for the data pipeline. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time. You can view the template and code in the GitHub repository.

  1. Choose Launch with CloudFormation Console:
  2. Choose the US West (Oregon) Region (us-west-2).
  3. For pEmail, enter a valid email to which the analytics pipeline can send notifications.
  4. Choose Next.

For pEmail, enter a valid email to which the analytics pipeline can send notifications.

  1. Acknowledge that the template may create AWS Identity and Access Management (IAM) resources.
  2. Choose Create stack.

This CloudFormation template creates the following resources in your AWS account:

  • An IAM role to provide a trust relationship between Kinesis and DynamoDB to replicate data from DynamoDB to the data stream
  • Two data streams:
    • An input stream to replicate data from DynamoDB
    • An output stream to store aggregated data from the Data Analytics for Flink app
  • A Lambda function
  • An SNS topic to send an email notifications about high wind speeds
  1. When the stack is ready, on the Outputs tab, note the values of both data streams.

When the stack is ready, on the Outputs tab, note the values of both data streams.

Check your email and confirm your subscription to receive notifications. Make sure to check your junk folder if you don’t see the email in your inbox.

Check your email and confirm your subscription to receive notifications.

Now you can use Kinesis Data Streams for DynamoDB, which allows you to have your data in both DynamoDB and Kinesis without having to use Lambda or write custom code.

Enabling Kinesis streaming for DynamoDB

AWS recently launched Kinesis Data Streams for DynamoDB so that you can send data from DynamoDB to Kinesis Data. You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to enable this feature.

To enable this feature from the console, complete the following steps:

  1. In the DynamoDB console, choose the table that you created earlier (it begins with the prefix windspeed-).
  2. On the Overview tab, choose Manage streaming to Kinesis.

On the Overview tab, choose Manage streaming to Kinesis.

  1. Choose your input stream.

Choose your input stream.

  1. Choose Enable.

Choose Enable.

  1. Choose Close.

Choose Close.

Make sure that Stream enabled is set to Yes.

Make sure that Stream enabled is set to Yes.

Building the Data Analytics for Flink app for real-time data queries

As part of the CloudFormation stack, the new Data Analytics for Flink application is deployed in the configured AWS Region. When the stack is up and running, you should be able to see the new Data Analytics for Flink application in the configured Region. Choose Run to start the app.

Choose Run to start the app.

When your app is running, you should see the following application graph.

When your app is running, you should see the following application graph.

Review the Properties section of the app, which shows you the input and output streams that the app is using.

Review the Properties section of the app, which shows you the input and output streams that the app is using.

Let’s learn important code snippets of the Flink Java application in next section, which explain how the Flink application reads data from a data stream, aggregates the data, and outputs it to another data stream.

Diving Deep into Flink Java application code:

In the following code, createSourceFromStaticConfig provides all the wind turbine speed readings from the input stream in string format, which we pass to the WindTurbineInputMap map function. This function parses the string into the Tuple3 data type (exp Tuple3<>(turbineID, speed, 1)). All Tuple3 messages are grouped by turbineID to further apply a one-minute tumbling window. The AverageReducer reduce function provides two things: the sum of all the speeds for the specific turbineId in the one-minute window, and a count of the messages for the specific turbineId in the one-minute window. The AverageMap map function takes the output of the AverageReducer reduce function and transforms it into Tuple2 (exp Tuple2<>(turbineId, averageSpeed)). Then all turbineIds are filtered with an average speed greater than 60 and map them to a JSON-formatted message, which we send to the output stream by using the createSinkFromStaticConfig sink function.

final StreamExecutionEnvironment env =
   StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> input = createSourceFromStaticConfig(env);

input.map(new WindTurbineInputMap())
   .filter(v -> v.f2 > 0)
   .keyBy(0)
      .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
   .reduce(new AverageReducer())
   .map(new AverageMap())
   .filter(v -> v.f1 > 60)
   .map(v -> "{ \"turbineID\": \"" + v.f0 + "\", \"avgSpeed\": "+ v.f1 +" }")
   .addSink(createSinkFromStaticConfig());

env.execute("Wind Turbine Data Aggregator");

The following code demonstrates how the createSourceFromStaticConfig and createSinkFromStaticConfig functions read the input and output stream names from the properties of the Data Analytics for Flink application and establish the source and sink of the streams.

private static DataStream<String> createSourceFromStaticConfig(
   StreamExecutionEnvironment env) throws IOException {
   Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
   Properties inputProperties = new Properties();
   inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, (String) applicationProperties.get("WindTurbineEnvironment").get("region"));
   inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

   return env.addSource(new FlinkKinesisConsumer<>((String) applicationProperties.get("WindTurbineEnvironment").get("inputStreamName"),
      new SimpleStringSchema(), inputProperties));
}

private static FlinkKinesisProducer<String> createSinkFromStaticConfig() throws IOException {
   Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
   Properties outputProperties = new Properties();
   outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, (String) applicationProperties.get("WindTurbineEnvironment").get("region"));

   FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
      SimpleStringSchema(), outputProperties);
   sink.setDefaultStream((String) applicationProperties.get("WindTurbineEnvironment").get("outputStreamName"));
   sink.setDefaultPartition("0");
   return sink;
}

In the following code, the WindTurbineInputMap map function parses Tuple3 out of the string message. Additionally, the AverageMap map and AverageReducer reduce functions process messages to accumulate and transform data.

public static class WindTurbineInputMap implements MapFunction<String, Tuple3<String, Integer, Integer>> {
   @Override
   public Tuple3<String, Integer, Integer> map(String value) throws Exception {
      String eventName = JsonPath.read(value, "$.eventName");
      if(eventName.equals("REMOVE")) {
         return new Tuple3<>("", 0, 0);
      }
      String turbineID = JsonPath.read(value, "$.dynamodb.NewImage.deviceID.S");
      Integer speed = Integer.parseInt(JsonPath.read(value, "$.dynamodb.NewImage.value.N"));
      return new Tuple3<>(turbineID, speed, 1);
   }
}

public static class AverageMap implements MapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> {
   @Override
   public Tuple2<String, Integer> map(Tuple3<String, Integer, Integer> value) throws Exception {
      return new Tuple2<>(value.f0, (value.f1 / value.f2));
   }
}

public static class AverageReducer implements ReduceFunction<Tuple3<String, Integer, Integer>> {
   @Override
   public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> value1, Tuple3<String, Integer, Integer> value2) {
      return new Tuple3<>(value1.f0, value1.f1 + value2.f1, value1.f2 + 1);
   }
}

Receiving email notifications of high wind speed

The following screenshot shows an example of the notification email you will receive about high wind speeds.

The following screenshot shows an example of the notification email you will receive about high wind speeds.

To test the feature, in this section you generate high wind speed data from the simulator, which is stored in DynamoDB, and get an email notification when the average wind speed is greater than 60 mph for a one-minute period. You’ll observe wind data flowing through the data stream and Data Analytics for Flink.

To test this feature:

  1. Generate wind speed data in the simulator and confirm that it’s stored in DynamoDB.
  2. In the Kinesis Data Streams console, choose the input data stream, kds-ddb-blog-InputKinesisStream.
  3. On the Monitoring tab of the stream, you can observe the Get records – sum (Count) metrics, which show multiple records captured by the data stream automatically.
  4. In the Kinesis Data Analytics console, choose the Data Analytics for Flink application, kds-ddb-blog-windTurbineAggregator.
  5. On the Monitoring tab, you can see the Last Checkpoint metrics, which show multiple records captured by the Data Analytics for Flink app automatically.
  6. In the Kinesis Data Streams console, choose the output stream, kds-ddb-blog-OutputKinesisStream.
  7. On the Monitoring tab, you can see the Get records – sum (Count) metrics, which show multiple records output by the app.
  8. Finally, check your email for a notification.

If you don’t see a notification, change the data simulator value range between a minimum of 50 mph and maximum of 90 mph and wait a few minutes.

Conclusion

As you have learned in this post, you can build an end-to-end serverless analytics pipeline to get real-time insights from DynamoDB by using Kinesis Data Streams—all without writing any complex code. This allows your team to focus on solving business problems by getting useful insights immediately. IoT and application development have a variety of use cases for moving data quickly through an analytics pipeline, and you can make this happen by enabling Kinesis Data Streams for DynamoDB.

If this blog post helps you or inspires you to solve a problem, we would love to hear about it! The code for this solution is available in the GitHub repository for you to use and extend. Contributions are always welcome!


About the Authors

Saurabh Shrivastava is a solutions architect leader and analytics/machine learning specialist working with global systems integrators. He works with AWS partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

Sameer Goel is a solutions architect in Seattle who drives customers’ success by building prototypes on cutting-edge initiatives. Prior to joining AWS, Sameer graduated with a Master’s degree with a Data Science concentration from NEU Boston. He enjoys building and experimenting with creative projects and applications.

 

 

Pratik Patel is a senior technical account manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions by using best practices, and proactively helps keep customers’ AWS environments operationally healthy.

Building an ad-to-order conversion engine with Amazon Kinesis, AWS Glue, and Amazon QuickSight

Post Syndicated from Gandhi Raketla original https://aws.amazon.com/blogs/big-data/building-an-ad-to-order-conversion-engine-with-aws-glue-amazon-kinesis-data-streams-and-amazon-quicksight/

Businesses in ecommerce have the challenge of measuring their ad-to-order conversion ratio for ads or promotional campaigns displayed on a webpage. Tracking the number of users that clicked on a particular promotional ad and the number of users who actually added items to their cart or placed an order helps measure the ad’s effectiveness. Utilizing promotional ads that have higher conversion rates enables you to effectively utilize limited space on your ecommerce websites and applications.

This post demonstrates how to sessionize and aggregate clickstream and order data, compute the conversion ratio in real time, and generate data visualizations. We use Amazon Kinesis Data Streams to ingest and send data to Amazon Simple Storage Service (Amazon S3), and AWS Glue, Amazon Athena, and Amazon QuickSight to catalog, analyze, and visualize the data, respectively.

Solution overview

To measure ad-to-order conversion, you need two important pieces of data: user clicks and orders. Clickstream data is captured as users navigate through the site, each time users click on the webpage, and the metadata associated with those clicks. Depending on the user base and number of active users at any moment, clickstream data can be a large amount of data generated per second. Typically, every ecommerce system has a centralized order management system that captures orders created from different channels like a web portal or mobile app. To compute an ad-to-order conversion rate, you join clickstream data and order data over time: (total number of orders/total number of clicks) *100.

The following diagram illustrates the architecture of our solution.

The solution has six main categories.

  • Data generators – Clickstream and order data is generated with the help of an AWS Lambda function. The function is triggered by a scheduled Amazon CloudWatch Events event every minute and generates random clicks for ingestion into a Kinesis data stream. Similarly, another function triggered by a CloudWatch event generates random orders for ingestion into a second data stream. In a production environment, this data comes from clickstream generators and a centralized order management system.
  • Data ingestion – Kinesis data streams ingest clickstream and order data as they are generated.
  • Data sessionization – Data sessionization helps group related data. For clickstream data, we can group clicks on an ad by different users or time periods. For order data, we can group orders by different ads. We use Amazon Kinesis Data Analytics for SQL to analyze streaming data in real time with standard SQL. Sessionized clickstream and order data is ingested into another in-application stream.
  • Data processing and storage – The sessionization stream from Kinesis Data Analytics for SQL is ingested into an Amazon Kinesis Data Firehose delivery stream, which delivers the data to a pre-configured S3 bucket.
  • Data Catalog – You use AWS Glue to crawl the clickstream and orders data in their respective S3 buckets, as well as build metadata definitions and tables in Athena. AWS Glue crawlers run every hour to update table definitions, and Athena views are built to compute the ad-to-order conversion.
  • Data visualization – You use QuickSight to generate visualizations.

Prerequisites

Before getting started, you must provision your resources with AWS CloudFormation. 

  1. Choose Launch Stack.
  1. Choose Next.
  2. For Stack name, enter a name for the stack.
  3. For Bucket Name for Clicks, enter the name of the S3 bucket that holds clickstream data (for this post, click-stream).
  4. For Bucket Name for Orders, enter the name of the S3 bucket that holds order data (order-stream).
  5. Enter any tags you wish to assign to the stack.
  6. Choose Next.
  7. Verify that the stack has been created successfully.

If you have never used QuickSight in this account before, sign up for QuickSight before moving on to the next step. Keep in mind that admin access to the Enterprise Edition QuickSight instance is needed to complete setup. 

Generating and ingesting clickstream data

On the Lambda console, view your function ingest-clickstream for ingesting clickstream data. The clickstream data attributes include UserId, Device, Event, EventType, and Timestamp. The event contains promotional ad information on the webpage clicked by the user. This function generates random clickstreams and ingests it into the data stream ClickStream. The following screenshot shows your function details on the console.

A CloudWatch Events rule invokes this function every minute. The following screenshot shows sample data that was ingested into the data stream. The Event column represents the portion of the webpage the user clicked; every click on the webpage has a unique ID and type assigned (for example, P601 has the event type Promotion, C301 has the event type Checkout).

Generating and ingesting order data

On the AWS Lambda console, view your function ingest-order for ingesting order data. This function ingests random orders.

Each order has order lines, which contain the attributes ItemId, Promotion, UnitPrice, and Quantity (see the following screenshot). The promotion attribute indicates the ad the user clicked before adding the item to their shopping cart. This function generates random orders and ingests it into OrderStream. The Promotion attribute joins clickstream data and order data.

Sessionizing the data

To sessionize the data, complete the following steps:

  1. On the Kinesis Data Analytics console, select <Stack Name>-ClickStreamApplication.
  2. Choose Run.
  3. Repeat the same step for <Stack Name>-OrderAnalysisApp.
  4. When the status changes to Running, choose the application name.
  5. Under Real time analytics, choose Go to SQL results.
  6. Choose the Real-time analytics

The application groups clicks in 1-minute intervals. Let’s take the ad P701 as an example. If this ad is clicked by multiple users, this SQL function adds all the clicks by different users in the last minute. If five users clicked on P701 in the last minute, the function outputs a ClickCount of 5. A stagger window is used because it’s well-suited for analyzing groups of data that arrive at inconsistent times.

  1. On the Kinesis Data Analytics console, choose OrderAnalysisApp.
  2. Choose Go to SQL results.
    This application groups orders by Promotion, as shown in the following screenshot.

Processing and storing the data

In the data processing and storage stage, aggregated clickstream and order data is delivered to a Kinesis Data Firehose delivery stream. Kinesis Data Firehose delivers clickstream aggregated records and orders to the click-stream and order-stream buckets, respectively. The data is partitioned by year, month, and day. The following screenshot shows the delivery streams on the console.

Analyzing the data

To analyze your data, complete the following steps:

  1. Verify that the S3 bucket was created for clickstream and orders.

The data in the bucket is partitioned by year, month, date, and hour.

  1. On the AWS Glue console, view the clickstream and orders crawlers.

These two crawlers crawl the click-stream and order-stream buckets every 15 minutes and create tables.

  1. To run the crawlers on demand, choose Run crawler.

When the crawler is finished, the Tables added column displays 1.

  1. In the navigation pane, choose Tables.
  2. Verify that the crawlers created the tables.
  3. On the Athena console, choose Saved queries.

You can see three queries have been created.

  1. Select view_clicks_aggregate to load it in the query editor.
  2. Select ad_to_order_conversion and choose Run Query.

If the Amazon S3 bucket name has -, the crawler replaces - with _ while creating the table.

  1. Replace - with _ in the table name when creating the view.
  2. Repeat the same process for view_orders_aggregate and view_conversion_ratio.

Make sure you run view_clicks_aggregate and view_orders_aggregate before running view_conversion_ratio.

  1. Choose view_conversion_ratio and choose Preview.

Orders and clicks for each promotion and the corresponding conversion ratio are displayed.

Visualizing the data

To visualize your data, you first load it into QuickSight. You can then create visualizations. In this section, we also configure a scheduled data refresh.

Loading the data

To visualize your data, you must first load your data into QuickSight.

  1. On the QuickSight console, from the Admin drop-down menu, choose Manage QuickSight.
  2. In the navigation pane, choose Security & Permissions.
  3. Choose Add or remove.
  4. Select Amazon Athena.
  5. Select Amazon S3 to edit QuickSight access to your S3 buckets.
  6. Choose the Details link next to Amazon S3.
  7. Choose Select S3 buckets.
  8. Select the bucket names you provided for clicks and orders.
  9. Choose Finish.
  10. Choose Update.
  11. Choose the QuickSight icon on the top left of the admin panel to proceed back to the home screen.
  12. In the navigation pane, choose Datasets.
  13. Choose New dataset.
  14. Choose Athena.
  15. For Data source name, enter Ad-To-Order-Conversion.
  16. Choose Validate Connection.
  17. After your connection is validated, choose Create data source.
  18. For Database, choose ad-to-order-conversion.
  19. For Tables, select view_conversion_ratio.
  20. Choose Select.
  21. Choose Visualize.

Creating visualizations

In this section, we create two visualizations of our data. We first make a horizontal bar chart.

  1. From the Add menu, choose Add Calculated Field.
  2. Enter Clicks_to_Orders.
  3. Enter the formula sum(orders)/sum(clicks).
  4. Choose Save.
  5. Choose next to Click to orders.
  6. For Show as, choose Percent.
  7. For Visual type, choose Horizontal bar chart.
  8. Drag promotion to Y-axis.
  9. Drag clicks_to_orders to Value.
  10.  Drag date to Group/Color.

The following screenshot shows our visualization.

We now make our second visualization, a vertical bar chart.

  1. Choose the + icon next to Sheet1.
  2. For Visual types, choose Vertical bar chart.
  3. Drag promotions to Y-axis.
  4. Drag clicks and orders to Value.

This graph displays clicks and orders for each promotion.

  1. Choose Insights on the left panel to see a summary of your insights.

Refreshing the data

We can also set up a scheduled refresh for our data.

  1. Choose Manage Data.
  2. Choose view_conversion_ratio.
  3. Choose Schedule refresh.
  4. Choose Create.
  5. For Repeats, choose Hourly.
  6. Choose Create.

You see a confirmation message that you configured a refresh one time per hour.

Conclusion

In this post, we showed you how to use AWS analytics and storage services to address business challenges that require handling large volumes of data. Kinesis Data Streams and Kinesis Data Analytics let you ingest large volumes of data and sessionize the data. We also showed you how to analyze and visualize the clickstream and order data using AWS Glue, Athena, and QuickSight.


About the Authors

Gandhi Raketla is a Senior Solutions Architect for AWS. He works with AWS customers and partners on cloud adoption, architecting solutions that help customers foster agility and innovation.

 

 

 

Nick Sack is a DevOps Consultant for AWS Professional Services. He is passionate about working with customers and building automated solutions to help customers on their cloud journeys. When not working, Nick enjoys hiking, playing soccer, reading, and learning about technology.

Building a scalable streaming data processor with Amazon Kinesis Data Streams on AWS Fargate

Post Syndicated from Florian Mair original https://aws.amazon.com/blogs/big-data/building-a-scalable-streaming-data-processor-with-amazon-kinesis-data-streams-on-aws-fargate/

Data is ubiquitous in businesses today, and the volume and speed of incoming data are constantly increasing. To derive insights from data, it’s essential to deliver it to a data lake or a data store and analyze it. Real-time or near-real-time data delivery can be cost prohibitive, therefore an efficient architecture is key for processing, and becomes more essential with growing data volume and velocity.

In this post, we show you how to build a scalable producer and consumer application for Amazon Kinesis Data Streams running on AWS Fargate. Kinesis Data Streams is a fully managed and scalable data stream that enables you to ingest, buffer, and process data in real time. AWS Fargate is a serverless compute engine for containers that works with AWS container orchestration services like Amazon Elastic Container Service (Amazon ECS), which allows us to easily run, scale, and secure containerized applications.

This solution also uses the Amazon Kinesis Producer Library (KPL) and Amazon Kinesis Client Library (KCL) to ingest data into the stream and to process it. KPL helps you optimize shard utilization in your data stream by specifying settings for aggregation and batching as data is being produced into your data stream. KCL helps you write robust and scalable consumers that can keep up with fluctuating data volumes being sent to your data stream.

The sample code for this post is available in a GitHub repo, which also includes an AWS CloudFormation template to get you started.

What is data streaming?

Before we look into the details of data streaming architectures, let’s get started with a brief overview of data streaming. Streaming data is data that is generated continuously by a large number of sources that transmit the data records simultaneously in small packages. You can use data streaming for many use cases, such as log processing, clickstream analysis, device geo-location, social media data processing, and financial trading.

A data streaming application consists of two layers: the storage layer and the processing layer. As stream storage, AWS offers the managed services Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), but you can also run stream storages like Apache Kafka or Apache Flume on Amazon Elastic Compute Cloud (Amazon EC2) or Amazon EMR. The processing layer consumes the data from the storage layer and runs computations on that data. This could be an Apache Flink application running fully managed on Amazon Kinesis Analytics for Apache Flink, an application running stream processing frameworks like Apache Spark Streaming and Apache Storm or a custom application using the Kinesis API or KCL. For this post, we use Kinesis Data Streams as the storage layer and the containerized KCL application on AWS Fargate as the processing layer.

Streaming data processing architecture

This section gives a brief introduction to the solution’s architecture, as shown in the following diagram.

The architecture consists of four components:

  • Producer group (data ingestion)
  • Stream storage
  • Consumer group (stream processing)
  • Kinesis Data Streams auto scaling

Data ingestion

For ingesting data into the data stream, you use the KPL, which aggregates, compresses, and batches data records to make the ingestion more efficient. In this architecture, the KPL increased the per-shard throughput up to 100 times, compared to ingesting the records with the PutRecord API (more on this in the Monitoring your stream and applications section). This is because the records are smaller than 1 KB each and the example code uses the KPL to buffer and send a collection of records in one HTTP request.

The record buffering can consume enough memory to crash itself; therefore, we recommend handling back-pressure. A sample on handling back-pressure is available in the KPL GitHub repo.

Not every use case is suited for using the KPL for ingestion. Due to batching and aggregation, the KPL has to buffer records, and therefore introduces some additional per-record latency. For a large number of small producers (such as mobile applications), you should use the PutRecords API to batch records or implement a proxy that handles aggregation and batching.

In this post, you set up a simple HTTP endpoint that receives data records and processes them using the KPL. The producer application runs in a Docker container, which is orchestrated by Amazon ECS on AWS Fargate. A target tracking scaling policy manages the number of parallel running data ingestion containers. It adjusts the number of running containers so you maintain an average CPU utilization of 65%.

Stream storage: Kinesis Data Streams

As mentioned earlier, you can run a variety of streaming platforms on AWS. However, for the data processor in this post, you use Kinesis Data Streams. Kinesis Data Streams is a data store where the data is held for 24 hours and configurable up to 1 year. Kinesis Data Streams is designed to be highly available and redundant by storing data across three Availability Zones in the specified Region.

The stream consists of one or more shards, which are uniquely identified sequences of data records in a stream. One shard has a maximum of 2 MB/s in reads (up to five transactions) and 1 MB/s writes per second (up to 1,000 records per second). Consumers with Dedicated Throughput (Enhanced Fan-Out) support up to 2 MB/s data egress per consumer and shard.

Each record written to Kinesis Data Streams has a partition key, which is used to group data by shard. In this example, the data stream starts with five shards. You use random generated partition keys for the records because records don’t have to be in a specific shard. Kinesis Data Streams assigns a sequence number to each data record, which is unique within the partition key. Sequence numbers generally increase over time so you can identify which record was written to the stream before or after another.

Stream processing: KCL application on AWS Fargate

This post shows you how to use custom consumers—specifically, enhanced fan-out consumers—using the KCL. Enhanced fan-out consumers have a dedicated throughput of 2 MB/s and use a push model instead of pull to get data. Records are pushed to the consumer from the Kinesis Data Streams shards using HTTP/2 Server Push, which also reduces the latency for record processing. If you have more than one instance of a consumer, each instance has a 2 MB/s fan-out pipe to each shard independent from any other consumers. You can use enhanced fan-out consumers with the AWS SDK or the KCL.

For the producer application, this example uses the KPL, which aggregates and batches records. For the consumer to be able to process these records, the application needs to deaggregate the records. To do this, you can use the KCL or the Kinesis Producer Library Deaggeragtion Modules for AWS Lambda (support for Java, Node.js, Python, and Go). The KCL is a Java library but also supports other languages via a MultiLangDaemon. The MultiLangDaemon uses STDIN and STDOUT to communicate with the record processor, so be aware of logging limitations. For this sample application, you use enhanced fan-out consumers with the KCL for Python 2.0.1.

Due to the STDOUT limitation, the record processor logs data records to a file that is written to the container logs and published to Amazon CloudWatch. If you create your own record processor, make sure it handles exceptions, otherwise records may be skipped.

The KCL creates an Amazon DynamoDB table to keep track of consumer progress. For example, if your stream has four shards and you have one producer instance, your instance runs a separate record processor for each shard. If the consumer scales to two instances, the KCL rebalances the record processor and runs two record processors on each instance. For more information, see Using the Kinesis Client Library.

A target tracking scaling policy manages the number of parallel running data processor containers. It adjusts the number of running containers to maintain an average CPU utilization of 65%.

Container configuration

The base layer of the container is Amazon Linux 2 with Python 3 and Java 8. Although you use KCL for Python, you need Java because the record processor communicates with the MultiLangDaemon of the KCL.

During the Docker image build, the Python library for the KCL (version 2.0.1 of amazon_kclpy) is installed, and the sample application (release 2.0.1) from the KCL for Python GitHub repo is cloned. This allows you to use helper tools (samples/amazon_kclpy_helper.py) so you can focus on developing the record processor. The KCL is configured via a properties file (record_processor.properties).

For logging, you have to distinguish between logging of the MultiLangDaemon and the record processor. The logging configuration for the MultiLangDaemon is specified in logback.xml, whereas the record processor has its own logger. The record processor logs to a file and not to STDOUT, because the MultiLangDaemon uses STDOUT for communication, therefore the Daemon would throw an unrecognized messages error.

Logs written to a file (app/logs/record_processor.log) are attached to container logs by a subprocess that runs in the container entry point script (run.sh). The starting script also runs set_properties_py, which uses environment variables to set the AWS Region, stream name, and application name dynamically. If you want to also change other properties, you can extend this script.

The container gets its permissions (such as to read from Kinesis Data Streams and write to DynamoDB) by assuming the role ECSTaskConsumerRole01. This sample deployment uses 2 vCPU and 4 GB memory to run the container.

Kinesis capacity management

When changes in the rate of data flow occur, you may have to increase or decrease the capacity. With Kinesis Data Streams, you can have one or more hot shards as a result of unevenly distributed partition keys, very similar to a hot key in a database. This means that a certain shard receives more traffic than others, and if it’s overloaded, it produces a ProvisionedThroughputExceededException (enable detailed monitoring to see that metric on shard level).

You need to split these hot shards to increase throughput, and merge cold shards to increase efficiency. For this post, you use random partition keys (and therefore random shard assignment) for the records, so we don’t dive deeper into splitting and merging specific shards. Instead, we show how to increase and decrease throughput capacity for the whole stream. For more information about scaling on a shard level, see Strategies for Resharding.

You can build your own scaling application utilizing the UpdateShardCount, SplitShard, and MergeShards APIs or use the custom resource scaling solution as described in Scale Amazon Kinesis Data Streams with AWS Application Auto Scaling or Amazon Kineis Scaling Utils. The Application Auto Scaling is an event-driven scaling architecture based on CloudWatch alarms, and the Scaling Utils is a Docker container that constantly monitors your data stream. The Application Auto Scaling manages the number of shards for scaling, whereas the Kinesis Scaling Utils additionally handles shard keyspace allocations, hot shard splitting, and cold shard merging. For this solution, you use the Kinesis Scaling Utils and deploy it on Amazon ECS. You can also deploy it on AWS Elastic Beanstalk as a container or on an Apache Tomcat platform.

Prerequisites

For this walkthrough, you must have an AWS account.

Solution overview

In this post, we walk through the following steps:

  1. Deploying the CloudFormation template.
  2. Sending records to Kinesis Data Streams.
  3. Monitoring your stream and applications.

Deploying the CloudFormation template

Deploy the CloudFormation stack by choosing Launch Stack:

The template launches in the US East (N. Virginia) Region by default. To launch it in a different Region, use the Region selector in the console navigation bar. The following Regions are supported:

  • US East (Ohio)
  • US West (N. California)
  • US West (Oregon)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Europe (Frankfurt)
  • Europe (Ireland)

Alternatively, you can download the CloudFormation template and deploy it manually. When asked to provide an IPv4 CIDR range, enter the CIDR range that can send records to your application. You can change it later on by adapting the security groups inbound rule for the Application Load Balancer.

Sending records to Kinesis Data Streams

You have several options to send records to Kinesis Data Streams. You can do it from the CLI or any API client that can send REST requests, or use a load testing solution like Distributed Load Testing on AWS or Artillery. With load testing, additional charges for requests occur; as a guideline, 10,000 requests per second for 10 minutes generate an AWS bill of less than $5.00. To do a POST request via curl, run the following command and replace ALB_ENDPOINT with the DNS record of your Application Load Balancer. You can find it on the CloudFormation stack’s Outputs tab. Ensure you have a JSON element “data”. Otherwise, the application can’t process the record.

curl --location --request POST '&lt;ALB_ENDPOINT&gt;' --header 'Content-Type: application/json' --data-raw '{"data":" This is a testing record"}'

Your Application Load Balancer is the entry point for your data records, so all traffic has to pass through it. Application Load Balancers automatically scale to the appropriate size based on traffic by adding or removing different sized load balancer nodes.

Monitoring your stream and applications

The CloudFormation template creates a CloudWatch dashboard. You can find it on the CloudWatch console or by choosing the link on the stack’s Outputs tab on the CloudFormation console. The following screenshot shows the dashboard.

This dashboard shows metrics for the producer, consumer, and stream. The metric Consumer Behind Latest gives you the offset between current time and when the last record was written to the stream. An increase in this metric means that your consumer application can’t keep up with the rate records are ingested. For more information, see Consumer Record Processing Falling Behind.

The dashboard also shows you the average CPU utilization for the consumer and producer applications, the number of PutRecords API calls to ingest data into Kinesis Data Streams, and how many user records are ingested.

Without using the KPL, you would see one PutRecord equals one user record, but in our architecture, you should see a significantly higher number of user records than PutRecords. The ratio between UserRecords and PutRecords operations strongly depends on KPL configuration parameters. For example, if you increase the value of RecordMaxBufferedTime, data records are buffered longer at the producer, more records can be aggregated, but the latency for ingestion is increased.

All three applications (including the Kinesis Data Streams scaler) publish logs to their respective log group (for example, ecs/kinesis-data-processor-producer) in CloudWatch. You can either check the CloudWatch logs of the Auto Scaling Application or the data stream metrics to see the scaling behavior of Kinesis Data Streams.

Cleaning up

To avoid additional cost, ensure that the provisioned resources are decommissioned. To do that, delete the images in the Amazon Elastic Container Registry (Amazon ECR) repository, the CloudFormation stack, and any remaining resources that the CloudFormation stack didn’t automatically delete. Additionally, delete the DynamoDB table DataProcessorConsumer, which the KCL created.

Conclusion

In this post, you saw how to run the KCL for Python on AWS Fargate to consume data from Kinesis Data Streams. The post also showed you how to scale the data production layer (KPL), data storage layer (Kinesis Data Streams), and the stream processing layer (KCL). You can build your own data streaming solution by deploying the sample code from the GitHub repo. To get started with Kinesis Data Streams, see Getting Started with Amazon Kinesis Data Streams.


About the Author

Florian Mair is a Solutions Architect at AWS.He is a t echnologist that helps customers in Germany succeed and innovate by solving business challenges using AWS Cloud services. Besides working as a Solutions Architect, Florian is a passionate mountaineer, and has climbed some of the highest mountains across Europe.

Best practices for consuming Amazon Kinesis Data Streams using AWS Lambda

Post Syndicated from Dylan Qu original https://aws.amazon.com/blogs/big-data/best-practices-for-consuming-amazon-kinesis-data-streams-using-aws-lambda/

Many organizations are processing and analyzing clickstream data in real time from customer-facing applications to look for new business opportunities and identify security incidents in real time. A common practice is to consolidate and enrich logs from applications and servers in real time to proactively identify and resolve failure scenarios and significantly reduce application downtime. Internet of things (IOT) is also driving more adoption for real-time data processing. For example, a connected factory, connected cars, and smart spaces enable seamless sharing of information between people, machines, and sensors.

To help ingest real-time data or streaming data at large scales, you can use Amazon Kinesis Data Streams. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources. The data collected is available in milliseconds, enabling real-time analytics. You can use an AWS Lambda function to process records in a Kinesis data stream.

This post discusses common use cases for Lambda stream processing and describes how to optimize the integration between Kinesis Data Streams and Lambda at high throughput with low system overhead and processing latencies.

Using Lambda to process a Kinesis data stream

Before diving into best practices, we discuss good use cases for Lambda stream processing and anti-patterns.

When to use Lambda for Kinesis data stream processing

Lambda integrates natively with Kinesis Data Streams. The polling, checkpointing, and error handling complexities are abstracted when you use this native integration. This allows the Lambda function code to focus on business logic processing. For example, one application can take in IP addresses from the streaming records and enrich them with geographic fields. Another application can take in all system logs from the stream and filter out non-critical ones. Another common use case is to take in text-based system logs and transform them into JSON format.

One key pattern the previous examples share is that the transformation works on a per-record basis. You can still receive batches of records, but the transformation of the records happens individually.

When not to use Lambda for Kinesis data stream processing

By default, Lambda invocates one instance per Kinesis shard. Lambda invokes your function as soon as it has gathered a full batch, or until the batch window expires, as shown in the following diagram.

This means each Lambda invocation only holds records from one shard, so each Lambda invocation is ephemeral and there can be arbitrarily small batch windows for any invocation. Therefore, the following use cases are challenging for Lambda stream processing:

  • Correlation of events of different shards
  • Stateful stream processing, such as windowed aggregations
  • Buffering large volumes of streaming data before writing elsewhere

For the first two use cases, consider using Amazon Kinesis Data Analytics. Kinesis Data Analytics allows you to transform and analyze streaming data in real time. You can build sophisticated streaming applications with Apache Flink. Apache Flink is an open-source framework and engine for processing data streams. Kinesis Data Analytics takes care of everything required to run streaming applications continuously, and scales automatically to match the volume and throughput of your incoming data.

For the third use case, consider using Amazon Kinesis Data Firehose. Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores, and analytics services. It can capture, transform, and deliver streaming data to Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), generic HTTP endpoints, and service providers like Datadog, New Relic, MongoDB, and Splunk. Kinesis Data Firehose enables you to transform your data with Lambda before it’s loaded to data stores.

Developing a Lambda consumer with shared throughput or dedicated throughput

You can use Lambda in two different ways to consume data stream records: you can map a Lambda function to a shared-throughput consumer (standard iterator), or to a dedicated-throughput consumer with enhanced fan-out (EFO).

For standard iterators, Lambda service polls each shard in your stream one time per second for records using HTTP protocol. By default, Lambda invokes your function as soon as records are available in the stream. The invocated instances shares read throughput with other consumers of the shard. Each shard in a data stream provides 2 MB/second of read throughput. You can increase stream throughput by adding more shards. When it comes to latency, the Kinesis Data Streams GetRecords API has a five reads per second per shard limit. This means you can achieve 200-millisecond data retrieval latency for one consumer. With more consumer applications, propagation delay increases. For example, with five consumer applications, each can only retrieve records one time per second and each can retrieve less than 400 Kbps.

To minimize latency and maximize read throughput, you can create a data stream consumer with enhanced fan-out. An EFO consumer gets an isolated connection to the stream that provides a 2 MB/second outbound throughput. It doesn’t impact other applications reading from the stream. Stream consumers use HTTP/2 to push records to Lambda over a long-lived connection. Records can be delivered from producers to consumers in 70 milliseconds or better (a 65% improvement) in typical scenarios.

When to use shared throughput vs. dedicated throughput (EFO)

It’s advisable to use standard consumers when there are fewer (less than three) consuming applications and your use cases aren’t sensitive to latency. EFO is better for use cases that require low latency (70 milliseconds or better) for message delivery to consumer; this is achieved by automatic provisioning of an EFO pipe per consumer, which guarantees low latency irrespective of the number of consumers linked to the shard. EFO has cost dimensions associated with it; there is additional hourly charge per EFO consumer and charge for per GB of EFO data retrievals cost.

Monitoring ongoing stream processing

Kinesis Data Streams and Amazon CloudWatch are integrated so you can collect, view, and analyze CloudWatch metrics for your streaming application. It’s a best practice to make monitoring a priority to head off small problems before they become big ones. In this section, we discuss some key metrics to monitor.

Enhanced shard-level metrics

It’s a best practice to enable shard-level metrics with Kinesis Data Streams. As the name suggests, Kinesis Data Streams sends additional shard-level metrics to CloudWatch every minute. This can help you pinpoint failing consumers for a specific record or shards and identify hot shards. Enhanced shard-level metrics comes with additional cost. For information about pricing, see Amazon CloudWatch pricing.

IteratorAge

Make sure you keep a close eye on the IteratorAge (GetRecords.IteratorAgeMilliseconds) metric. Age is the difference between the current time and when the last record of the GetRecords call was written to the stream. If this value spikes, data processing from the stream is delayed. If the iterator age gets beyond your retention period, the expired records are permanently lost. Use CloudWatch alarms on the Maximum statistic to alert you before this loss is a risk.

The following screenshot shows a visualization of GetRecords.IteratorAgeMilliseconds.

In a single-source, multiple-consumer use case, each Lambda consumer reports its own IteratorAge metric. This helps identify the problematic consumer for further analysis.

You can find common causes and resolutions later in this post.

ReadProvisionedThroughputExceeded

The ReadProvisionedThroughputExceeded metric shows the count of GetRecords calls that have been throttled during a given time period. Use this metric to determine if your reads are being throttled due to exceeding your read throughput limits. If the Average statistic has a value other than 0, some of your consumers are throttled. You can add shards to the stream to increase throughput or use an EFO consumer to trigger your Lambda function.

Being aware of poison messages

A Lambda function is invoked for a batch of records from a shard and it checkpoints upon the success of each batch, so either a batch is processed successfully or entire batch is retried until processing is successful or records fall off the stream based on retention period. A poison message causes the failure of a batch process. It can create two possible scenarios: duplicates in the results, or delayed data processing and loss of data.

The following diagram illustrates when a poison message causes duplicates in the results. If there are 300 records in the data stream and batch size is 200, the Lambda instance is invoked to process the first 200 records. If processing fails at the eighty-third record, the entire batch is tried again, which can cause duplicates in the target for first 82 records depending on the target application.

The following diagram illustrates the problem of delayed data processing and data loss. If there are 300 records in the data stream and the batch size is 200, a Lambda instance is invoked to process the first 200 records until these records expire. This causes these records to be lost, and processing data in the queue is delayed significantly.

 

Addressing poison messages

There are two ways to handle failures gracefully. The first option is to implement logic in the Lambda function code to catch exceptions and log for offline analysis and return success to process the next batch. Exceptions can be logged to Amazon Simple Queue Service (Amazon SQS), CloudWatch Logs, Amazon S3, or other services.

 

The second (and recommended) option is to configure the following retry and failure behaviors settings with Lambda as the consumer for Kinesis Data Streams:

  • On-failure destination – Automatically send records to an SQS queue or Amazon Simple Notification Service (Amazon SNS) topic
  • Retry attempts – Control the maximum retries per batch
  • Maximum age of record – Control the maximum age of records to process
  • Split batch on error – Split every retry batch size to a narrow batch size that is retried to automatically home in on poison messages

Optimizing for performance

In this section, we discuss common causes for Lambda not being able to keep up with Kinesis Data Streams and how to fix it.

Lambda is hitting concurrency limit

Lambda has reached the maximum number of parallel runs within the account, which means that Lambda can’t instantiate additional instances of the function. To identify this, set up CloudWatch alarms on the Throttles metrics exposed by the function. To resolve this issue, consider assigning reserved concurrency to a particular function.

Lambda is throttled on egress throughput of a data stream

This can happen if there are more consumers for a data stream and not enough read provisioned throughput available. To identify this, monitor the ReadProvisionedThroughputExceeded metric and set up a CloudWatch alarm. One or more of the following options can help resolve this issue:

  • Add more shards and scale the data stream
  • Reduce the batch window to process messages more frequently
  • Use a consumer with enhanced fan-out 

Business logic in Lambda is taking too long

To address this issue, consider increasing memory assigned to the function or add shards to the data stream to increase parallelism.

Another approach is to enable concurrent Lambda invocations by configuring Parallelization Factor, a feature that allows more than one simultaneous Lambda invocation per shard. Lambda can process up to 10 batches in each shard simultaneously. Each parallelized batch contains messages with the same partition key. This means the record processing order is still maintained at the partition-key level. The following diagram illustrates this architecture.

For more information, see New AWS Lambda scaling controls for Kinesis and DynamoDB event sources.

Optimizing for cost

Kinesis Data Stream has the following cost components:

  • Shard hours
  • PUT payload units (charged for 25 KB per PUT into a data stream)
  • Extended data retention
  • Enhanced fan-out

One of the key components you can optimize is PUT payload limits. As mentioned earlier, you’re charged for each event you put in a data stream in 25 KB increments, so if you’re sending small messages, it’s advisable to aggregate messages to optimize cost. One of the ways to aggregate multiple small records into a large record is to use Kinesis Producer Library (KPL) aggregation.

The following is an example of a use case with and without record aggregation:

  • Without aggregation:
    • 1,000 records per second, with record size of 512 bytes each
    • Cost is $47.74 per month in us-east-1 Region (with $36.79 PUT payload units)
  • With aggregation:
    • 10 records per second, with records size of 50 kb each
    • Cost is $11.69 per month in us-east-1 Region (with $0.74 PUT payload units)

Another component to optimize is to increase batch windows, which fine-tunes Lambda invocation for cost-optimization.

Conclusion

In this post, we covered the following aspects of Kinesis Data Streams processing with Lambda:

  • Suitable use cases for Lambda stream processing
  • Shared throughput consumers vs. dedicated-throughput consumers (enhanced fan-out)
  • Monitoring
  • Error handling
  • Performance tuning
  • Cost-optimization

To learn more about Amazon Kinesis, see Getting Started with Amazon Kinesis. If you have questions or suggestions, please leave a comment.


About the Authors

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on Data Analytics, AI/ML and DevOps.

 

 

 

Vishwa Gupta is a Data and ML Engineer with AWS Professional Services Intelligence Practice. He helps customers implement big data and analytics solutions. Outside of work, he enjoys spending time with family, traveling, and playing badminton.

Detect change points in your event data stream using Amazon Kinesis Data Streams, Amazon DynamoDB and AWS Lambda

Post Syndicated from Marco Guerriero original https://aws.amazon.com/blogs/big-data/detect-change-points-in-your-event-data-stream-using-amazon-kinesis-data-streams-amazon-dynamodb-and-aws-lambda/

The success of many modern streaming applications depends on the ability to sequentially detect each change as soon as possible after it occurs, while continuing to monitor the data stream as it evolves. Applications of change point detection range across genomics, marketing, and finance, to name a few. In genomics, change point detection can help identify genes that are damaged. In marketing, we can identify things like customer churns in real time or when customer engagement changes over time. This is very useful in areas like online retail where, if change detection is implemented, we can adapt much more quickly to customer behavior. In finance, we can detect moments in time when stock prices have significantly changed. As opposed to online batch methodologies that take a batch of historical data and look for the change points in that data, we use a streaming procedure where we detect change points as fast as possible in real time as new data comes in.

In this post, we demonstrate automated event detection (AED), a fully automated, non-parametric, multiple change point detection algorithm that can operate on massive streaming data with low computational complexity, quickly reacting to the changes. Quick change detection [1] can help a system raise a timely alarm. Quickly reacting to a sudden fault arising in an industrial process or a production process could lead to significant savings in unplanned downtime. Detecting the onset of the outbreak of a disease, or the effect of a bio-terrorist attack, is critical, both for effective initiation of public health intervention measures and to timely alert government agencies and the general public.

In AED, no statistical assumption (such as Gaussian) is made on the generative process of the time series or data streams being processed. We rely on a class of tests called non-parametric or distribution-free tests [2, 3, 4, 5]. These tests are the natural choice for performing change point detection on data streams with unknown statistical distribution, which represents a common scenario that applies to a wide variety of real-world processes. We demonstrate a Python implementation of AED embedded within AWS Lambda over a data stream processed and managed by Amazon Kinesis Data Streams.

Understanding event detection

Let’s talk for a moment about anomaly detection, data drift, and change point detection in time series.

In machine learning (ML), we hear a lot about anomaly detection. In the context of a time series, that often means a data point that is outside the expected range. That range may have a static definition, such as “we never expect the temperature to exceed 130F,” or a dynamic definition, such as “it should not vary from recent averages by more than three standard deviations.” An example of a dynamic anomaly detection algorithm is Random Cut Forest.

In contrast, data drift describes a slower shift in the stream’s data distribution. It may mean a shift in the mean of a variable, such as a shift in temperatures between summer and winter, or a change in behavior of visitors to a website as certain product categories fall out of favor and others become fashionable. A frequently recommended approach for dealing with data drift is frequent retraining of an ML model, to keep recommendations fresh and relevant based on recent data.

The events AED focuses on are in a different category altogether: sudden shifts in the data stream to a new level, sometimes called regime change. These are occasions where human intervention may be desired to see whether automated responses are appropriate. For example, a slow shift in readings from a temperature sensor may be normal, whereas a sudden large change may be a sign of a major malfunction in progress. Other data streams that constantly fluctuate but where a sudden large change may require oversight include equipment monitoring and malfunctions, cyber attacks, stock market movements, power grids and outages, and operational variations in manufacturing processes, to name a few [1].

Our goal is to automatically detect multiple events when the time series or the data streams change their behavior. In this version, we focus on sudden changes in mean or offset in time series. AED is an unsupervised, automatic, statistically rigorous, real-time procedure for detecting relevant events, which doesn’t require any training data. It can also be used as an automatic process for feature extraction (such as time series segmentation) that can be used in downstream ML systems.

Automated event detection

AED operates sequentially as data comes in, continuously performing a novel statistical test to identify a change event—a significant shift in the data from the data that preceded that point in time. If an event is detected at time ti, all the previous data prior to ti is discarded. AED is then reset to process new data points starting with the (ti +1)th data point, looking for a new change event. This procedure is repeated sequentially until no more data points are available.

The core of AED is represented by the statistical test called the Modified Practical Pettitt (MPP) test. The MPP test is a variant of the Pettitt test [6], which is a very powerful and robust non-parametrical statistical procedure to detect a single change point in data streams where the distribution is completely unknown. Pettitt’s test is based on Null Hypothesis Significance Testing (NHST), which comes with some constraints and fallacies. In fact, a very important problem with NHST is that the result of the test doesn’t provide information about the magnitude of the effect of the event—which is key information we’re interested in. A consequence of this limit is that an event (change point) can be detected despite a very small effect. But is an increase of 0.001 Celsius degrees in temperature data streams coming from some IoT devices or a decrease of $1 in a particular stock price noteworthy events? Statistical significance doesn’t tell us anything about practical relevance. The MPP test solves this problem by incorporating a practical significance threshold into the definition of the decision test statistic. In this way, only events that are both statistically and practically significant are detected and reported by AED.

The decision statistic of the MPP test can be computed recursively, resulting in a computational complexity that is linear in the number of data points processed by AED up to the reset time.

Prerequisites

To get started, we need the following:

  • A continuous or discrete time series (data stream). No knowledge of the distribution of the data is needed.
  • Some guidance for the algorithm in terms of when to alert us:
    • The statistical confidence level we require that an event has occurred (p value).
    • The practical significance level we require (our practical significance threshold).

The output of AED is a list of change points specified by their time of occurrence, their p-value, and the magnitude of the offset.

Solution overview

The following diagram illustrates the AWS services used to implement the solution.

The steps in the process are as follows:

  1. One or more programs, devices, or sensors generate events and place them into a Kinesis data stream. These events make up a time series. In our case, we provide a sample generator function.
  2. The event recorder Lambda function consumes records from the data stream.
  3. The Lambda function stores them in an Amazon DynamoDB events table.
  4. The DynamoDB table streams the inserted events to the event detection Lambda function.
  5. The Lambda function checks each event to see whether this is a change point. To do so, it performs the following actions:
    • Reads the last change point recorded from the DynamoDB change points table (or creates one if this is the first data point for this device).
    • Reads the prior events since the last change point from the events table, as a time series.
    • Runs the event-detection algorithm over the retrieved time series.
  6. If a new change point is detected, the function does the following:
    • Writes the change point into the DynamoDB change points table, for later use.
    • Sends an Amazon Simple Notification Service (Amazon SNS) message to a topic with the change point details.

Setting up the solution infrastructure

To set up the infrastructure components and configure the connections, launch a predefined AWS CloudFormation stack. The stack sets up the following resources:

  • A Kinesis data stream.
  • Two Lambda functions: the event recorder, and the event detection. Each function has an associated AWS Identity and Access Management (IAM) role.
  • Two DynamoDB tables: one to hold events, and one for detected change points.
  • An SNS topic and a subscription, for notifying that a change point has been detected.

To see this solution in operation in the US East (N. Virginia) Region, launch the provided CloudFormation stack. The total solution costs approximately $0.02 per hour to run, depending on the volume of data sent to the components. Remember to delete the CloudFormation stack when you’re finished with the solution to avoid additional charges.

To run the stack, complete the following steps:

  1. Choose Launch Stack:
  1. Choose Next.
  2. Update the email address for notifications to be a valid email address of your choice.
  3. Update the following parameters for your environment if necessary, or, for now, leave the defaults:
    1. Environment parameters – Names for your DynamoDB tables, Kinesis stream, and SNS topic
    2. AED parameters – Startup, alpha level, change difference window, and practical significance threshold
  4. Choose Next.
  5. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  6. Choose Create stack.
  7. Wait for the CloudFormation stack to reach a status of CREATE COMPLETE (around 5 minutes).
  8. Check the Outputs tab for the resources created.

If you want to receive the SNS messages for detected change points, remember to confirm your SNS subscription for the email address.

Generating data

After AWS CloudFormation creates the system resources for us, we’re ready to start generating some data. For this post, we provide a client that generates a time series with periodic regime shifts. For simplicity and to show generality, the client also runs AED over the time series it creates. At the end, it generates a time series plot of the generated data along with the detected change points. These should match the SNS notifications you receive.

  1. To download and set up the test clients, copy the following files to your local desktop:
    1. time series client.py
    2. time series validation client.py
    3. AmazonAED.py
  2. Install boto3, numpy, and matplotlib, if you don’t already have them installed.

Two test clients are provided:

  • The time_series_client.py generates data and puts it into Kinesis Data Streams; this is what your device does.
  • For easier demonstration, we also provide a time_series_validation_client. This version also runs AED locally and generates an annotated plot showing the input data, where the change points are detected, and where the change actually occurred.

To run the validation client, open a command prompt to the folder where you copied the client files and run the following code:

python time_series_validation_client.py  -r us-east-1 -s aed_events_data_stream

You can pass the –h parameter to find out the other parameters. By default, the client generates 1 minute’s worth of data. The parameters let you configure the client for your environment, and, when running the validation client, for the local AED detection (which is independent of that being performed in the Lambda function).

You can use four parameters to configure AED’s behavior to match your environment:

  • “-a”, “–alpha” – The AED alpha level, that is, the smallest change to identify. The default is 0.01.
  • “-w”, “–window” – The AED change difference window, the number of points around the change to ignore. The default is 5.
  • “-x”, “–startup” – The number of samples to include in the AED initialization stage. The default is 5.
  • “-p”, “–practicalthreshold” – The AED practical significance threshold. Changes smaller than this are not designated as change points. The default is 0.

At the end of the time period, the client displays an image of the generated time series. The following time series is annotated with a small blue triangle whenever the client created a change point. The small red triangles mark the points where the AED algorithm, running independently over the generated data, detected the change points. The blue triangle always precedes the red triangle in our DetectBeforeDecide framework, where the AED algorithm has to first detect that something is changing (red triangle) and then work backwards to decide when in time the change most likely occurred (blue triangle).

The turquoise dot shows where the data generation was restarted; this change point may not be detected because AED isn’t running at that time. As you can see, each generated change point was detected. At the same time, the significant variation between the change points—in essence, within a single regime—didn’t cause a spurious change point detection.

The practical significance threshold additionally allows you to specify when the change is small enough that it’s not relevant and should be ignored.

You can also review the contents of the DynamoDB tables on the AWS Management Console (see the following screenshot). The table aed_stream_data shows each event in the stream logged. The aed_change_points table shows the individual change points detected, along with the timestamp of the detection and the data point at the time the change was detected by the Lambda function running over the stream data. This data lets you construct a history of the event stream and the change points, and manipulate them according to your need.

Each Lambda function is started by the Kinesis stream; it checks to see when the last change point occurred. It then retrieves the data since that change point from the aed_stream_data table and reprocesses it, looking for a new change point. As the gaps between change points get very large, this may become a large sequence to retrieve and process. If this is your use case, you may wish to artificially create a new set point every so often to reduce the amount of data that must be reread.

Cleaning up

To avoid additional charges, you should delete the CloudFormation stack after walking through the solution.

To implement AED in a different Region or to adapt it to your own needs, download the aed.zip file.

Conclusion

In this post, we introduced automated event detection (AED), a new, fast, scalable, fully automated, non-parametric, multiple change point detection service that can operate on massive streaming data with low computational complexity. AED efficiently identifies shifts in the data stream that have both statistical and practical significance. This ability to locate the time and magnitude of a shift in the data stream, and to differentiate it from normal data fluctuations, is key in many applications. The appropriate action varies by use case: it may be appropriate to take automated action, or call a human to evaluate whether the preplanned actions should be taken. A natural extension of AED would be to add an algorithm to detect trends (gradual changes as opposed to sudden shifts) in data streams using non-parametric tests such as Mann-Kendall [7, 8]. Combining these functions allows you to identify both kinds of changes in your incoming data stream and distinguish between them, further broadening the use cases.

In this post, we demonstrated a Python implementation of AED embedded within Lambda over a data stream processed and managed by Kinesis Data Streams. We also provided a standalone client implementation to show an alternate implementation. The code is available for you to download and integrate into your applications. Do you have any data streams where sudden shifts may happen? Do you want to know when they happen? Are there automated actions that should be taken, or should someone be alerted? If your answer to any of these questions is “Yes!” then consider implementing AED.

If you have any comments about this post, submit them in the comments section.

References

[1] H. V. Poor and O. Hadjiliadis (2009). Quickest detection. Cambridge University Press, 2009.

[2] Brodsky, E., and Boris S. Darkhovsky (2013). Nonparametric methods in change point problems. Vol. 243. Springer Science & Business Media.

[3] Csörgö, Miklós, and Lajos Horváth (1997). Limit theorems in change-point analysis. Vol. 18. John Wiley & Sons Inc,.

[4] Ross GJ, Tasoulis DK, Adams NM (2011). Nonparametric Monitoring of Data Streams for Changes in Location and Scale. Technometrics, 53(4), 379–389.

[5] Chu, Lynna, and Hao Chen (2019). Asymptotic distribution-free change-point detection for multivariate and non-euclidean data. The Annals of Statistics 47.1, 382-414.

[6] Pettitt AN (1979). A Non-Parametric Approach to the Change-Point Problem. Journal of the Royal Statistical Society C, 28(2), 126–135.

[7] H. B. Mann (1945). Nonparametric tests against trend, Econometrica, vol. 13, pp. 245–259, 1945.

[8] M. G. Kendal (1975). Rank Correlation Methods, Griffin, London, UK, 1975


About the Authors

Marco Guerriero, PhD, is a Practice Manager for Emergent Technologies and Intelligence Platform for AWS Professional Services. I love working on ways for emergent technologies such as AI/ML, Big Data, IoT, and Quantum to help businesses across different industry vertical succeed within their innovation journey.

 

 

 

 

Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Customer Packaging Experience. Until recently, she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

Rapid and flexible Infrastructure as Code using the AWS CDK with AWS Solutions Constructs

Post Syndicated from Biff Gaut original https://aws.amazon.com/blogs/devops/rapid-flexible-infrastructure-with-solutions-constructs-cdk/

Introduction

As workloads move to the cloud and all infrastructure becomes virtual, infrastructure as code (IaC) becomes essential to leverage the agility of this new world. JSON and YAML are the powerful, declarative modeling languages of AWS CloudFormation, allowing you to define complex architectures using IaC. Just as higher level languages like BASIC and C abstracted away the details of assembly language and made developers more productive, the AWS Cloud Development Kit (AWS CDK) provides a programming model above the native template languages, a model that makes developers more productive when creating IaC. When you instantiate CDK objects in your Typescript (or Python, Java, etc.) application, those objects “compile” into a YAML template that the CDK deploys as an AWS CloudFormation stack.

AWS Solutions Constructs take this simplification a step further by providing a library of common service patterns built on top of the CDK. These multi-service patterns allow you to deploy multiple resources with a single object, resources that follow best practices by default – both independently and throughout their interaction.

Comparison of an Application stack with Assembly Language, 4th generation language and Object libraries such as Hibernate with an IaC stack of CloudFormation, AWS CDK and AWS Solutions Constructs

Application Development Stack vs. IaC Development Stack

Solution overview

To demonstrate how using Solutions Constructs can accelerate the development of IaC, in this post you will create an architecture that ingests and stores sensor readings using Amazon Kinesis Data Streams, AWS Lambda, and Amazon DynamoDB.

An architecture diagram showing sensor readings being sent to a Kinesis data stream. A Lambda function will receive the Kinesis records and store them in a DynamoDB table.

Prerequisite – Setting up the CDK environment

Tip – If you want to try this example but are concerned about the impact of changing the tools or versions on your workstation, try running it on AWS Cloud9. An AWS Cloud9 environment is launched with an AWS Identity and Access Management (AWS IAM) role and doesn’t require configuring with an access key. It uses the current region as the default for all CDK infrastructure.

To prepare your workstation for CDK development, confirm the following:

  • Node.js 10.3.0 or later is installed on your workstation (regardless of the language used to write CDK apps).
  • You have configured credentials for your environment. If you’re running locally you can do this by configuring the AWS Command Line Interface (AWS CLI).
  • TypeScript 2.7 or later is installed globally (npm -g install typescript)

Before creating your CDK project, install the CDK toolkit using the following command:

npm install -g aws-cdk

Create the CDK project

  1. First create a project folder called stream-ingestion with these two commands:

mkdir stream-ingestion
cd stream-ingestion

  1. Now create your CDK application using this command:

npx [email protected] init app --language=typescript

Tip – This example will be written in TypeScript – you can also specify other languages for your projects.

At this time, you must use the same version of the CDK and Solutions Constructs. We’re using version 1.68.0 of both based upon what’s available at publication time, but you can update this with a later version for your projects in the future.

Let’s explore the files in the application this command created:

  • bin/stream-ingestion.ts – This is the module that launches the application. The key line of code is:

new StreamIngestionStack(app, 'StreamIngestionStack');

This creates the actual stack, and it’s in StreamIngestionStack that you will write the CDK code that defines the resources in your architecture.

  • lib/stream-ingestion-stack.ts – This is the important class. In the constructor of StreamIngestionStack you will add the constructs that will create your architecture.

During the deployment process, the CDK uploads your Lambda function to an Amazon S3 bucket so it can be incorporated into your stack.

  1. To create that S3 bucket and any other infrastructure the CDK requires, run this command:

cdk bootstrap

The CDK uses the same supporting infrastructure for all projects within a region, so you only need to run the bootstrap command once in any region in which you create CDK stacks.

  1. To install the required Solutions Constructs packages for our architecture, run the these two commands from the command line:

npm install @aws-solutions-constructs/[email protected]
npm install @aws-solutions-constructs/[email protected]

Write the code

First you will write the Lambda function that processes the Kinesis data stream messages.

  1. Create a folder named lambda under stream-ingestion
  2. Within the lambda folder save a file called lambdaFunction.js with the following contents:
var AWS = require("aws-sdk");

// Create the DynamoDB service object
var ddb = new AWS.DynamoDB({ apiVersion: "2012-08-10" });

AWS.config.update({ region: process.env.AWS_REGION });

// We will configure our construct to 
// look for the .handler function
exports.handler = async function (event) {
  try {
    // Kinesis will deliver records 
    // in batches, so we need to iterate through
    // each record in the batch
    for (let record of event.Records) {
      const reading = parsePayload(record.kinesis.data);
      await writeRecord(record.kinesis.partitionKey, reading);
    };
  } catch (err) {
    console.log(`Write failed, err:\n${JSON.stringify(err, null, 2)}`);
    throw err;
  }
  return;
};

// Write the provided sensor reading data to the DynamoDB table
async function writeRecord(partitionKey, reading) {

  var params = {
    // Notice that Constructs automatically sets up 
    // an environment variable with the table name.
    TableName: process.env.DDB_TABLE_NAME,
    Item: {
      partitionKey: { S: partitionKey },  // sensor Id
      timestamp: { S: reading.timestamp },
      value: { N: reading.value}
    },
  };

  // Call DynamoDB to add the item to the table
  await ddb.putItem(params).promise();
}

// Decode the payload and extract the sensor data from it
function parsePayload(payload) {

  const decodedPayload = Buffer.from(payload, "base64").toString(
    "ascii"
  );

  // Our CLI command will send the records to Kinesis
  // with the values delimited by '|'
  const payloadValues = decodedPayload.split("|", 2)
  return {
    value: payloadValues[0],
    timestamp: payloadValues[1]
  }
}

We won’t spend a lot of time explaining this function – it’s pretty straightforward and heavily commented. It receives an event with one or more sensor readings, and for each reading it extracts the pertinent data and saves it to the DynamoDB table.

You will use two Solutions Constructs to create your infrastructure:

The aws-kinesisstreams-lambda construct deploys an Amazon Kinesis data stream and a Lambda function.

  • aws-kinesisstreams-lambda creates the Kinesis data stream and Lambda function that subscribes to that stream. To support this, it also creates other resources, such as IAM roles and encryption keys.

The aws-lambda-dynamodb construct deploys a Lambda function and a DynamoDB table.

  • aws-lambda-dynamodb creates an Amazon DynamoDB table and a Lambda function with permission to access the table.
  1. To deploy the first of these two constructs, replace the code in lib/stream-ingestion-stack.ts with the following code:
import * as cdk from "@aws-cdk/core";
import * as lambda from "@aws-cdk/aws-lambda";
import { KinesisStreamsToLambda } from "@aws-solutions-constructs/aws-kinesisstreams-lambda";

import * as ddb from "@aws-cdk/aws-dynamodb";
import { LambdaToDynamoDB } from "@aws-solutions-constructs/aws-lambda-dynamodb";

export class StreamIngestionStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const kinesisLambda = new KinesisStreamsToLambda(
      this,
      "KinesisLambdaConstruct",
      {
        lambdaFunctionProps: {
          // Where the CDK can find the lambda function code
          runtime: lambda.Runtime.NODEJS_10_X,
          handler: "lambdaFunction.handler",
          code: lambda.Code.fromAsset("lambda"),
        },
      }
    );

    // Next Solutions Construct goes here
  }
}

Let’s explore this code:

  • It instantiates a new KinesisStreamsToLambda object. This Solutions Construct will launch a new Kinesis data stream and a new Lambda function, setting up the Lambda function to receive all the messages in the Kinesis data stream. It will also deploy all the additional resources and policies required for the architecture to follow best practices.
  • The third argument to the constructor is the properties object, where you specify overrides of default values or any other information the construct needs. In this case you provide properties for the encapsulated Lambda function that informs the CDK where to find the code for the Lambda function that you stored as lambda/lambdaFunction.js earlier.
  1. Now you’ll add the second construct that connects the Lambda function to a new DynamoDB table. In the same lib/stream-ingestion-stack.ts file, replace the line // Next Solutions Construct goes here with the following code:
    // Define the primary key for the new DynamoDB table
    const primaryKeyAttribute: ddb.Attribute = {
      name: "partitionKey",
      type: ddb.AttributeType.STRING,
    };

    // Define the sort key for the new DynamoDB table
    const sortKeyAttribute: ddb.Attribute = {
      name: "timestamp",
      type: ddb.AttributeType.STRING,
    };

    const lambdaDynamoDB = new LambdaToDynamoDB(
      this,
      "LambdaDynamodbConstruct",
      {
        // Tell construct to use the Lambda function in
        // the first construct rather than deploy a new one
        existingLambdaObj: kinesisLambda.lambdaFunction,
        tablePermissions: "Write",
        dynamoTableProps: {
          partitionKey: primaryKeyAttribute,
          sortKey: sortKeyAttribute,
          billingMode: ddb.BillingMode.PROVISIONED,
          removalPolicy: cdk.RemovalPolicy.DESTROY
        },
      }
    );

    // Add autoscaling
    const readScaling = lambdaDynamoDB.dynamoTable.autoScaleReadCapacity({
      minCapacity: 1,
      maxCapacity: 50,
    });

    readScaling.scaleOnUtilization({
      targetUtilizationPercent: 50,
    });

Let’s explore this code:

  • The first two const objects define the names and types for the partition key and sort key of the DynamoDB table.
  • The LambdaToDynamoDB construct instantiated creates a new DynamoDB table and grants access to your Lambda function. The key to this call is the properties object you pass in the third argument.
    • The first property sent to LambdaToDynamoDB is existingLambdaObj – by setting this value to the Lambda function created by KinesisStreamsToLambda, you’re telling the construct to not create a new Lambda function, but to grant the Lambda function in the other Solutions Construct access to the DynamoDB table. This illustrates how you can chain many Solutions Constructs together to create complex architectures.
    • The second property sent to LambdaToDynamoDB tells the construct to limit the Lambda function’s access to the table to write only.
    • The third property sent to LambdaToDynamoDB is actually a full properties object defining the DynamoDB table. It provides the two attribute definitions you created earlier as well as the billing mode. It also sets the RemovalPolicy to DESTROY. This policy setting ensures that the table is deleted when you delete this stack – in most cases you should accept the default setting to protect your data.
  • The last two lines of code show how you can use statements to modify a construct outside the constructor. In this case we set up auto scaling on the new DynamoDB table, which we can access with the dynamoTable property on the construct we just instantiated.

That’s all it takes to create the all resources to deploy your architecture.

  1. Save all the files, then compile the Typescript into a CDK program using this command:

npm run build

  1. Finally, launch the stack using this command:

cdk deploy

(Enter “y” in response to Do you wish to deploy all these changes (y/n)?)

You will see some warnings where you override CDK default values. Because you are doing this intentionally you may disregard these, but it’s always a good idea to review these warnings when they occur.

Tip – Many mysterious CDK project errors stem from mismatched versions. If you get stuck on an inexplicable error, check package.json and confirm that all CDK and Solutions Constructs libraries have the same version number (with no leading caret ^). If necessary, correct the version numbers, delete the package-lock.json file and node_modules tree and run npm install. Think of this as the “turn it off and on again” first response to CDK errors.

You have now deployed the entire architecture for the demo – open the CloudFormation stack in the AWS Management Console and take a few minutes to explore all 12 resources that the program deployed (and the 380 line template generated to created them).

Feed the Stream

Now use the CLI to send some data through the stack.

Go to the Kinesis Data Streams console and copy the name of the data stream. Replace the stream name in the following command and run it from the command line.

aws kinesis put-records \
--stream-name StreamIngestionStack-KinesisLambdaConstructKinesisStreamXXXXXXXX-XXXXXXXXXXXX \
--records \
PartitionKey=1301,'Data=15.4|2020-08-22T01:16:36+00:00' \
PartitionKey=1503,'Data=39.1|2020-08-22T01:08:15+00:00'

Tip – If you are using the AWS CLI v2, the previous command will result in an “Invalid base64…” error because v2 expects the inputs to be Base64 encoded by default. Adding the argument --cli-binary-format raw-in-base64-out will fix the issue.

To confirm that the messages made it through the service, open the DynamoDB console – you should see the two records in the table.

Now that you’ve got it working, pause to think about what you just did. You deployed a system that can ingest and store sensor readings and scale to handle heavy loads. You did that by instantiating two objects – well under 60 lines of code. Experiment with changing some property values and deploying the changes by running npm run build and cdk deploy again.

Cleanup

To clean up the resources in the stack, run this command:

cdk destroy

Conclusion

Just as languages like BASIC and C allowed developers to write programs at a higher level of abstraction than assembly language, the AWS CDK and AWS Solutions Constructs allow us to create CloudFormation stacks in Typescript, Java, or Python instead JSON or YAML. Just as there will always be a place for assembly language, there will always be situations where we want to write CloudFormation templates manually – but for most situations, we can now use the AWS CDK and AWS Solutions Constructs to create complex and complete architectures in a fraction of the time with very little code.

AWS Solutions Constructs can currently be used in CDK applications written in Typescript, Javascript, Java and Python and will be available in C# applications soon.

About the Author

Biff Gaut has been shipping software since 1983, from small startups to large IT shops. Along the way he has contributed to 2 books, spoken at several conferences and written many blog posts. He is now a Principal Solutions Architect at AWS working on the AWS Solutions Constructs team, helping customers deploy better architectures more quickly.

ICYMI: Serverless Q3 2020

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-q3-2020/

Welcome to the 11th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

Q3 Calendar

In case you missed our last ICYMI, checkout what happened last quarter here.

AWS Lambda

MSK trigger in Lambda

In August, we launched support for using Amazon Managed Streaming for Apache Kafka (Amazon MSK) as an event source for Lambda functions. Lambda has existing support for processing streams from Kinesis and DynamoDB. Now you can process data streams from Amazon MSK and easily integrate with downstream serverless workflows. This integration allows you to process batches of records, one per partition at a time, and scale concurrency by increasing the number of partitions in a topic.

We also announced support for Java 8 (Corretto) in Lambda, and you can now use Amazon Linux 2 for custom runtimes. Amazon Linux 2 is the latest generation of Amazon Linux and provides an application environment with access to the latest innovations in the Linux ecosystem.

Amazon API Gateway

API integrations

API Gateway continued to launch new features for HTTP APIs, including new integrations for five AWS services. HTTP APIs can now route requests to AWS AppConfig, Amazon EventBridge, Amazon Kinesis Data Streams, Amazon SQS, and AWS Step Functions. This makes it easy to create webhooks for business logic hosted in these services. The service also expanded the authorization capabilities, adding Lambda and IAM authorizers, and enabled wildcards in custom domain names. Over time, we will continue to improve and migrate features from REST APIs to HTTP APIs.

In September, we launched mutual TLS for both regional REST APIs and HTTP APIs. This is a new method for client-to-server authentication to enhance the security of your API. It can protect your data from exploits such as client spoofing or man-in-the-middle. This enforces two-way TLS (or mTLS) which enables certificate-based authentication both ways from client-to-server and server-to-client.

Enhanced observability variables now make it easier to troubleshoot each phase of an API request. Each phase from AWS WAF through to integration adds latency to a request, returns a status code, or raises an error. Developers can use these variables to identify the cause of latency within the API request. You can configure these variables in AWS SAM templates – see the demo application to see how you can use these variables in your own application.

AWS Step Functions

X-Ray tracing in Step Functions

We added X-Ray tracing support for Step Functions workflows, giving you full visibility across state machine executions, making it easier to analyze and debug distributed applications. Using the service map view, you can visually identify errors in resources and view error rates across workflow executions. You can then drill into the root cause of an error. You can enable X-Ray in existing workflows by a single-click in the console. Additionally, you can now also visualize Step Functions workflows directly in the Lambda console. To see this new feature, open the Step Functions state machines page in the Lambda console.

Step Functions also increased the payload size to 256 KB and added support for string manipulation, new comparison operators, and improved output processing. These updates were made to the Amazon States Languages (ASL), which is a JSON-based language for defining state machines. The new operators include comparison operators, detecting the existence of a field, wildcarding, and comparing two input fields.

AWS Serverless Application Model (AWS SAM)

AWS SAM goes GA

AWS SAM is an open source framework for building serverless applications that converts a shorthand syntax into CloudFormation resources.

In July, the AWS SAM CLI became generally available (GA). This tool operates on SAM templates and provides developers with local tooling for building serverless applications. The AWS SAM CLI offers a rich set of tools that enable developers to build serverless applications quickly.

AWS X-Ray

X-Ray Insights

X-Ray launched a public preview of X-Ray Insights, which can help produce actionable insights for anomalies within your applications. Designed to make it easier to analyze and debug distributed applications, it can proactively identify issues caused by increases in faults. Using the incident timeline, you can visualize when the issue started and how it developed. The service identifies a probable root cause along with any anomalous services. There is no additional instrumentation needed to use X-Ray Insights – you can enable this feature within X-Ray Groups.

Amazon Kinesis

In July, Kinesis announced support for data delivery to generic HTTP endpoints, and service providers like Datadog, New Relic, MongoDB, and Splunk. Use the Amazon Kinesis console to configure your data producers to send data to Amazon Kinesis Data Firehose and specify one of these new delivery targets. Additionally, Amazon Kinesis Data Firehose is now available in the Europe (Milan) and Africa (Cape Town) AWS Regions.

Serverless Posts

Our team is always working to build and write content to help our customers better understand all our serverless offerings. Here is a list of the latest posts published to the AWS Compute Blog this quarter.

July

August

September

Tech Talks & Events

We hold several AWS Online Tech Talks covering serverless tech talks throughout the year, so look out for them in the Serverless section of the AWS Online Tech Talks page. We also regularly deliver talks at conferences and events around the globe, regularly join in on podcasts, and record short videos you can find to learn in quick byte sized chunks.

Here are some from Q3:

Learning Paths

Ask Around Me

Learn How to Build and Deploy a Web App Backend that Supports Authentication, Geohashing, and Real-Time Messaging

Ask Around Me is an example web app that shows how to build authenticaton, geohashing and real-time messaging into your serverless applications. This learning path includes videos and learning resources to help walk you through the application.

Build a Serverless Web App for a Theme Park

This five-video learning path walks you through the Innovator Island workshop, and provides learning resources for building realtime serverless web applications.

Live streams

July

August

September

There are also a number of other helpful video series covering serverless available on the Serverless Land YouTube channel.

New AWS Serverless Heroes

Serverless Heroes Q3 2020

We’re pleased to welcome Angela Timofte, Luca Bianchi, Matthieu Napoli, Peter Hanssens, Sheen Brisals, and Tom McLaughlin to the growing list of AWS Serverless Heroes.

The AWS Hero program is a selection of worldwide experts that have been recognized for their positive impact within the community. They share helpful knowledge and organize events and user groups. They’re also contributors to numerous open-source projects in and around serverless technologies.

New! The Serverless Land website

Serverless Land

To help developers find serverless learning resources, we have curated a list of serverless blogs, videos, events and training programs at a new site, Serverless Land. This is regularly updated with new information – you can subscribe to the RSS feed for automatic updates, follow the LinkedIn page or subscribe to the YouTube channel.

Still looking for more?

The Serverless landing page has lots of information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow all of us on Twitter to see the latest news, follow conversations, and interact with the team.

Unified serverless streaming ETL architecture with Amazon Kinesis Data Analytics

Post Syndicated from Ram Vittal original https://aws.amazon.com/blogs/big-data/unified-serverless-streaming-etl-architecture-with-amazon-kinesis-data-analytics/

Businesses across the world are seeing a massive influx of data at an enormous pace through multiple channels. With the advent of cloud computing, many companies are realizing the benefits of getting their data into the cloud to gain meaningful insights and save costs on data processing and storage. As businesses embark on their journey towards cloud solutions, they often come across challenges involving building serverless, streaming, real-time ETL (extract, transform, load) architecture that enables them to extract events from multiple streaming sources, correlate those streaming events, perform enrichments, run streaming analytics, and build data lakes from streaming events.

In this post, we discuss the concept of unified streaming ETL architecture using a generic serverless streaming architecture with Amazon Kinesis Data Analytics at the heart of the architecture for event correlation and enrichments. This solution can address a variety of streaming use cases with various input sources and output destinations. We then walk through a specific implementation of the generic serverless unified streaming architecture that you can deploy into your own AWS account for experimenting and evolving this architecture to address your business challenges.

Overview of solution

As data sources grow in volume, variety, and velocity, the management of data and event correlation become more challenging. Most of the challenges stem from data silos, in which different teams and applications manage data and events using their own tools and processes.

Modern businesses need a single, unified view of the data environment to get meaningful insights through streaming multi-joins, such as the correlation of sensory events and time-series data. Event correlation plays a vital role in automatically reducing noise and allowing the team to focus on those issues that really matter to the business objectives.

To realize this outcome, the solution proposes creating a three-stage architecture:

  • Ingestion
  • Processing
  • Analysis and visualization

The source can be a varied set of inputs comprising structured datasets like databases or raw data feeds like sensor data that can be ingested as single or multiple parallel streams. The solution envisions multiple hybrid data sources as well. After it’s ingested, the data is divided into single or multiple data streams depending on the use case and passed through a preprocessor (via an AWS Lambda function). This highly customizable processor transforms and cleanses data to be processed through analytics application. Furthermore, the architecture allows you to enrich data or validate it against standard sets of reference data, for example validating against postal codes for address data received from the source to verify its accuracy. After the data is processed, it’s sent to various sink platforms depending on your preferences, which could range from storage solutions to visualization solutions, or even stored as a dataset in a high-performance database.

The solution is designed with flexibility as a key tenant to address multiple, real-world use cases. The following diagram illustrates the solution architecture.

The architecture has the following workflow:

  1. We use AWS Database Migration Service (AWS DMS) to push records from the data source into AWS in real time or batch. For our use case, we use AWS DMS to fetch records from an on-premises relational database.
  2. AWS DMS writes records to Amazon Kinesis Data Streams. The data is split into multiple streams as necessitated through the channels.
  3. A Lambda function picks up the data stream records and preprocesses them (adding the record type). This is an optional step, depending on your use case.
  4. Processed records are sent to the Kinesis Data Analytics application for querying and correlating in-application streams, taking into account Amazon Simple Storage Service (Amazon S3) reference data for enrichment.

Solution walkthrough

For this post, we demonstrate an implementation of the unified streaming ETL architecture using Amazon RDS for MySQL as the data source and Amazon DynamoDB as the target. We use a simple order service data model that comprises orders, items, and products, where an order can have multiple items and the product is linked to an item in a reference relationship that provides detail about the item, such as description and price.

We implement a streaming serverless data pipeline that ingests orders and items as they are recorded in the source system into Kinesis Data Streams via AWS DMS. We build a Kinesis Data Analytics application that correlates orders and items along with reference product information and creates a unified and enriched record. Kinesis Data Analytics outputs output this unified and enriched data to Kinesis Data Streams. A Lambda function consumer processes the data stream and writes the unified and enriched data to DynamoDB.

To launch this solution in your AWS account, use the GitHub repo.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Setting up AWS resources in your account

To set up your resources for this walkthrough, complete the following steps:

  1. Set up the AWS CDK for Java on your local workstation. For instructions, see Getting Started with the AWS CDK.
  2. Install Maven binaries for Java if you don’t have Maven installed already.
  3. If this is the first installation of the AWS CDK, make sure to run cdk bootstrap.
  4. Clone the following GitHub repo.
  5. Navigate to the project root folder and run the following commands to build and deploy:
    1. mvn compile
    2. cdk deploy UnifiedStreamETLCommonStack UnifiedStreamETLDataStack UnifiedStreamETLProcessStack

Setting up the orders data model for CDC

In this next step, you set up the orders data model for change data capture (CDC).

  1. On the Amazon Relational Database Service (Amazon RDS) console, choose Databases.
  2. Choose your database and make sure that you can connect to it securely for testing using bastion host or other mechanisms (not detailed in scope of this post).
  3. Start MySQL Workbench and connect to your database using your DB endpoint and credentials.
  4. To create the data model in your Amazon RDS for MySQL database, run orderdb-setup.sql.
  5. On the AWS DMS console, test the connections to your source and target endpoints.
  6. Choose Database migration tasks.
  7. Choose your AWS DMS task and choose Table statistics.
  8. To update your table statistics, restart the migration task (with full load) for replication.
  9. From your MySQL Workbench session, run orders-data-setup.sql to create orders and items.
  10. Verify that CDC is working by checking the Table statistics

Setting up your Kinesis Data Analytics application

To set up your Kinesis Data Analytics application, complete the following steps:

  1. Upload the product reference products.json to your S3 bucket with the logical ID prefix unifiedBucketId (which was previously created by cdk deploy).

You can now create a Kinesis Data Analytics application and map the resources to the data fields.

  1. On the Amazon Kinesis console, choose Analytics Application.
  2. Choose Create application.
  3. For Runtime, choose SQL.
  4. Connect the streaming data created using the AWS CDK as a unified order stream.
  5. Choose Discover schema and wait for it to discover the schema for the unified order stream. If discovery fails, update the records on the source Amazon RDS tables and send streaming CDC records.
  6. Save and move to the next step.
  7. Connect the reference S3 bucket you created with the AWS CDK and uploaded with the reference data.
  8. Input the following:
    1. “products.json” on the path to the S3 object
    2. Products on the in-application reference table name
  9. Discover the schema, then save and close.
  10. Choose SQL Editor and start the Kinesis Data Analytics application.
  11. Edit the schema for SOURCE_SQL_STREAM_001 and map the data resources as follows:
Column Name Column Type Row Path
orderId INTEGER $.data.orderId
itemId INTEGER $.data.orderId
itemQuantity INTEGER $.data.itemQuantity
itemAmount REAL $.data.itemAmount
itemStatus VARCHAR $.data.itemStatus
COL_timestamp VARCHAR $.metadata.timestamp
recordType VARCHAR $.metadata.table-name
operation VARCHAR $.metadata.operation
partitionkeytype VARCHAR $.metadata.partition-key-type
schemaname VARCHAR $.metadata.schema-name
tablename VARCHAR $.metadata.table-name
transactionid BIGINT $.metadata.transaction-id
orderAmount DOUBLE $.data.orderAmount
orderStatus VARCHAR $.data.orderStatus
orderDateTime TIMESTAMP $.data.orderDateTime
shipToName VARCHAR $.data.shipToName
shipToAddress VARCHAR $.data.shipToAddress
shipToCity VARCHAR $.data.shipToCity
shipToState VARCHAR $.data.shipToState
shipToZip VARCHAR $.data.shipToZip

 

  1. Choose Save schema and update stream samples.

When it’s complete, verify for 1 minute that nothing is in the error stream. If an error occurs, check that you defined the schema correctly.

  1. On your Kinesis Data Analytics application, choose your application and choose Real-time analytics.
  2. Go to the SQL results and run kda-orders-setup.sql to create in-application streams.
  3. From the application, choose Connect to destination.
  4. For Kinesis data stream, choose unifiedOrderEnrichedStream.
  5. For In-application stream, choose ORDER_ITEM_ENRICHED_STREAM.
  6. Choose Save and Continue.

Testing the unified streaming ETL architecture

You’re now ready to test your architecture.

  1. Navigate to your Kinesis Data Analytics application.
  2. Choose your app and choose Real-time analytics.
  3. Go to the SQL results and choose Real-time analytics.
  4. Choose the in-application stream ORDER_ITEM_ENRCIHED_STREAM to see the results of the real-time join of records from the order and order item streaming Kinesis events.
  5. On the Lambda console, search for UnifiedStreamETLProcess.
  6. Choose the function and choose Monitoring, Recent invocations.
  7. Verify the Lambda function run results.
  8. On the DynamoDB console, choose the OrderEnriched table.
  9. Verify the unified and enriched records that combine order, item, and product records.

The following screenshot shows the OrderEnriched table.

Operational aspects

When you’re ready to operationalize this architecture for your workloads, you need to consider several aspects:

  • Monitoring metrics for Kinesis Data Streams: GetRecords.IteratorAgeMilliseconds, ReadProvisionedThroughputExceeded, and WriteProvisionedThroughputExceeded
  • Monitoring metrics available for the Lambda function, including but not limited to Duration, IteratorAge, Error count and success rate (%), Concurrent executions, and Throttles
  • Monitoring metrics for Kinesis Data Analytics (millisBehindLatest)
  • Monitoring DynamoDB provisioned read and write capacity units
  • Using the DynamoDB automatic scaling feature to automatically manage throughput

We used the solution architecture with the following configuration settings to evaluate the operational performance:

  • Kinesis OrdersStream with two shards and Kinesis OrdersEnrichedStream with two shards
  • The Lambda function code does asynchronous processing with Kinesis OrdersEnrichedStream records in concurrent batches of five, with batch size as 500
  • DynamoDB provisioned WCU is 3000, RCU is 300

We observed the following results:

  • 100,000 order items are enriched with order event data and product reference data and persisted to DynamoDB
  • An average of 900 milliseconds latency from the time of event ingestion to the Kinesis pipeline to when the record landed in DynamoDB

The following screenshot shows the visualizations of these metrics.

Cleaning up

To avoid incurring future charges, delete the resources you created as part of this post (the AWS CDK provisioned AWS CloudFormation stacks).

Conclusion

In this post, we designed a unified streaming architecture that extracts events from multiple streaming sources, correlates and performs enrichments on events, and persists those events to destinations. We then reviewed a use case and walked through the code for ingesting, correlating, and consuming real-time streaming data with Amazon Kinesis, using Amazon RDS for MySQL as the source and DynamoDB as the target.

Managing an ETL pipeline through Kinesis Data Analytics provides a cost-effective unified solution to real-time and batch database migrations using common technical knowledge skills like SQL querying.


About the Authors

Ram Vittal is an enterprise solutions architect at AWS. His current focus is to help enterprise customers with their cloud adoption and optimization journey to improve their business outcomes. In his spare time, he enjoys tennis, photography, and movies.

 

 

 

 

Akash Bhatia is a Sr. solutions architect at AWS. His current focus is helping customers achieve their business outcomes through architecting and implementing innovative and resilient solutions at scale.

 

 

Using AWS Lambda as a consumer for Amazon Kinesis

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/using-aws-lambda-as-a-consumer-for-amazon-kinesis/

This post is courtesy of Prateek Mehrotra, Software Development Engineer.

AWS Lambda integrates natively with Amazon Kinesis as a consumer to process data ingested through a data stream. The polling, checkpointing, and error handling complexities are abstracted when you use this native integration. This allows the Lambda function code to focus on business logic processing.

This blog post describes how to operate and optimize this integration at high throughput with low system overhead time and processing latencies.

To learn more about Kinesis concepts and terminology, visit the documentation page.

Overview

You can attach a Lambda function to a Kinesis stream to process data. Multiple Lambda functions can consume from a single Kinesis stream for different kinds of processing independently. These can be used alongside other consumers such as Amazon Kinesis Data Firehose.

If a Kinesis stream has ‘n’ shards, then at least ‘n’ concurrency is required for a consuming Lambda function to process data without any induced delay. Less than ‘n’ available concurrency results in elevated iterator age in the Kinesis stream and elevated iterator age in the Lambda consumer. In a multi-consumer paradigm, if the Kinesis iterator age spikes then at least one of the stream consumers also reports a corresponding iterator age spike.

Stream poller

When the parallelization factor is greater than 1 for a Lambda consumer, the record processor polls up-to ‘parallelization-factor’ partition keys at a time while processing from a single shard. To learn more, read about handling traffic with a parallelization factor.

Kinesis shard level metrics

When using Kinesis streams, it’s best practice to enable enhanced shard level metrics. These metrics can help in detecting if the data distribution is happening uniformly within the shards of the stream, or not.

In a single-source, multiple-consumer use case, enhanced shard level metrics can help identify the cause of elevated iterator age. This could be due to a single shard receiving data too quickly, or at least one of the consumers failing to process the data.

To learn more about Kinesis monitoring, visit the documentation page. If per-partition processing is not a requirement, distribute data uniformly across shards. To learn more about Kinesis partition keys, visit the documentation page.

Processing delay caused by consumer misconfiguration

Kinesis reports an iterator age metric. If this value spikes, data processing from the stream is delayed. The metric value is set by the earliest record read from the stream measured over the specified time period.

This delay slows the data processing of the pipeline. This happens when a single shard is receiving data faster than the consumer can process it or the consumer is failing to complete processing due to errors.

Graph of records iterator age

In a single-source, multiple-consumer use case, at least one of the consumers shows a corresponding iterator age spike. If there are multiple Lambda consumers of the same data stream, then each Lambda consumer will report its own iterator age metric. This helps identify the problematic consumer for further analysis.

Tuning the configuration to optimize for iterator age

There are several tuning options available when the iterator age is increasing for the consumer Lambda function.

1. Increase the batch size

If the Lambda function operates at a low maximum duration, a single invocation may process less than a maximum batch size. Increase the batch size (up to a maximum of 10,000) to read more records from a shard in a single batch. This can help normalize the iterator age.

2. Change the parallelization factor

Increasing the parallelization factor in the Lambda function allows concurrent invocations to read a single shard. Multiple batches of records are created in the shard based on partition keys, resulting in faster data consumption.

Iterator age can spike when the batch size is set to 10,000 and the parallelization factor is set to 10. This can happen when data is produced faster than the consumer can process it, backing up the per-shard/per-partition queues. To mitigate this, subdivide the partition into multiple keys. This helps distribute the data for that partition key more evenly across shards.

Partition keys

3. Reduce the batch window

If data is distributed unequally across shards, or there is low write volume from producers, the Lambda poller may wait for an entire batch. You can reduce this wait time by reducing the batch window, which results in faster processing.

To learn more about Lambda poller batch window for Kinesis, visit the documentation page.

4. De-scale the Kinesis stream if overprovisioned

If the Kinesis stream metrics indicate that the stream is over-provisioned, de-scaling the stream helps increase data compaction within shards. This results in better throughput per Lambda invocation.

After reducing stream size, reduce the Lambda concurrency to maintain a 1:1 ratio of shard count to Lambda concurrency mapping. As load increases, increase the parallelization factor the keep the shard size constant. With this increase, the Lambda concurrency should be at least shard count * parallelization factor.

To learn more, read about handling traffic with a parallelization factor.

5. Enable enhanced fan-out for consumers

Enhanced fan-out allows developers to scale up the number of stream consumers by offering each stream consumer its own read throughput.

To learn more about Kinesis enhanced fan-out, visit the documentation page.

Conclusion

This blog post shows some of the best practices when using Lambda with Kinesis. It covers operational levers for high-throughput, low latency, single source data processing pipelines.

The enhanced Amazon Kinesis shard level metrics help monitor the maximum overhead processing delay per shard. When correlated with the Lambda consumer’s iterator age metrics, this shows each consumer’s performance. The effective combination of batch size, parallelization factor, batch window, and partition key can lead to more efficient stream processing.

To learn more about Amazon Kinesis, visit the Getting Started page.

Enhanced monitoring and automatic scaling for Apache Flink

Post Syndicated from Karthi Thyagarajan original https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/

Thousands of developers use Apache Flink to build streaming applications to transform and analyze data in real time. Apache Flink is an open-source framework and engine for processing data streams. It’s highly available and scalable, delivering high throughput and low latency for the most demanding stream-processing applications. Monitoring and scaling your applications is critical to keep your applications running successfully in a production environment.

Amazon Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications. Amazon Kinesis Data Analytics manages the underlying Apache Flink components that provide durable application state, metrics and logs, and more. Kinesis Data Analytics recently announced new Amazon CloudWatch metrics and the ability to create custom metrics to provide greater visibility into your application.

In this post, we show you how to easily monitor and automatically scale your Apache Flink applications with Amazon Kinesis Data Analytics. We walk through three examples. First, we create a custom metric in the Kinesis Data Analytics for Apache Flink application code. Second, we use application metrics to automatically scale the application. Finally, we share a CloudWatch dashboard for monitoring your application and recommend metrics that you can alarm on.

Custom metrics

Kinesis Data Analytics uses Apache Flink’s metrics system to send custom metrics to CloudWatch from your applications. For more information, see Using Custom Metrics with Amazon Kinesis Data Analytics for Apache Flink.

We use a basic word count program to illustrate the use of custom metrics. The following code shows how to extend RichFlatMapFunction to track the number of words it sees. This word count is then surfaced via the Flink metrics API.

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
     
            private transient Counter counter;
     
            @Override
            public void open(Configuration config) {
                this.counter = getRuntimeContext().getMetricGroup()
                        .addGroup("kinesisanalytics")
                        .addGroup("Service", "WordCountApplication")
                        .addGroup("Tokenizer")
                        .counter("TotalWords");
            }
     
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>>out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
     
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        counter.inc();
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }

Custom metrics emitted through the Flink metrics API are forwarded to CloudWatch metrics by Kinesis Data Analytics for Apache Flink. The following screenshot shows the word count metric in CloudWatch.

Custom automatic scaling

This section describes how to implement an automatic scaling solution for Kinesis Data Analytics for Apache Flink based on CloudWatch metrics. You can configure Kinesis Data Analytics for Apache Flink to perform CPU-based automatic scaling. However, you can automatically scale your application based on something other than CPU utilization. To perform custom automatic scaling, use Application Auto Scaling with the appropriate metric.

For applications that read from a Kinesis stream source, you can use the metric millisBehindLatest. This captures how far behind your application is from the head of the stream.

A target tracking policy is one of two scaling policy types offered by Application Auto Scaling. You can specify a threshold value around which to vary the degree of parallelism of your Kinesis Data Analytics application. The following sample code on GitHub configures Application Auto Scaling when millisBehindLatest for the consuming application exceeds 1 minute. This increases the parallelism, which increases the number of KPUs.

The following diagram shows how Application Auto Scaling, used with Amazon API Gateway and AWS Lambda, scales a Kinesis Data Analytics application in response to a CloudWatch alarm.

The sample code includes examples for automatic scaling based on the target tracking policy and step scaling policy.

Automatic scaling solution components

The following is a list of key components used in the automatic scaling solution. You can find these components in the AWS CloudFormation template in the GitHub repo accompanying this post.

  • Application Auto Scaling scalable target – A scalable target is a resource that Application Auto Scaling can scale in and out. It’s uniquely identified by the combination of resource ID, scalable dimension, and namespace. For more information, see RegisterScalableTarget.
  • Scaling policy – The scaling policy defines how your scalable target should scale. As described in the PutScalingPolicy, Application Auto Scaling supports two policy types: TargetTrackingScaling and StepScaling. In addition, you can configure a scheduled scaling action using Application Auto Scaling. If you specify TargetTrackingScaling, Application Auto Scaling also creates corresponding CloudWatch alarms for you.
  • API Gateway – Because the scalable target is a custom resource, we have to specify an API endpoint. Application Auto Scaling invokes this to perform scaling and get information about the current state of our scalable resource. We use an API Gateway and Lambda function to implement this endpoint.
  • Lambda – API Gateway invokes the Lambda function. This is called by Application Auto Scaling to perform the scaling actions. It also fetches information such as current scale value and returns information requested by Application Auto Scaling.

Additionally, you should be aware of the following:

  • When scaling out or in, this sample only updates the overall parallelism. It doesn’t adjust parallelism or KPU.
  • When scaling occurs, the Kinesis Data Analytics application experiences downtime.
  • The throughput of a Flink application depends on many factors, such as complexity of processing and destination throughput. The step-scaling example assumes a relationship between incoming record throughput and scaling. The millisBehindLatest metric used for target tracking automatic scaling works the same way.
  • We recommend using the default scaling policy provided by Kinesis Data Analytics for CPU-based scaling, the target tracking auto scaling policy for the millisBehindLatest metric, and a step scaling auto scaling policy for a metric such as numRecordsInPerSecond. However, you can use any automatic scaling policy for the metric you choose.

CloudWatch operational dashboard

Customers often ask us about best practices and the operational aspects of Kinesis Data Analytics for Apache Flink. We created a CloudWatch dashboard that captures the key metrics to monitor. We categorize the most common metrics in this dashboard with the recommended statistics for each metric.

This GitHub repo contains a CloudFormation template to deploy the dashboard for any Kinesis Data Analytics for Apache Flink application. You can also deploy a demo application with the dashboard. The dashboard includes the following:

  • Application health metrics:
    • Use uptime to see how long the job has been running without interruption and downtime to determine if a job failed to run. Non-zero downtime can indicate issues with your application.
    • Higher-than-normal job restarts can indicate an unhealthy application.
    • Checkpoint information size, duration, and number of failed checkpoints can help you understand application health and progress. Increasing checkpoint duration values can signify application health problems like backpressure and the inability to keep up with input data. Increasing checkpoint size over time can point to an infinitely growing state that can lead to out-of-memory errors.
  • Resource utilization metrics:
    • You can check the CPU and heap memory utilization along with the thread count. You can also check the garbage collection time taken across all Flink task managers.
  • Flink application progress metrics:
    • numRecordsInPerSecond and numRecordsOutPerSecond show the number of records accepted and emitted per second.
    • numLateRecordsDropped shows the number of records this operator or task has dropped due to arriving late.
    • Input and output watermarks are valid only when using event time semantics. You can use the difference between these two values to calculate event time latency.
  • Source metrics:
    • The Kinesis Data Streams-specific metric millisBehindLatest shows that the consumer is behind the head of the stream, indicating how far behind current time the consumer is. We used this metric to demonstrate Application Auto Scaling earlier in this post.
    • The Kafka-specific metric recordsLagMax shows the maximum lag in terms of number of records for any partition in this window.

The dashboard contains useful metrics to gauge the operational health of a Flink application. You can modify the threshold, configure additional alarms, and add other system or custom metrics to customize the dashboard for your use. The following screenshot shows a section of the dashboard.

Summary

In this post, we covered how to use the enhanced monitoring features for Kinesis Data Analytics for Apache Flink applications. We created custom metrics for an Apache Flink application within application code and emitted it to CloudWatch. We also used Application Auto Scaling to scale an application. Finally, we shared a CloudWatch dashboard to monitor the operational health of Kinesis Data Analytics for Apache Flink applications. For more information about using Kinesis Data Analytics, see Getting Started with Amazon Kinesis Data Analytics.


About the Authors

Karthi Thyagarajan is a Principal Solutions Architect on the Amazon Kinesis team.

 

 

 

 

Deepthi Mohan is a Sr. TPM on the Amazon Kinesis Data Analytics team.

Stream CDC into an Amazon S3 data lake in Parquet format with AWS DMS

Post Syndicated from Viral Shah original https://aws.amazon.com/blogs/big-data/stream-cdc-into-an-amazon-s3-data-lake-in-parquet-format-with-aws-dms/

Most organizations generate data in real time and ever-increasing volumes. Data is captured from a variety of sources, such as transactional and reporting databases, application logs, customer-facing websites, and external feeds. Companies want to capture, transform, and analyze this time-sensitive data to improve customer experiences, increase efficiency, and drive innovations. With increased data volume and velocity, it’s imperative to capture the data from source systems as soon as they are generated and store them on a secure, scalable, and cost-efficient platform.

AWS Database Migration Service (AWS DMS) performs continuous data replication using change data capture (CDC). Using CDC, you can determine and track data that has changed and provide it as a stream of changes that a downstream application can consume and act on. Most database management systems manage a transaction log that records changes made to the database contents and metadata. AWS DMS reads the transaction log by using engine-specific API operations and functions and captures the changes made to the database in a nonintrusive manner.

Amazon Simple Storage Service (Amazon S3) is the largest and most performant object storage service for structured and unstructured data and the storage service of choice to build a data lake. With Amazon S3, you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% of durability.

AWS DMS offers many options to capture data changes from relational databases and store the data in columnar format (Apache Parquet) into Amazon S3:

The second option helps you build a flexible data pipeline to ingest data into an Amazon S3 data lake from several relational and non-relational data sources, compared to just relational data sources support in the former option. Kinesis Data Firehose provides pre-built AWS Lambda blueprints for converting common data sources such as Apache logs and system logs to JSON and CSV formats or writing your own custom functions. It can also convert the format of incoming data from JSON to Parquet or Apache ORC before storing the data in Amazon S3. Data stored in columnar format gives you faster and lower-cost queries with downstream analytics services like Amazon Athena.

In this post, we focus on the technical challenges outlined in the second option and how to address them.

As shown in the following reference architecture, data is ingested from a database into Parquet format in Amazon S3 via AWS DMS integrating with Kinesis Data Streams and Kinesis Data Firehose.

Our solution provides flexibility to ingest data from several sources using Kinesis Data Streams and Kinesis Data Firehose with built-in data format conversion and integrated data transformation capabilities before storing data in a data lake. For more information about data ingestion into Kinesis Data Streams, see Writing Data into Amazon Kinesis Data Streams. You can then query Parquet data in Amazon S3 efficiently with Athena.

Implementing the architecture

AWS DMS can migrate data to and from most widely used commercial and open-source databases. You can migrate and replicate data directly to Amazon S3 in CSV and Parquet formats, and store data in Amazon S3 in Parquet because it offers efficient compression and encoding schemes. Parquet format allows compression schemes on a per-column level, and is future-proofed to allow adding more encodings as they are invented and implemented.

AWS DMS supports Kinesis Data Streams as a target. Kinesis Data Streams is a massively scalable and durable real-time data streaming service that can collect and process large streams of data records in real time. AWS DMS service publishes records to a data stream using JSON. For more information about configuration details, see Use the AWS Database Migration Service to Stream Change Data to Amazon Kinesis Data Streams.

Kinesis Data Firehose can pull data from Kinesis Data Streams. It’s a fully managed service that delivers real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk. Kinesis Data Firehose can convert the format of input data from JSON to Parquet or ORC before sending it to Amazon S3. It needs reference schema to interpret the AWS DMS streaming data in JSON and convert into Parquet. In this post, we use AWS Glue, a fully managed ETL service, to create a schema in the AWS Glue Data Catalog for Kinesis Data Firehose to reference.

When AWS DMS migrates records, it creates additional fields (metadata) for each migrated record. The metadata provides additional information about the record being migrated, such as source table name, schema name, and type of operation. Most metadata fields add – in their field names (for example, record-type, schema-name, table-name, transaction-id). See the following code:

{
        "data": {
            "MEET_CODE": 5189459,
            "MEET_DATE": "2020-02-21T19:20:04Z",
            "RACE_CODE": 5189459,
            "LAST_MODIFIED_DATE": "2020-02-24T19:20:04Z",
            "RACE_ENTRY_CODE": 11671651,
            "HORSE_CODE": 5042811
        },
        "metadata": {
            "transaction-id": 917505,
            "schema-name": "SH",
            "operation": "insert",
            "table-name": "RACE_ENTRY",
            "record-type": "data",
            "timestamp": "2020-02-26T00:20:07.482592Z",
            "partition-key-type": "schema-table"
        }
    }

Additional metadata added by AWS DMS leads to an error during the data format conversion phase in Kinesis Data Firehose. Kinesis Data Firehose follows Hive style formatting and therefore doesn’t recognize the – character in the metadata field names during data conversion from JSON into Parquet and returns an error message: expected at the position 30 of ‘struct’ but ‘-’ is found. For example, see the following code:

{
	"deliveryStreamARN": "arn:aws:firehose:us-east-1:1234567890:deliverystream/abc-def-KDF",
	"destination": "arn:aws:s3:::abc-streaming-bucket",
	"deliveryStreamVersionId": 13,
	"message": "The schema is invalid. Error parsing the schema:
	 Error: : expected at the position 30 of 'struct<timestamp:string,record-type:string,operation:string,partition-key-type:string,schema-name:string,table-name:string,transaction-id:int>' but '-' is found.",
	"errorCode": "DataFormatConversion.InvalidSchema"
}

You can resolve the issue by making the following changes: specifying JSON key mappings and creating a reference table in AWS Glue before configuring Kinesis Data Firehose.

Specifying JSON key mappings

In your Kinesis Data Firehose configuration, specify JSON key mappings for fields with – in their names. Mapping transforms these specific metadata fields names to _ (for example, record-type changes to record_type).

Use AWS Command Line Interface (AWS CLI) to create Kinesis Data Firehose with the JSON key mappings. Modify the parameters to meet your specific requirements.

Kinesis Data Firehose configuration mapping is only possible through the AWS CLI or API and not through the AWS Management Console.

The following code configures Kinesis Data Firehose with five columns with – in their field names mapped to new field names with _”:

"S3BackupMode": "Disabled",
                    "DataFormatConversionConfiguration": {
                        "SchemaConfiguration": {
                            "RoleARN": "arn:aws:iam::123456789012:role/sample-firehose-delivery-role",
                            "DatabaseName": "sample-db",
                            "TableName": "sample-table",
                            "Region": "us-east-1",
                            "VersionId": "LATEST"
                        },
                        "InputFormatConfiguration": {
                            "Deserializer": {
                                "OpenXJsonSerDe": {
                                "ColumnToJsonKeyMappings":
                                {
                                 "record_type": "record-type","partition_key_type": "partition-key-type","schema_name":"schema-name","table_name":"table-name","transaction_id":"transaction-id"
                                }
                                }

Creating a reference table in AWS Glue

Because Kinesis Data Firehose uses the Data Catalog to reference schema for Parquet format conversion, you must first create a reference table in AWS Glue before configuring Kinesis Data Firehose. Use Athena to create a Data Catalog table. For instructions, see CREATE TABLE. In the table, make sure that the column name uses _ in their names, and manually modify it in advance through the Edit schema option for the referenced table in AWS Glue, if needed.

Use Athena to query the results of data ingested by Kinesis Data Firehose into Amazon S3.

This solution is only applicable in the following use cases:

  • Capturing data changes from your source with AWS DMS
  • Converting data into Parquet with Kinesis Data Firehose

If you want to store data in non-Parquet format (such CSV or JSON) or ingest into Kinesis through other routes, then you don’t need to modify your Kinesis Data Firehose configuration.

Conclusion

This post demonstrated how to convert AWS DMS data into Parquet format and specific configurations to make sure metadata follows the expected format of Kinesis Data Streams and Kinesis Data Firehose. We encourage you to try this solution and take advantage of all the benefits of using AWS DMS with Kinesis Data Streams and Kinesis Data Firehose. For more information, see Getting started with AWS Database Migration Service and Setting up Amazon Kinesis Firehose.

If you have questions or suggestions, please leave a comment.

 


About the Author

Viral Shah is a Data Lab Architect with Amazon Web Services. Viral helps our customers architect and build data and analytics prototypes in just four days in the AWS Data Lab. He has over 20 years of experience working with enterprise customers and startups primarily in the Data and Database space.

 

 

Building storage-first serverless applications with HTTP APIs service integrations

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/building-storage-first-applications-with-http-apis-service-integrations/

Over the last year, I have been talking about “storage first” serverless patterns. With these patterns, data is stored persistently before any business logic is applied. The advantage of this pattern is increased application resiliency. By persisting the data before processing, the original data is still available, if or when errors occur.

Common pattern for serverless API backend

Common pattern for serverless API backend

Using Amazon API Gateway as a proxy to an AWS Lambda function is a common pattern in serverless applications. The Lambda function handles the business logic and communicates with other AWS or third-party services to route, modify, or store the processed data. One option is to place the data in an Amazon Simple Queue Service (SQS) queue for processing downstream. In this pattern, the developer is responsible for handling errors and retry logic within the Lambda function code.

The storage first pattern flips this around. It uses native error handling with retry logic or dead-letter queues (DLQ) at the SQS layer before any code is run. By directly integrating API Gateway to SQS, developers can increase application reliability while reducing lines of code.

Storage first pattern for serverless API backend

Storage first pattern for serverless API backend

Previously, direct integrations require REST APIs with transformation templates written in Velocity Template Language (VTL). However, developers tell us they would like to integrate directly with services in a simpler way without using VTL. As a result, HTTP APIs now offers the ability to directly integrate with five AWS services without needing a transformation template or code layer.

The first five service integrations

This release of HTTP APIs direct integrations includes Amazon EventBridge, Amazon Kinesis Data Streams, Simple Queue Service (SQS), AWS System Manager’s AppConfig, and AWS Step Functions. With these new integrations, customers can create APIs and webhooks for their business logic hosted in these AWS services. They can also take advantage of HTTP APIs features like authorizers, throttling, and enhanced observability for securing and monitoring these applications.

Amazon EventBridge

HTTP APIs service integration with Amazon EventBridge

HTTP APIs service integration with Amazon EventBridge

The HTTP APIs direct integration for EventBridge uses the PutEvents API to enable client applications to place events on an EventBridge bus. Once the events are on the bus, EventBridge routes the event to specific targets based upon EventBridge filtering rules.

This integration is a storage first pattern because data is written to the bus before any routing or logic is applied. If the downstream target service has issues, then EventBridge implements a retry strategy with incremental back-off for up to 24 hours. Additionally, the integration helps developers reduce code by filtering events at the bus. It routes to downstream targets without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Different tasks are required based upon incoming event details
  • Only data ingestion is required
  • Payload size is less than 256 kb
  • Expected requests per second are less than the Region quotas.

Amazon Kinesis Data Streams

HTTP APIs service integration with Amazon Kinesis Data Streams

HTTP APIs service integration with Amazon Kinesis Data Streams

The HTTP APIs direct integration for Kinesis Data Streams offers the PutRecord integration action, enabling client applications to place events on a Kinesis data stream. Kinesis Data Streams are designed to handle up to 1,000 writes per second per shard, with payloads up to 1 mb in size. Developers can increase throughput by increasing the number of shards in the data stream. You can route the incoming data to targets like an Amazon S3 bucket as part of a data lake or a Kinesis data analytics application for real-time analytics.

This integration is a storage first option because data is stored on the stream for up to seven days until it is processed and routed elsewhere. When processing stream events with a Lambda function, errors are handled at the Lambda layer through a configurable error handling strategy.

Use this direct integration when:

  • Ingesting large amounts of data
  • Ingesting large payload sizes
  • Order is important
  • Routing the same data to multiple targets

Amazon SQS

HTTP APIs service integration with Amazon SQS

HTTP APIs service integration with Amazon SQS

The HTTP APIs direct integration for Amazon SQS offers the SendMessage, ReceiveMessage, DeleteMessage, and PurgeQueue integration actions. This integration differs from the EventBridge and Kinesis integrations in that data flows both ways. Events can be created, read, and deleted from the SQS queue via REST calls through the HTTP API endpoint. Additionally, a full purge of the queue can be managed using the PurgeQueue action.

This pattern is a storage first pattern because the data remains on the queue for four days by default (configurable to 14 days), unless it is processed and removed. When the Lambda service polls the queue, the messages that are returned are hidden in the queue for a set amount of time. Once the calling service has processed these messages, it uses the DeleteMessage API to remove the messages permanently.

When triggering a Lambda function with an SQS queue, the Lambda service manages this process internally. However, HTTP APIs direct integration with SQS enables developers to move this process to client applications without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Data must be received as well as sent to the service
  • Downstream services need reduced concurrency
  • The queue requires custom management
  • Order is important (FIFO queues)

AWS AppConfig

HTTP APIs service integration with AWS Systems Manager AppConfig

HTTP APIs service integration with AWS Systems Manager AppConfig

The HTTP APIs direct integration for AWS AppConfig offers the GetConfiguration integration action and allows applications to check for application configuration updates. By exposing the systems parameter API through an HTTP APIs endpoint, developers can automate configuration changes for their applications. While this integration is not considered a storage first integration, it does enable direct communication from external services to AppConfig without the need for a Lambda function as a transport layer.

Use this direct integration when:

  • Access to AWS AppConfig is required.
  • Managing application configurations.

AWS Step Functions

HTTP APIs service integration with AWS Step Functions

HTTP APIs service integration with AWS Step Functions

The HTTP APIs direct integration for Step Functions offers the StartExecution and StopExecution integration actions. These actions allow for programmatic control of a Step Functions state machine via an API. When starting a Step Functions workflow, JSON data is passed in the request and mapped to the state machine. Error messages are also mapped to the state machine when stopping the execution.

This pattern provides a storage first integration because Step Functions maintains a persistent state during the life of the orchestrated workflow. Step Functions also supports service integrations that allow the workflows to send and receive data without needing a Lambda function as a transport layer.

Use this direct integration when:

  • Orchestrating multiple actions.
  • Order of action is required.

Building HTTP APIs direct integrations

HTTP APIs service integrations can be built using the AWS CLI, AWS SAM, or through the API Gateway console. The console walks through contextual choices to help you understand what is required for each integration. Each of the integrations also includes an Advanced section to provide additional information for the integration.

Creating an HTTP APIs service integration

Creating an HTTP APIs service integration

Once you build an integration, you can export it as an OpenAPI template that can be used with infrastructure as code (IaC) tools like AWS SAM. The exported template can also include the API Gateway extensions that define the specific integration information.

Exporting the HTTP APIs configuration to OpenAPI

Exporting the HTTP APIs configuration to OpenAPI

OpenAPI template

An example of a direct integration from HTTP APIs to SQS is located in the Sessions With SAM repository. This example includes the following architecture:

AWS SAM template resource architecture

AWS SAM template resource architecture

The AWS SAM template creates the HTTP APIs, SQS queue, Lambda function, and both Identity and Access Management (IAM) roles required. This is all generated in 58 lines of code and looks like this:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: HTTP API direct integrations

Resources:
  MyQueue:
    Type: AWS::SQS::Queue
    
  MyHttpApi:
    Type: AWS::Serverless::HttpApi
    Properties:
      DefinitionBody:
        'Fn::Transform':
          Name: 'AWS::Include'
          Parameters:
            Location: './api.yaml'
          
  MyHttpApiRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: "Allow"
            Principal:
              Service: "apigateway.amazonaws.com"
            Action: 
              - "sts:AssumeRole"
      Policies:
        - PolicyName: ApiDirectWriteToSQS
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              Action:
              - sqs:SendMessage
              Effect: Allow
              Resource:
                - !GetAtt MyQueue.Arn
                
  MyTriggeredLambda:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/
      Handler: app.lambdaHandler
      Runtime: nodejs12.x
      Policies:
        - SQSPollerPolicy:
            QueueName: !GetAtt MyQueue.QueueName
      Events:
        SQSTrigger:
          Type: SQS
          Properties:
            Queue: !GetAtt MyQueue.Arn

Outputs:
  ApiEndpoint:
    Description: "HTTP API endpoint URL"
    Value: !Sub "https://${MyHttpApi}.execute-api.${AWS::Region}.amazonaws.com"

The OpenAPI template handles the route definitions for the HTTP API configuration and configures the service integration. The template looks like this:

openapi: "3.0.1"
info:
  title: "my-sqs-api"
paths:
  /:
    post:
      responses:
        default:
          description: "Default response for POST /"
      x-amazon-apigateway-integration:
        integrationSubtype: "SQS-SendMessage"
        credentials:
          Fn::GetAtt: [MyHttpApiRole, Arn]
        requestParameters:
          MessageBody: "$request.body.MessageBody"
          QueueUrl:
            Ref: MyQueue
        payloadFormatVersion: "1.0"
        type: "aws_proxy”
        connectionType: "INTERNET"
x-amazon-apigateway-importexport-version: "1.0"

Because the OpenAPI template is included in the AWS SAM template via a transform, the API Gateway integration can reference the roles and services created within the AWS SAM template.

Conclusion

This post covers the concept of storage first integration patterns and how the new HTTP APIs direct integrations can help. I cover the five current integrations and possible use cases for each. Additionally, I demonstrate how to use AWS SAM to build and manage the integrated applications using infrastructure as code.

Using the storage first pattern with direct integrations can help developers build serverless applications that are more durable with fewer lines of code. A Lambda function is no longer required to transport data from the API endpoint to the desired service. Instead, use Lambda function invocations for differentiating business logic.

To learn more join us for the HTTP API service integrations session of Sessions With SAM! 

#ServerlessForEveryone