Tag Archives: Amazon Kinesis

Introducing mutual TLS authentication for Amazon MSK as an event source

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/introducing-mutual-tls-authentication-for-amazon-msk-as-an-event-source/

This post is written by Uma Ramadoss, Senior Specialist Solutions Architect, Integration.

Today, AWS Lambda is introducing mutual TLS (mTLS) authentication for Amazon Managed Streaming for Apache Kafka (Amazon MSK) and self-managed Kafka as an event source.

Many customers use Amazon MSK for streaming data from multiple producers. Multiple subscribers can then consume the streaming data and build data pipelines, analytics, and data integration. To learn more, read Using Amazon MSK as an event source for AWS Lambda.

You can activate any combination of authentication modes (mutual TLS, SASL SCRAM, or IAM access control) on new or existing clusters. This is useful if you are migrating to a new authentication mode or must run multiple authentication modes simultaneously. Lambda natively supports consuming messages from both self-managed Kafka and Amazon MSK through event source mapping.

By default, the TLS protocol only requires a server to authenticate itself to the client. The authentication of the client to the server is managed by the application layer. The TLS protocol also offers the ability for the server to request that the client send an X.509 certificate to prove its identity. This is called mutual TLS as both parties are authenticated via certificates with TLS.

Mutual TLS is a commonly used authentication mechanism for business-to-business (B2B) applications. It’s used in standards such as Open Banking, which enables secure open API integrations for financial institutions. It is one of the popular authentication mechanisms for customers using Kafka.

To use mutual TLS authentication for your Kafka-triggered Lambda functions, you provide a signed client certificate, the private key for the certificate, and an optional password if the private key is encrypted. This establishes a trust relationship between Lambda and Amazon MSK or self-managed Kafka. Lambda supports self-signed server certificates or server certificates signed by a private certificate authority (CA) for self-managed Kafka. Lambda trusts the Amazon MSK certificate by default as the certificates are signed by Amazon Trust Services CAs.

This blog post explains how to set up a Lambda function to process messages from an Amazon MSK cluster using mutual TLS authentication.

Overview

Using Amazon MSK as an event source operates in a similar way to using Amazon SQS or Amazon Kinesis. You create an event source mapping by attaching Amazon MSK as event source to your Lambda function.

The Lambda service internally polls for new records from the event source, reading the messages from one or more partitions in batches. It then synchronously invokes your Lambda function, sending each batch as an event payload. Lambda continues to process batches until there are no more messages in the topic.

The Lambda function’s event payload contains an array of records. Each array item contains details of the topic and Kafka partition identifier, together with a timestamp and base64 encoded message.

Kafka event payload

Kafka event payload

You store the signed client certificate, the private key for the certificate, and an optional password if the private key is encrypted in the AWS Secrets Manager as a secret. You provide the secret in the Lambda event source mapping.

The steps for using mutual TLS authentication for Amazon MSK as event source for Lambda are:

  1. Create a private certificate authority (CA) using AWS Certificate Manager (ACM) Private Certificate Authority (PCA).
  2. Create a client certificate and private key. Store them as secret in AWS Secrets Manager.
  3. Create an Amazon MSK cluster and a consuming Lambda function using the AWS Serverless Application Model (AWS SAM).
  4. Attach the event source mapping.

This blog walks through these steps in detail.

Prerequisites

1. Creating a private CA.

To use mutual TLS client authentication with Amazon MSK, create a root CA using AWS ACM Private Certificate Authority (PCA). We recommend using independent ACM PCAs for each MSK cluster when you use mutual TLS to control access. This ensures that TLS certificates signed by PCAs only authenticate with a single MSK cluster.

  1. From the AWS Certificate Manager console, choose Create a Private CA.
  2. In the Select CA type panel, select Root CA and choose Next.
  3. Select Root CA

    Select Root CA

  4. In the Configure CA subject name panel, provide your certificate details, and choose Next.
  5. Provide your certificate details

    Provide your certificate details

  6. From the Configure CA key algorithm panel, choose the key algorithm for your CA and choose Next.
  7. Configure CA key algorithm

    Configure CA key algorithm

  8. From the Configure revocation panel, choose any optional certificate revocation options you require and choose Next.
  9. Configure revocation

    Configure revocation

  10. Continue through the screens to add any tags required, allow ACM to renew certificates, review your options, and confirm pricing. Choose Confirm and create.
  11. Once the CA is created, choose Install CA certificate to activate your CA. Configure the validity of the certificate and the signature algorithm and choose Next.
  12. Configure certificate

    Configure certificate

  13. Review the certificate details and choose Confirm and install. Note down the Amazon Resource Name (ARN) of the private CA for the next section.
  14. Review certificate details

    Review certificate details

2. Creating a client certificate.

You generate a client certificate using the root certificate you previously created, which is used to authenticate the client with the Amazon MSK cluster using mutual TLS. You provide this client certificate and the private key as AWS Secrets Manager secrets to the AWS Lambda event source mapping.

  1. On your local machine, run the following command to create a private key and certificate signing request using OpenSSL. Enter your certificate details. This creates a private key file and a certificate signing request file in the current directory.
  2. openssl req -new -newkey rsa:2048 -days 365 -keyout key.pem -out client_cert.csr -nodes
    OpenSSL create a private key and certificate signing request

    OpenSSL create a private key and certificate signing request

  3. Use the AWS CLI to sign your certificate request with the private CA previously created. Replace Private-CA-ARN with the ARN of your private CA. The certificate validity value is set to 300, change this if necessary. Save the certificate ARN provided in the response.
  4. aws acm-pca issue-certificate --certificate-authority-arn Private-CA-ARN --csr fileb://client_cert.csr --signing-algorithm "SHA256WITHRSA" --validity Value=300,Type="DAYS"
  5. Retrieve the certificate that ACM signed for you. Replace the Private-CA-ARN and Certificate-ARN with the ARN you obtained from the previous commands. This creates a signed certificate file called client_cert.pem.
  6. aws acm-pca get-certificate --certificate-authority-arn Private-CA-ARN --certificate-arn Certificate-ARN | jq -r '.Certificate + "\n" + .CertificateChain' >> client_cert.pem
  7. Create a new file called secret.json with the following structure
  8. {
    "certificate":"",
    "privateKey":""
    }
    
  9. Copy the contents of the client_cert.pem in certificate and the content of key.pem in privatekey. Ensure that there are no extra spaces added. The file structure looks like this:
  10. Certificate file structure

    Certificate file structure

  11. Create the secret and save the ARN for the next section.
aws secretsmanager create-secret --name msk/mtls/lambda/clientcert --secret-string file://secret.json

3. Setting up an Amazon MSK cluster with AWS Lambda as a consumer.

Amazon MSK is a highly available service, so it must be configured to run in a minimum of two Availability Zones in your preferred Region. To comply with security best practice, the brokers are usually configured in private subnets in each Region.

You can use AWS CLI, AWS Management Console, AWS SDK and AWS CloudFormation to create the cluster and the Lambda functions. This blog uses AWS SAM to create the infrastructure and the associated code is available in the GitHub repository.

The AWS SAM template creates the following resources:

  1. Amazon Virtual Private Cloud (VPC).
  2. Amazon MSK cluster with mutual TLS authentication.
  3. Lambda function for consuming the records from the Amazon MSK cluster.
  4. IAM roles.
  5. Lambda function for testing the Amazon MSK integration by publishing messages to the topic.

The VPC has public and private subnets in two Availability Zones with the private subnets configured to use a NAT Gateway. You can also set up VPC endpoints with PrivateLink to allow the Amazon MSK cluster to communicate with Lambda. To learn more about different configurations, see this blog post.

The Lambda function requires permission to describe VPCs and security groups, and manage elastic network interfaces to access the Amazon MSK data stream. The Lambda function also needs two Kafka permissions: kafka:DescribeCluster and kafka:GetBootstrapBrokers. The policy template AWSLambdaMSKExecutionRole includes these permissions. The Lambda function also requires permission to get the secret value from AWS Secrets Manager for the secret you configure in the event source mapping.

  ConsumerLambdaFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaMSKExecutionRole
      Policies:
        - PolicyName: SecretAccess
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action: "SecretsManager:GetSecretValue"
                Resource: "*"

This release adds two new SourceAccessConfiguration types to the Lambda event source mapping:

1. CLIENT_CERTIFICATE_TLS_AUTH – (Amazon MSK, Self-managed Apache Kafka) The Secrets Manager ARN of your secret key containing the certificate chain (PEM), private key (PKCS#8 PEM), and private key password (optional) used for mutual TLS authentication of your Amazon MSK/Apache Kafka brokers. A private key password is required if the private key is encrypted.

2. SERVER_ROOT_CA_CERTIFICATE – This is only for self-managed Apache Kafka. This contains the Secrets Manager ARN of your secret containing the root CA certificate used by your Apache Kafka brokers in PEM format. This is not applicable for Amazon MSK as Amazon MSK brokers use public AWS Certificate Manager certificates which are trusted by AWS Lambda by default.

Deploying the resources:

To deploy the example application:

  1. Clone the GitHub repository
  2. git clone https://github.com/aws-samples/aws-lambda-msk-mtls-integration.git
  3. Navigate to the aws-lambda-msk-mtls-integration directory. Copy the client certificate file and the private key file to the producer lambda function code.
  4. cd aws-lambda-msk-mtls-integration
    cp ../client_cert.pem code/producer/client_cert.pem
    cp ../key.pem code/producer/client_key.pem
  5. Navigate to the code directory and build the application artifacts using the AWS SAM build command.
  6. cd code
    sam build
  7. Run sam deploy to deploy the infrastructure. Provide the Stack Name, AWS Region, ARN of the private CA created in section 1. Provide additional information as required in the sam deploy and deploy the stack.
  8. sam deploy -g
    Running sam deploy -g

    Running sam deploy -g

    The stack deployment takes about 30 minutes to complete. Once complete, note the output values.

  9. Create the event source mapping for the Lambda function. Replace the CONSUMER_FUNCTION_NAME and MSK_CLUSTER_ARN from the output of the stack created by the AWS SAM template. Replace SECRET_ARN with the ARN of the AWS Secrets Manager secret created previously.
  10. aws lambda create-event-source-mapping --function-name CONSUMER_FUNCTION_NAME --batch-size 10 --starting-position TRIM_HORIZON --topics exampleTopic --event-source-arn MSK_CLUSTER_ARN --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "SECRET_ARN"}]'
  11. Navigate one directory level up and configure the producer function with the Amazon MSK broker details. Replace the PRODUCER_FUNCTION_NAME and MSK_CLUSTER_ARN from the output of the stack created by the AWS SAM template.
  12. cd ../
    ./setup_producer.sh MSK_CLUSTER_ARN PRODUCER_FUNCTION_NAME
  13. Verify that the event source mapping state is enabled before moving on to the next step. Replace UUID from the output of step 5.
  14. aws lambda get-event-source-mapping --uuid UUID
  15. Publish messages using the producer. Replace PRODUCER_FUNCTION_NAME from the output of the stack created by the AWS SAM template. The following command creates a Kafka topic called exampleTopic and publish 100 messages to the topic.
  16. ./produce.sh PRODUCER_FUNCTION_NAME exampleTopic 100
  17. Verify that the consumer Lambda function receives and processes the messages by checking in Amazon CloudWatch log groups. Navigate to the log group by searching for aws/lambda/{stackname}-MSKConsumerLambda in the search bar.
Consumer function log stream

Consumer function log stream

Conclusion

Lambda now supports mutual TLS authentication for Amazon MSK and self-managed Kafka as an event source. You now have the option to provide a client certificate to establish a trust relationship between Lambda and MSK or self-managed Kafka brokers. It supports configuration via the AWS Management Console, AWS CLI, AWS SDK, and AWS CloudFormation.

To learn more about how to use mutual TLS Authentication for your Kafka triggered AWS Lambda function, visit AWS Lambda with self-managed Apache Kafka and Using AWS Lambda with Amazon MSK.

Designing a High-volume Streaming Data Ingestion Platform Natively on AWS

Post Syndicated from Soonam Jose original https://aws.amazon.com/blogs/architecture/designing-a-high-volume-streaming-data-ingestion-platform-natively-on-aws/

The total global data storage is projected to exceed 200 zettabytes by 2025. This exponential growth of data demands increased vigilance against cybercrimes. Emerging cybersecurity trends include increasing service attacks, ransomware, and critical infrastructure threats. Businesses are changing how they approach cybersecurity and are looking for new ways to tackle these threats. In the past, they have relied on internal IT or engaged a managed security services provider (MSSP) to monitor and prevent unauthorized access and attacks.

An end-to-end analytics solution should ingest and process log data streaming from various computing and IoT devices. It can then make processed data available to analytics systems users in near-real-time. However, the sheer volume of data in the future makes this difficult to address in a reliable and cost-effective manner.

In this blog post, we present three approaches for a high-volume log data ingestion and processing platform natively on Amazon Web Services (AWS). We also compare the pros and cons of each. We’ll discuss factors to consider when evaluating the different options and their associated flexibility, to take full advantage of AWS. We will showcase a fictional use case for a top MSSP who ingests high volumes of logs from security devices to cloud. This MSSP also performs downstream analytics and threat detection modeling.

The options we present here have a log collection platform (LCP) on-premises. It collects logs from security devices and sensors, performs necessary translations and tokenization, and pushes compressed log files to the processing tier on cloud. The collection platform can also be modernized to have the IoT-enabled devices send logs to AWS IoT services. This will push the data to Amazon Kinesis, a managed service for collecting and analyzing streaming data.

Approach 1: Amazon Kinesis for log ingestion and format conversion

Figure 1 illustrates a comprehensive solution that uses managed and serverless services on AWS.

Figure 1. Amazon Kinesis for log ingestion and format conversion

Figure 1. Amazon Kinesis for log ingestion and format conversion

1. LCP will invoke a scalable producer application for Amazon Kinesis Data Streams running on AWS Fargate behind an Application Load Balancer. The producer application will use the Amazon Kinesis Producer Library (KPL). KPL aggregates and batches data records to make ingestion into the data stream more efficient. The application may provide compressed records to the KPL to have it manage object compression.

The application can be set up as an HTTP endpoint that receives log files and processes them using KPL. Customer ID sent as part of an HTTP request header can be used to maintain affinity. The application can run in a Docker container, which is orchestrated by Amazon ECS on AWS Fargate. A target tracking scaling policy can manage the number of parallel running data ingestion containers to manage scalability of the ingestion process.

2. Amazon Kinesis Scaling Utility can be used to scale data streams up or down by a count, or as a percentage of the total fleet. The scaling utility archive file can be imported as a library to AWS Lambda. It will automatically manage the number of shards in the stream based on the observed PUT or GET rate of the stream. The combination of customer ID and security device ID may be used to define the partition key.

3. Records uploaded to the stream by the producer application will be consumed by Lambda. It will perform gateway transformations (required by all downstream consumers) and the normalization of record format. Any additional consumer level transformations may be handled separately, associated with respective consumers.

A combination of batch window and batch size configurations can improve efficiency of function invocations. Batch windows are the maximum amount of time in seconds to gather records before invoking the function. Batch size is the number of records to send to the function in each batch. The Lambda function will throttle sending records to Amazon Kinesis Data Firehose. Error handling will be accomplished via retries with a smaller batch size, with number of retries limited as appropriate. It will discard records that are too old.

An Amazon Simple Queue Service (SQS) queue can be configured as a failed-event destination for further offline analysis. A Lambda function can read from the error SQS queue to do basic checks and determine appropriate follow-up actions. This can be an initiated email for additional investigation or a command to discard the message.

4. Output of transformations by Lambda will be saved to the short term (hot) storage Amazon S3 bucket via Kinesis Data Firehose. This can efficiently handle Parquet format conversion required by downstream analytics applications. Kinesis Data Firehose delivery streams will be created per customer and configured with associated AWS Glue Data Catalog table, to perform parquet format conversion.

5. AWS Glue jobs will be used to consolidate and write larger files to the long term (cold) storage bucket.

6. The data in the cold storage bucket will be accessed by internal SOC analysts for threat detection and mitigation.

7. The data in cold storage buckets will also be accessed by end customers via dashboards in Amazon QuickSight.

8. This architecture also provides additional options to modernize streaming analytics using Amazon Kinesis Data Analytics or AWS Glue streaming jobs as appropriate.

While this architecture proposes a fully managed, end-to-end solution, the sheer volume of log messages may drive up the total cost of the solution. This is especially true for Kinesis Data Streams and Kinesis Data Firehose costs.

Approach 2: Containerized application on AWS Fargate for ingestion and Amazon Kinesis for format conversion

An alternative approach shown in Figure 2 replaces the gateway Kinesis Data Streams and transformations, with a containerized application on Fargate. Conversion to Parquet format and writing to the S3 bucket is still handled by Kinesis Data Firehose.

Figure 2. Containerized application for ingestion and Amazon Kinesis for format conversion

Figure 2. Containerized application for ingestion and Amazon Kinesis for format conversion

1. LCP will upload log files to a raw storage bucket in Amazon S3.

2. A Lambda function will process Event Notifications from the raw data storage bucket. It can insert Amazon S3 object pointers to a Kinesis Data Stream partitioned by Customer ID and Device ID.

3. The producer application will retrieve the Event Notifications from the Data Stream and retrieve corresponding log files from S3. It will perform initial aggregations and transformations, and output to Kinesis Data Firehose. The application can run in a Docker container that is orchestrated by Amazon ECS on Fargate. A target tracking scaling policy can manage the number of parallel running data ingestion containers, to manage scalability of the ingestion process. ECS cluster capacity can be scaled up or down based on Amazon CloudWatch alarms.

4. Kinesis Data Firehose converts to Parquet format, zips the data, and persists to a short-term storage bucket in S3. This is backed by Glue Data Catalog.

Steps 5, 6 and 7 perform consolidation and availability of the processed data to downstream consumers, as in the previous approach.

This option uses the built-in capabilities of Kinesis Data Firehose to transform to Parquet format and deliver to S3. Note that higher costs associated with the service may still be cost prohibitive for larger data volumes.

Approach 3: Containerized application on AWS Fargate for ingestion and format conversion

Figure 3 uses a containerized application running on Fargate for both gateway transformations. This app also provides conversion to Parquet format before writing the files to a short term (hot) storage bucket. All the other steps are the same as in option 2.

Figure 3. Containerized application for ingestion and format conversion

Figure 3. Containerized application for ingestion and format conversion

This option offers the least expensive way to transform, aggregate, and enrich the incoming log records, as well as convert them to Parquet format. But it comes with additional overhead for custom development of format conversion, checkpointing, error handling, and application management. Evaluate based on your business needs and workflow.

Conclusion

In this post, we discussed multiple approaches to design a platform on AWS to ingest and process high-volume security log records. We compared the pros and cons for each option. Amazon Kinesis is a fully managed and scalable service that helps easily collect, process, and analyze video and data streams in real time. A solution primarily based on Kinesis may become cost prohibitive due to large data volumes. Consider alternate approaches that use containerized applications on AWS Fargate. The trade-off would be the ability for custom development versus application management overhead.

To improve your security log analysis solution, explore one of the approaches we illustrate and customize as appropriate to fit your unique needs.

How NortonLifelock built a serverless architecture for real-time analysis of their VPN usage metrics

Post Syndicated from Madhu Nunna original https://aws.amazon.com/blogs/big-data/how-nortonlifelock-built-a-serverless-architecture-for-real-time-analysis-of-their-vpn-usage-metrics/

This post presents a reference architecture and optimization strategies for building serverless data analytics solutions on AWS using Amazon Kinesis Data Analytics. In addition, this post shows the design approach that the engineering team at NortonLifeLock took to build out an operational analytics platform that processes usage data for their VPN services, consuming petabytes of data across the globe on a daily basis.

NortonLifeLock is a global cybersecurity and internet privacy company that offers services to millions of customers for device security, and identity and online privacy for home and family. NortonLifeLock believes the digital world is only truly empowering when people are confident in their online security. NortonLifeLock has been an AWS customer since 2014.

For any organization, the value of operational data and metrics decreases with time. This lost value can equate to lost revenue and wasted resources. Real-time streaming analytics helps capture this value and provide new insights that can create new business opportunities.

AWS offers a rich set of services that you can use to provide real-time insights and historical trends. These services include managed Hadoop infrastructure services on Amazon EMR as well as serverless options such as Kinesis Data Analytics and AWS Glue.

Amazon EMR also supports multiple programming options for capturing business logic, such as Spark Streaming, Apache Flink, and SQL.

As a customer, it’s important to understand organizational capabilities, project timelines, business requirements, and AWS service best practices in order to define an optimal architecture from performance, cost, security, reliability, and operational excellence perspectives (the five pillars of the AWS Well-Architected Framework).

NortonLifeLock is taking a methodical approach to real-time analytics on AWS while using serverless technology to deliver on key business drivers such as time to market and total cost of ownership. In addition to NortonLifeLock’s implementation, this post provides key lessons learned and best practices for rapid development of real-time analytics workloads.

Business problem

NortonLifeLock offers a VPN product as a freemium service to users. Therefore, they need to enforce usage limits in real time to stop freemium users from using the service when their usage is over the limit. The challenge for NortonLifeLock is to do this in a reliable and affordable fashion.

NortonLifeLock runs its VPN infrastructure in almost all AWS Regions. Migrating to AWS from smaller hosting vendors has greatly improved user experience and VPN edge server performance, including a reduction in connection latency, time to connect and connection errors, faster upload and download speed, and more stability and uptime for VPN edge servers.

VPN usage data is collected by VPN edge servers and uploaded to backend stats servers every minute and persisted in backend databases. The usage information serves multiple purposes:

  • Displaying how much data a device has consumed for the past 30 days.
  • Enforcing usage limits on freemium accounts. When a user exhausts their free quota, that user is unable to connect through VPN until the next free cycle.
  • Analyzing usage data by the internal business intelligence (BI) team based on time, marketing campaigns, and account types, and using this data to predict future growth, ability to retain users, and more.

Design challenge

NortonLifeLock had the following design challenges:

  • The solution must be able to simultaneously satisfy both real-time and batch analysis.
  • The solution must be economical. NortonLifeLock VPN has hundreds of thousands of concurrent users, and if a user’s usage information is persisted as it comes in, it results in tens of thousands of reads and writes per second and tens of thousands of dollars a month in database costs.

Solution overview

NortonLifeLock decided to split storage into two parts by storing usage data in Amazon DynamoDB for real-time access and in Amazon Simple Storage Service (Amazon S3) for analysis, which addresses real-time enforcement and BI needs. Kinesis Data Analytics aggregates and loads data to Amazon S3 and DynamoDB. With Amazon Kinesis Data Streams and AWS Lambda as consumers of Kinesis Data Analytics, the implementation of user and device-level aggregations was simplified.

To keep costs down, user usage data was aggregated by the hour and persisted in DynamoDB. This spread hundreds of thousands of writes over an hour and reduced DynamoDB cost by 30 times.

Although increasing aggregation might not be an option for other problem domains, it’s acceptable in this case because it’s not necessary to be precise to the minute for user usage, and it’s acceptable to calculate and enforce the usage limit every hour.

The following diagram illustrates the high-level architecture. The solution is broken into three logical parts:

  • End-users – Real-time queries from devices to display current usage information (how much data is used daily)
  • Business analysts – Query historical usage information through Amazon Athena to extract business insights
  • Usage limit enforcement – Usage data ingestion and aggregation in real time

The solution has the following workflow:

  1. Usage data is collected by a VPN edge server and sends it to the backend service through Application Load Balancer.
  2. A single usage data record sent by the VPN edge server contains usage data for many users. A stats splitter splits the message into individual usage stats per user and forwards the message to Kinesis Data Streams.
  3. Usage data is consumed by both the legacy stats processor and the new Apache Flink application developed and deployed on Kinesis Data Analytics.
  4. The Apache Flink application carries out the following tasks:
    1. Aggregate device usage data hourly and send the aggregated result to Amazon S3 and the outgoing Kinesis data stream, which is picked up by a Lambda function that persists the usage data in DynamoDB.
    2. Aggregate device usage data daily and send the aggregated result to Amazon S3.
    3. Aggregate account usage data hourly and forward the aggregated results to the outgoing data stream, which is picked up by a Lambda function that checks if account usage is over the limit for that account. If account usage is over the limit, the function forwards the account information to another Lambda function, via Amazon Simple Queue Service (Amazon SQS), to cut off access on that account.

Design journey

NortonLifeLock needed a solution that was capable of real-time streaming and batch analytics. Kinesis Data Analysis fits this requirement because of the following key features:

  • Real-time streaming and batch analytics for data aggregation
  • Fully managed with a pay-as-you-go model
  • Auto scaling

NortonLifeLock needed Kinesis Data Analytics to do the following:

  • Aggregate customer usage data per device hourly and send results to Kinesis Data Streams (ultimately to DynamoDB) and the data lake (Amazon S3)
  • Aggregate customer usage data per account hourly and send results to Kinesis Data Streams (ultimately to DynamoDB and Lambda, which enforces usage limit)
  • Aggregate customer usage data per device daily and send results to the data lake (Amazon S3)

The legacy system processes usage data from an incoming Kinesis data stream, and they plan to use Kinesis Data Analytics to consume and process production data from the same stream. As such, NortonLifeLock started with SQL applications on Kinesis Data Analytics.

First attempt: Kinesis Data Analytics for SQL

Kinesis Data Analytics with SQL provides a high-level SQL-based abstraction for real-time stream processing and analytics. It’s configuration driven and very simple to get started. NortonLifeLock was able to create a prototype from scratch, get to production, and process the production load in less than 2 weeks. The solution met 90% of the requirements, and there were alternates for the remaining 10%.

However, they started to receive “read limit exceeded” alerts from the source data stream, and the legacy application was read throttled. With Amazon Support’s help, they traced the issues to the drastic reversal of the Kinesis Data Analytics MillisBehindLatest metric in Kinesis record processing. This was correlated to the Kinesis Data Analytics auto scaling events and application restarts, as illustrated by the following diagram. The highlighted areas show the correlation between spikes due to autoscaling and reversal of MillisBehindLatest metrics.

Here’s what happened:

  • Kinesis Data Analytics for SQL scaled up KPU due to load automatically, and the Kinesis Data Analytics application was restarted (part of scaling up).
  • Kinesis Data Analytics for SQL supports the at least once delivery model and uses checkpoints to ensure no data loss. But it doesn’t support taking a snapshot and restoring from the snapshot after a restart. For more details, see Delivery Model for Persisting Application Output to an External Destination.
  • When the Kinesis Data Analytics for SQL application was restarted, it needed to reprocess data from the beginning of the aggregation window, resulting in a very large number of duplicate records, which led to a dramatic increase in the Kinesis Data Analytics MillisBehindLatest metric.
  • To catch up with incoming data, Kinesis Data Analytics started re-reading from the Kinesis data stream, which led to over-consumption of read throughput and the legacy application being throttled.

In summary, Kinesis Data Analytics for SQL’s duplicates record processing on restarts, no other means to eliminate duplicates, and limited ability to control auto scaling led to this issue.

Although they found Kinesis Data Analytics for SQL easy to get started, these limitations demanded other alternatives. NortonLifeLock reached out to the Kinesis Data Analytics team and discussed the following options:

  • Option 1 – AWS was planning to release a new service, Kinesis Data Analytics Studio for SQL, Python, and Scala, which addresses these limitations. But this service was still a few months away (this service is now available, launched May 27, 2021).
  • Option 2 – The alternative was to switch to Kinesis Data Analytics for Apache Flink, which also provides the necessary tools to address all their requirements.

Second attempt: Kinesis Data Analytics for Apache Flink

Apache Flink has a comparatively steep learning curve (we used Java for streaming analytics instead of SQL), and it took about 4 weeks to build the same prototype, deploy it to Kinesis Data Analytics, and test the application in production. NortonLifeLock had to overcome a few hurdles, which we document in this section along with the lessons learned.

Challenge 1: Too many writes to outgoing Kinesis data stream

The first thing they noticed was that the write threshold on the outgoing Kinesis data stream was greatly exceeded. Kinesis Data Analytics was attempting to write 10 times the amount of expected data to the data stream, with 95% of data throttled.

After a lengthy investigation, it turned out that having too much parallelism in the Kinesis Data Analytics application led to this issue. They had followed default recommendations and set parallelism to 12 and it scaled up to 16. This means that every hour, 16 separate threads were attempting to write to the destination data stream simultaneously, leading to massive contention and writes throttled. These threads attempted to retry continuously, until all records were written to the data stream. This resulted in 10 times the amount of data processing attempted, even though only one tenth of the writes eventually succeeded.

The solution was to reduce parallelism to 4 and disable auto scaling. In the preceding diagram, the percentage of throttled records dropped to 0 from 95% after they reduced parallelism to 4 in the Kinesis Data Analytics application. This also greatly improved KPU utilization and reduced Kinesis Data Analytics cost from $50 a day to $8 a day.

Challenge 2: Use Kinesis Data Analytics sink aggregation

After tuning parallelism, they still noticed occasional throttling by Kinesis Data Streams because of the number of records being written, not record size. To overcome this, they turned on Kinesis Data Analytics sink aggregation to reduce the number of records being written to the data stream, and the result was dramatic. They were able to reduce the number of writes by 1,000 times.

Challenge 3: Handle Kinesis Data Analytics Flink restarts and the resulting duplicate records

Kinesis Data Analytics applications restart because of auto scaling or recovery from application or task manager crashes. When this happens, Kinesis Data Analytics saves a snapshot before shutdown and automatically reloads the latest snapshot and picks up where the work was left off. Kinesis Data Analytics also saves a checkpoint every minute so no data is lost, guaranteeing exactly-once processing.

However, when the Kinesis Data Analytics application shut down in the middle of sending results to Kinesis Data Streams, it doesn’t guarantee exactly-once data delivery. In fact, Flink only guarantees at least once delivery to Kinesis Data Analytics sink, meaning that Kinesis Data Analytics guarantees to send a record at least once, which leads to duplicate records sent when Kinesis Data Analytics is restarted.

How were duplicate records handled in the outgoing data stream?

Because duplicate records aren’t handled by Kinesis Data Analytics when sinks do not have exactly-once semantics, the downstream application must deal with the duplicate records. The first question you should ask is whether it’s necessary to deal with the duplicate records. Maybe it’s acceptable to tolerate duplicate records in your application? This, however, is not an option for NortonLifeLock, because no user wants to have their available usage taken twice within the same hour. So, logic had to be built in the application to handle duplicate usage records.

To deal with duplicate records, you can employ a strategy in which the application saves an update timestamp along with the user’s latest usage. When a record comes in, the application reads existing daily usage and compares the update timestamp against the current time. If the difference is less than a configured window (50 minutes if the aggregation window is 60 minutes), the application ignores the new record because it’s a duplicate. It’s acceptable for the application to potentially undercount vs. overcount user usage.

How were duplicate records handled in the outgoing S3 bucket?

Kinesis Data Analytics writes temporary files in Amazon S3 before finalizing and removing them. When Kinesis Data Analytics restarts, it attempts to write new S3 files, and potentially leaves behind temporary S3 files because of restart. Because Athena ignores all temporary S3 files, no further is action needed. If your BI tools take temporary S3 files into consideration, you have to configure the Amazon S3 lifecycle policy to clean up temporary S3 files after a certain time.

Conclusion

NortonLifelock has been successfully running a Kinesis Data Analytics application in production since May 2021. It provides several key benefits. VPN users can now keep track of their usage in near-real time. BI analysts can get timely insights that are used for targeted sales and marketing campaigns, and upselling features and services. VPN usage limits are enforced in near-real time, thereby optimizing the network resources. NortonLifelock is saving tens of thousands of dollars each month with this real-time streaming analytics solution. And this telemetry solution is able to keep up with petabytes of data flowing through their global VPN service, which is seeing double-digit monthly growth.

To learn more about Kinesis Data Analytics and getting started with serverless streaming solutions on AWS, please see Developer Guide for Studio, the easiest way to build Apache Flink applications in SQL, Python, Scala in a notebook interface.


About the Authors

Lei Gu has 25 years of software development experience and the architect for three key Norton products, Norton Secure Backup, VPN and Norton Family. He is passionate about cloud transformation and most recently spoke about moving from Cassandra to Amazon DynamoDB at AWS re:Invent 2019. Check out his Linkedin profile at https://www.linkedin.com/in/leigu/.

Madhu Nunna is a Sr. Solutions Architect at AWS, with over 20 years of experience in networks and cloud, with the last two years focused on AWS Cloud. He is passionate about Analytics and AI/ML. Outside of work, he enjoys hiking and reading books on philosophy, economics, history, astronomy and biology.

Register now for Flink Forward Global, October 26-27, 2021

Post Syndicated from Deepthi Mohan original https://aws.amazon.com/blogs/big-data/register-now-for-flink-forward-global-october-26-27-2021/

Flink Forward Global 2021 is a 2-day virtual conference for the Apache Flink and stream processing communities. Apache Flink is an open-source distributed engine for processing data streams that can support both streaming and batch workloads. Amazon Kinesis Data Analytics is a fully managed service for Apache Flink on AWS that reduces the complexity of building, managing, and integrating Apache Flink applications with other AWS services. You can use Kinesis Data Analytics for Apache Flink to process data from Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, and a variety of data sources for use cases such as streaming ETL (extract, transform, and load), log analysis, event-driven applications, and anomaly and fraud detection in real time.

Flink Forward has keynote presentations and talks on production Flink use cases, technical deep dive sessions, and the growth of the Flink ecosystem. You can meet core Flink committers, new and experienced users, and thought leaders who share experiences and best practices in stream processing, real-time analytics, and the management of mission-critical Flink deployments in production.

AWS is a Platinum sponsor for Flink Forward. If you’re interested in learning about real-time data processing at scale, register now to attend.

Toyota Connected and AWS Design and Deliver Collision Assistance Application

Post Syndicated from Srikanth Kodali original https://aws.amazon.com/blogs/architecture/toyota-connected-and-aws-design-and-deliver-collision-assistance-application/

This post was cowritten by Srikanth Kodali, Sr. IoT Data Architect at AWS, and Will Dombrowski, Sr. Data Engineer at Toyota Connected

Toyota Connected North America (TC) is a technology/big data company that partners with Toyota Motor Corporation and Toyota Motor North America to develop products that aim to improve the driving experience for Toyota and Lexus owners.

TC’s Mobility group provides backend cloud services that are built and hosted in AWS. Together, TC and AWS engineers designed, built, and delivered their new Collision Assistance product, which debuted in early August 2021.

In the aftermath of an accident, Collision Assistance offers Toyota and Lexus drivers instructions to help them navigate a post-collision situation. This includes documenting the accident, filing an insurance claim, and transitioning to the repair process.

In this blog post, we’ll talk about how our team designed, built, refined, and deployed the Collision Assistance product with Serverless on AWS services. We’ll discuss our goals in developing this product and the architecture we developed based on those goals. We’ll also present issues we encountered when testing our initial architecture and how we resolved them to create the final product.

Building a scalable, affordable, secure, and high performing product

We used a serverless architecture because it is often less complex than other architecture types. Our goals in developing this initial architecture were to achieve scalability, affordability, security, and high performance, as described in the following sections.

Scalability and affordability

In our initial architecture, Amazon Simple Queue Service (Amazon SQS) queues, Amazon Kinesis streams, and AWS Lambda functions allow data pipelines to run servers only when they’re needed, which introduces cost savings. They also process data in smaller units and run them in parallel, which allows data pipelines to scale up efficiently to handle peak traffic loads. These services allow for an architecture that can handle non-uniform traffic without needing additional application logic.

Security

Collision Assistance can deliver information to customers via push notifications. This data must be encrypted because many data points the application collects are sensitive, like geolocation.

To secure this data outside our private network, we use Amazon Simple Notification Service (Amazon SNS) as our delivery mechanism. Amazon SNS provides HTTPS endpoint delivery of messages coming to topics and subscriptions. AWS allows us to enable at-rest and/or in-transit encryption for all of our other architectural components as well.

Performance

To quantify our product’s performance, we review the “notification delay.” This metric evaluates the time between the initial collision and when the customer receives a push notification from Collision Assistance. Our ultimate goal is to have the push notification sent within minutes of a crash, so drivers have this information in near real time.

Initial architecture

Figure 1 presents our initial architecture implementation that aims to predict whether a crash has occurred and reduce false positives through the following data pipeline:

  1. The Kinesis stream receives vehicle data from an upstream ingestion service, as discussed in the Enhancing customer safety by leveraging the scalable, secure, and cost-optimized Toyota Connected Data Lake blog.
  2. A Lambda function writes lookup data to Amazon DynamoDB for every Kinesis record.
  3. This Lambda function decreases obvious non-crash data. It sends the current record (X) to Amazon SQS. If X exceeds a certain threshold, it will remain a crash candidate.
  4. Amazon SQS sets a delivery delay so that there will be more Kinesis/DynamoDB records available when X is processed later in the pipeline.
  5. A second Lambda function reads the data from the SQS message. It queries DynamoDB to find the Kinesis lookup data for the message before (X-1) and after (X+1) the crash candidate.
  6. Kinesis GetRecords retrieves X-1 and X+1, because X+1 will exist after the SQS delivery delay times out.
  7. The X-1, X, and X+1 messages are sent to the data science (DS) engine.
  8. When a crash is accurately predicted, these results are stored in a DynamoDB table.
  9. The push notification is sent to the vehicle owner. (Note: the push notification is still in ‘select testing phase’)
Diagram and description of our initial architecture implementation

Figure 1. Diagram and description of our initial architecture implementation

To be consistent with privacy best practices and reduce server uptime, this architecture uses the minimum amount of data the DS engine needs.

We filter out records that are lower than extremely low thresholds. Once these records are filtered out, around 40% of the data fits the criteria to be evaluated further. This reduces the server capacity needed by the DS engine by 60%.

To reduce false positives, we gather data before and after the timestamps where the extremely low thresholds are exceeded. We then evaluate the sensor data across this timespan and discard any sets with patterns of abnormal sensor readings or other false positive conditions. Figure 2 shows the time window we initially used.

Longitudinal acceleration versus time

Figure 2. Longitudinal acceleration versus time

Adjusting our initial architecture for better performance

Our initial design worked well for processing a few sample messages and achieved the desired near real-time delivery of the push notification. However, when the pipeline was enabled for over 1 million vehicles, certain limits were exceeded, particularly for Kinesis and Lambda integrations:

  • Our Kinesis GetRecords API exceeded the allowed five requests per shard per second. With each crash candidate retrieving an X-1 and X+1 message, we could only evaluate two per shard per second, which isn’t cost effective.
  • Additionally, the downstream SQS-reading Lambda function was limited to 10 records per second per invocation. This meant any slowdown that occurs downstream, such as during DS engine processing, could cause the queue to back up significantly.

To improve cost and performance for the Kinesis-related functionality, we abandoned the DynamoDB lookup table and the GetRecord calls in favor of using a Redis cache cluster on Amazon ElastiCache. This allows us to avoid all throughput exceptions from Kinesis and focus on scaling the stream based on the incoming throughput alone. The ElastiCache cluster scales capacity by adding or removing shards, which improves performance and cost efficiency.

To solve the Amazon SQS/Lambda integration issue, we funneled messages directly to an additional Kinesis stream. This allows the final Lambda function to use some of the better scaling options provided to Kinesis-Lambda event source integrations, like larger batch sizes and max-parallelism.

After making these adjustments, our tests proved we could scale to millions of vehicles as needed. Figure 3 shows a diagram of this final architecture.

Final architecture

Figure 3. Final architecture

Conclusion

Engineers across many functions worked closely to deliver the Collision Assistance product.

Our team of backend Java developers, infrastructure experts, and data scientists from TC and AWS built and deployed a near real-time product that helps Toyota and Lexus drivers document crash damage, file an insurance claim, and get updates on the actual repair process.

The managed services and serverless components available on AWS provided TC with many options to test and refine our team’s architecture. This helped us find the best fit for our use case. Having this flexibility in design was a key factor in designing and delivering the best architecture for our product.

 

How MEDHOST’s cardiac risk prediction successfully leveraged AWS analytic services

Post Syndicated from Pandian Velayutham original https://aws.amazon.com/blogs/big-data/how-medhosts-cardiac-risk-prediction-successfully-leveraged-aws-analytic-services/

MEDHOST has been providing products and services to healthcare facilities of all types and sizes for over 35 years. Today, more than 1,000 healthcare facilities are partnering with MEDHOST and enhancing their patient care and operational excellence with its integrated clinical and financial EHR solutions. MEDHOST also offers a comprehensive Emergency Department Information System with business and reporting tools. Since 2013, MEDHOST’s cloud solutions have been utilizing Amazon Web Services (AWS) infrastructure, data source, and computing power to solve complex healthcare business cases.

MEDHOST can utilize the data available in the cloud to provide value-added solutions for hospitals solving complex problems, like predicting sepsis, cardiac risk, and length of stay (LOS) as well as reducing re-admission rates. This requires a solid foundation of data lake and elastic data pipeline to keep up with multi-terabyte data from thousands of hospitals. MEDHOST has invested a significant amount of time evaluating numerous vendors to determine the best solution for its data needs. Ultimately, MEDHOST designed and implemented machine learning/artificial intelligence capabilities by leveraging AWS Data Lab and an end-to-end data lake platform that enables a variety of use cases such as data warehousing for analytics and reporting.

Since you’re reading this post, you may also be interested in the following:

Getting started

MEDHOST’s initial objectives in evaluating vendors were to:

  • Build a low-cost data lake solution to provide cardiac risk prediction for patients based on health records
  • Provide an analytical solution for hospital staff to improve operational efficiency
  • Implement a proof of concept to extend to other machine learning/artificial intelligence solutions

The AWS team proposed AWS Data Lab to architect, develop, and test a solution to meet these objectives. The collaborative relationship between AWS and MEDHOST, AWS’s continuous innovation, excellent support, and technical solution architects helped MEDHOST select AWS over other vendors and products. AWS Data Lab’s well-structured engagement helped MEDHOST define clear, measurable success criteria that drove the implementation of the cardiac risk prediction and analytical solution platform. The MEDHOST team consisted of architects, builders, and subject matter experts (SMEs). By connecting MEDHOST experts directly to AWS technical experts, the MEDHOST team gained a quick understanding of industry best practices and available services allowing MEDHOST team to achieve most of the success criteria at the end of a four-day design session. MEDHOST is now in the process of moving this work from its lower to upper environment to make the solution available for its customers.

Solution

For this solution, MEDHOST and AWS built a layered pipeline consisting of ingestion, processing, storage, analytics, machine learning, and reinforcement components. The following diagram illustrates the Proof of Concept (POC) that was implemented during the four-day AWS Data Lab engagement.

Ingestion layer

The ingestion layer is responsible for moving data from hospital production databases to the landing zone of the pipeline.

The hospital data was stored in an Amazon RDS for PostgreSQL instance and moved to the landing zone of the data lake using AWS Database Migration Service (DMS). DMS made migrating databases to the cloud simple and secure. Using its ongoing replication feature, MEDHOST and AWS implemented change data capture (CDC) quickly and efficiently so MEDHOST team could spend more time focusing on the most interesting parts of the pipeline.

Processing layer

The processing layer was responsible for performing extract, tranform, load (ETL) on the data to curate them for subsequent uses.

MEDHOST used AWS Glue within its data pipeline for crawling its data layers and performing ETL tasks. The hospital data copied from RDS to Amazon S3 was cleaned, curated, enriched, denormalized, and stored in parquet format to act as the heart of the MEDHOST data lake and a single source of truth to serve any further data needs. During the four-day Data Lab, MEDHOST and AWS targeted two needs: powering MEDHOST’s data warehouse used for analytics and feeding training data to the machine learning prediction model. Even though there were multiple challenges, data curation is a critical task which requires an SME. AWS Glue’s serverless nature, along with the SME’s support during the Data Lab, made developing the required transformations cost efficient and uncomplicated. Scaling and cluster management was addressed by the service, which allowed the developers to focus on cleaning data coming from homogenous hospital sources and translating the business logic to code.

Storage layer

The storage layer provided low-cost, secure, and efficient storage infrastructure.

MEDHOST used Amazon S3 as a core component of its data lake. AWS DMS migration tasks saved data to S3 in .CSV format. Crawling data with AWS Glue made this landing zone data queryable and available for further processing. The initial AWS Glue ETL job stored the parquet formatted data to the data lake and its curated zone bucket. MEDHOST also used S3 to store the .CSV formatted data set that will be used to train, test, and validate its machine learning prediction model.

Analytics layer

The analytics layer gave MEDHOST pipeline reporting and dashboarding capabilities.

The data was in parquet format and partitioned in the curation zone bucket populated by the processing layer. This made querying with Amazon Athena or Amazon Redshift Spectrum fast and cost efficient.

From the Amazon Redshift cluster, MEDHOST created external tables that were used as staging tables for MEDHOST data warehouse and implemented an UPSERT logic to merge new data in its production tables. To showcase the reporting potential that was unlocked by the MEDHOST analytics layer, a connection was made to the Redshift cluster to Amazon QuickSight. Within minutes MEDHOST was able to create interactive analytics dashboards with filtering and drill-down capabilities such as a chart that showed the number of confirmed disease cases per US state.

Machine learning layer

The machine learning layer used MEDHOST’s existing data sets to train its cardiac risk prediction model and make it accessible via an endpoint.

Before getting into Data Lab, the MEDHOST team was not intimately familiar with machine learning. AWS Data Lab architects helped MEDHOST quickly understand concepts of machine learning and select a model appropriate for its use case. MEDHOST selected XGBoost as its model since cardiac prediction falls within regression technique. MEDHOST’s well architected data lake enabled it to quickly generate training, testing, and validation data sets using AWS Glue.

Amazon SageMaker abstracted underlying complexity of setting infrastructure for machine learning. With few clicks, MEDHOST started Jupyter notebook and coded the components leading to fitting and deploying its machine learning prediction model. Finally, MEDHOST created the endpoint for the model and ran REST calls to validate the endpoint and trained model. As a result, MEDHOST achieved the goal of predicting cardiac risk. Additionally, with Amazon QuickSight’s SageMaker integration, AWS made it easy to use SageMaker models directly in visualizations. QuickSight can call the model’s endpoint, send the input data to it, and put the inference results into the existing QuickSight data sets. This capability made it easy to display the results of the models directly in the dashboards. Read more about QuickSight’s SageMaker integration here.

Reinforcement layer

Finally, the reinforcement layer guaranteed that the results of the MEDHOST model were captured and processed to improve performance of the model.

The MEDHOST team went beyond the original goal and created an inference microservice to interact with the endpoint for prediction, enabled abstracting of the machine learning endpoint with the well-defined domain REST endpoint, and added a standard security layer to the MEDHOST application.

When there is a real-time call from the facility, the inference microservice gets inference from the SageMaker endpoint. Records containing input and inference data are fed to the data pipeline again. MEDHOST used Amazon Kinesis Data Streams to push records in real time. However, since retraining the machine learning model does not need to happen in real time, the Amazon Kinesis Data Firehose enabled MEDHOST to micro-batch records and efficiently save them to the landing zone bucket so that the data could be reprocessed.

Conclusion

Collaborating with AWS Data Lab enabled MEDHOST to:

  • Store single source of truth with low-cost storage solution (data lake)
  • Complete data pipeline for a low-cost data analytics solution
  • Create an almost production-ready code for cardiac risk prediction

The MEDHOST team learned many concepts related to data analytics and machine learning within four days. AWS Data Lab truly helped MEDHOST deliver results in an accelerated manner.


About the Authors

Pandian Velayutham is the Director of Engineering at MEDHOST. His team is responsible for delivering cloud solutions, integration and interoperability, and business analytics solutions. MEDHOST utilizes modern technology stack to provide innovative solutions to our customers. Pandian Velayutham is a technology evangelist and public cloud technology speaker.

 

 

 

 

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

Building a Data Pipeline for Tracking Sporting Events Using AWS Services

Post Syndicated from Ashwini Rudra original https://aws.amazon.com/blogs/architecture/building-a-data-pipeline-for-tracking-sporting-events-using-aws-services/

In an evolving world that is increasingly connected, data-centric, and fast-paced, the sports industry is no exception. Amazon Web Services (AWS) has been helping customers in the sports industry gain real-time insights through analytics. You can re-invent and reimagine the fan experience by tracking sports actions and activities. In this blog post, we will highlight common architectural and design patterns for building a data pipeline to track sporting events in real time.

The sports industry is largely comprised of two subsegments: participatory and spectator sports. Participatory sports, for example fitness, golf, boating, and skiing, comprise the largest share of the market. Spectator sports, such as teams/clubs/leagues, individual sports, and racing, are expected to be the fastest growing segment. Sports teams/leagues/clubs comprise the largest share of the Spectator sports segment, and is growing most rapidly.

IoT data pipeline architecture overview

Let’s discuss the infrastructure in three parts:

  1. Infrastructure at the arena itself
  2. Processing data using AWS services
  3. Leveraging this analysis using a graphics overlay (this can be especially useful for broadcasters, OTT channels, and arena users)

Data-gathering devices

Radio-frequency identification (RFID) chips or IoT devices can be worn by players or embedded in the playing equipment. These devices emit 20–50 messages per second. These messages are collected and output using JSON. This information may include player coordinate positions, player speed, statistics, health information, or more. To process the game, leagues, coaches, or broadcasters can analyze this data using analytics tools and/or machine learning.

Figure 1. Data pipeline architecture using AWS Services

Figure 1. Data pipeline architecture using AWS Services

Processing data, feature engineering, and model training at AWS

Use serverless services from AWS when possible in order to keep your solution scalable and cost-efficient. This also helps with operational overhead for teams. You can use the Kinesis family of services for stream ingestion and processing. The streaming data from hundreds to thousands of IoT sources (from equipment and clothing) can be fed to Amazon Kinesis Data Streams (KDS). KDS and Amazon Kinesis Data Firehose provide a buffering mechanism for streaming data before it lands on Amazon Simple Storage Service (S3). With Amazon Kinesis Data Analytics, you can process and analyze Kinesis stream data using powerful SQL, Apache Flink, or Beam. Kinesis Data Analytics also supports building applications in SQL, Java, Scala, and Python. With this service, you can quickly author and run powerful SQL code against Amazon Kinesis Streams as your source. This way you can perform time series analytics, feed real-time dashboards, and create real-time metrics. Read more about Amazon Kinesis Data Analytics for SQL Applications.

You might want to transform or enhance the streaming data before it is delivered to Amazon S3. Amazon Kinesis Data Firehose can be used with an AWS Lambda function to do the transformation. Let’s say you have a player prediction timestamp that you want to represent in a different time format to different ML algorithms. Lambda can process and transform this data. Kinesis Data Firehose will deliver the transformed and raw data to the destination (Amazon S3). This can occur after the specific buffering size or when the buffering interval is reached, whichever happens first.

For more complex transformations, AWS Glue can be used. For example, once the data lands in Amazon S3, you can start preparing and aggregating the training dataset using Amazon SageMaker Data Wrangler. As part of the feature engineering process, you can do the following:

  • Transform the data
  • Delete unneeded columns
  • Impute missing values
  • Perform label encoding
  • Use the quick model option to get a sense of which features are adding predictive power as you progress with your data preparation

All the data preparation and feature engineering tasks can be performed from Data Wrangler’s single visual interface.

Once data is prepared in Amazon S3, Amazon SageMaker can be used for model training. In soccer, you can predict a goal percentage based on the player’s position, acceleration, and past performance history.  SageMaker provides several built-in algorithms that can be trained. For real-time predictions, Amazon API Gateway provides an API layer to clients like an OTT, broadcasting service, or a web browser. API Gateway can invoke a Lambda function, with logic to call a SageMaker endpoint and persist the output to the database. This data can be used later on for further analysis or to fine-tune your models.

Figure 2. Deliver real-time prediction using SageMaker

Figure 2. Deliver real-time prediction using Amazon SageMaker

Computer vision-based object detection techniques can be very useful in Sports. These techniques use deep learning algorithms to predict the pass probability, game player face-off, or win prediction. For the sports industry, object detection technology like these are crucial. They obviate the need for sensors. Real-time object identification can be used to:

  • Generate new advanced analytics regarding player and team performance
  • Aid game officials in making correct calls
  • Provide fans an improved and more data-rich viewing experience

Read Football tracking in the NFL with Amazon SageMaker for more information on how to track using broadcast video data. Using SageMaker, you can train object detection models that analyze thousands of images. You can then locate and classify the football itself, and distinguish it from background objects.

Creating a graphics overlay

When you have the ML inference data and video ingestion ready, you may want to represent this data on your broadcasted video. The graphic overlay feature lets you insert an image (a BMP, PNG, or TGA file) at a specified time. It is displayed as a static overlay on the underlying video for a specified duration. The motion graphic overlay feature lets you insert an animation (a MOV or SWF file, or a series of PNG files) on the underlying video. This can be displayed at a specified time for a specified duration.

For example, a player’s motion prediction can be inserted on video during a game, through a RESTful API call of ML inferences. You can use AWS Elemental Live to achieve this. Read about AWS Elemental Live Graphic Overlay at AWS documentation.

Reducing latency

You may want to reduce latency for analytics such as for player health and safety. Use video, data, or machine learning processing at the arena using AWS Outposts. You can also use AWS Wavelength along with 5G infrastructure. For more information, read Catch Important Moments in Sports with 5G and AWS Wavelength.

Summary

In this blog, we’ve highlighted how customers in the sports industry are using AWS to increase the quality of the game, and enhance the sports fan’s experience. The following benefits can be achieved by building a data pipeline for tracking sporting events using AWS services:

  • Amazon Kinesis collects, processes, and analyzes in-game streaming data in real time. This way both teams and fans get timely insights and can react quickly to new information.
  • The serverless nature of this architecture enables a cost-effective, scalable, and operationally efficient environment for customers.
  • Amazon Machine Learning services like Amazon SageMaker can be used to enrich the fan viewing experience. It presents in-game predictions such as who will score next, or which team will win the game.

Visit our AWS Sports Partnerships page for more information on how AWS is changing the game.

Secure multi-tenant data ingestion pipelines with Amazon Kinesis Data Streams and Kinesis Data Analytics for Apache Flink

Post Syndicated from Abhinav Krishna Vadlapatla original https://aws.amazon.com/blogs/big-data/secure-multi-tenant-data-ingestion-pipelines-with-amazon-kinesis-data-streams-and-kinesis-data-analytics-for-apache-flink/

When designing multi-tenant streaming ingestion pipelines, there are myriad ways to design and build your streaming solution, each with its own set of trade-offs. The first decision you have to make is the strategy that determines how you choose to physically or logically separate one tenant’s data from another.

Sharing compute and storage resources helps reduce costs; however, it requires strong security measures to prevent cross-tenant data access. This strategy is known as a pool model. In contrast, a silo model helps reduce security complexity by having each tenant have its own set of isolated resources. However, this increases cost and operational overhead. A more detailed review of tenant isolation models is covered in the SaaS Storage Strategies whitepaper. In this post, we focus on the pool model to optimize for cost when supporting a multi-tenant streaming ingestion architecture.

Consider a retail industry data as a service (DaaS) company that ingests point of sale (POS) data from multiple customers and curates reports that blend sale transactions with third-party data in near-real time. The DaaS company can benefit from sharing compute and storage resources to reduce costs and stay competitive. For security, the DaaS company needs to authenticate each customer request and, to support a pool model, also needs to guarantee that data issues from one tenant don’t affect reports consumed by other customers. Similar scenarios apply to other industries that need to ingest data from semi-trusted servers. For example, in supply chain, a company could be streaming data from multiple suppliers to maintain a near-real-time status of SKUs and purchase orders. In the education industry, a third-party company could ingest data from servers at multiple schools and provide aggregated data to government agencies.

To build a multi-tenant streaming ingestion pipeline with shared resources, we walk you through an architecture that allows semi-trusted servers to use Amazon Kinesis Data Streams using the AWS IoT credentials provider feature for authentication, Amazon API Gateway as a proxy for authorization, and an Amazon Kinesis Data Analytics for Apache Flink application to aggregate and write data partitioned by the tenant in near-real time into an Amazon Simple Storage Service (Amazon S3) data lake. With this architecture, you remove the operational overhead of maintaining multiple Kinesis data streams (one per customer) and allow for cost optimization opportunities by performing better utilization of your provisioned Kinesis data stream shards.

The following architecture diagram illustrates the data ingestion pipeline.

In this architecture, authorized servers from one or multiple third-party companies send messages to an API Gateway endpoint. The endpoint puts messages into the proper partition of a shared Kinesis data stream. Finally, a Kinesis Data Analytics consumer application aggregates, compresses, and writes data into the proper partition of an S3 data lake.

The following sections describe in more detail the multi-tenant architecture considerations and implementation details for this architecture.

Authentication

First, you need to decide on the desired authentication mechanisms. To simplify onboarding new customers and eliminate the need for hardcoded credentials on customers servers, we recommend looking into the credentials provider feature of AWS IoT. Each tenant can use a provisioned x.509 certificate to securely retrieve temporary credentials and authenticate against AWS services using an AWS Identity and Access Management (IAM) role. For more information on how this works, see How to Eliminate the Need for Hardcoded AWS Credentials in Devices by Using the AWS IoT Credentials Provider.

For additional authentication mechanisms directly with API Gateway, see Controlling and managing access to a REST API in API Gateway.

Authorization

After you’re authenticated with IAM, the next step is authorization. Simply put, make sure each tenant can only write to their respective data lake partition. One of the key risks to mitigate in a multi-tenant steaming ingestion workflow is the scenario where a tenant server is compromised and it attempts to impersonate other tenants sending bogus data. To guarantee isolation of data ingest and reduce the blast radius of bad data, you could consider the following options:

  • Use a silo model and provision one Kinesis data stream per tenant – Kinesis Data Streams provides access control at the stream level. This approach provides you with complete isolation and the ability to scale your stream capacity up or down on a per-tenant basis. However, there is operational overhead in maintaining multiple streams, and optimizing for cost has limitations. Each data stream is provisioned by increments of one shard or 1 MB/sec of ingestion capacity with up to 1,000 PUT records per second. Pricing is based on shards per hour. One shard could be well beyond your tenant requirements and tenant onboarding costs could scale rapidly.
  • Use AWS IoT Core with one topic per tenant using topic filters and an AWS IoT rule to push data into a shared data streamAWS IoT Core gives access control at the topic level. Each tenant can send data to only their respective topic (for example, tenantID topic) based on their IAM credentials. We can then use an AWS IoT rule to extract the tenantID from the topic and push data into a shared data stream using tenantID as the partition key.
  • Use API Gateway as a proxy with mapping templates and a shared data stream – Kinesis Data Streams doesn’t provide access control at the data partition level. However, API Gateway provides access control at the method and path level. With API Gateway as a proxy, you can use mapping templates to programmatically fetch the tenant UUID from the path and set it as the partition key before pushing the data to Kinesis Data Streams.

Optimize for costs

The last two preceding options use a pool model and share a single Kinesis data stream to reduce operational overhead and costs. To optimize costs even further, you need to consider the pricing model of each of these services (API Gateway vs. AWS IoT Core) and three factors in your use case: the average size for each message, the rate at which the data is being ingested, and the data latency requirements.

Consider an example where you have 1,000 tenants (devices) and each produces data at the rate of one request per second with an average payload of 8 KB. AWS IoT Core is priced per million messages and per million rules. Each message is metered at 5 KB increments, so you’re charged for two messages per payload. If you have small payloads and very low latency requirements, AWS IoT Core is likely your best choice. If you can introduce some latency and buffer your messages at each tenant, then API Gateway is your best option because the pricing model for REST APIs requests is on a per-API call basis and not metered by KB. You can use the AWS Pricing Calculator to quickly decide which option offers the best price for your use case.

For example, with API Gateway, you can optimize your cost even further by reducing the number of API requests. Instead of each tenant sending 8 KB of data per second, you can send 240 KB every 30 seconds and reduce costs considerably. We can explore a sample cost calculation for API Gateway considering this scenario: average size of message: 240 KB, REST API request units per month: 2 request per minute x 60 min x 24 hrs. x 30 days = 86,400 requests x 1,000 tenants = 86,400,000.

The following sections walk you through the configuration of API Gateway and Kinesis to prevent cross-data access when you support a multi-tenant streaming ingestion pipeline architecture.

Enable API Gateway as a Kinesis Data Streams proxy

API Gateway is a fully managed service that makes it easy for developers to publish, maintain, monitor, and secure APIs at any scale. You can create an API Gateway endpoint to expose other AWS services, such as Amazon Simple Notification Service (Amazon SNS), Amazon S3, Kinesis, and even AWS Lambda. All AWS services support dedicated APIs to expose their features. However, the application protocols or programming interfaces are likely to differ from service to service. An API Gateway API with the AWS integration has the advantage of providing a consistent application protocol for your client to access different AWS services. In our use case, we use API Gateway as a proxy to Kinesis in order to handle IAM authentication and authorize clients to invoke URL paths with their unique tenant ID. API Gateway has additional features that are beneficial for multi-tenant applications, like rate limiting API calls per tenant, requests and response transformations, logging and monitoring, and more.

When you configure API Gateway with IAM resource-level permissions, you can make sure each tenant can only make requests to a unique URL path. For example, if the tenant invokes the API Gateway URL with their tenant ID in the path (for example, https://api-id.execute-api.us-east-2.amazonaws.com/{tenantId}), IAM validates that the tenant is authorized to invoke this URL only. For more details on how to set up an IAM policy to a specific API Gateway URL path, see Control access for invoking an API.

Then, to ensure no authorized customer can impersonate other tenant by sending bogus data, API Gateway extracts the tenant ID from the URL path programmatically using the API Gateway mapping template feature. API Gateway allows developers to transform payloads before passing it to backend resources using mapping templates written with JSONPath expressions. With this feature, we can extract the tenant ID from the URL and pass it as the partition key of the shared data stream. The following is a sample mapping template:

{
    "StreamName": "$input.params('stream-name')",
    "Data": "$util.base64Encode($input.json('$.Data'))",
    "PartitionKey": "$input.params('partition')"
}

In the preceding code, partition is the parameter name you specify in your API Gateway resource path. The following screenshot shows what the configuration looks like on the API Gateway console.

After messages in the data stream use the proper partition, the next step is to transform, enrich, and aggregate messages before writing them into an S3 data lake. For this workflow, we use Kinesis Data Analytics for Apache Flink to have full control of the data lake partition configuration. The following section describes the approach to ensure data is written in the proper partition.

Use Kinesis Data Analytics for Apache Flink to process and write data into an S3 data lake

After we guarantee that messages within the data stream have the right tenant ID as the partition key, we can use Kinesis Data Analytics for Apache Flink to continuously process messages in near-real time and store them in Amazon S3. Kinesis Data Analytics for Apache Flink is an easy way to transform and analyze streaming data in real time. Apache Flink is an open-source framework and engine for processing data streams. Kinesis Data Analytics reduces the complexity of building, managing, and integrating Apache Flink applications with other AWS services. Because this solution is also serverless, there are no servers to manage, it scales automatically to match the volume and throughput of your incoming data, and you only pay for the resources your streaming applications consume.

In this scenario, we want to extract the partition key (tenantId) from each Kinesis data stream message, then process all messages within a time window and use the tenant ID as the file prefix of the files we write into the destination S3 bucket. In other words, we write the data into the proper tenant partition. The result writes data in files that look like the following:

s3://mybucket/year=2020/month=1/day=1/tenant=A01/part-0-0
s3://mybucket/year=2020/month=1/day=1/tenant=A02/part-0-1
s3://mybucket/year=2020/month=1/day=1/tenant=A03/part-0-3

To achieve this, we need to implement two custom classes within the Apache Flink application code.

First, we use a custom deserializer class to extract the partition key from the data stream and append it to the body of the message. We can achieve this by overriding the deserialize method of the KinesisDeserializationSchema class:

class CustomKinesisDeserializer implements  KinesisDeserializationSchema<String> {
    private static final Logger log = LogManager.getLogger(CustomKinesisDeserializer.class);
   @Override
    public String deserialize(byte[] bytes, String partitionKey, String seqNum,
                              long approxArrivalTimeStamp, String stream, String shardId) throws IOException {
        log.debug("deserialize - enter");
        String s = new String(bytes);
        JSONObject json = new JSONObject(s);
        json.put("tenantid", partitionKey);
        return json.toString();
    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

Next, we use a customBucketAssignerclass to use the partition key in the body of the message (in our case, the tenant ID) as the bucket prefix:

private static final BucketAssigner<String, String> assigner = new BucketAssigner<String, String> () {

        @Override
        public String getBucketId(String element, BucketAssigner.Context context) {
            log.debug("getBucketId - enter");
            JSONObject json = new JSONObject(element);
            if (json.has("tenantid")) {
                String tenantId = json.getString("tenantid");
                return "tenantid=" + tenantId;
            }
            return "tenantid=unknown";
        }

        @Override
        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
};

The following code is the full sample class for the Kinesis Data Analytics with Apache Flink application. The purpose of the sample code is to illustrate how you can obtain the partition key from the data stream and use it as your bucket prefix via the BucketAssigner class. Your implementation might require additional windowing logic to enrich, aggregate, and transform your data before writing it into an S3 bucket. In this post, we write data into a tenantId partition, but your code might require additional partition fields (such as by date). For additional code examples, see Kinesis Data Analytics for Apache Flink: Examples.

package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class S3StreamingSinkWithPartitionsJob {

    private static final Logger log = LogManager.getLogger(S3StreamingSinkWithPartitionsJob.class);
    private static String s3SinkPath;
    private static String inputStreamName;
    private static String region;

    /**
     * Custom BucketAssigner to specify the bucket path/prefix with the Kinesis Stream partitionKey.
     *
     * Sample code. Running application with debug mode with this implementation will expose data into log files
     */
    private static final BucketAssigner<String, String> assigner = new BucketAssigner<String, String> () {

        @Override
        public String getBucketId(String element, BucketAssigner.Context context) {
            log.debug("getBucketId - enter");
            JSONObject json = new JSONObject(element);
            if (json.has("tenantid")) {
                String tenantId = json.getString("tenantid");
                return "tenantid=" + tenantId;
            }
            return "tenantid=unknown";
        }

        @Override
        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    };


    private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) throws IOException {
        log.debug("createSourceFromStaticConfig - enter - variables: {region:" + region +
                ", inputStreamName:" + inputStreamName + "}");
        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

        /*
         * Implementinga custom serializer class that extends KinesisDeserializationSchema interface
         * to get additional values from partition keys.
         */
        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                new CustomKinesisDeserializer(),
                inputProperties
        ));
    }

    private static StreamingFileSink<String> createS3SinkFromStaticConfig() {
        log.debug("createS3SinkFromStaticConfig - enter - variables: { s3SinkPath:" + s3SinkPath + "}");
        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<String>("UTF-8"))
                .withBucketAssigner(assigner)
                .build();
        return sink;
    }

    public static void main(String[] args) throws Exception {

        Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
        Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
        region = consumerProperties.getProperty("Region","us-west-2");
        inputStreamName = consumerProperties.getProperty("InputStreamName");
        s3SinkPath = "s3a://" + consumerProperties.getProperty("S3SinkPath") + "/data";

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> input = createSourceFromStaticConfig(env);
        input.addSink(createS3SinkFromStaticConfig());
        env.execute("Flink S3 Streaming with Partitions Sink Job");
    }

}

/**
 * Custom deserializer to pass partitionKey from KDS into the record value. The partition key can be used
 * by the bucket assigner to leverage it as the s3 path/prefix/partition.
 *
 * Sample code. Running application with debug mode with this implementation will expose data into log files
 */

class CustomKinesisDeserializer implements  KinesisDeserializationSchema<String> {

    private static final Logger log = LogManager.getLogger(CustomKinesisDeserializer.class);

    @Override
    public String deserialize(byte[] bytes, String partitionKey, String seqNum,
                              long approxArrivalTimeStamp, String stream, String shardId) throws IOException {
        log.debug("deserialize - enter");
        String s = new String(bytes);
        JSONObject json = new JSONObject(s);
        json.put("tenantid", partitionKey);
        return json.toString();
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }

}

To test and build this multi-tenant stream ingestion pipeline, you can deploy an AWS CloudFormation template in your AWS environment. The following section provides step-by-step instructions on how to deploy and test the sample template.

Deploy a sample multi-tenant streaming ingestion pipeline

AWS CloudFormation simplifies provisioning and managing infrastructure and services on AWS via JSON or .yaml templates. Follow these instructions to deploy and test the sample workflow described in this post. The instructions assume a basic understanding of AWS Cloud concepts, the AWS Management Console, and working with REST APIs.

  1. Create a destination S3 bucket.
  2. Deploy the CloudFormation template.

The template has only been tested in the us-west-2 Region, and creates IAM roles and users with limited access scope. This template doesn’t register CA certificates or implement the AWS IoT credentials provider feature for authentication. To test the pipeline, the template creates an IAM user for authentication with API Gateway. If you want to test the AWS IoT credentials provider feature with this implementation, follow the instructions in How to Eliminate the Need for Hardcoded AWS Credentials in Devices by Using the AWS IoT Credentials Provider.

  1. For Stack name¸ enter a name (for example, flinkapp).
  2. For KDAS3DestinationBucket, enter the name of the S3 bucket you created.
  3. Leave the other parameters as default.

  1. Accept all other options, including acknowledging the template will create IAM principals on your behalf.
  2. Wait until the stack shows the status CREATE_COMPLETE.

Now you can start your Kinesis Data Analytics for Apache Flink application.

  1. On the Kinesis Data Analytics console, choose Analytics applications.
  2. Select the application that starts with KinesisAnalyticsFI_*.
  3. Choose Run.

  1. Choose Run without snapshot.
  2. Wait for the application to show the status Running.

Now you can test sending messages to your API Gateway endpoint. Remember requests should be authenticated. The CloudFormation template created an IAM test user for this purpose. We recommend using a development API tool for this step. For this post, we use Postman.

  1. On the AWS CloudFormation console, navigate to the Outputs tab of your stack.
  2. Note the API Gateway endpoint (InvokeURL) and the name of the IAM test user.

  1. Create and retrieve the access key and secret key of your test user. For instructions, see Programmatic access.

AWS recommends using temporary keys when authenticating requests to AWS services. For testing purposes, we use a long-lived access key from this limited scope test user.

  1. Use your API development tool to build a POST request to your API Gateway endpoint using your IAM test user secrets.

The following screenshot shows the Authorization tab of the request using Postman.

The following screenshot shows the Body tab of the request using Postman.

  1. For the body of the request, you can use the following payload:
{
    Data: {
        "key1": "value1",
        "key2": "value2",
        "key3": "value3"
    }
}

You should get a response from the data stream that looks as follows:

{
 "EncryptionType": "KMS",
 "SequenceNumber": "49619151594519161991565402527623825830782609999622307842",
 "ShardId": "shardId-000000000000"
}

  1. Try to make a request to a different tenant by changing the path from /prod/T001 to /prod/T002.

Because the user isn’t authorized to send data to this endpoint, you get the following error message:

{
    "Message": "User: arn:aws:iam::*******4205:user/flinkapp-MultiTenantStreamTestUser-EWUSMWR0T5I5 is not authorized to perform: execute-api:Invoke on resource: arn:aws:execute-api:us-west-2:********4205:fktw02penb/prod/POST/T002"
}

  1. Browse to your destination S3 bucket.

You should be able to see a new file within your T001 tenant’s folder or partition.

  1. Download and open your file (part-*-*).

The content should look like the following data (in this scenario, we made six requests to the tenant’s API Gateway endpoint):

{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}
{"key1":"value1","key2":"value2","key3":"value3","tenantid":"T001"}

Clean up

After you finalize your testing, delete the CloudFormation stack and any data written into your destination S3 bucket to prevent incurring unnecessary charges.

Conclusion

Sharing resources in multi-tenant architectures allows organizations to optimize for costs while providing controls for proper tenant isolation and security. In this post, we showed you how to use API Gateway as a proxy to authorize tenants to a specific partition in your shared Kinesis data stream and prevent cross-tenant data access when performing data ingestion from semi-trusted servers. We also showed you how buffering data and sharing a single data stream with multiple tenants reduces operational overhead and optimizes for costs by taking advantage of better resource utilization. Check out the Kinesis Data Streams and Kinesis Data Analytics quick starts to evaluate them for your multi-tenant ingestion use case.


About the Authors

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. In his free time, he likes to cook and travel.

 

Pablo Redondo Sanchez is a Senior Solutions Architect at Amazon Web Services. He is a data enthusiast and works with customers to help them achieve better insights and faster outcomes from their data analytics workflows. In his spare time, Pablo enjoys woodworking and spending time outdoor with his family in Northern California.

Auto scaling Amazon Kinesis Data Streams using Amazon CloudWatch and AWS Lambda

Post Syndicated from Matthew Nolan original https://aws.amazon.com/blogs/big-data/auto-scaling-amazon-kinesis-data-streams-using-amazon-cloudwatch-and-aws-lambda/

This post is co-written with Noah Mundahl, Director of Public Cloud Engineering at United Health Group.

In this post, we cover a solution to add auto scaling to Amazon Kinesis Data Streams. Whether you have one stream or many streams, you often need to scale them up when traffic increases and scale them down when traffic decreases. Scaling your streams manually can create a lot of operational overhead. If you leave your streams overprovisioned, costs can increase. If you want the best of both worlds—increased throughput and reduced costs—then auto scaling is a great option. This was the case for United Health Group. Their Director of Public Cloud Engineering, Noah Mundahl, joins us later in this post to talk about how adding this auto scaling solution impacted their business.

Overview of solution

In this post, we showcase a lightweight serverless architecture that can auto scale one or many Kinesis data streams based on throughput. It uses Amazon CloudWatch, Amazon Simple Notification Service (Amazon SNS), and AWS Lambda. A single SNS topic and Lambda function process the scaling of any number of streams. Each stream requires one scale-up and one scale-down CloudWatch alarm. For an architecture that uses Application Auto Scaling, see Scale Amazon Kinesis Data Streams with AWS Application Auto Scaling.

The workflow is as follows:

  1. Metrics flow from the Kinesis data stream into CloudWatch (bytes/second, records/second).
  2. Two CloudWatch alarms, scale-up and scale-down, evaluate those metrics and decide when to scale.
  3. When one of these scaling alarms triggers, it sends a message to the scaling SNS topic.
  4. The scaling Lambda function processes the SNS message:
    1. The function scales the data stream up or down using UpdateShardCount:
      1. Scale-up events double the number of shards in the stream
      2. Scale-down events halve the number of shards in the stream
    2. The function updates the metric math on the scale-up and scale-down alarms to reflect the new shard count.

Implementation

The scaling alarms rely on CloudWatch alarm metric math to calculate a stream’s maximum usage factor. This usage factor is a percentage calculation from 0.00–1.00, with 1.00 meaning the stream is 100% utilized in either bytes per second or records per second. We use the usage factor for triggering scale-up and scale-down events. Our alarms use the following usage factor thresholds to trigger scaling events: >= 0.75 for scale-up and < 0.25 for scale-down. We use 5-minute data points (period) on all alarms because they’re more resistant to Kinesis traffic micro spikes.

Scale-up usage factor

The following screenshot shows the metric math on a scale-up alarm.

The scale-up max usage factor for a stream is calculated as follows:

s1 = Current shard count of the stream
m1 = Incoming Bytes Per Period, directly from CloudWatch metrics
m2 = Incoming Records Per Period, directly from CloudWatch metrics
e1 = Incoming Bytes Per Period with missing data points filled by zeroes
e2 = Incoming Records Per Period with missing data points filled by zeroes
e3 = Incoming Bytes Usage Factor 
   = Incoming Bytes Per Period / Max Bytes Per Period
   = e1/(1024*1024*60*$kinesis_period_mins*s1)
e4 = Incoming Records Usage Factor  
   = Incoming Records Per Period / Max Records Per Period 
   = e2/(1000*60*$kinesis_period_mins*s1) 
e6 = Max Usage Factor: Incoming Bytes or Incoming Records 
   = MAX([e3,e4])

Scale-down usage factor

We calculate the scale-down usage factor the same as the scale-up usage factor with some additional metric math to (optionally) take into account the iterator age of the stream to block scale-downs when stream processing is falling behind. This is useful if you’re using Lambda functions per shard, known as the Parallelization Factor, to process your streams. If you have a backlog of data, scaling down reduces the number of Lambda functions you need to process that backlog.

The following screenshot shows the metric math on a scale-down alarm.

The scale-down max usage factor for a stream is calculated as follows:

s1 = Current shard count of the stream
s2 = Iterator Age (in minutes) after which we begin blocking scale downs	
m1 = Incoming Bytes Per Period, directly from CloudWatch metrics
m2 = Incoming Records Per Period, directly from CloudWatch metrics
e1 = Incoming Bytes Per Period with missing data points filled by zeroes
e2 = Incoming Records Per Period with missing data points filled by zeroes
e3 = Incoming Bytes Usage Factor 
   = Incoming Bytes Per Period / Max Bytes Per Period
   = e1/(1024*1024*60*$kinesis_period_mins*s1)
e4 = Incoming Records Usage Factor  
   = Incoming Records Per Period / Max Records Per Period 
   = e2/(1000*60*$kinesis_period_mins*s1)
e5 = Iterator Age Adjusted Factor 
   = Scale Down Threshold * (Iterator Age Minutes / Iterator Age Minutes to Block Scale Down)
   = $kinesis_scale_down_threshold * ((FILL(m3,0)*1000/60)/s2)
e6 = Max Usage Factor: Incoming Bytes, Incoming Records, or Iterator Age Adjusted Factor
   = MAX([e3,e4,e5])

Deployment

You can deploy this solution via AWS CloudFormation. For more information, see the GitHub repo.

If you need to generate traffic on your streams for testing, consider using the Amazon Kinesis Data Generator. For more information, see Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.

Optum’s story

As the health services innovation arm of UnitedHealth Group, Optum has been on a multi-year journey towards advancing maturity and capabilities in the public cloud. Our multi-cloud strategy includes using many cloud-native services offered by AWS. The elasticity and self-healing features of the public cloud are among of its many strengths, and we use the automation provided natively by AWS through auto scaling capabilities. However, some services don’t natively provide those capabilities, such as Kinesis Data Streams. That doesn’t mean that we’re complacent and accept inelasticity.

Reducing operational toil

At the scale Optum operates at in the public cloud, monitoring for errors or latency related to our Kinesis data stream shard count and manually adjusting those values in response could become a significant source of toil for our public cloud platform engineering teams. Rather than engaging in that toil, we prefer to engineer automated solutions that respond much faster than humans and help us maintain performance, data resilience, and cost-efficiency.

Serving our mission through engineering

Optum is a large organization with thousands of software engineers. Our mission is to help people live healthier lives and help make the health system work better for everyone. To accomplish that mission, our public cloud platform engineers must act as force multipliers across the organization. With solutions such as this, we ensure that our engineers can focus on building and not on responding to needless alerts.

Conclusion

In this post, we presented a lightweight auto scaling solution for Kinesis Data Streams. Whether you have one stream or many streams, this solution can handle scaling for you. The benefits include less operational overhead, increased throughput, and reduced costs. Everything you need to get started is available on the Kinesis Auto Scaling GitHub repo.


About the authors

Matthew NolanMatthew Nolan is a Senior Cloud Application Architect at Amazon Web Services. He has over 20 years of industry experience and over 10 years of cloud experience. At AWS he helps customers rearchitect and reimagine their applications to take full advantage of the cloud. Matthew lives in New England and enjoys skiing, snowboarding, and hiking.

 

 

Paritosh Walvekar Paritosh Walvekar is a Cloud Application Architect with AWS Professional Services, where he helps customers build cloud native applications. He has a Master’s degree in Computer Science from University at Buffalo. In his free time, he enjoys watching movies and is learning to play the piano.

 

 

Noah Mundahl Noah Mundahl is Director of Public Cloud Engineering at United Health Group.

Understanding data streaming concepts for serverless applications

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/understanding-data-streaming-concepts-for-serverless-applications/

Amazon Kinesis is a suite of managed services that can help you collect, process, and analyze streaming data in near-real time. It consists of four separate services that are designed for common tasks with streaming data: This blog post focuses on Kinesis Data Streams.

One of the main benefits of processing streaming data is that an application can react as new data is generated, instead of waiting for batches. This real-time capability enables new functionality for applications. For example, payment processors can analyze payments in real time to detect fraudulent transactions. Ecommerce websites can use streams of clickstream activity to determine site engagement metrics in near-real time.

Kinesis can be used with Amazon EC2-based and container-based workloads. However, its integration with AWS Lambda can make it a useful data source for serverless applications. Using Lambda as a stream consumer can also help minimize the amount of operational overhead for managing streaming applications.

In this post, I explain important streaming concepts and how they affect the design of serverless applications. This post references the Alleycat racing application. Alleycat is a home fitness system that allows users to compete in an intense series of 5-minute virtual bicycle races. Up to 1,000 racers at a time take the saddle and push the limits of cadence and resistance to set personal records and rank on leaderboards. The Alleycat software connects the stationary exercise bike with a backend application that processes the data from thousands of remote devices.

The Alleycat frontend allows users to configure their races and view real-time leaderboard and historical rankings. The frontend could wait until the end of each race and collect the total output from each racer. Once the batch is ready, it could rank the results and provide a leaderboard once the race is completed. However, this is not very engaging for competitors. By using streaming data instead of a batch, the application show the racers a view of who is winning during the race. This makes the virtual environment more like a real-life cycling race.

Producers and consumers

In streaming data workloads, producers are the applications that produce data and consumers are those that process it. In a serverless streaming application, a consumer is usually a Lambda function, Amazon Kinesis Data Firehose, or Amazon Kinesis Data Analytics.

Kinesis producers and consumers

There are a number of ways to put data into a Kinesis stream in serverless applications, including direct service integrations, client libraries, and the AWS SDK.

Producer

Kinesis Data Streams

Kinesis Data Firehose

Amazon CloudWatch Logs Yes, using subscription filters Yes, using subscription filters
AWS IoT Core Yes, using IoT rule actions Yes, using IoT rule actions
AWS Database Migration Service Yes – set stream as target Not directly.
Amazon API Gateway Yes, via REST API direct service integration Yes, via REST API direct service integration
AWS Amplify Yes – via JavaScript library Not directly
AWS SDK Yes Yes

A single stream may have tens of thousands of producers, which could be web or mobile applications or IoT devices. The Alleycat application uses the AWS IoT SDK for JavaScript to publish messages to an IoT topic. An IoT rule action then uses a direct integration with Kinesis Data Streams to push the data to the stream. This configuration is ideal at the device level, especially since the device may already use AWS IoT Core to receive messages.

The Alleycat simulator uses the AWS SDK to send a large number of messages to the stream. The SDK provides two methods: PutRecord and PutRecords. The first allows you to send a single record, while the second supports up to 500 records per request (or up to 5 MB in total). The simulator uses the putRecords JavaScript API to batch messages to the stream.

A producer can put records directly on a stream, for example via the AWS SDK, or indirectly via other services such as Amazon API Gateway or AWS IoT Core. If direct, the producer must have appropriate permission to write data to the stream. If indirect, the producer must have permission to invoke the proxy service, and then the service must have permission to put data onto the stream.

While there may be many producers, there are comparatively fewer consumers. You can register up to 20 consumers per data stream, which share the outgoing throughout limit per shard. Consumers receive batches of records sequentially, which means processing latency increases as you add more consumers to a stream. For latency-sensitive applications, Kinesis offers enhanced fan-out which gives consumers 2 MB per second dedicated throughput and uses a push model to reduce latency.

Shards, streams, and partition keys

A shard is a sequence of data records in a stream with a fixed capacity. Part of Kinesis billing is based upon the number of shards. A single shard can process up to 1 MB per second or 1,000 records of incoming data. One shard can also send up to 2 MB per second of outgoing data to downstream consumers. These are hard limits on the throughputs of a shard and as your application approaches these limits, you must add more shards to avoid exceeding these limits.

A stream is a collection of these shards and is often a grouping at the workload or project level. Adding another shard to a stream effectively doubles the throughput, though it also doubles the cost. When there is only one shard in a stream, all records sent to that sent are routed to the same shard. With multiple shards, the routing of incoming messages to shards is determined by a partition key.

The data producer adds the partition key before sending the record to Kinesis. The service calculates an MD5 hash of the key, which maps to one of the shards in the stream. Each shard is assigned a range of non-overlapping hash values, so each partition key maps to only one shard.

MD5 hash function

The partition key exists as an alternative to specifying a shard ID directly, since it’s common in production applications to add and remove shards depending upon traffic. How you use the partition key determines the shard-mapping behavior. For example:

  • Same value: If you specify the same string as the partition key, every message is routed to a single shard, regardless of the number of shards in the stream. This is called overheating a shard.
  • Random value: Using a pseudo-random value, such as a UUID, evenly distributes messages between all the shards available.
  • Time-based: Using a timestamp as a partition key may result in a preference for a single shard if multiple messages arrive at the same time.
  • Applicationspecific: The Alleycat application uses the raceId as a partition key to ensure that all messages from a single race are processed by the same shard consumer.

A Lambda function is a consumer application for a data stream and processes one batch of records for each shard. Since Alleycat uses a tumbling window to calculate aggregates between batches, this use of the partition key ensures that all messages for each raceId are processed by the same function. The downside to this architecture is that it is limited to 1,000 incoming messages per second with the same raceId since it is bound to a single shard.

Deciding on a partition key strategy depends upon the specific needs of your workload. In most cases, a random value partition key is often the best approach.

Streaming payloads in serverless applications

When using the SDK to put messages to a stream, the Data attribute can be a buffer, typed array, blob, or string. Combined with the partition key value, the maximum record size is 1 MB. The Data value is base64 encoded when serialized by Kinesis and delivered as an encoded value to downstream consumers. When using a Lambda function consumer, Kinesis delivers batches in a Records array. Each record contains the encoded data attribute, partition key, and additional metadata in a JSON envelope:

JSON transformation from producer to consumer

Ordering and idempotency

Records in a Kinesis stream are delivered to consuming applications in the same order that they arrive at the Kinesis service. The service assigns a sequence number to the record when it is received and this is delivered as part of the payload to a Kinesis consumer:

Sequence number in payload

When using Lambda as a consuming application for Kinesis, by default each shard has a single instance of the function processing records. In this case, ordering is guaranteed as Kinesis invokes the function serially, one batch of records at a time.

Parallelization factor of 1

You can increase the number of concurrent function invocations by setting the ParallelizationFactor on the event source mapping. This allows you to set a concurrency of between 1 and 10, which provides a way to increase Lambda throughout if the IteratorAge metric is increasing. However, one side effect is that ordering per shard is no longer guaranteed, since the shard’s messages are split into multiple subgroups based upon an internal hash.

Parallelization factor is 2

Kinesis guarantees that every record is delivered “at least once”, but occasionally messages are delivered more than once. This is caused by producers that retry messages, network-related timeouts, and consumer retries, which can occur when worker processes restart. In both cases, these are normal activities and you should design your application to handle infrequent duplicate records.

To prevent the duplicate messages causing unintentional side effects, such as charging a payment twice, it’s important to design your application with idempotency in mind. By using transaction IDs appropriately, your code can determine if a given message has been processed previously, and ignore any duplicates. In the Alleycat application, the aggregation and processes of messages is idempotent. If two identical messages are received, processing completes with the same end result.

To learn more about implementing idempotency in serverless applications, read the “Serverless Application Lens: AWS Well-Architected Framework”.

Conclusion

In this post, I introduce some of the core streaming concepts for serverless applications. I explain some of the benefits of streaming architectures and how Kinesis works with producers and consumers. I compare different ways to ingest data, how streams are composed of shards, and how partition keys determine which shard is used. Finally, I explain the payload formats at the different stages of a streaming workload, how message ordering works with shards, and why idempotency is important to handle.

To learn more about building serverless web applications, visit Serverless Land.

Get started with Flink SQL APIs in Amazon Kinesis Data Analytics Studio

Post Syndicated from Sam Mokhtari original https://aws.amazon.com/blogs/big-data/get-started-with-flink-sql-apis-in-amazon-kinesis-data-analytics-studio/

Before the release of Amazon Kinesis Data Analytics Studio, customers relied on Amazon Kinesis Data Analytics for SQL on Amazon Kinesis Data Streams. With the release of Kinesis Data Analytics Studio, data engineers and analysts can use an Apache Zeppelin notebook within Studio to query streaming data interactively from a variety of sources, like Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Simple Storage Service (Amazon S3), and other sources using custom connectors.

In this post, we cover some of the most common query patterns to run on streaming data using Apache Flink relational APIs. Out of the two relational API types supported by Apache Flink, SQL and Table APIs, our focus is on SQL APIs. We expect readers to have knowledge of Kinesis Data Streams, AWS Glue, and AWS Identity and Access Management (IAM). In this post, we use a sales transaction use case to walk you through the examples of tumbling, sliding, session and windows, group by, and joins query operations. We expect readers to have a basic knowledge of SQL queries and streaming window concepts.

Solution architecture

To show the working solution of interactive analytics on streaming data, we use a Kinesis Data Generator UI application to generate the stream of data, which continuously writes to Kinesis Data Streams. For the interactive analytics on Kinesis Data Streams, we use Kinesis Data Analytics Studio that uses Apache Flink as the processing engine, and notebooks powered by Apache Zeppelin. These notebooks come with preconfigured Apache Flink, which allows you to query data from Kinesis Data Streams interactively using SQL APIs. To use SQL queries in the Apache Zeppelin notebook, we configure an AWS Glue Data Catalog table, which is configured to use Kinesis Data Streams as a source. This configuration allows you to query the data stream by referring to the AWS Glue table in SQL queries.

We use an AWS CloudFormation template to create the AWS resources shown in the following diagram.

Set up the environment

After you sign in to your AWS account, launch the CloudFormation template by choosing Launch Stack:


The CloudFormation template configures the following resources in your account:

  • Two Kinesis data streams, one for sales transactions and one for card data
  • A Kinesis Data Analytics Studio application
  • An IAM role (service execution role) for Kinesis Data Analytics Studio
  • Two AWS Glue Data Catalog tables: sales and card

After you complete the setup, sign in to the Kinesis Data Analytics console. On the Kinesis Data Analytics applications page, choose the Studio tab, where you can see the Studio notebook in ready status. Select the Studio notebook, choose Run, and wait until the notebook is in running status. It can take a couple of minutes for the notebook to get into running status.

To run the analysis on streaming data, select the Apache Zeppelin notebook environment and open it. You have the option to create a new note in the notebook.

Run stream analytics in an interactive application

Before you start running interactive analytics with a Studio notebook, you need to start streaming data into your Kinesis data stream, which you created earlier using the CloudFormation stack. To generate streaming data into the data stream, we use a hosted Kinesis Data Generator UI application.

  1. Create an Amazon Cognito user pool in your account and user in that pool. For instructions, see the GitHub repo.
  2. Log in to the Kinesis Data Generator application.
  3. Choose the Region where the CloudFormation template was run to create the Kinesis data stream.
  4. Choose the data stream from the drop-down menu and select the data stream for sales.
  5. Set records per second to 10.
  6. Use the following code for the record template:
{
    "customer_card_id": {{random.number({
            "min":1,
            "max":99
        })}},
    "customer_id": {{random.number({
            "min":100,
            "max":110
        })}},
    "price": {{random.number(
        {
            "min":10,
            "max":500
        }
    )}},
    "product_id": "{{random.arrayElement(
        ["4E5750DC2A1D","E6DA5387367B","B552B4B940D0"]
    )}}"
}
  1. Choose Send Data.

To run the table join queries in the example section, you need to stream sample card data to a separate data stream.

  1. Choose the Region where you created the data stream.
  2. Choose the data stream from the drop-down menu.
  3. Select the data stream for card.
  4. Set records per second to 5.
  5. Use the following code for the record template:
{
    "card_id": {{random.number({
            "min":75,
            "max":99
        })}},
    "card_number": {{random.number({
            "min":23274397,
            "max":47547920
        })}},
    "card_zip": "{{random.arrayElement(
        ["07422","23738","03863"]
    )}}",
    "card_name": "{{random.arrayElement(
        ["Laura Perez","Peter Han","Karla Johnson"]
    )}}"
}
  1. Choose Send Data.
  2. Go back to the notebook note and specify the language Studio uses to run the application.

You need to specify Flink interpreter supported by Apache Zeppelin notebook, like Python, IPython, stream SQL, or batch SQL. Because we use Python Flink streaming SQL APIs in this post, we use the stream SQL interpreter ssql as the first statement:

%flink.ssql(type=update)

Common query patterns with Flink SQL

In this section, we walk you through examples of common query patterns using Flink SQL APIs. In all the examples, we refer to the sales table, which is the AWS Glue table created by the CloudFormation template that has Kinesis Data Streams as a source. It’s the same data stream where you publish the sales data using the Kinesis Data Generator application.

Windows and aggregation

In this section, we cover examples of windowed and aggregate queries: tumbling, sliding, and session window operations.

Tumbling window

In the following example, we use SUM aggregation on a tumbling window. The query emits the total spend for every customer every 30-second window interval.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.115 78 118 B552B4B940D0 80
2021-04-20 21:31:01.328 75 101 E6DA5387367B 60
2021-04-20 21:31:01.504 78 101 4E5750DC2A1D 110
2021-04-20 21:31:01.678 75 148 4E5750DC2A1D 110
2021-04-20 21:31:01.960 78 118 B552B4B940D0 80

We use the following code for our query:

%flink.ssql(type=update)
SELECT TUMBLE_END(proctime, INTERVAL '30' SECOND) as window_end_time, customer_id
, SUM(price) as tumbling_30_seconds_sum
FROM sales
GROUP BY TUMBLE(proctime, INTERVAL '30' SECOND), customer_id

The following table shows our results.

windown_end_time customer_id tumbling_30_seconds_sum
2021-04-20 21:31:01.0 75 170
2021-04-20 21:31:01.0 78 80
2021-04-20 21:31:30.0 75 110
2021-04-20 21:31:30.0 78 190

Sliding window

In this sliding window example, we run a SUM aggregate query that emits the total spend for every customer every 10 seconds for the 30-second window.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:31:01.30 78 101 4E5750DC2A1D 110
2021-04-20 21:31:01.36 75 148 4E5750DC2A1D 110
2021-04-20 21:31:01.40 78 118 B552B4B940D0 80

We use the following code for our query:

%flink.ssql(type=update)
SELECT HOP_END(proctime, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS window_end_time
, customer_id, SUM(price) AS sliding_30_seconds_sum
FROM sales
GROUP BY HOP(proctime, INTERVAL '10' SECOND, INTERVAL '30' SECOND), customer_id

The following table shows our results.

window_end_time customer_id sliding_30_seconds_sum
2021-04-20 21:31:01.10 75 110
2021-04-20 21:31:01.20 75 110
2021-04-20 21:31:01.20 78 80
2021-04-20 21:31:30.30 75 170
2021-04-20 21:31:30.30 78 190
2021-04-20 21:31:30.40 75 280
2021-04-20 21:31:30.40 78 270

Session window

The following example of a session window query finds the total spend per session for a 1-minute gap of inactivity. To generate the result, we stream the data from the Kinesis Data Generator application and stop streaming for more than a minute to create a 1-minute gap of inactivity.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

We use the following code for our query:

%flink.ssql(type=update)
SELECT customer_id, SESSION_START(proctime, INTERVAL '1' MINUTE) AS session_start_time
, SESSION_PROCTIME(proctime, INTERVAL '1' MINUTE) AS session_end_time, SUM(price) AS total_spend
FROM sales
GROUP BY SESSION(proctime, INTERVAL '1' MINUTE), customer_id

The following table shows our results.

session_start_time session_end_time total_spend
2021-04-20 21:31:01.10 2021-04-20 21:32:01.28 250
2021-04-20 21:32:50.30 2021-04-20 21:32:50.36 220

Data filter and consolidation

To show an example of a filter and union operation, we create two separate datasets using the filter condition and combine them using the UNION operation.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

We use the following code for our query:

%flink.ssql(type=update)
SELECT * FROM (
(SELECT customer_id, product_id, price FROM sales WHERE price > 100 AND  product_id <> '4E5750DC2A1D')
UNION
(SELECT customer_id, product_id, price FROM sales WHERE product_id = '4E5750DC2A1D' AND price > 250)
)

The following table shows our results.

customer_id product_id price
78 4E5750DC2A1D 300
75 B552B4B940D0 170
78 B552B4B940D0 110
75 4E5750DC2A1D 260

Table joins

Flink SQL APIs support different types of join conditions, like inner join, outer join, and interval join. You want to limit the resource utilization from growing indefinitely, and run joins effectively. For that reason, in our example, we use table joins using an interval join. An interval join requires one equi-join predicate and a join condition that bounds the time on both sides. In this example, we join the dataset of two Kinesis Data Streams tables based on the card ID, which is a common field between the two stream datasets. The filter condition in the query is based on a time constraint, which restricts resource utilization from growing.

The following table shows our sales input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

The following table shows our cards input data.

card_id card_number card_zip card_name
101 23274397 23738 Laura Perez
118 54093472 7422 Karla Johnson
101 23274397 23738 Laura Perez
101 23274397 23738 Laura Perez
148 91368810 7422 Peter Han

We use the following code for our query:

%flink.ssql(type=update)
SELECT sales.proctime, customer_card_id, card_zip, product_id, price
FROM card INNER JOIN sales ON card.card_id = sales.customer_card_id
WHERE sales.proctime BETWEEN card.proctime - INTERVAL '5' MINUTE AND card.proctime;

The following table shows our results.

proctime customer_card_id card_zip product_id price
2021-04-20 21:31:01.10 101 23738 4E5750DC2A1D 110
2021-04-20 21:31:01.20 118 7422 B552B4B940D0 80
2021-04-20 21:31:01.28 101 23738 E6DA5387367B 60
2021-04-20 21:32:50.30 101 23738 4E5750DC2A1D 110
2021-04-20 21:32:50.36 148 7422 4E5750DC2A1D 110

 Data partitioning and ranking

To show the example of Top-N records, we use the same input dataset as in the previous join example. In this example, we run a query to find the top sales records by sales price in each zip code. We use the OVER window clause to rank sales in each zip code using a PARTITION BY clause. Next, we order the records in each zip code with an ORDER BY clause on the price field in descending order. The result of this operation is a ranking of each record based on the OVER clause condition. We use the external block of the query to filter the result on ranking so that we get the top sales in each zip code.

We use the following code for our query:

%flink.ssql(type=update)
SELECT card_zip, customer_card_id, product_id, price FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY card_zip ORDER BY price DESC) as row_num
FROM card INNER JOIN sales ON card.card_id = sales.customer_card_id
WHERE sales.proctime BETWEEN card.proctime - INTERVAL '5' MINUTE AND card.proctime
)
WHERE row_num = 1

The following table shows our results.

card_zip customer_card_id product_id price
23738 101 4E5750DC2A1D 110
7422 148 4E5750DC2A1D 110

Data transformation

There are times when you want to transform incoming data. The Flink SQL API has many built-in functions to support a wide range of data transformation requirements, including string functions, date functions, arithmetic functions, and so on. For the complete list, see System (Built-in) Functions.

Extract a portion of a string

In this example, we use the SUBSTR string function to subtract the first four digits and only return the last four digits of the card number.

The following table shows our sales input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

The following table shows our cards input data.

card_id card_number card_zip card_name
101 23274397 23738 Laura Perez
118 54093472 7422 Karla Johnson
101 23274397 23738 Laura Perez
101 23274397 23738 Laura Perez
148 91368810 7422 Peter Han

We use the following code for our query:

%flink.ssql(type=update)
SELECT proctime, SUBSTR(card_number,5) AS partial_card_number,    card_zip, product_id, price
FROM card INNER JOIN sales ON card.card_id = sales.customer_card_id

The following table shows our results.

proctime partial_card_number card_zip product_id price
2021-04-20 21:31:01.10 4397 23738 4E5750DC2A1D 110
2021-04-20 21:31:01.20 3472 7422 B552B4B940D0 80
2021-04-20 21:31:01.28 4397 23738 E6DA5387367B 60
2021-04-20 21:32:50.30 4397 23738 4E5750DC2A1D 110
2021-04-20 21:32:50.36 8810 7422 4E5750DC2A1D 110

Replace a substring

In this example, we use the REGEXP_REPLACE string function to remove all the characters after the space from the card_name field. Assuming that the first name and last name are separated by a space, the query returns the first name only.

The following table shows our cards input data.

card_id card_number card_zip card_name
101 23274397 23738 Laura Perez
118 54093472 7422 Karla Johnson
101 23274397 23738 Laura Perez
101 23274397 23738 Laura Perez
148 91368810 7422 Peter Han

We use the following code for our query:

%flink.ssql(type=update)
SELECT card_id, REGEXP_REPLACE(card_name,' .*','') card_name
FROM card

The following table shows our results.

card_id card_name
101 Laura
118 Karla
101 Laura
101 Laura
148 Jason

Split the string field into multiple fields

In this example, we use the SPLIT_INDEX string function to split the card_name field into first_name and last_name, assuming the card_name field is a full name separated by space.

The following table shows our cards input data.

card_id card_number card_zip card_name
101 23274397 23738 Laura Perez
118 54093472 7422 Karla Johnson
101 23274397 23738 Laura Perez
101 23274397 23738 Laura Perez
148 91368810 7422 Peter Han

We use the following code for our query:

%flink.ssql(type=update)
SELECT card_id, SPLIT_INDEX(card_name,' ',0) first_name, SPLIT_INDEX(card_name,' ',1) last_name
FROM card

The following table shows our results.

card_id first_name last_name
101 Laura Perez
118 Karla Johnson
101 Laura Perez
101 Laura Perez
148 Peter Han

Transform data using a CASE statement

There are times when you want to transform the result value and apply labels to get insights. For our example, we label the risk level as high, medium, or low for every customer (who is purchasing in the window) based on the number of purchases in the last 5-minute sliding window that emits results every 30 seconds.

The following table shows our input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:30.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:38.20 78 118 B552B4B940D0 80
2021-04-20 21:31:42.28 75 101 E6DA5387367B 60
2021-04-20 21:31:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:31:50.36 75 148 4E5750DC2A1D 110

We use the following code for our query:

%flink.ssql(type=update)
SELECT customer_id, CASE
WHEN total_purchases BETWEEN 1 AND 2 THEN 'LOW'
WHEN total_purchases BETWEEN 3 AND 10 THEN 'MEDIUM'
ELSE 'HIGH'
END as risk
FROM (
SELECT HOP_END(proctime, INTERVAL '30' SECOND, INTERVAL '5' MINUTE) AS winend
, customer_id, COUNT(1) AS total_purchases
FROM sales
GROUP BY HOP(proctime, INTERVAL '30' SECOND, INTERVAL '5' MINUTE), customer_id
)

The following table shows our results.

customer_id risk
78 LOW
75 HIGH

DateTime data transformation

The Flink SQL API has a wide range of built-in functions to operate on the date timestamp field, like extracting the day, month, week, hour, minute, day of the month, and so on. There are functions to convert the date timestamp field. In this example, we use the MINUTE and HOUR functions to extract the minute of an hour and the hour from the timestamp field.

The following table shows our sales input data.

proctime customer_id customer_card_id product_id price
2021-04-20 21:31:01.10 75 101 4E5750DC2A1D 110
2021-04-20 21:31:01.20 78 118 B552B4B940D0 80
2021-04-20 21:31:01.28 75 101 E6DA5387367B 60
2021-04-20 21:32:50.30 78 101 4E5750DC2A1D 110
2021-04-20 21:32:50.36 75 148 4E5750DC2A1D 110

We use the following code for our query:

%flink.ssql(type=update)
SELECT HOUR(TIMESTAMP proctime) AS transaction_hour, MINUTE(TIMESTAMP proctime) AS transaction_min,customer_id, product_id, price
FROM sales

The following table shows our results.

transaction_hour transaction_min customer_id product_id price
21 31 75 4E5750DC2A1D 110
21 31 78 B552B4B940D0 80
21 31 75 E6DA5387367B 60
21 32 78 4E5750DC2A1D 110
21 32 75 4E5750DC2A1D 110

Conclusion

In this post, we used sales and card examples to demonstrate different query patterns to get insight from streaming data using Apache Flink SQL APIs. We walked you through examples of Flink SQL queries that you can run within Kinesis Data Analytics Studio. In just a few minutes, you can start running interactive analytics with the examples in this post.

You can quickly start developing a stream processing application using Studio from the supported languages like SQL, Python, and Scala. If you want to generate continuous actionable insights, you can easily build and deploy your code as an Apache Flink application with durable state from the notebook within Studio. For more information, see Deploying as an application with durable state.

For further reading on Flink SQL queries that you can use in Kinesis Data Analytics Studio, visit the official page at Apache Flink 1.11 SQL Queries.


About the Authors

Dr. Sam Mokhtari is a Senior Solutions Architect at AWS. His main area of depth is “Data & Analytics” and he published more than 30 influential articles in this field. He is also a respected data & analytics advisor who led several large-scale implementation projects across different industries including energy, health, telecom and transport.

 

 

Mitesh Patel is a Senior Solutions Architect at AWS. He works with customers in SMB to help them develop scalable, secure and cost effective solutions in AWS. He enjoys helping customers in modernizing applications using microservices and implementing serverless analytics platform.

Build and optimize real-time stream processing pipeline with Amazon Kinesis Data Analytics for Apache Flink, Part 2

Post Syndicated from Amit Chowdhury original https://aws.amazon.com/blogs/big-data/part2-build-and-optimize-real-time-stream-processing-pipeline-with-amazon-kinesis-data-analytics-for-apache-flink/

In Part 1 of this series, you learned how to calibrate Amazon Kinesis Data Streams stream and Apache Flink application deployed in Amazon Kinesis Data Analytics for tuning Kinesis Processing Units (KPUs) to achieve higher performance. Although the collection, processing, and analysis of spiky data stream in real time is crucial, reacting to the spiky data is equally important in many real-life situations as derived insights diminish with time.

In order to build a highly responsive scalable streaming application, we need to auto-scale both Kinesis Data Streams and Kinesis Data Analytics application based on incoming data streams. Refer this blog to know how to  easily monitor and automatically scale your Apache Flink application with Amazon Kinesis Data Analytics. Use Kinesis Scaling Utility, which is designed to give you the ability to scale Amazon Kinesis Streams in the same way that you scale EC2 Auto Scaling groups – up or down by a count or as a percentage of the total fleet. You can simply scale to an exact number of Shards.

In this post, we dive deep into important metrics to generate meaningful insights about the health and performance of the Amazon Kinesis Data Analytics for Apache Flink application. We will also walk you through steps to build a fully automated, scalable, and highly available pipeline to handle streaming data scaling in and out for both the Kinesis data stream (based on incoming records) and Kinesis data analytics application (by calibrating KPUs and parallelism). You use AWS Managed Services to reduce operational overhead compared to the manual approach of scaling the streaming application. Because this is a continuation of the previous post, make sure to walk through Part 1 as a prerequisite before deploying the automated pipeline code in this post.

Deploy the advanced monitoring and scaling architecture

This section uses an AWS CloudFormation template to build an advanced monitoring dashboard to capture vital metrics data. You also create an advanced scaling environment to auto scale the Kinesis data stream and Kinesis data analytics application, which scales both services in and out depending on the volume of spiky data. Furthermore, we use managed services for better operational efficiency. The template builds the following architecture.

The CloudFormation template includes the following components:

  • An advanced Amazon CloudWatch dashboard
  • Two CloudWatch alarms for scaling your Kinesis Data Analytics for Apache Flink application
  • Two CloudWatch alarms for scaling your Kinesis Data Streams
  • Accompanying auto scaling policy actions in these alarms
  • An Amazon API Gateway endpoint for triggering AWS Lambda
  • A Lambda function responsible for handling the scale-in and scale-out functions

These components work in tandem to monitor the metrics configured in the CloudWatch alarm and respond to metrics accordingly.

To provision your resources, complete the following steps:

  1. Choose Launch Stack (right-click) to open a new tab to run the CloudFormation template. Make sure to choose us-east-1 (N. Virginia) region.
  2. Choose Next.

  1. For FlinkApplicationName, enter the name of your application.
  2. For KinesisStreamName, enter the name of your data stream.
  3. Make sure ShardCount is same as the current shard count of Kinesis Data Streams created in Part 1.

This information is available on the Outputs tab of the CloudFormation stack detailed in Part 1.

  1. Choose Next.

  1. Follow the remaining instructions and choose Create stack.

This dashboard should take a few minutes to launch.

 

  1. When the stack is complete, go to the Outputs tab of the stack on the AWS CloudFormation console and choose the dashboard link.

Metrics deep dive

There are many critical operational metrics to measure when assessing the performance of a running Apache Flink application. This section looks at the essential CloudWatch metrics for Kinesis Data Analytics for Apache Flink applications, what they mean, and what appropriate alarms might be vital indicators for each. Let’s dive into how to monitor your application.

First, let’s look at the running application using our CloudWatch dashboard and point out potential issues with a given Apache Flink application indicated by our CloudWatch metrics.

The Application Health section of this dashboard can help identify fundamental issues with your application that are causing it to be inoperable. Let’s start with the first two cells: Uptime and Downtime. In an ideal state, this is precisely how your application should look—uptime measures the cumulative time in milliseconds that the application has been running without interruption, and downtime measures the time elapsed during an outage.

In an ideal state, your lastCheckpointSize and lastCheckpointDuration metrics should remain relatively stable over time. If you observe an increasing checkpoint size, this can indicate a state not being cleared within your application or a memory leak. Similarly, a longer and unexpected spike in checkpoint duration can cause backpressure of your application. Monitor these metrics for stability over time.

The resource utilization metrics section gives a glimpse into the resource usage of the running Flink application. In a healthy application, try to keep this metric under 75% usage. This is also the same metric that Kinesis Data Analytics for Apache Flink uses to auto scale your application if you have auto scaling enabled. Also, it’s normal to see CPU spikes during application startup or restart. HeapMemoryUtilization measures the memory taken up by the application, on-heap state, and any other operations that may take up memory space.

Let’s now evaluate our Flink application progress. Incoming and outgoing records per second are measured on an application level in this image. You can also measure them on a task or subtask level for finer granularity and visibility into the operators of your application. The ideal state for these depends on the use case, but if it’s a straight read, process, and write action without filtering the records, you can expect to see an equal amount of records in and out per second. If a deviation occurs on either end of these metrics, it’s a good indicator of where the bottleneck is. If numRecordsInPerSecond is lower, the source might be configured to read in less data, or it could be indicative of backpressure on the sink causing a slowdown. If numRecordsOutPerSecond is lower, it could be identifying a slow operator process in the middle of your application.

Next, let’s look at InputandOutputWatermark and EventTimeLatency. The watermarks indicate the eventTime with which data is arriving into the data stream. A large difference between these two values could indicate significantly late-arriving data into the stream. Your stream should handle this according to your use case, and EventTimeLatency measures the total latency, or OutputWatermark and InputWatermark, of the streaming workload.

The LateRecordsDropped metric measures the number of records dropped due to arriving late. If this number is spiking, there is an issue with data arriving late to the Flink application.

Now let’s dive into Kinesis source and sink metrics. The millisBehindLatest metric shows the time the consumer is behind the head of the stream, which is a good indicator of how far behind the consumer’s current time is. You can measure this metric on an application or a parallelism level—a value of 0 shows that the application is completely caught up with processing records. This is ideal; a higher value means that the application is falling behind. It could indicate that the consumer isn’t tuned to read records efficiently, backpressure, or some slowness in processing. Scale the application accordingly.

The RetriesPerRecord, UserRecordsPending, and BufferingTime metrics come from the Kinesis Producer Library (KPL), and in this case, is referring to our terminal script, which is writing to the Kinesis data stream. All applications that use the KPL report this metric, and it’s important to monitor in case of frequent retries or timeouts. The other metrics can grow exceedingly large if the data stream is under-provisioned.

Advanced scaling

Let’s dive deep into how to scale your Kinesis data analytics application based on the previously discussed metrics. The only way to scale a Kinesis data analytics application automatically is to use the built-in auto scale feature. This feature monitors your application’s CPU usage over time, and if it remains at 75% or above for 15 minutes, it increases your application’s overall parallelism. You experience some downtime during this scale-up, and an application developer should take this into account when using the auto scaling feature. It’s an excellent and helpful feature of Kinesis Data Analytics for Apache Flink. However, some applications need to scale based on other factors, not just CPU usage. In this section, we look at an external way to scale your application based on IncomingRecords or millisBehindLatest metrics on the source Kinesis data stream.

To add the functionality of scaling based on other metrics, we utilize Application Auto Scaling to specify our scaling policy and other attributes, such as cooldown periods. We can also take advantage of any auto scaling types—step scaling, target tracking scaling, and schedule-based scaling. The CloudFormation template we launched already created the necessary resources covering step scaling. For a more detailed list, view the Resources tab on the AWS CloudFormation console or view the designer before launching.

Currently, the settings are tuned to the max throughput per KPU, which is ideal for a production workload. Let’s tune this setting down to a lower value to more quickly see results.

  1. On the CloudWatch console, choose Alarms in the navigation pane.
  2. Choose the alarm KDAScaleOutAlam.

The alarm has been preconfigured for you in CloudWatch to listen for specific metrics breaching a threshold. Optionally, you can adjust the alarm to trigger scale-out or scale-in events as needed.

  1. On the Actions menu, choose Edit.

  1. In the Conditions section, adjust the threshold value as needed.
  2. Choose Update alarm.

You can also use the speedup value in the ProducerCommand found in the outputs of the CloudFormation stack from Part 1 to increase and decrease data volume per second, replicating real-life scenarios of spiky data. Observe the CloudWatch alarms changing states between OK and In alarm. When in alarm, it triggers auto scaling of the Kinesis data stream scaling in or out many shards. Alarms also scale KPUs allocated to the Kinesis data analytics application.

  1. Navigate back to your Kinesis data analytics application.
  2. On the Details tab, see in the Scaling section if this alarm has impacted the parallelism.

  1. Alternatively, stop the producer in the terminal.

Turning off the producer should show an inverse effect, causing the application to trigger the KDAScaleInAlarm, and the application parallelism should scale back down in a few minutes.

  1. On the Configuration tab of your data stream, observe the scaling operation of allocated shards.

You can open the Apache Flink dashboard from your Kinesis data analytics application, analyze the application performance, and troubleshoot by looking at Flink job-level insights, Flink task-level insights, Flink exceptions, and checkpoints. You can also calibrate your application by looking at the Flink dashboard metrics, which gives you additional granularity out of the box, and using the metrics for debugging purposes.

Conclusion

In this post, you built a reliable, scalable, and highly available advanced scaling mechanism for streaming applications based on Kinesis Data Analytics for Apache Flink and Kinesis Data Streams. The post also discussed how to auto scale your applications based on a metric other than CPU utilization and explored ways to extend observability of the application with advanced monitoring and error handling. This solution was largely enabled by using managed services, so you didn’t need to spend time provisioning and configuring the underlying infrastructure. The sources of the AWS CloudFormation templates used in this post are available from GitHub for your reference.

You should now have a good understanding of how to build, monitor and auto scale a real-time streaming application using Amazon Kinesis. You can also calibrate various components based on your application needs and volume of data by applying advanced monitoring and scaling techniques.


About the Authors

 Amit Chowdhury is a Partner Solutions Architect in the Global System Integrator (GSI) team at Amazon Web Services. He helps AWS GSI partners migrate customer workloads to the AWS Cloud, and provides guidance to build, design, and architect scalable, highly available, and secure solutions on AWS by applying AWS recommended best practices. He enjoys spending time with his family, outdoor adventures and traveling.

 

Saurabh Shrivastava is a solutions architect leader and analytics specialist working with global systems integrators. He works with AWS Partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

Build and optimize a real-time stream processing pipeline with Amazon Kinesis Data Analytics for Apache Flink, Part 1

Post Syndicated from Amit Chowdhury original https://aws.amazon.com/blogs/big-data/part1-build-and-optimize-a-real-time-stream-processing-pipeline-with-amazon-kinesis-data-analytics-for-apache-flink/

In real-time stream processing, it becomes critical to collect, process, and analyze high-velocity real-time data to provide timely insights and react quickly to new information. Streaming data velocity could be unpredictable, and volume could spike based on user demand at a given time of day. Real-time analysis needs to handle the data spike, because any delay in processing streaming data can cause a significant impact on the desired outcome. The value of insights and actions reduces over time, whereas real-time analysis can substantially improve the effectiveness of the analytics application.

A widespread use case is fleet management for vehicles, especially in the autonomous car industry. It’s essential to collect, process, and analyze high-velocity traffic data and react in real time to control and reroute traffic. Real-time stream processing is crucial in many other use cases, such as manufacturing production lines, robotics automation, analyzing high-volume web and application logs, website clickstreams or database event streams, aggregating social media feeds, and tracking financial transactions.

Amazon Kinesis Data Analytics reduces the complexity of building, managing, and integrating Apache Flink applications with other AWS services. Kinesis Data Analytics takes care of everything required to run streaming applications continuously and scales automatically to match your incoming data volume and throughput in a serverless manner.

In this post, you learn the required concepts to implement robust, scalable, and flexible real-time streaming extract, transform, and load (ETL) pipelines with Apache Flink and Kinesis Data Analytics. We demonstrate how to calibrate the Kinesis streaming analytics pipeline to achieve higher performance efficiency and better cost optimization with the right amount of Kinesis Processing Units (KPUs). Estimating the optimal number of KPUs to handle your streaming workload depends on several factors, including the type of stream processing involved. For instance, if you’re performing CPU-intensive statistical calculations, your application might need more CPU or memory. On the other hand, if your application is simply enriching records via external API calls as they flow through, you might be I/O bound. In Part 1 of this series, you learn various parameters such as Parallelism and ParallelismPerKPU for KPU calibration. In Part 2, you learn about applying auto scaling to add the right amount of KPUs based streaming data spike.

For this post, we analyze the telemetry data of a taxi fleet in New York City in real time to optimize fleet operation using Amazon Kinesis Data Analytics for Apache Flink. Kinesis Data Analytics helps process and analyze the data in real time to identify areas currently requesting a high number of taxi rides. The derived insights are visualized on a dashboard for operational teams to inspect.

Architecture

As shown in the following architecture diagram, every taxi in the fleet is capturing information about completed trips. The tracked information includes the pickup and drop-off locations, number of passengers, and generated revenue. This information is ingested into Amazon Kinesis Data Streams as a simple JSON blob. The application reads the timestamp attribute of the stored events and replays them as if they occurred in real time. From there, the data is processed and analyzed by a Flink application, which is deployed to Kinesis Data Analytics for Apache Flink.

The application  identifies areas that are currently requesting a high number of taxi rides. The derived insights are finally persisted into Amazon Elasticsearch Service, where they can be accessed and visualized using Kibana.

In this post, you build a fully managed infrastructure that can analyze the data in near-real time—within seconds—while being scalable and highly available. The architecture uses Kinesis Data Streams as a streaming store, Kinesis Data Analytics to run an Apache Flink application in a fully managed environment, and Amazon Elasticsearch Service (Amazon ES) and Kibana for visualization.

Additionally, we discuss basic Apache Flink concepts and common patterns for streaming analytics. We also cover how Kinesis Data Analytics for Apache Flink is different from a self-managed environment and how to effectively operate and monitor streaming architectures. You also calibrate KPUs in Kinesis Data Analytics to improve performance efficiency and cost optimization.

Deploy the real-time streaming and analysis workload

To replicate the real-life scenario, you connect to a preconfigured Amazon Elastic Compute Cloud (Amazon EC2) instance running Linux over SSH. Then you use a Java application to replay a historic set of taxi trips made in NYC stored in objects in Amazon Simple Storage Service (Amazon S3) into the data stream. The Java application is compiled and loaded onto the EC2 instance.

This section uses an AWS CloudFormation template to build a producer client program that sends NYC taxi trip data to our Kinesis data stream. The template creates the following resources:

  • An S3 bucket to house the data resources.
  • A new Kinesis data stream that we use to stream a dataset of NYC taxi trips.
  • Amazon ElasticSearch cluster with Kibana integration for displaying dashboard information.
  • A build pipeline and AWS CodeBuild project along with sources for a Flink Kinesis connector application.
  • An EC2 instance for running a Flink application to replay data onto the data stream. An Elastic IP is provisioned for the EC2 instance to allow SSH access.
  • A Java application hosted on the EC2 instance, which loads data from the EC2 instance.
  • A Kinesis data analytics application to continuously monitor and analyze data from the connected data stream and run the Apache Flink 1.11 application.
  • The necessary AWS Identity and Access Management (IAM) service roles, security groups, and policies to run and communicate between the resources you create.
  • An Amazon CloudWatch alarm when the application falls behind based on the millisBehindLatest metric.

To provision these resources, complete the following steps:

  1. Choose Launch Stack (right click) and open a new tab to run the CloudFormation template. Make sure to choose us-east-1 ( N. Virginia) region.
  2. Choose Next.

  1. For Stack name, enter a name.
  2. For ClientipAddressRange, enter the IP address from which the Kibana dashboard is accessed.

Use https://checkup.amazon.com to find IP and add /32 at the end.

  1. For SshKeyName¸ enter the SSH key name for the Linux-based EC2 instance you created.
  2. Choose Next.

  1. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND.
  2. Choose Create stack.
  3. Wait until the CloudFormation template has been successfully created.

Connect to the new EC2 instance and run the producer client program

In this section, we connect to our new EC2 instance and run the producer client program.

  1. On the AWS CloudFormation console, choose the parent in which the stack was deployed.
  2. In the Outputs section, locate the AWS Systems Manager Session Manager URL for KinesisReplayInstance.
  3. Choose the Session Manager URL to connect to the EC2 instance.

  1. After the connection has been established, start ingesting events into the Kinesis data stream by running the JAR file that was downloaded to the EC2 instance.

The command with pre-populated parameters is available in the Outputs section of the CloudFormation template for ProducerCommand.

You have now successfully created the basic infrastructure and are ingesting data into the data stream.

  1. On the Kinesis Data Streams console, go to your data stream.
  2. On the Monitoring tab, locate the Incoming Data – Sum

You may need to wait 2–3 minutes and use the refresh button for the monitoring charts to see the metrics.

Visualize the data

To visualize the data, navigate to the Kibana dashboard. The dashboard URL is in the Outputs section of the CloudFormation stack for KibanaDashboard. You can inspect the preloaded dashboard or even create your visualizations. If no data shows up, choose the clock icon and change the timeframe to January 2010 – December 2011.

AWS CloudFormation automatically grants access to the IP address provided during stack creation. However, if you encounter access issues in the Kibana dashboard, modify your Amazon ES domain’s access policy and change your local IP address on the Amazon ES console.

To change your IP address, find and choose the Amazon ES domain that you provisioned. On the Actions menu, choose Modify access policy.

Replace the IP address (for example, 123.123.123.123) with your local IP. If you don’t know your local IP, use http://checkip.amazonaws.com.

Scale the Kinesis data stream to adapt to a spiky data stream

Now that the Kinesis Data Analytics for Apache Flink application is running and sending results to Amazon ES, we can look at operational aspects, such as monitoring and scaling. When you closely inspect the output of the producer application, you can observe that it’s experiencing write provisioned throughput that has exceeded exceptions and it can’t send data fast enough. If the resources of the Apache Flink application aren’t adapted accordingly, particularly for a spiky data stream, the application may fall substantially behind. It may then generate results that are no longer relevant because they’re already too old when the overloaded application can eventually produce them.

The Kinesis data stream was deliberately under-provisioned so that the Kinesis Replay Java application is completely saturating the data stream. When you closely inspect the output of the Java application, you can notice that the replay lag is continuously increasing. This means that the producer can’t ingest events as quickly as required according to the specified speedup parameter.

In this section, we scale the Kinesis data stream to accommodate the throughput generated by the Java application ingesting events into the data stream. We then observe how the Kinesis Data Analytics for Apache Flink application automatically scales to adapt to the increased throughput.

  1. On the Kinesis Data Streams console, navigate to the stream you created.
  2. In the Shards section, choose Edit.
  3. Double the throughput of the Kinesis stream by changing the number of open shards to 16.
  4. Choose Save changes to confirm the changes.

  1. While the service doubles the number of shards and therefore the throughput of the stream, examine the metrics of the data stream on the Monitoring

After few minutes, you should notice the effect of the scaling operation as the throughput of the stream substantially increases.

While we have scaled Kinesis Data Steams manually, Kinesis Scaling Utility is designed to give you the ability to auto-scale Amazon Kinesis Streams in the same way that you scale EC2 Auto Scaling groups – up or down by a count or as a percentage of the total fleet. You can simply scale to an exact number of Shards. In Part 2, you will learn more about auto-scaling Kinesis Data Streams based on incoming data stream.

Calibrate KPUs

Currently, Kinesis Data Analytics scales your application solely based on the underlying CPU usage. However, because not all applications are CPU bound, depending on your needs, you may want to use a different mechanism for sizing your application. In this section, we demonstrate how you can use the millisBehindLatest metric (available when consuming data from a Kinesis data stream) to responsively size your Kinesis data analytics application.

Kinesis Data Analytics provisions capacity in the form of Amazon Kinesis Processing Units (KPUs). One KPU provides you with 1 vCPU and 4 GB memory. The default limit for KPUs for your application is 32. You can also request an increase to this limit in AWS Service Limits.

We recommend that you test your application with production loads to get an accurate estimate of the number of KPUs required for your application. KPUs usage can vary considerably based on your data volume and velocity, code complexity, integrations, and more. This is especially true when using the Apache Flink runtime in Kinesis Data Analytics.

You can configure the parallel run of tasks and allocate resources for Kinesis Data Analytics for Apache Flink to implement scaling. We use the following properties:

  • Parallelism – Use this property to set the default Apache Flink application parallelism. All operators, sources, and sinks run with this parallelism unless overridden in the application code. The default is 1, and the default maximum is 256.
  • ParallelismPerKPU – Use this property to set the number of parallel tasks that can be scheduled per the of your application. The default is 1, and the maximum is 8. For applications that have blocking operations (for example, I/O), a higher value of ParallelismPerKPU leads to full utilization of KPU resources.

Kinesis Data Analytics calculates the KPUs needed to run your application as Parallelism/ParallelismPerKPU.

The following example request for the CreateApplication action sets parallelism when you create an application.

{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_11",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::mybucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}

 

For more examples and instructions for using request blocks with API actions, see Kinesis Data Analytics API Example Code.

The following example request for the UpdateApplication action sets parallelism for an existing application:

{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}

Scale the Kinesis Data Analytics for Apache Flink application

Because you increased the throughput of the Kinesis data stream by doubling the number of shards, more events are sent into the stream. However, as a direct result, more events need to be processed. Now the Kinesis data analytics application becomes overloaded and can no longer keep up with the increased number of incoming events. You can observe this through the millisBehindLatest metric, which is published to CloudWatch.

Kinesis Data Analytics natively supports auto scaling. After few minutes, you can see the effect of the auto scaling activities in the metrics. The millisBehindLatest metric starts to decrease until it reaches zero when the processing has caught up with the tip of the Kinesis data stream.

You can calibrate the scaling operation based on your application needs by adjusting the KPU.

  1. On the Kinesis Data Analytics console, navigate to your application.
  2. Under Scaling, choose Configure.
  3. Adjust Parallelism to 6 and Parallelism per KPU to 2.
  4. Choose Update.

The other method that you can apply to improve throughput is the AsyncIO function. You can make AsyncIO calls asynchronously to improve throughput while other requests are in progress. The two essential parameters when defining an AsyncFunction are Capacity (how many requests are in-flight concurrently per parallel sub-task) and Timeout (the timeout duration of an individual request to the external data source). It helps if you allocate enough capacity to account for the throughput, but not more than the external data source can handle. For example, application with a parallelism of 5 and a capacity of 10 sends 50 concurrent requests to your external data source. You can learn more about using the AsyncIO function with Kinesis Data Analytics for Apache Flink on the GitHub repo.

Conclusion

In this post, you built a reliable, scalable, and highly available streaming application based on Apache Flink and Kinesis Data Analytics. You also scaled the different components while ingesting and analyzing thousands of events per second in near-real time. The solution utilizes managed services without having to provision and configure underlying infrastructure. The post also discussed what it takes to auto scale your application based on metrics such as CPU utilization and millisBehindLatest. The sources of the AWS CloudFormation templates used in this post are available from GitHub for your reference.

You now know how to build a real-time streaming application using Kinesis Data Analytics on AWS. You can also calibrate KPUs based on your application needs and volume of data. Check out Part 2 of this post to explore advanced monitoring techniques and auto scale your real-time streaming application, adapting with streaming data.


About the Author

Amit Chowdhury is a Partner Solutions Architect in the Global System Integrator (GSI) team at Amazon Web Services. He helps AWS GSI partners migrate customer workloads to AWS, and provides guidance to build, design, and architect scalable, highly available, and secure solutions on AWS by applying AWS recommended best practices. He enjoys spending time with his family, outdoor adventures and traveling.

 

Saurabh Shrivastava is a solutions architect leader and analytics specialist working with global systems integrators. He works with AWS Partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

ICYMI: Serverless Q2 2021

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/icymi-serverless-q2-2021/

Welcome to the 14th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

Q2 calendar

In case you missed our last ICYMI, check out what happened last quarter here.

AWS Step Functions

Step Functions launched Workflow Studio, a new visual tool that provides a drag-and-drop user interface to build Step Functions workflows. This exposes all the capabilities of Step Functions that are available in Amazon States Language (ASL). This makes it easier to build and change workflows and build definitions in near-real time.

For more:

Workflow Studio

The new data flow simulator in the Step Functions console helps you evaluate the inputs and outputs passed through your state machine. It allows you to simulate each of the fields used to process data and updates in real time. It can help accelerate development with workflows and help visualize JSONPath processing.

For more:

Data flow simulator

Also, Amazon API Gateway can now invoke synchronous Express Workflows using REST APIs.

Amazon EventBridge

EventBridge now supports cross-Region event routing from any commercial AWS Region to a list of supported Regions. This feature allows you to centralize global events for auditing and monitoring or replicate events across Regions.

EventBridge cross-Region routing

The service now also supports bus-to-bus event routing in the same Region and in the same AWS account. This can be useful for centralizing events related to a single project, application, or team within your organization.

EventBridge bus-to-bus

You can now use EventBridge as a resource within Step Functions workflows. This provides a direct service integration for both standard and Express Workflows. You can publish events directly to a specified event bus using either a request-response or wait-for-callback pattern.

EventBridge added a new target for rules – Amazon SageMaker Pipelines. This allows you to use a rule to trigger a continuous integration and continuous deployment (CI/CD) service for your machine learning workloads.

AWS Lambda

Lambda Extensions

AWS Lambda extensions are now generally available including some performance and functionality improvements. Lambda extensions provide a new way to integrate your chosen monitoring, observability, security, and governance tools with AWS Lambda. These use the Lambda Runtime Extensions API to integrate with the execution environment and provide hooks into the Lambda lifecycle.

To help build your own extensions, there is an updated GitHub repository with example code.

To learn more:

  • Watch a Tech Talk with Julian Wood.
  • Watch the 8-episode Learning Path series covering all aspects of extensions.

Extensions available today

Amazon CloudWatch Lambda Insights support for Lambda container images is now generally available.

Amazon SNS

Amazon SNS has expanded the set of filter operators available to include IP address matching, existence of an attribute key, and “anything-but” matching.

The service has also introduced an SMS sandbox to help developers testing workloads that send text messages.

To learn more:

Amazon DynamoDB

DynamoDB announced CloudFormation support for several features. First, it now supports configuring Kinesis Data Streams using CloudFormation. This allows you to use infrastructure as code to set up Kinesis Data Streams instead of DynamoDB streams.

The service also announced that NoSQL Workbench now supports CloudFormation, so you can build data models and configure table capacity settings directly from the tool. Finally, you can now create and manage global tables with CloudFormation.

Learn how to use the recently launched Serverless Patterns Collection to configure DynamoDB as an event source for Lambda.

AWS Amplify

Amplify Hosting announced support for server-side rendered (SSR) apps built with the Next.js framework. This provides a zero configuration option for developers to deploy and host their Next.js-based applications.

The Amplify GLI now allows developers to make multiple DynamoDB GSI updates in a single deployment. This can help accelerate data model iterations. Additionally, the data management experience in the Amplify Admin UI launched at AWS re:Invent 2020 is now generally available.

AWS Serverless Application Model (AWS SAM)

AWS SAM has a public preview of support for local development and testing of AWS Cloud Development Kit (AWS CDK) projects.

To learn more:

Serverless blog posts

Operating Lambda

The “Operating Lambda” blog series includes the following posts in this quarter:

Streaming data

The “Building serverless applications with streaming data” blog series shows how to use Lambda with Kinesis.

Getting started with serverless for developers

Learn how to build serverless applications from your local integrated development environment (IDE).

April

May

June

Tech Talks & Events

We hold AWS Online Tech Talks covering serverless topics throughout the year. These are listed in the Serverless section of the AWS Online Tech Talks page. We also regularly deliver talks at conferences and events around the world, speak on podcasts, and record videos you can find to learn in bite-sized chunks.

Here are some from Q2:

Serverless Live was a day of talks held on May 19, featuring the serverless developer advocacy team, along with Adrian Cockroft and Jeff Barr. You can watch a replay of all the talks on the AWS Twitch channel.

Videos

YouTube ServerlessLand channel

Serverless Office Hours – Tues 10 AM PT / 1PM EST

Weekly live virtual office hours. In each session we talk about a specific topic or technology related to serverless and open it up to helping you with your real serverless challenges and issues. Ask us anything you want about serverless technologies and applications.

YouTube: youtube.com/serverlessland
Twitch: twitch.tv/aws

April

May

June

DynamoDB Office Hours

Are you an Amazon DynamoDB customer with a technical question you need answered? If so, join us for weekly Office Hours on the AWS Twitch channel led by Rick Houlihan, AWS principal technologist and Amazon DynamoDB expert. See upcoming and previous shows

Learning Path – AWS Lambda Extensions: The deep dive

Are you looking for a way to more easily integrate AWS Lambda with your favorite monitoring, observability, security, governance, and other tools? Welcome to AWS Lambda extensions: The deep dive, a learning path video series that shows you everything about augmenting Lambda functions using Lambda extensions.

There are also other helpful videos covering serverless available on the Serverless Land YouTube channel.

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on Twitter to see the latest news, follow conversations, and interact with the team.

How Optus improves broadband and mobile customer experience using the Network Data Analytics platform on AWS

Post Syndicated from Rajagopal Mahendran original https://aws.amazon.com/blogs/big-data/how-optus-improves-broadband-and-mobile-customer-experience-using-the-network-data-analytics-platform-on-aws/

This is a guest blog post co-written by Rajagopal Mahendran, Development Manager at the Optus IT Innovation Team.


Optus is part of The Singtel group, which operates in one of the world’s fastest growing and most dynamic regions, with a presence in 21 countries. Optus provides not only core telecom services, but also an extensive range of digital solutions, including cloud, cybersecurity, and digital advertising to enterprises, as well as entertainment and mobile financial services to millions of consumers. Optus provides mobile communication services to over 10.4 million customers and broadband services to over 1.1 million homes and businesses. In addition, Optus Sport connects close to 1 million fans to Premier League, international football, and fitness content.

In this post, we look at how Optus used Amazon Kinesis to ingest and analyze network related data in a data lake on AWS and improve customer experience and the service planning process.

The challenge

A common challenge for telecommunication providers is to form an accurate, real-time view of quality of service and issues experienced by their customers. Home network and broadband connectivity quality has a significant impact on customer productivity and satisfaction, especially considering the increased reliance on home networks for work, connecting with family and friends, and entertainment during the COVID-19 pandemic.

Additionally, network operations and planning teams often don’t have access to the right data and insights to plan new rollouts and manage their current fleet of devices.

The network analytics platform provides troubleshooting and planning data and insights to Optus teams and their customers in near-real time, which helps reduce mean time to rectify and enhance the customer experience. With the right data and insights, customers have a better experience because instead of starting a support call with a lot of questions, the support staff and the customer have a current and accurate view of the services and the customer’s home network.

Service owner teams within Optus can also use the insights and trends derived from this platform to better plan for the future and provide higher-quality service to customers.

Design considerations

To address this challenge and its requirements, we embarked on a project to transform our current batch collection and processing system to a stream-based, near-real-time processing system, and introduce APIs for insights so that support systems and customer applications can show the latest snapshot of the network and service status.

We had the following functional and non-functional requirements:

  • The new platform must be capable of supporting data capture from future types of customer equipment as well as new ways of ingestion (new protocols and frequency) and new formats of data.
  • It should support multiple consumers (a near-real-time API for support staff and customer applications and operational and business reporting) to consume data and generate insights. The aim is for the platform to proactively detect issues and generate appropriate alerting to support staff as well as customers.
  • After the data arrives, insights from the data should be ready in the form of an API in a few seconds (5 seconds maximum).
  • The new platform should be resilient enough to continue processing when parts of the infrastructure fail, such as nodes or Availability Zones.
  • It can support an increased number of devices and services as well as more frequent collection from the devices.
  • A small cross-functional team across business and technology will build and run this platform. We need to ensure minimal infrastructure and operational overhead in the long run.
  • The pipeline should be highly available and allow for new deployments with no downtime.

Solution overview

With the goal of the platform and design considerations in mind, we decided to use higher-order services and serverless services from AWS where possible, to avoid unnecessary operational overhead for our team and focus on the core business needs. This includes using the Kinesis family of services for stream ingestion and processing; AWS Lambda for processing; Amazon DynamoDB, Amazon Relational Database Service (Amazon RDS), and Amazon Simple Storage Service (Amazon S3) for data persistence; and AWS Elastic Beanstalk and Amazon API Gateway for application and API serving. The following diagram shows the overall solution.

 

The solution ingests log files from thousands of customer network equipment (home routers) in predefined periods. The customer equipment is only capable of sending simple HTTP PUT and POST requests to transfer log files. To receive these files, we use a Java application running in an Auto Scaling group of Amazon Elastic Compute Cloud (Amazon EC2) instances. After some initial checks, the receiver application performs cleansing and formatting, then it streams the log files to Amazon Kinesis Data Streams.

We intentionally use a custom receiver application in the ingestion layer to provide flexibility in supporting different devices and file formats.

To understand the rest of the architecture, let’s take a look at the expected insights. The platform produces two types of insights:

  • Individual insights – Questions answered in this category include:
    • How many errors has a particular customer device experienced in the last 15 minutes?
    • What was the last error?
    • How many devices are currently connected at a particular customer home?
    • What’s the transfer/receive rate as captured by a particular customer device?
  • Base insights – Pertaining to a group or the whole user base, questions in this category include:
    • How many customer devices reported service disruption in the past 24 hours?
    • Which device types (models) have experienced the highest number of errors in the past 6 months?
    • After last night’s patch update on a group of devices, have they reported any errors? Was the maintenance successful?

The top lane in the architecture shows the pipeline that generates the individual insights.

 

The event source mapping of the Lambda function is configured to consume records from the Kinesis data stream. This function reads the records, formats, and prepares them based on the insights required. Finally, it stores the results in the Amazon S3 location and also updates a DynamoDB table that maintains a summary and the metadata of the actual data stored in Amazon S3.

To optimize performance, we configured two metrics in the Lambda event source mapping:

  • Batch size – Shows the number of records to send to the function in each batch, which helps achieve higher throughput
  • Concurrent batches per shard – Processes multiple batches from the same shard concurrently, which helps with faster processing

Finally, the API is provided via API Gateway and runs on a Spring Boot application that is hosted on Elastic Beanstalk. In the future, we may need to keep state between API calls, which is why we use Elastic Beanstalk instead of a serverless application.

The bottom lane in the architecture is the pipeline that generates base reports.

 

We use Amazon Kinesis Data Analytics, running stateful computation on streaming data, to summarize certain metrics like transfer rates or error rates in given time windows. These summaries are then pushed to an Amazon Aurora database with a data model that’s suitable for dashboarding and reporting purposes.

The insights are then presented in dashboards using a web application running on Elastic Beanstalk.

Lessons learned

Using serverless patterns and higher-order services, in particular Lambda, Kinesis Data Streams, Kinesis Data Analytics, and DynamoDB, provided a lot of flexibility in our architecture and helped us move more towards microservices rather than big monolith batch jobs.

This shift also helped us dramatically decrease our operational and service management overhead. For example, over the last several months since the launch, customers of this platform didn’t experience any service disruption.

This solution also enabled us to adopt more DevOps and agile ways of working, in the sense that a single small team develops and runs the system. This in turn enabled the organization to be more agile and innovative in this domain.

We also discovered some technical tips through the course of development and production that are worth sharing:

Outcomes and benefits

We now have near-real-time visibility of our fixed and mobile networks performance as experienced by our customers. In the past, we only had data that came in batch mode with a delay and also only from our own network probes and equipment.

With the near-real-time view of the network when changes occur, our operational teams can also carry out upgrades and maintenance across the fleet of customer devices with higher confidence and frequency.

Lastly, our planning teams use these insights to form an accurate, up-to-date performance view of various equipment and services. This leads to higher-quality service for our customers at better prices because our service planning teams are enabled to optimize cost, better negotiate with vendors and service providers, and plan for the future.

Looking ahead

With the network analytics platform in production for several months and stable now, there is demand for more insights and new use cases. For example, we’re looking into a mobile use case to better manage capacity at large-scale events (such as sporting events). The aim is for our teams to be data driven and able to react in near-real time to capacity needs in these events.

Another area of demand is around predictive maintenance: we are looking to introduce machine learning into these pipelines to help drive insights faster and more accurately by using the AWS Machine Learning portfolio of services.


About the authors

Rajagopal Mahendran is a Development Manager at the Optus IT Innovation Team. Mahendran has over 14 years of experience in various organizations delivering enterprise applications from medium-scale to very large-scale using proven to cutting-edge technologies in big data, streaming data applications, mobile, and cloud native applications. His passion is to power innovative ideas using technology for better living. In his spare time, he loves bush walking and swimming.

 

Mostafa Safipour is a Solutions Architect at AWS based out of Sydney. He works with customers to realize business outcomes using technology and AWS. Over the past decade he has helped many large organizations in the ANZ region build their data, digital, and enterprise workloads on AWS.

 

Masudur Rahaman Sayem is a Specialist Solution Architect for Analytics at AWS. He works with AWS customers to provide guidance and technical assistance on data and analytics projects, helping them improve the value of their solutions when using AWS. He is passionate about distributed systems. He also likes to read, especially classic comic books.

Introducing Amazon Kinesis Data Analytics Studio – Quickly Interact with Streaming Data Using SQL, Python, or Scala

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/introducing-amazon-kinesis-data-analytics-studio-quickly-interact-with-streaming-data-using-sql-python-or-scala/

The best way to get timely insights and react quickly to new information you receive from your business and your applications is to analyze streaming data. This is data that must usually be processed sequentially and incrementally on a record-by-record basis or over sliding time windows, and can be used for a variety of analytics including correlations, aggregations, filtering, and sampling.

To make it easier to analyze streaming data, today we are pleased to introduce Amazon Kinesis Data Analytics Studio.

Now, from the Amazon Kinesis console you can select a Kinesis data stream and with a single click start a Kinesis Data Analytics Studio notebook powered by Apache Zeppelin and Apache Flink to interactively analyze data in the stream. Similarly, you can select a cluster in the Amazon Managed Streaming for Apache Kafka console to start a notebook to analyze data in Apache Kafka streams. You can also start a notebook from the Kinesis Data Analytics Studio console and connect to custom sources.

Architectural diagram.

In the notebook, you can interact with streaming data and get results in seconds using SQL queries and Python or Scala programs. When you are satisfied with your results, with a few clicks you can promote your code to a production stream processing application that runs reliably at scale with no additional development effort.

For new projects, we recommend that you use the new Kinesis Data Analytics Studio over Kinesis Data Analytics for SQL Applications. Kinesis Data Analytics Studio combines ease of use with advanced analytical capabilities, which makes it possible to build sophisticated stream processing applications in minutes. Let’s see how that works in practice.

Using Kinesis Data Analytics Studio to Analyze Streaming Data
I want to get a better understanding of the data sent by some sensors to a Kinesis data stream.

To simulate the workload, I use this random_data_generator.py Python script. You don’t need to know Python to use Kinesis Data Analytics Studio. In fact, I am going to use SQL in the following steps. Also, you can avoid any coding and use the Amazon Kinesis Data Generator user interface (UI) to send test data to Kinesis Data Streams or Kinesis Data Firehose. I am using a Python script to have finer control over the data that is being sent.

import datetime
import json
import random
import boto3

STREAM_NAME = "my-input-stream"


def get_random_data():
    current_temperature = round(10 + random.random() * 170, 2)
    if current_temperature > 160:
        status = "ERROR"
    elif current_temperature > 140 or random.randrange(1, 100) > 80:
        status = random.choice(["WARNING","ERROR"])
    else:
        status = "OK"
    return {
        'sensor_id': random.randrange(1, 100),
        'current_temperature': current_temperature,
        'status': status,
        'event_time': datetime.datetime.now().isoformat()
    }


def send_data(stream_name, kinesis_client):
    while True:
        data = get_random_data()
        partition_key = str(data["sensor_id"])
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey=partition_key)


if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis')
    send_data(STREAM_NAME, kinesis_client)

This script sends random records to my Kinesis data stream using JSON syntax. For example:

{'sensor_id': 77, 'current_temperature': 93.11, 'status': 'OK', 'event_time': '2021-05-19T11:20:00.978328'}
{'sensor_id': 47, 'current_temperature': 168.32, 'status': 'ERROR', 'event_time': '2021-05-19T11:20:01.110236'}
{'sensor_id': 9, 'current_temperature': 140.93, 'status': 'WARNING', 'event_time': '2021-05-19T11:20:01.243881'}
{'sensor_id': 27, 'current_temperature': 130.41, 'status': 'OK', 'event_time': '2021-05-19T11:20:01.371191'}

From the Kinesis console, I select a Kinesis data stream (my-input-stream) and choose Process data in real time from the Process drop-down. In this way, the stream is configured as a source for the notebook.

Console screenshot.

Then, in the following dialog box, I create an Apache Flink – Studio notebook.

I enter a name (my-notebook) and a description for the notebook. The AWS Identity and Access Management (IAM) permissions to read from the Kinesis data stream I selected earlier (my-input-stream) are automatically attached to the IAM role assumed by the notebook.

Console screenshot.

I choose Create to open the AWS Glue console and create an empty database. Back in the Kinesis Data Analytics Studio console, I refresh the list and select the new database. It will define the metadata for my sources and destinations. From here, I can also review the default Studio notebook settings. Then, I choose Create Studio notebook.

Console screenshot.

Now that the notebook has been created, I choose Run.

Console screenshot.

When the notebook is running, I choose Open in Apache Zeppelin to get access to the notebook and write code in SQL, Python, or Scala to interact with my streaming data and get insights in real time.

In the notebook, I create a new note and call it Sensors. Then, I create a sensor_data table describing the format of the data in the stream:

%flink.ssql

CREATE TABLE sensor_data (
    sensor_id INTEGER,
    current_temperature DOUBLE,
    status VARCHAR(6),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (sensor_id)
WITH (
    'connector' = 'kinesis',
    'stream' = 'my-input-stream',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
)

The first line in the previous command tells to Apache Zeppelin to provide a stream SQL environment (%flink.ssql) for the Apache Flink interpreter. I can also interact with the streaming data using a batch SQL environment (%flink.bsql), or Python (%flink.pyflink) or Scala (%flink) code.

The first part of the CREATE TABLE statement is familiar to anyone who has used SQL with a database. A table is created to store the sensor data in the stream. The WATERMARK option is used to measure progress in the event time, as described in the Event Time and Watermarks section of the Apache Flink documentation.

The second part of the CREATE TABLE statement describes the connector used to receive data in the table (for example, kinesis or kafka), the name of the stream, the AWS Region, the overall data format of the stream (such as json or csv), and the syntax used for timestamps (in this case, ISO 8601). I can also choose the starting position to process the stream, I am using LATEST to read the most recent data first.

When the table is ready, I find it in the AWS Glue Data Catalog database I selected when I created the notebook:

Console screenshot.

Now I can run SQL queries on the sensor_data table and use sliding or tumbling windows to get a better understanding of what is happening with my sensors.

For an overview of the data in the stream, I start with a simple SELECT to get all the content of the sensor_data table:

%flink.ssql(type=update)

SELECT * FROM sensor_data;

This time the first line of the command has a parameter (type=update) so that the output of the SELECT, which is more than one row, is continuously updated when new data arrives.

On the terminal of my laptop, I start the random_data_generator.py script:

$ python3 random_data_generator.py

At first I see a table that contains the data as it comes. To get a better understanding, I select a bar graph view. Then, I group the results by status to see their average current_temperature, as shown here:

Notebook screenshot.

As expected by the way I am generating these results, I have different average temperatures depending on the status (OK, WARNING, or ERROR). The higher the temperature, the greater the probability that something is not working correctly with my sensors.

I can run the aggregated query explicitly using a SQL syntax. This time, I want the result computed on a sliding window of 1 minute with results updated every 10 seconds. To do so, I am using the HOP function in the GROUP BY section of the SELECT statement. To add the time to the output of the select, I use the HOP_ROWTIME function. For more information, see how group window aggregations work in the Apache Flink documentation.

%flink.ssql(type=update)

SELECT sensor_data.status,
       COUNT(*) AS num,
       AVG(sensor_data.current_temperature) AS avg_current_temperature,
       HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
  FROM sensor_data
 GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status;

This time, I look at the results in table format:

Notebook screenshot.

To send the result of the query to a destination stream, I create a table and connect the table to the stream. First, I need to give permissions to the notebook to write into the stream.

In the Kinesis Data Analytics Studio console, I select my-notebook. Then, in the Studio notebooks details section, I choose Edit IAM permissions. Here, I can configure the sources and destinations used by the notebook and the IAM role permissions are updated automatically.

Console screenshot.

In the Included destinations in IAM policy section, I choose the destination and select my-output-stream. I save changes and wait for the notebook to be updated. I am now ready to use the destination stream.

In the notebook, I create a sensor_state table connected to my-output-stream.

%flink.ssql

CREATE TABLE sensor_state (
    status VARCHAR(6),
    num INTEGER,
    avg_current_temperature DOUBLE,
    hop_time TIMESTAMP(3)
)
WITH (
'connector' = 'kinesis',
'stream' = 'my-output-stream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');

I now use this INSERT INTO statement to continuously insert the result of the select into the sensor_state table.

%flink.ssql(type=update)

INSERT INTO sensor_state
SELECT sensor_data.status,
    COUNT(*) AS num,
    AVG(sensor_data.current_temperature) AS avg_current_temperature,
    HOP_ROWTIME(event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
FROM sensor_data
GROUP BY HOP(event_time, INTERVAL '10' second, INTERVAL '1' minute), sensor_data.status;

The data is also sent to the destination Kinesis data stream (my-output-stream) so that it can be used by other applications. For example, the data in the destination stream can be used to update a real-time dashboard, or to monitor the behavior of my sensors after a software update.

I am satisfied with the result. I want to deploy this query and its output as a Kinesis Analytics application. To do so, I need to provide an S3 location to store the application executable.

In the configuration section of the console, I edit the Deploy as application configuration settings. There, I choose a destination bucket in the same region and save changes.

Console screenshot.

I wait for the notebook to be ready after the update. Then, I create a SensorsApp note in my notebook and copy the statements that I want to execute as part of the application. The tables have already been created, so I just copy the INSERT INTO statement above.

From the menu at the top right of my notebook, I choose Build SensorsApp and export to Amazon S3 and confirm the application name.

Notebook screenshot.

When the export is ready, I choose Deploy SensorsApp as Kinesis Analytics application in the same menu. After that, I fine-tune the configuration of the application. I set parallelism to 1 because I have only one shard in my input Kinesis data stream and not a lot of traffic. Then, I run the application, without having to write any code.

From the Kinesis Data Analytics applications console, I choose Open Apache Flink dashboard to get more information about the execution of my application.

Apache Flink console screenshot.

Availability and Pricing
You can use Amazon Kinesis Data Analytics Studio today in all AWS Regions where Kinesis Data Analytics is generally available. For more information, see the AWS Regional Services List.

In Kinesis Data Analytics Studio, we run the open-source versions of Apache Zeppelin and Apache Flink, and we contribute changes upstream. For example, we have contributed bug fixes for Apache Zeppelin, and we have contributed to AWS connectors for Apache Flink, such as those for Kinesis Data Streams and Kinesis Data Firehose. Also, we are working with the Apache Flink community to contribute availability improvements, including automatic classification of errors at runtime to understand whether errors are in user code or in application infrastructure.

With Kinesis Data Analytics Studio, you pay based on the average number of Kinesis Processing Units (KPU) per hour, including those used by your running notebooks. One KPU comprises 1 vCPU of compute, 4 GB of memory, and associated networking. You also pay for running application storage and durable application storage. For more information, see the Kinesis Data Analytics pricing page.

Start using Kinesis Data Analytics Studio today to get better insights from your streaming data.

Danilo

Build a data lake using Amazon Kinesis Data Streams for Amazon DynamoDB and Apache Hudi

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/build-a-data-lake-using-amazon-kinesis-data-streams-for-amazon-dynamodb-and-apache-hudi/

Amazon DynamoDB helps you capture high-velocity data such as clickstream data to form customized user profiles and online order transaction data to develop customer order fulfillment applications, improve customer satisfaction, and get insights into sales revenue to create a promotional offer for the customer. It’s essential to store these data points in a centralized data lake, which can be transformed, analyzed, and combined with diverse organizational datasets to derive meaningful insights and make predictions.

A popular use case in order management is receiving, tracking, and fulfilling customer orders. The order management process begins when an order is placed and ends when the customer receives their package. When storing high-velocity order transaction data in DynamoDB, you can use Amazon Kinesis streaming to extract data and store it in a centralized data lake built on Amazon Simple Storage Service (Amazon S3).

Amazon Kinesis Data Streams for DynamoDB helps you to publish item-level changes in any DynamoDB table to a Kinesis data stream of your choice. Additionally, you can take advantage of this feature for use cases that require longer data retention on the stream and fan out to multiple concurrent stream readers. You also can integrate with Amazon Kinesis Data Analytics or Amazon Kinesis Data Firehose to publish data to downstream destinations such as Amazon Elasticsearch Service (Amazon ES), Amazon Redshift, or Amazon S3.

In this post, you use Kinesis Data Streams for DynamoDB and take advantage of managed streaming delivery of DynamoDB data to other Kinesis Data Stream by simply enabling Kinesis streaming connection from Amazon DynamoDB console. To process DynamoDB events from Kinesis, you have multiple options: Amazon Kinesis Client Library (KCL) applications, Lambda, Kinesis Data Analytics for Apache Flink, and Kinesis Data Firehose. In this post, you use Kinesis Data Firehose to save the raw data in the S3 data lake and Apache Hudi to batch process the data.

Architecture

The following diagram illustrates the order processing system architecture.

In this architecture, users buy products in online retail shops and internally create an order transaction stored in DynamoDB. The order transaction data is ingested to the data lake and stored in the raw data layer. To achieve this, you enable Kinesis Data Streams for DynamoDB and use Kinesis Data Firehose to store data in Amazon S3. You use Lambda to transform the data from the delivery stream to remove unwanted data and finally store it in Parquet format. Next, you batch process the raw data and store it back in the Hudi dataset in the S3 data lake. You can then use Amazon Athena to do sales analysis. You build this entire data pipeline in a serverless manner.

Prerequisites

Complete the following steps to create AWS resources to build a data pipeline as mentioned in the architecture. For this post, we use the AWS Region us-west-1.

  1. On the Amazon Elastic Compute Cloud (Amazon EC2) console, create a keypair.
  2. Download the data files, Amazon EMR cluster, and Athena DDL code from GitHub.
  3. Deploy the necessary Amazon resources using the provided AWS CloudFormation template.
  4. For Stack name, enter a stack name of your choice.
  5. For Keypair name, choose a key pair.

A key pair is required to connect to the EMR cluster nodes. For more information, see Use an Amazon EC2 Key Pair for SSH Credentials.

  1. Keep the remaining default parameters.
  2. Acknowledge that AWS CloudFormation might create AWS Identity and Access Management (IAM) resources.

For more information about IAM, see Resources to learn more about IAM.

  1. Choose Create stack.

You can check the Resources tab for the stack after the stack is created.

The following table summarizes the resources that you created, which you use to build the data pipeline and analysis.

Logical ID Physical ID Type
DeliveryPolicy kines-Deli-* AWS::IAM::Policy
DeliveryRole kinesis-hudi-DeliveryRole-* AWS::IAM::Role
Deliverystream kinesis-hudi-Deliverystream-* AWS::KinesisFirehose::DeliveryStream
DynamoDBTable order_transaction_* AWS::DynamoDB::Table
EMRClusterServiceRole kinesis-hudi-EMRClusterServiceRole-* AWS::IAM::Role
EmrInstanceProfile kinesis-hudi-EmrInstanceProfile-* AWS::IAM::InstanceProfile
EmrInstanceRole kinesis-hudi-EmrInstanceRole-* AWS::IAM::Role
GlueDatabase gluedatabase-* AWS::Glue::Database
GlueTable gluetable-* AWS::Glue::Table
InputKinesisStream order-data-stream-* AWS::Kinesis::Stream
InternetGateway igw-* AWS::EC2::InternetGateway
InternetGatewayAttachment kines-Inter-* AWS::EC2::VPCGatewayAttachment
MyEmrCluster AWS::EMR::Cluster
ProcessLambdaExecutionRole kinesis-hudi-ProcessLambdaExecutionRole-* AWS::IAM::Role
ProcessLambdaFunction kinesis-hudi-ProcessLambdaFunction-* AWS::Lambda::Function
ProcessedS3Bucket kinesis-hudi-processeds3bucket-* AWS::S3::Bucket
PublicRouteTable AWS::EC2::RouteTable
PublicSubnet1 AWS::EC2::Subnet
PublicSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
PublicSubnet2 AWS::EC2::Subnet
PublicSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
RawS3Bucket kinesis-hudi-raws3bucket-* AWS::S3::Bucket
S3Bucket kinesis-hudi-s3bucket-* AWS::S3::Bucket
SourceS3Bucket kinesis-hudi-sources3bucket-* AWS::S3::Bucket
VPC vpc-* AWS::EC2::VPC

Enable Kinesis streaming for DynamoDB

AWS recently launched Kinesis Data Streams for DynamoDB so you can send data from DynamoDB to Kinesis data streams. You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to enable this feature.

To enable this feature from the console, complete the following steps:

  1. On the DynamoDB console, choose the table you created in the CloudFormation stack earlier (it begins with the prefix order_transaction_).
  2. On the Overview tab, choose Manage streaming to Kinesis.
  3. Choose your input stream (it starts with order-data-stream-).
  4. Choose Enable.
  5. Choose Close.
  6. Make sure that stream enabled is set to Yes.

Populate the sales order transaction dataset

To replicate a real-life use case, you need an online retail application. For this post, you upload raw data files in the S3 bucket and use a Lambda function to upload the data in DynamoDB. You can download the order data CSV files from the AWS Sample GitHub repository. Complete the following steps to upload the data in DynamoDB:

  1. On the Amazon S3 console, choose the bucket <stack-name>-sourcess3bucket-*.
  2. Choose Upload.
  3. Choose Add files.
  4. Choose the order_data_09_02_2020.csv and order_data_10_02_2020.csv files.
  5. Choose Upload.
  6. On the Lambda console, choose the function <stack-name>-CsvToDDBLambdaFunction-*.
  7. Choose Test.
  8. For Event template, enter an event name.
  9. Choose Create.
  10. Choose Test.

This runs the Lambda function and loads the CSV file order_data_09_02_2020.csv to the DynamoDB table.

  1. Wait until the message appears that the function ran successfully.

You can now view the data on the DynamoDB console, in the details page for your table.

Because you enabled the Kinesis data stream in the DynamoDB table, it starts streaming the data to Amazon S3. You can check the data by viewing the bucket on the Amazon S3 console. The following screenshot shows that a Parquet file is under the prefix in the bucket.

Use Apache Hudi with Amazon EMR

Now it’s time to process the streaming data using Hudi.

  1. Log in to the Amazon EMR leader node.

You can use the key pair you chose in the security options to SSH into the leader node.

  1. Use the following bash command to start the Spark shell to use it with Apache Hudi:
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

The Amazon EMR instance looks like the following screenshot.

  1. You can use the following Scala code to import the order transaction data from the S3 data lake to a Hudi dataset using the copy-on-write storage type. Change inputDataPath as per file path in <stack-name>-raws3bucket-* in your environment, and replace the bucket name in hudiTablePath as <stack-name>- processeds3bucket-*.
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Set up various input values as variables
val inputDataPath = "s3://kinesis-hudi-raws3bucket-1p6nszvqd9awz/2021/02/01/15/"
val hudiTableName = "order_hudi_cow"
val hudiTablePath = "s3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/" + hudiTableName

// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id",
	DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "time_stamp",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "transaction_date", 
    HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "time_stamp", 
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "transaction_date", 
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
        classOf[MultiPartKeysValueExtractor].getName)

// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("parquet").load(inputDataPath)

// Write data into the Hudi dataset
inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath)

For more information about DataSourceWriteOptions, see Work with a Hudi Dataset.

  1. In the Spark shell, you can now count the total number of records in the Apache Hudi dataset:
scala> inputDF.count()
res1: Long = 1000

You can check the processed Apache Hudi dataset in the S3 data lake via the Amazon S3 console. The following screenshot shows the prefix order_hudi_cow is in <stack-name>- processeds3bucket-*.

When navigating into the order_hudi_cow prefix, you can find a list of Hudi datasets that are partitioned using the transaction_date key—one for each date in our dataset.

Let’s analyze the data stored in Amazon S3 using Athena.

Analyze the data with Athena

To analyze your data, complete the following steps:

  1. On the Athena console, create the database order_db using the following command:
create database order_db;

You use this database to create all the Athena tables.

  1. Create your table using the following command (replace the S3 bucket name with <stack-name>- processeds3bucket* created in your environment):
    CREATE EXTERNAL TABLE order_transaction_cow (
      `_hoodie_commit_time` string,
      `_hoodie_commit_seqno` string,
      `_hoodie_record_key` string,
      `_hoodie_partition_path` string,
      `_hoodie_file_name` string,
      `order_id` string,
      `item_id` string,
      `customer_id` string,
      `product` string,
      `amount` decimal(3,1),
      `currency` string,
      `time_stamp` string
      )
      PARTITIONED BY ( 
      `transaction_date` string)
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hudi.hadoop.HoodieParquetInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/order_hudi_cow'

  2. Add partitions by running the following query on the Athena console:
    ALTER TABLE order_transaction_cow ADD
    PARTITION (transaction_date = '2020-09-02') LOCATION 's3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/order_hudi_cow/2020-09-02/';

  3. Check the total number of records in the Hudi dataset with the following query:
    SELECT count(*) FROM "order_db"."order_transaction_cow";

It should return a single row with a count of 1,000.

Now check the record that you want to update.

 

  1. Run the following query on the Athena console:
SELECT * FROM "order_db"."order_transaction_cow"
where order_id ='3801'
and item_id ='23'
and transaction_date ='2020-09-02';

The output should look like the following screenshot. Note down the value of product and amount.

Analyze the change data capture

Now let’s test the change data capture (CDC) in streaming. Let’s take an example where the customer changed an existing order. We load the order_data_10_02_2020.csv file, where order_id 3801 has a different product and amount.

To test the CDC feature, complete the following steps:

  1. On the Lambda console, choose the stack <stack-name>-CsvToDDBLambdaFunction-*.
  2. In the Environment variables section, choose Edit.
  3. For key, enter order_data_10_02_2020.csv.
  4. Choose Save.

You can see another prefix has been created in <stack-name>-raws3bucket-*.

  1. In Amazon EMR, run the following code in the Scala shell prompt to update the data (change inputDataPath to the file path in <stack-name>-raws3bucket-* and hudiTablePath to <stack-name>- processeds3bucket-*):
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor
    
    //Set up various input values as variables
    val inputDataPath = "s3://kinesis-hudi-raws3bucket-1p6nszvqd9awz/2021/02/01/18/"
    val hudiTableName = "order_hudi_cow"
    val hudiTablePath = "s3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/" + hudiTableName
    
    // Set up our Hudi Data Source Options
    val hudiOptions = Map[String,String](
        DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id",
    	DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "time_stamp",
        DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "transaction_date", 
        HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
        DataSourceWriteOptions.OPERATION_OPT_KEY ->
            DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
        DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "time_stamp", 
        DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
        DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
        DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "transaction_date", 
        DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
            classOf[MultiPartKeysValueExtractor].getName)
    
    // Read data from S3 and create a DataFrame with Partition and Record Key
    val inputDF = spark.read.format("parquet").load(inputDataPath)
    
    // Write data into the Hudi dataset
    inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath
    

  2. Run the following query on the Athena console to check for the change to the total number of records as 1,000:
    SELECT count(*) FROM "order_db"."order_transaction_cow";

  3. Run the following query on the Athena console to test for the update:
SELECT * FROM "order_db"."order_transaction_cow"
where order_id ='3801'
and item_id ='23'
and transaction_date ='2020-09-02';

The following screenshot shows that the product and amount values for the same order are updated.

In a production workload, you can trigger the updates on a schedule or by S3 modification events. A fully automated data lake makes sure your business analysts are always viewing the latest available data.

Clean up the resources

To avoid incurring future charges, follow these steps to remove the example resources:

  1. Delete the resources you created earlier in the pre-requisite section by deleting the stack instances from your stack set, if you created the EMR cluster with the CloudFormation template,.
  2. Stop the cluster via the Amazon EMR console, if you launched the EMR cluster manually.
  3. Empty all the relevant buckets via the Amazon S3 console.

Conclusion

You can build an end-to-end serverless data lake to get real-time insights from DynamoDB by using Kinesis Data Streams—all without writing any complex code. It allows your team to focus on solving business problems by getting useful insights immediately. Application developers have various use cases for moving data quickly through an analytics pipeline, and you can make this happen by enabling Kinesis Data Streams for DynamoDB.

If this post helps you or inspires you to solve a problem, we would love to hear about it! The code for this solution is available in the GitHub repository for you to use and extend. Contributions are always welcome!


About the Authors

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to guide enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

 

 

 

Saurabh Shrivastava is a solutions architect leader and analytics/ML specialist working with global systems integrators. He works with AWS Partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

 

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on data analytics, AI/ML, and DevOps.

 

Retaining data streams up to one year with Amazon Kinesis Data Streams

Post Syndicated from Nihar Sheth original https://aws.amazon.com/blogs/big-data/retaining-data-streams-up-to-one-year-with-amazon-kinesis-data-streams/

Streaming data is used extensively for use cases like sharing data between applications, streaming ETL (extract, transform, and load), real-time analytics, processing data from internet of things (IoT) devices, application monitoring, fraud detection, live leaderboards, and more. Typically, data streams are stored for short durations of time before being loaded into a permanent data store like a data lake or analytics service.

Additional use cases are becoming more prevalent that may require you retain data in streams for longer periods of time. For example, compliance programs like HIPAA and FedRAMP may require you to store raw data for more than a few days or weeks, or you may want to backtest machine learning (ML) algorithms with historical data that may be several months old.

A challenge arises when you want to process historical data and newly arriving data streams. This requires complex logic to access your data lake and your data stream store, or two sets of code—one to process data from your data lake and one to process your new data streams.

Amazon Kinesis Data Streams solves this challenge by storing your data streams up to 1 year with long-term retention. You can use the same Kinesis Data Streams code base to process both historical and newly arriving data streams, and continue to use features like enhanced fan-out to read large data volumes at very high throughput.

In this post, we describe how long-term retention enables new use cases by bridging real-time and historical data processing. We also demonstrate how you can reduce the time to retrieve 30 days of data from a data stream by an order of magnitude using Kinesis Data Streams enhanced fan-out.

Simple setup, no resource provisioning

Kinesis Data Streams durably stores all data stream records in a shard, an append-only log ordered by arrival time. The time period from when a record is added to when it’s no longer accessible is called the retention period. A Kinesis data stream stores records for 24 hours by default, up to 365 days (8,760 hours). Applications can start reading data at any point in the retention period in the exact order in which the data stream is stored. Shards enable these applications to process data in parallel and at low-latency.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

The default retention period is 24 hours and covers scenarios where intermittent lags in processing need to catch up with the real-time data. You can extend retention up to 7 days to reprocess slightly aged data to resolve potential downstream data losses. You can also use long-term retention to store data for more than 7 days and up to 365 days to reprocess historical data for use cases like algorithm backtesting, data store backfills, and auditing. For more information, see Changing the Data Retention Period.

Similarly, you can use the following AWS Command Line Interface (AWS CLI) command to set the retention period in hours (the following code sets it to 9 days, or 216 hours):

aws kinesis increase-stream-retention-period \
    --stream-name samplestream \
    --retention-period-hours 216

Read new and historical data, no code changes necessary

All the data captured in the stream is stored in a durable, encrypted, and secure manner for the specified retention period up to a maximum of 1 year. You can store any amount of data, retrieve it by specifying a start position, and read sequentially using the familiar getRecords and SubscribeToShard APIs. The start position can be the sequence number of a data record in a shard or a timestamp. This enables you to use the same code to process older data. You can set up multiple consuming applications to start processing data at different points in the data stream.

Speed up data reads using enhanced fan-out consumers

Kinesis Data Streams provides two types of models to consume data: shared throughput consumer and enhanced fan-out (EFO) consumer. In the shared throughput consumer model, all the consuming applications share 2 MB/s per shard read throughput and a 5 transactions per second (TPS) quota. In the enhanced fan-out model, each consumer gets a dedicated read throughput of 2MB/s per shard. Because it uses an HTTP/2 data retrieval API, there is no longer a limit of 5 TPS. You can attach up to 20 EFO consumers to a single stream and read data at a total rate of 40MB/s per shard. Because each consumer gets dedicated read throughput, processing one doesn’t impact another. So you can attach new consumers to process old data without worrying about the performance of the existing consumer processing real-time data. For example, you can retrain an ML model in an ad hoc fashion without impacting real-time workflows.

You can add and remove EFO consumers at any time and avoid paying for over-provisioned resources. For example, when backtesting, you can register EFO consumers before the test and remove them after completion. You’re only charged for resources used during the test. Also, you can use EFO consumers to accelerate the speed of processing. Each consuming application can process different parts of streams across the retention period to process all the data in parallel, thereby dramatically reducing the total processing time.

Clickstream pipeline use case

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

This pipeline takes clickstream data and creates an alert every time a user leaves your ecommerce site without purchasing the items in their cart. A simple pipeline like this is a great way to start with stream processing, but soon you may want to implement a recommendation system based on user activity on your website and mobile app. To do this, you need to gather historical data in your existing data stream and send it to Amazon Simple Storage Service (Amazon S3) so it can be used for training a recommendation ML model. This scenario illustrates a key benefit of enabling long-term retention: it gives you the flexibility to “go back in time” and replay the existing data in your stream to generate new analytics that you may not have considered when you initially set up the streaming pipeline.

Let’s say you enabled 30 days of retention on your Kinesis data stream. After you train your ML model, you can set up a new streaming pipeline that generates recommendations by calling an inference endpoint hosted on Amazon SageMaker based on the trained ML model. The following diagram illustrates the final state of this architecture.

The following diagram illustrates the final state of this architecture.

You can efficiently and quickly consume the existing data in the stream and write it to Amazon S3 so it can be used for training your ML model. The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

You may wonder, why read from Kinesis Data Streams and write to Amazon S3? Why not write to Amazon S3 directly without enabling long-term retention? First, ingesting into Kinesis Data Streams with long-term retention enabled gives you the flexibility to generate additional streaming analytics as time passes. Second, this gives you the flexibility to filter and transform the data being read from Kinesis Data Streams before generating analytics or writing to Amazon S3. Lastly, you can use this approach to render analytics onto other systems besides Amazon S3, such as Amazon Elasticsearch Service (Amazon ES) using the Elasticsearch sink for Apache Flink.

Keep in mind that we only use this pipeline to bootstrap our second, long-lived pipeline that does recommendations, but this is an important step and we need a way to do this efficiently. Although there are multiple options for consuming data from Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink provides an elegant way to attach multiple EFO consumers in the same consuming application.

You can find more information at the official Apache Flink website, and about Kinesis Data Analytics for Apache Flink in the Kinesis Data Analytics developer guide. Apache Flink has a number of connectors, like the recently released FlinkKinesisConsumer, which supports enhanced fan-out for consuming from Kinesis Data Streams, or the Streaming File Sink to write to Amazon S3 from your Apache Flink application.

Accelerating data consumption

For the sake of simplicity, let’s use just one shard in our data stream, ingest data at the maximum rate of 1MB/s, and specify a retention period of 30 days. To bootstrap our new analytics, reading the full amount of data over 30 days with one EFO consumer at 2MB/s could potentially take up to 15 days to load this data into Amazon S3. However, you can accelerate this to 20 times faster using 20 EFO consumers at the same time, each reading from different points in the stream at 2 MB/s. The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

This gives us a total of 40MB/s in consumption capacity as opposed to 2MB/s per shard with just one EFO consumer, reducing the overall time by 95%. In most use cases, this combination of Kinesis Data Analytics and EFO allows you to process 30 days of data in hours, instead of days.

A point of clarification regarding our approach: When all 20 consumers are finished reading past their respective endpoints in the stream, we stop the Apache Flink application. You can do this by raising an exception when all 20 consumers finish reading their respective time slices—effectively stopping the application. The following diagram illustrates the time savings we get from using 20 EFO consumers.

The following diagram illustrates the time savings we get from using 20 EFO consumers.

For more information about implementing this approach, see the GitHub repo.

Pricing

An additional cost is associated with long-term retention (from 7–365 days) and EFO consumers. For more information, see Amazon Kinesis Data Streams pricing. Because you can register EFO consumers on demand, you pay only for the limited time you used all 20 consumers to load data, resulting in faster loads. It’s important to point out that you pay roughly the same amount to consume a fixed volume of data from the stream with 20 EFO consumers as you do with 1 EFO consumer because of the shorter duration required when using 20 consumers. 

Summary

In this post, we discussed long-term retention use cases of Kinesis Data Streams, how to increase the retention of a data stream, and related feature enhancements with Kinesis Data Streams APIs and KCL. We took a deep dive into the Apache Flink-based enhanced-fan out consumer approach to replay long-term data quickly. We shared open-source code based on this approach so you can easily implement your use cases using Kinesis Data Streams long-term retention. 

You should use long-term retention if you’re planning to develop ML systems, generate customer behavior insights, or have compliance requirements for retaining raw data for more than 7 days. We would love to hear about your use cases with the long-term retention feature. Please submit your feedback to [email protected].


About the Authors

Nihar ShethNihar Sheth is a Senior Product Manager on the Amazon Kinesis Data Streams team at Amazon Web Services. He is passionate about developing intuitive product experiences that solve complex customer problems and enables customers to achieve their business goals. Outside of work, he is focusing on hiking 200 miles of beautiful PNW trails with his son in 2021.

 

 

Karthi Thyagarajan is a Solutions Architect on the Amazon Kinesis Team focusing on all things streaming and he enjoys helping customers tackle distributed systems challenges.

 

 

 

 

Sai Maddali is a Sr. Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Streams . He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

 

 

Larry Heathcote is a Senior Product Marketing Manager at Amazon Web Services for data streaming and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys walking his Samoyed Sasha in the mornings so she can look for squirrels to bark at.

How Baqend built a real-time web analytics platform using Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Wolfram Wingerath original https://aws.amazon.com/blogs/big-data/how-baqend-built-a-real-time-web-analytics-platform-using-amazon-kinesis-data-analytics-for-apache-flink/

This is a customer post written by the engineers from German startup Baqend and the AWS EMEA Prototyping Labs team.

Baqend is one of the fastest-growing software as a service (SaaS) startups in Germany, serving over 5,000 business customers with more than 100 million monthly users and $2 billion EUR revenue per year. Baqend’s main product is a one-click solution to accelerate ecommerce websites called Speed Kit. By rerouting a portion of the web traffic through Speed Kit’s caching infrastructure, it achieves a typical performance boost between 1.5–3 times faster.

To measure the impact of Speed Kit and confirm its uplift to Baqend’s customers, we maintain several dashboards that display the technical and business performance improvements achieved by Speed Kit. This requires complex aggregations of tracking data collected during A/B tests on our customers’ websites.

The Challenge: Real-time analytics and reporting at scale

One of the key issues with our legacy solution for monitoring and reporting needed to process. The raw tracking data from all users was batched through various systems, which resulted in processing delays up to 24 hours for some analytics jobs. This impacted our operations monitoring and sales activities negatively, because our customers sometimes couldn’t analyze the impact of deployment changes until the next day. Furthermore, our legacy reporting service lacked any support for custom visualization development.

This post shows you how we transformed our batch-based analytics process into a continuous complex event-processing pipeline, which is managed by Amazon Kinesis Data Analytics for Apache Flink. The new solution exhibits less than a minute of end-to-end latency from data ingestion to visual output in the dashboard.

The key topics presented in this post are:

Solution overview and key components

Following a remote planning phase in which we defined our requirements and laid out the basic design, we built the solution on an on-site prototyping engagement with AWS over the course of 4 weeks in early 2020 in Hamburg. Seven team members from Baqend and AWS EMEA Prototyping Labs implemented the following architecture.

Following a remote planning phase in which we defined our requirements and laid out the basic design.

The workflow includes the following steps:

  1. The performance tracking data is streamed by Speed Kit Amazon Elastic Compute Cloud (Amazon EC2) instances.
  2. This data goes into an Amazon Kinesis Data Streams
  3. This data stream is consumed by a Kinesis Data Analytics for Apache Flink application.
  4. The data is ingested into Amazon ES.
  5. This streaming application relies on AWS Secrets Manager to store and access the credentials for Elasticsearch with basic HTTP authentication.
  6. An Nginx proxy server application hosted on EC2 instances in multiple public subnets and Availability Zones redirects the user requests Kibana with Amazon Cognito authentication (for more information, see How do I use an NGINX proxy to access Kibana from outside a VPC that’s using Amazon Cognito authentication?).
  7. The Apache Flink application also uses Amazon DynamoDB as a backend for long-living external states required for certain operations (covered later in this post).
  8. The streaming application also delivers the raw and intermediate data outputs to an Amazon Simple Storage Service (Amazon S3) bucket to enable historical data analysis and operational troubleshooting with Amazon Athena.

Although the prototyping engagement also covered other aspects, we focus on the Kinesis Data Analytics application in the following sections of this post.

Continuous aggregation with Kinesis Data Analytics

We need to collect all kinds of technical data points on every page load of a website visitor. Details on the individual page impressions (PI) help us analyze web performance for the websites of our customers. Speed Kit provides a performance tracking functionality that collects data within the browser of every website visitor and sends it to our analytics backend.

Aggregating page impressions

Intuitively, there should be only one data beacon for any given PI because the data could be aggregated in the browser before it’s sent to our backend. Speed Kit sends several data beacons during the page load to minimize the possibility of any data loss, as shown in the following figure.

Speed Kit sends several data beacons during the page load to minimize the possibility of any data loss, as shown in the following figure.

For example, static information such as the target URL or the current time can be sent away as soon the navigation starts (navigation beacon), whereas certain measurements can’t be sent until very late in the load process, like the time it took to load the entire page (load beacon). Certain events may even occur minutes after the page load, or not at all (for example, user interaction with the page or JavaScript errors) and are therefore handled via dedicated and optional transmissions (event beacons). These beacons need to be correlated in our analytical backend later on.

Aggregating session data

Because some of the most interesting metrics are computed on the level of user sessions, aggregating all data beacons for the individual PIs isn’t enough to analyze web performance. For instance, the user engagement metrics are often quantified by the number of pages visited in one sitting (session length) or the share of users that left on the very first page (bounce rate).

Aggregating relevant information may even involve identifying and removing duplicates, as illustrated in the following figure.

Aggregating relevant information may even involve identifying and removing duplicates, as illustrated in the following figure.

Suppose the user first checks out the landing page and immediately leaves (Session 1), and then comes back later to browse through some products and buy some blue shoes (Session 2), and finally returns after a few hours to reload the order confirmation page and browse some more products (Session 3). Because Session 3 starts with a reload of the order confirmation page, tracking data on the order that was completed in Session 2 is transmitted a second time, resulting in a potentially duplicated count of the completed orders. Therefore, our analytical backend needs to identify the duplicated tracking information as such and ignore it for further analysis. To enable this, we persistently store a salted hash of every order ID and simply have the aggregation pipeline drop the tracking data on any order that has already been written to the external key value store (see the diagram in the following section).

Anatomy of the streaming application

The following diagram shows our event processing pipeline from raw data collection to the storage of aggregation results.

The following diagram shows our event processing pipeline from raw data collection to the storage of aggregation results.

The workflow is as follows:

  1. The first step is tracking the data within the browsers of the end users.
  2. The data is sent to Kinesis Data Streams for consumption through a custom stateful Apache Flink process function within a Kinesis Data Analytics application.
  3. Raw data beacons are initially normalized and invalid data beacons are delivered to Amazon S3 via side outputs to facilitate later analysis of all data that has been sorted out.
  4. As mentioned earlier, we use a DynamoDB table to run a deduplication rule over all incoming order data (confirmation pages) by the DynamoDB Transactions API. We also use another DynamoDB table to identify bot traffic by storing the user agent strings that have been associated with suspicious behavior consistently (because they belong to web crawlers). Finally, the stream of cleaned tracking beacons is processed in stateful window aggregation steps for storage.
  5. We aggregate all beacons referring to the same PI and write them off to our data lake on Amazon S3 to enable offline analysis with Athena.
  6. Furthermore, we compile the tracking beacon stream into 1-minute summaries containing both PI and session data for storage via Amazon ES to enable efficient reporting with Kibana.

State storage and application management

Most of the application state for the streaming application is held in the built-in RocksDB state backend with incremental checkpointing. This default built-in state storage mechanism depends on a 50 GB storage limit provided for each Kinesis Processing Unit (KPU) allocated to a Kinesis Data Analytics application. On the other hand, we used DynamoDB tables to store the state permanently for unique conversions and user agent strings in order to decouple historical state for these two data types from Apache Flink application management and to keep the checkpointing duration and size under control. Using DynamoDB for these two use cases helps to control the overhead for creating and restoring checkpoints and thereby controls the application startup time.

Workload distribution and scalability

As of February 2021, our processing pipeline handles over 2.8 billion tracking beacons per month, which corresponds to more than 500 million individual PIs from over 140 million user sessions and more than 100 million unique users. Achieving this scale requires even distribution of both processing and storage load across all stream partitions. Therefore, we use randomly generated session IDs as a partitioning key for the input Kinesis data stream and throughout most of the remaining sections of our pipeline.

In the presence of certain anomalies such as heavy bot traffic, a load skew may occur regardless, which may impair overall throughput or even crash the entire application in extreme cases. We monitor the number of incoming and outgoing records (to derive the current buffer size) for the individual Apache Flink operators in every stream partition to identify issues with the load distribution quickly and generate alert notifications via multiple channels (such as Slack and email) if the measurements for different stream partitions diverge significantly. For convenience, we further visualize custom Amazon CloudWatch metrics in a Grafana dashboard.

Event processing, delivery semantics, and fault tolerance

The application restarts and downtime (such as during and after application deployment) can be handled seamlessly by using Apache Flink’s event time processing semantics as generated output is independent of the wall-clock time of the processing nodes. All processing is based on monotonically increasing ingestion timestamps to eliminate the possibility of late arrivers. While our data cleaning procedure identifies the invalid records, it never drops any data items from the stream, but instead it only attaches information on the detected issue to the data item in question. This approach enables us to analyze the frequency and distribution of every problem in our aggregation pipeline by using the same Kibana dashboard.

Even though the data ingestion to Amazon ES provides at-least-once delivery guarantees by default, we managed to achieve exactly-once delivery guarantees from the source Kinesis data stream to the Elasticsearch index by generating document identifiers in a deterministic fashion. Therefore, the data stream can be replayed safely because the existing data records are overwritten on re-insertion into the Elasticsearch index.

Data retention and multi-tenancy in Amazon ES

We store pre-aggregated data at the minute level in Amazon ES to make sure our Kibana dashboard remains responsive even when analyzing a scope of weeks or months. As illustrated in the following figure, the Apache Flink application summarizes the raw tracking data along different dimensions (browser, device, test group, and aggregation time in minutes) before writing it to Elasticsearch.

Apache Flink application summarizes the raw tracking data along different dimensions (browser, device, test group, and aggregation time in minutes) before writing it to Elasticsearch.

The Elasticsearch documents are composed of bucketed histogram data for performance timers such as the First Contentful Paint (FCP) instead of the actual timer values. Running queries over these aggregates instead of the raw data minimizes query run costs significantly: traffic-heavy customers may have tens of millions of raw tracking beacons in a single week, whereas the number of 1-minute buckets is several orders of magnitudes lower (for small and large customers alike). We observe over 5 times more PIs and 30 times more raw beacons than aggregates stored in Elasticsearch across all of our customers.

We store the data for different customers in separate indexes generated for a fixed temporal rolling period by the Apache Flink Elasticsearch Sink Connector. We also implemented customer-specific retention policies in Amazon ES by deleting the old indexes as required. Our deployment is multi-tenant so that our customers can receive fine-grained access only to their own data stored in the indexes created for them.

Kibana for continuous reporting

We used Kibana to build our dashboards because it provides powerful and easy-to-create built-in visualizations and virtually boundless flexibility through custom Vega chart visualizations. Kibana also works well in combination with Elasticsearch indexes, thereby facilitating the role-based access management that enables us to provide individual customers access to the data in our multi-tenant dashboard.

Easy data exploration

The following illustration shows one of the standard visualizations in Kibana that we use to understand the distribution of device types and browsers used by website visitors.

The following illustration shows one of the standard visualizations in Kibana that we use to understand the distribution of device types and browsers used by website visitors.

Real-time histogram visualization

Illustrating the distribution of performance metrics requires using a custom visualization. The following custom Vega histogram chart illustrates by the concrete performance metric LCP how Speed Kit improves the webpage load time.

The following custom Vega histogram chart illustrates by the concrete performance metric LCP how Speed Kit improves the webpage load time.

In comparison with the vanilla website where page loads are almost never faster than 2 seconds (pink area), Speed Kit-accelerated end users experience comparatively faster and even sub-second level load times (blue area).

Because our main business revolves around accelerating our customers’ websites, visualizing the actual uplift is critical for all developers (to debug performance and identify issues quickly) as well as our customers (highlighting the value of our product). With the continuous aggregation and reporting solution outlined in this post, we were able to satisfy all these requirements in a scalable and fully managed fashion.

Conclusion and future directions

In this post, we shared our journey from a high-volume batch analytics solution to a continuous aggregation pipeline using Kinesis Data Analytics for Apache Flink. Key aspects are:

  • End-to-end processing time is reduced from 24 hours to sub-minute latency.
  • We implemented a fully functional prototype within 4 weeks. The AWS Prototyping team enabled us to build our system on a multitude of managed AWS services.
  • The system was used with production load after 8 weeks.
  • The new system based on the Kinesis Data Analytics for Apache Flink application exhibits extreme scalability as it handles workloads with ease that were infeasible for the old system. As of February 2021, our system processes more than 500 million page loads from over 100 million unique users every month.
  • Elasticsearch and Kibana with customized Vega visualizations provides flexible and continuously updating dashboards for all our customers.

Additional Resources

For more details on the challenges and solutions discussed in this article, we recommend the following resources:

We would be glad to get feedback on our work, so please drop us a line in case of any remaining questions!


About the Authors

Wolfram “Wolle” Wingerath heads the data engineering team that is responsible for developing and operating Baqend’s infrastructure for analytics and reporting.

 

 

 

Florian Bücklers is Baqend’s Chief Technology Officer and therefore responsible for coordinating between the different teams for front-end and backend development, devOps, onboarding, and data engineering.

 

Benjamin Wollmer develops data-intensive systems at Baqend, but he is also doing his PhD at the University of Hamburg and therefore likes to read and write about related topics.

 

 

Stephan Succo is one of the core developers of Baqend’s continuous analytics pipeline.

 

Jörn Domnik is a Senior Software Engineer at Baqend with a focus on backend development and reliability engineering.

 

 

 

As a DevOps engineer, Virginia Amberg monitors cluster health and keeps all systems running smoothly at Baqend.

 

 

As a Principal Prototyping Engagement Manager in AWS, Markus Bestehorn is responsible for building business-critical prototypes with AWS customers and is a specialist for IoT and machine learning.

 

 

 

As a Data Prototyping Architect in AWS, Anil Sener builds prototypes on big data analytics, data streaming, and machine learning, which accelerates the production journey on the AWS Cloud for top EMEA customers.

 

 

As B2B Strategic Account Manager for Startups at AWS, Daniel Zäeh works with customers to make their ideas come true and helps them grow, by connecting tech and business.

 

 

 

 

 

 

Building a real-time notification system with Amazon Kinesis Data Streams for Amazon DynamoDB and Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/building-a-real-time-notification-system-with-amazon-kinesis-data-streams-for-amazon-dynamodb-and-amazon-kinesis-data-analytics-for-apache-flink/

Amazon DynamoDB helps you capture high-velocity data such as clickstream data to form customized user profiles and Internet of Things (IoT) data so that you can develop insights on sensor activity across various industries, including smart spaces, connected factories, smart packing, fitness monitoring, and more. It’s important to store these data points in a centralized data lake in real time, where they can be transformed, analyzed, and combined with diverse organizational datasets to derive meaningful insights and make predictions.

A popular use case in the wind energy sector is to protect wind turbines from wind speed. As per National Wind Watch, every wind turbine has a range of wind speeds, typically 30–55 mph, in which it produces maximum capacity. When wind speed is greater than 70 mph, it’s important to start shutdown to protect the turbine from a high wind storm. Customers often store high-velocity IoT data in DynamoDB and use Amazon Kinesis streaming to extract data and store it in a centralized data lake built on Amazon Simple Storage Service (Amazon S3). To facilitate this ingestion pipeline, you can deploy AWS Lambda functions or write custom code to build a bridge between DynamoDB Streams and Kinesis streaming.

Amazon Kinesis Data Streams for DynamoDB help you to publish item-level changes in any DynamoDB table to a Kinesis data stream of your choice. Additionally, you can take advantage of this feature for use cases that require longer data retention on the stream and fan out to multiple concurrent stream readers. You also can integrate with Amazon Kinesis Data Analytics or Amazon Kinesis Data Firehose to publish data to downstream destinations such as Amazon Elasticsearch Service, Amazon Redshift, or Amazon S3.

In this post, you use Kinesis Data Analytics for Apache Flink (Data Analytics for Flink) and Amazon Simple Notification Service (Amazon SNS) to send a real-time notification when wind speed is greater than 60 mph so that the operator can take action to protect the turbine. You use Kinesis Data Streams for DynamoDB and take advantage of managed streaming delivery of DynamoDB data to other AWS services without having to use Lambda or write and maintain complex code. To process DynamoDB events from Kinesis, you have multiple options: Amazon Kinesis Client Library (KCL) applications, Lambda, and Data Analytics for Flink. In this post, we showcase Data Analytics for Flink, but this is just one of many available options.

Architecture

The following architecture diagram illustrates the wind turbine protection system.

The following architecture diagram illustrates the wind turbine protection system.

In this architecture, high-velocity wind speed data comes from the wind turbine and is stored in DynamoDB. To send an instant notification, you need to query the data in real time and send a notification when the wind speed is greater than the established maximum. To achieve this goal, you enable Kinesis Data Streams for DynamoDB, and then use Data Analytics for Flink to query real-time data in a 60-second tumbling window. This aggregated data is stored in another data stream, which triggers an email notification via Amazon SNS using Lambda when the wind speed is greater than 60 mph. You will build this entire data pipeline in a serverless manner.

Deploying the wind turbine data simulator

To replicate a real-life scenario, you need a wind turbine data simulator. We use Amazon Amplify in this post to deploy a user-friendly web application that can generate the required data and store it in DynamoDB. You must have a GitHub account which will help to fork the Amplify app code and deploy it in your AWS account automatically.

Complete the following steps to deploy the data simulator web application:

  1. Choose the following AWS Amplify link to launch the wind turbine data simulator web app.

  1. Choose Connect to GitHub and provide credentials, if required.

Choose Connect to GitHub and provide credentials, if required.

  1. In the Deploy App section, under Select service role, choose Create new role.
  2. Follow the instructions to create the role amplifyconsole-backend-role.
  3. When the role is created, choose it from the drop-down menu.
  4. Choose Save and deploy.

Choose Save and deploy.

On the next page, the dynamodb-streaming app is ready to deploy.

  1. Choose Continue.

On the next page, the dynamodb-streaming app is ready to deploy.

On the next page, you can see the app build and deployment progress, which might take as many as 10 minutes to complete.

  1. When the process is complete, choose the URL on the left to access the data generator user interface (UI).
  2. Make sure to save this URL because you will use it in later steps.

Make sure to save this URL because you will use it in later steps.

You also get an email during the build process related to your SSH key. This email indicates that the build process created an SSH key on your behalf to connect to the Amplify application with GitHub.

  1. On the sign-in page, choose Create account.

On the sign-in page, choose Create account.

  1. Provide a user name, password, and valid email to which the app can send you a one-time passcode to access the UI.
  2. After you sign in, choose Generate data to generate wind speed data.
  3. Choose the Refresh icon to show the data in the graph.

You can generate a variety of data by changing the range of minimum and maximum speeds and the number of values.

You can generate a variety of data by changing the range of minimum and maximum speeds and the number of values.

To see the data in DynamoDB, choose the DynamoDB icon, note the table name that starts with windspeed-, and navigate to the table in the DynamoDB console.

To see the data in DynamoDB, choose the DynamoDB icon, note the table name that starts with windspeed.

Now that the wind speed data simulator is ready, let’s deploy the rest of the data pipeline.

Deploying the automated data pipeline by using AWS CloudFormation

You use AWS CloudFormation templates to create all the necessary resources for the data pipeline. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time. You can view the template and code in the GitHub repository.

  1. Choose Launch with CloudFormation Console:
  2. Choose the US West (Oregon) Region (us-west-2).
  3. For pEmail, enter a valid email to which the analytics pipeline can send notifications.
  4. Choose Next.

For pEmail, enter a valid email to which the analytics pipeline can send notifications.

  1. Acknowledge that the template may create AWS Identity and Access Management (IAM) resources.
  2. Choose Create stack.

This CloudFormation template creates the following resources in your AWS account:

  • An IAM role to provide a trust relationship between Kinesis and DynamoDB to replicate data from DynamoDB to the data stream
  • Two data streams:
    • An input stream to replicate data from DynamoDB
    • An output stream to store aggregated data from the Data Analytics for Flink app
  • A Lambda function
  • An SNS topic to send an email notifications about high wind speeds
  1. When the stack is ready, on the Outputs tab, note the values of both data streams.

When the stack is ready, on the Outputs tab, note the values of both data streams.

Check your email and confirm your subscription to receive notifications. Make sure to check your junk folder if you don’t see the email in your inbox.

Check your email and confirm your subscription to receive notifications.

Now you can use Kinesis Data Streams for DynamoDB, which allows you to have your data in both DynamoDB and Kinesis without having to use Lambda or write custom code.

Enabling Kinesis streaming for DynamoDB

AWS recently launched Kinesis Data Streams for DynamoDB so that you can send data from DynamoDB to Kinesis Data. You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to enable this feature.

To enable this feature from the console, complete the following steps:

  1. In the DynamoDB console, choose the table that you created earlier (it begins with the prefix windspeed-).
  2. On the Overview tab, choose Manage streaming to Kinesis.

On the Overview tab, choose Manage streaming to Kinesis.

  1. Choose your input stream.

Choose your input stream.

  1. Choose Enable.

Choose Enable.

  1. Choose Close.

Choose Close.

Make sure that Stream enabled is set to Yes.

Make sure that Stream enabled is set to Yes.

Building the Data Analytics for Flink app for real-time data queries

As part of the CloudFormation stack, the new Data Analytics for Flink application is deployed in the configured AWS Region. When the stack is up and running, you should be able to see the new Data Analytics for Flink application in the configured Region. Choose Run to start the app.

Choose Run to start the app.

When your app is running, you should see the following application graph.

When your app is running, you should see the following application graph.

Review the Properties section of the app, which shows you the input and output streams that the app is using.

Review the Properties section of the app, which shows you the input and output streams that the app is using.

Let’s learn important code snippets of the Flink Java application in next section, which explain how the Flink application reads data from a data stream, aggregates the data, and outputs it to another data stream.

Diving Deep into Flink Java application code:

In the following code, createSourceFromStaticConfig provides all the wind turbine speed readings from the input stream in string format, which we pass to the WindTurbineInputMap map function. This function parses the string into the Tuple3 data type (exp Tuple3<>(turbineID, speed, 1)). All Tuple3 messages are grouped by turbineID to further apply a one-minute tumbling window. The AverageReducer reduce function provides two things: the sum of all the speeds for the specific turbineId in the one-minute window, and a count of the messages for the specific turbineId in the one-minute window. The AverageMap map function takes the output of the AverageReducer reduce function and transforms it into Tuple2 (exp Tuple2<>(turbineId, averageSpeed)). Then all turbineIds are filtered with an average speed greater than 60 and map them to a JSON-formatted message, which we send to the output stream by using the createSinkFromStaticConfig sink function.

final StreamExecutionEnvironment env =
   StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> input = createSourceFromStaticConfig(env);

input.map(new WindTurbineInputMap())
   .filter(v -> v.f2 > 0)
   .keyBy(0)
      .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
   .reduce(new AverageReducer())
   .map(new AverageMap())
   .filter(v -> v.f1 > 60)
   .map(v -> "{ \"turbineID\": \"" + v.f0 + "\", \"avgSpeed\": "+ v.f1 +" }")
   .addSink(createSinkFromStaticConfig());

env.execute("Wind Turbine Data Aggregator");

The following code demonstrates how the createSourceFromStaticConfig and createSinkFromStaticConfig functions read the input and output stream names from the properties of the Data Analytics for Flink application and establish the source and sink of the streams.

private static DataStream<String> createSourceFromStaticConfig(
   StreamExecutionEnvironment env) throws IOException {
   Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
   Properties inputProperties = new Properties();
   inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, (String) applicationProperties.get("WindTurbineEnvironment").get("region"));
   inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

   return env.addSource(new FlinkKinesisConsumer<>((String) applicationProperties.get("WindTurbineEnvironment").get("inputStreamName"),
      new SimpleStringSchema(), inputProperties));
}

private static FlinkKinesisProducer<String> createSinkFromStaticConfig() throws IOException {
   Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
   Properties outputProperties = new Properties();
   outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, (String) applicationProperties.get("WindTurbineEnvironment").get("region"));

   FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
      SimpleStringSchema(), outputProperties);
   sink.setDefaultStream((String) applicationProperties.get("WindTurbineEnvironment").get("outputStreamName"));
   sink.setDefaultPartition("0");
   return sink;
}

In the following code, the WindTurbineInputMap map function parses Tuple3 out of the string message. Additionally, the AverageMap map and AverageReducer reduce functions process messages to accumulate and transform data.

public static class WindTurbineInputMap implements MapFunction<String, Tuple3<String, Integer, Integer>> {
   @Override
   public Tuple3<String, Integer, Integer> map(String value) throws Exception {
      String eventName = JsonPath.read(value, "$.eventName");
      if(eventName.equals("REMOVE")) {
         return new Tuple3<>("", 0, 0);
      }
      String turbineID = JsonPath.read(value, "$.dynamodb.NewImage.deviceID.S");
      Integer speed = Integer.parseInt(JsonPath.read(value, "$.dynamodb.NewImage.value.N"));
      return new Tuple3<>(turbineID, speed, 1);
   }
}

public static class AverageMap implements MapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> {
   @Override
   public Tuple2<String, Integer> map(Tuple3<String, Integer, Integer> value) throws Exception {
      return new Tuple2<>(value.f0, (value.f1 / value.f2));
   }
}

public static class AverageReducer implements ReduceFunction<Tuple3<String, Integer, Integer>> {
   @Override
   public Tuple3<String, Integer, Integer> reduce(Tuple3<String, Integer, Integer> value1, Tuple3<String, Integer, Integer> value2) {
      return new Tuple3<>(value1.f0, value1.f1 + value2.f1, value1.f2 + 1);
   }
}

Receiving email notifications of high wind speed

The following screenshot shows an example of the notification email you will receive about high wind speeds.

The following screenshot shows an example of the notification email you will receive about high wind speeds.

To test the feature, in this section you generate high wind speed data from the simulator, which is stored in DynamoDB, and get an email notification when the average wind speed is greater than 60 mph for a one-minute period. You’ll observe wind data flowing through the data stream and Data Analytics for Flink.

To test this feature:

  1. Generate wind speed data in the simulator and confirm that it’s stored in DynamoDB.
  2. In the Kinesis Data Streams console, choose the input data stream, kds-ddb-blog-InputKinesisStream.
  3. On the Monitoring tab of the stream, you can observe the Get records – sum (Count) metrics, which show multiple records captured by the data stream automatically.
  4. In the Kinesis Data Analytics console, choose the Data Analytics for Flink application, kds-ddb-blog-windTurbineAggregator.
  5. On the Monitoring tab, you can see the Last Checkpoint metrics, which show multiple records captured by the Data Analytics for Flink app automatically.
  6. In the Kinesis Data Streams console, choose the output stream, kds-ddb-blog-OutputKinesisStream.
  7. On the Monitoring tab, you can see the Get records – sum (Count) metrics, which show multiple records output by the app.
  8. Finally, check your email for a notification.

If you don’t see a notification, change the data simulator value range between a minimum of 50 mph and maximum of 90 mph and wait a few minutes.

Conclusion

As you have learned in this post, you can build an end-to-end serverless analytics pipeline to get real-time insights from DynamoDB by using Kinesis Data Streams—all without writing any complex code. This allows your team to focus on solving business problems by getting useful insights immediately. IoT and application development have a variety of use cases for moving data quickly through an analytics pipeline, and you can make this happen by enabling Kinesis Data Streams for DynamoDB.

If this blog post helps you or inspires you to solve a problem, we would love to hear about it! The code for this solution is available in the GitHub repository for you to use and extend. Contributions are always welcome!


About the Authors

Saurabh Shrivastava is a solutions architect leader and analytics/machine learning specialist working with global systems integrators. He works with AWS partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

Sameer Goel is a solutions architect in Seattle who drives customers’ success by building prototypes on cutting-edge initiatives. Prior to joining AWS, Sameer graduated with a Master’s degree with a Data Science concentration from NEU Boston. He enjoys building and experimenting with creative projects and applications.

 

 

Pratik Patel is a senior technical account manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions by using best practices, and proactively helps keep customers’ AWS environments operationally healthy.