Tag Archives: Amazon Kinesis Analytics

AWS CloudFormation Supports Amazon Kinesis Analytics Applications

Post Syndicated from Ryan Nienhuis original https://aws.amazon.com/blogs/big-data/aws-cloudformation-supports-amazon-kinesis-analytics-applications/

You can now provision and manage resources for Amazon Kinesis Analytics applications using AWS CloudFormation.  Kinesis Analytics is the easiest way to process streaming data in real time with standard SQL, without having to learn new programming languages or processing frameworks. Kinesis Analytics enables you to query streaming data or build entire streaming applications using SQL. Using the service, you gain actionable insights and can respond to your business and customer needs promptly.

Customers can create CloudFormation templates that easily create or update Kinesis Analytics applications. Typically, a template is used as a way to manage code across different environments, or to prototype a new streaming data solution quickly.

We have created two sample templates using past AWS Big Data Blog posts that referenced Kinesis Analytics.

For more information about the new feature, see the AWS Cloudformation User Guide.

 

Perform Near Real-time Analytics on Streaming Data with Amazon Kinesis and Amazon Elasticsearch Service

Post Syndicated from Tristan Li original https://aws.amazon.com/blogs/big-data/perform-near-real-time-analytics-on-streaming-data-with-amazon-kinesis-and-amazon-elasticsearch-service/

Nowadays, streaming data is seen and used everywhere—from social networks, to mobile and web applications, IoT devices, instrumentation in data centers, and many other sources. As the speed and volume of this type of data increases, the need to perform data analysis in real time with machine learning algorithms and extract a deeper understanding from the data becomes ever more important. For example, you might want a continuous monitoring system to detect sentiment changes in a social media feed so that you can react to the sentiment in near real time.

In this post, we use Amazon Kinesis Streams to collect and store streaming data. We then use Amazon Kinesis Analytics to process and analyze the streaming data continuously. Specifically, we use the Kinesis Analytics built-in RANDOM_CUT_FOREST function, a machine learning algorithm, to detect anomalies in the streaming data. Finally, we use Amazon Kinesis Firehose to export the anomalies data to Amazon Elasticsearch Service (Amazon ES). We then build a simple dashboard in the open source tool Kibana to visualize the result.

Solution overview

The following diagram depicts a high-level overview of this solution.

Amazon Kinesis Streams

You can use Amazon Kinesis Streams to build your own streaming application. This application can process and analyze streaming data by continuously capturing and storing terabytes of data per hour from hundreds of thousands of sources.

Amazon Kinesis Analytics

Kinesis Analytics provides an easy and familiar standard SQL language to analyze streaming data in real time. One of its most powerful features is that there are no new languages, processing frameworks, or complex machine learning algorithms that you need to learn.

Amazon Kinesis Firehose

Kinesis Firehose is the easiest way to load streaming data into AWS. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service.

Amazon Elasticsearch Service

Amazon ES is a fully managed service that makes it easy to deploy, operate, and scale Elasticsearch for log analytics, full text search, application monitoring, and more.

Solution summary

The following is a quick walkthrough of the solution that’s presented in the diagram:

  1. IoT sensors send streaming data into Kinesis Streams. In this post, you use a Python script to simulate an IoT temperature sensor device that sends the streaming data.
  2. By using the built-in RANDOM_CUT_FOREST function in Kinesis Analytics, you can detect anomalies in real time with the sensor data that is stored in Kinesis Streams. RANDOM_CUT_FOREST is also an appropriate algorithm for many other kinds of anomaly-detection use cases—for example, the media sentiment example mentioned earlier in this post.
  3. The processed anomaly data is then loaded into the Kinesis Firehose delivery stream.
  4. By using the built-in integration that Kinesis Firehose has with Amazon ES, you can easily export the processed anomaly data into the service and visualize it with Kibana.

Implementation steps

The following sections walk through the implementation steps in detail.

Creating the delivery stream

  1. Open the Amazon Kinesis Streams console.
  2. Create a new Kinesis stream. Give it a name that indicates it’s for raw incoming stream data—for example, RawStreamData. For Number of shards, type 1.
  3. The Python code provided below simulates a streaming application, such as an IoT device, and generates random data and anomalies into a Kinesis stream. The code generates two temperature ranges, where the first range is the hypothetical sensor’s normal operating temperature range (10–20), and the second is the anomaly temperature range (100–120).Make sure to change the stream name on line 16 and 20 and the Region on line 6 to match your configuration. Alternatively, you can download the Amazon Kinesis Data Generator from this repository and use it to generate the data.
    import json
    import datetime
    import random
    import testdata
    from boto import kinesis
    
    kinesis = kinesis.connect_to_region("us-east-1")
    
    def getData(iotName, lowVal, highVal):
       data = {}
       data["iotName"] = iotName
       data["iotValue"] = random.randint(lowVal, highVal) 
       return data
    
    while 1:
       rnd = random.random()
       if (rnd < 0.01):
          data = json.dumps(getData("DemoSensor", 100, 120))  
          kinesis.put_record("RawStreamData", data, "DemoSensor")
          print '***************************** anomaly ************************* ' + data
       else:
          data = json.dumps(getData("DemoSensor", 10, 20))  
          kinesis.put_record("RawStreamData", data, "DemoSensor")
          print data

  4. Open the Amazon Elasticsearch Service console and create a new domain.
    1. Give the domain a unique name. In the Configure cluster screen, use the default settings.
    2. In the Set up access policy screen, in the Set the domain access policy list, choose Allow access to the domain from specific IP(s).
    3. Enter the public IP address of your computer.
      Note: If you’re working behind a proxy or firewall, see the “Use a proxy to simplify request signing” section in this AWS Database blog post to learn how to work with a proxy. For additional information about securing access to your Amazon ES domain, see How to Control Access to Your Amazon Elasticsearch Domain in the AWS Security Blog.
  5. After the Amazon ES domain is up and running, you can set up and configure Kinesis Firehose to export results to Amazon ES:
    1. Open the Amazon Kinesis Firehose console and choose Create Delivery Stream.
    2. In the Destination dropdown list, choose Amazon Elasticsearch Service.
    3. Type a stream name, and choose the Amazon ES domain that you created in Step 4.
    4. Provide an index name and ES type. In the S3 bucket dropdown list, choose Create New S3 bucket. Choose Next.
    5. In the configuration, change the Elasticsearch Buffer size to 1 MB and the Buffer interval to 60s. Use the default settings for all other fields. This shortens the time for the data to reach the ES cluster.
    6. Under IAM Role, choose Create/Update existing IAM role.
      The best practice is to create a new role every time. Otherwise, the console keeps adding policy documents to the same role. Eventually the size of the attached policies causes IAM to reject the role, but it does it in a non-obvious way, where the console basically quits functioning.
    7. Choose Next to move to the Review page.
  6. Review the configuration, and then choose Create Delivery Stream.
  7. Run the Python file for 1–2 minutes, and then press Ctrl+C to stop the execution. This loads some data into the stream for you to visualize in the next step.

Analyzing the data

Now it’s time to analyze the IoT streaming data using Amazon Kinesis Analytics.

  1. Open the Amazon Kinesis Analytics console and create a new application. Give the application a name, and then choose Create Application.
  2. On the next screen, choose Connect to a source. Choose the raw incoming data stream that you created earlier. (Note the stream name Source_SQL_STREAM_001 because you will need it later.)
  3. Use the default settings for everything else. When the schema discovery process is complete, it displays a success message with the formatted stream sample in a table as shown in the following screenshot. Review the data, and then choose Save and continue.
  4. Next, choose Go to SQL editor. When prompted, choose Yes, start application.
  5. Copy the following SQL code and paste it into the SQL editor window.
    CREATE OR REPLACE STREAM "TEMP_STREAM" (
       "iotName"        varchar (40),
       "iotValue"   integer,
       "ANOMALY_SCORE"  DOUBLE);
    -- Creates an output stream and defines a schema
    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "iotName"       varchar(40),
       "iotValue"       integer,
       "ANOMALY_SCORE"  DOUBLE,
       "created" TimeStamp);
     
    -- Compute an anomaly score for each record in the source stream
    -- using Random Cut Forest
    CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "TEMP_STREAM"
    SELECT STREAM "iotName", "iotValue", ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    );
    
    -- Sort records by descending anomaly score, insert into output stream
    CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "iotName", "iotValue", ANOMALY_SCORE, ROWTIME FROM "TEMP_STREAM"
    ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

 

  1. Choose Save and run SQL.
    As the application is running, it displays the results as stream data arrives. If you don’t see any data coming in, run the Python script again to generate some fresh data. When there is data, it appears in a grid as shown in the following screenshot.Note that you are selecting data from the source stream name Source_SQL_STREAM_001 that you created previously. Also note the ANOMALY_SCORE column. This is the value that the Random_Cut_Forest function calculates based on the temperature ranges provided by the Python script. Higher (anomaly) temperature ranges have a higher score.Looking at the SQL code, note that the first two blocks of code create two new streams to store temporary data and the final result. The third block of code analyzes the raw source data (Stream_Pump_1) using the Random_Cut_Forest function. It calculates an anomaly score (ANOMALY_SCORE) and inserts it into the TEMP_STREAM stream. The final code block loads the result stored in the TEMP_STREAM into DESTINATION_SQL_STREAM.
  2. Choose Exit (done editing) next to the Save and run SQL button to return to the application configuration page.

Load processed data into the Kinesis Firehose delivery stream

Now, you can export the result from DESTINATION_SQL_STREAM into the Amazon Kinesis Firehose stream that you created previously.

  1. On the application configuration page, choose Connect to a destination.
  2. Choose the stream name that you created earlier, and use the default settings for everything else. Then choose Save and Continue.
  3. On the application configuration page, choose Exit to Kinesis Analytics applications to return to the Amazon Kinesis Analytics console.
  4. Run the Python script again for 4–5 minutes to generate enough data to flow through Amazon Kinesis Streams, Kinesis Analytics, Kinesis Firehose, and finally into the Amazon ES domain.
  5. Open the Kinesis Firehose console, choose the stream, and then choose the Monitoring
  6. As the processed data flows into Kinesis Firehose and Amazon ES, the metrics appear on the Delivery Stream metrics page. Keep in mind that the metrics page takes a few minutes to refresh with the latest data.
  7. Open the Amazon Elasticsearch Service dashboard in the AWS Management Console. The count in the Searchable documents column increases as shown in the following screenshot. In addition, the domain shows a cluster health of Yellow. This is because, by default, it needs two instances to deploy redundant copies of the index. To fix this, you can deploy two instances instead of one.

Visualize the data using Kibana

Now it’s time to launch Kibana and visualize the data.

  1. Use the ES domain link to go to the cluster detail page, and then choose the Kibana link as shown in the following screenshot.

    If you’re working behind a proxy or firewall, see the “Use a proxy to simplify request signing” section in this blog post to learn how to work with a proxy.
  2. In the Kibana dashboard, choose the Discover tab to perform a query.
  3. You can also visualize the data using the different types of charts offered by Kibana. For example, by going to the Visualize tab, you can quickly create a split bar chart that aggregates by ANOMALY_SCORE per minute.


Conclusion

In this post, you learned how to use Amazon Kinesis to collect, process, and analyze real-time streaming data, and then export the results to Amazon ES for analysis and visualization with Kibana. If you have comments about this post, add them to the “Comments” section below. If you have questions or issues with implementing this solution, please open a new thread on the Amazon Kinesis or Amazon ES discussion forums.


Next Steps

Take your skills to the next level. Learn real-time clickstream anomaly detection with Amazon Kinesis Analytics.

 


About the Author

Tristan Li is a Solutions Architect with Amazon Web Services. He works with enterprise customers in the US, helping them adopt cloud technology to build scalable and secure solutions on AWS.

 

 

 

 

Under the Hood of Server-Side Encryption for Amazon Kinesis Streams

Post Syndicated from Damian Wylie original https://aws.amazon.com/blogs/big-data/under-the-hood-of-server-side-encryption-for-amazon-kinesis-streams/

Customers are using Amazon Kinesis Streams to ingest, process, and deliver data in real time from millions of devices or applications. Use cases for Kinesis Streams vary, but a few common ones include IoT data ingestion and analytics, log processing, clickstream analytics, and enterprise data bus architectures.

Within milliseconds of data arrival, applications (KCL, Apache Spark, AWS Lambda, Amazon Kinesis Analytics) attached to a stream are continuously mining value or delivering data to downstream destinations. Customers are then scaling their streams elastically to match demand. They pay incrementally for the resources that they need, while taking advantage of a fully managed, serverless streaming data service that allows them to focus on adding value closer to their customers.

These benefits are great; however, AWS learned that many customers could not take advantage of Kinesis Streams unless their data-at-rest within a stream was encrypted. Many customers did not want to manage encryption on their own, so they asked for a fully managed, automatic, server-side encryption mechanism leveraging centralized AWS Key Management Service (AWS KMS) customer master keys (CMK).

Motivated by this feedback, AWS added another fully managed, low cost aspect to Kinesis Streams by delivering server-side encryption via KMS managed encryption keys (SSE-KMS) in the following regions:

  • US East (N. Virginia)
  • US West (Oregon)
  • US West (N. California)
  • EU (Ireland)
  • Asia Pacific (Singapore)
  • Asia Pacific (Tokyo)

In this post, I cover the mechanics of the Kinesis Streams server-side encryption feature. I also share a few best practices and considerations so that you can get started quickly.

Understanding the mechanics

The following section walks you through how Kinesis Streams uses CMKs to encrypt a message in the PutRecord or PutRecords path before it is propagated to the Kinesis Streams storage layer, and then decrypt it in the GetRecords path after it has been retrieved from the storage layer.

When server-side encryption is enabled—which takes just a few clicks in the console—the partition key and payload for every incoming record is encrypted automatically as it’s flowing into Kinesis Streams, using the selected CMK. When data is at rest within a stream, it’s encrypted.

When records are retrieved through a GetRecords request from the encrypted stream, they are decrypted automatically as they are flowing out of the service. That means your Kinesis Streams producers and consumers do not need to be aware of encryption. You have a fully managed data encryption feature at your fingertips, which can be enabled within seconds.

AWS also makes it easy to audit the application of server-side encryption. You can use the AWS Management Console for instant stream-level verification; the responses from PutRecord, PutRecords, and getRecords; or AWS CloudTrail.

Calling PutRecord or PutRecords

When server-side encryption is enabled for a particular stream, Kinesis Streams and KMS perform the following actions when your applications call PutRecord or PutRecords on a stream with server-side encryption enabled. The Amazon Kinesis Producer Library (KPL) uses PutRecords.

 

  1. Data is sent from a customer’s producer (client) to a Kinesis stream using TLS via HTTPS. Data in transit to a stream is encrypted by default.
  2. After data is received, it is momentarily stored in RAM within a front-end proxy layer.
  3. Kinesis Streams authenticates the producer, then impersonates the producer to request input keying material from KMS.
  4. KMS creates key material, encrypts it by using CMK, and sends both the plaintext and encrypted key material to the service, encrypted with TLS.
  5. The client uses the plaintext key material to derive data encryption keys (data keys) that are unique per-record.
  6. The client encrypts the payload and partition key using the data key in RAM within the front-end proxy layer and removes the plaintext data key from memory.
  7. The client appends the encrypted key material to the encrypted data.
  8. The plaintext key material is securely cached in memory within the front-end layer for reuse, until it expires after 5 minutes.
  9. The client delivers the encrypted message to a back-end store where it is stored at rest and fetchable by an authorized consumer through a GetRecords The Amazon Kinesis Client Library (KCL) calls GetRecords to retrieve records from a stream.

Calling getRecords

Kinesis Streams and KMS perform the following actions when your applications call GetRecords on a server-side encrypted stream.

 

  1. When a GeRecords call is made, the front-end proxy layer retrieves the encrypted record from its back-end store.
  2. The consumer (client) makes a request to KMS using a token generated by the customer’s request. KMS authorizes it.
  3. The client requests that KMS decrypt the encrypted key material.
  4. KMS decrypts the encrypted key material and sends the plaintext key material to the client.
  5. Kinesis Streams derives the per-record data keys from the decrypted key material.
  6. If the calling application is authorized, the client decrypts the payload and removes the plaintext data key from memory.
  7. The client delivers the payload over TLS and HTTPS to the consumer, requesting the records. Data in transit to a consumer is encrypted by default.

Verifying server-side encryption

Auditors or administrators often ask for proof that server-side encryption was or is enabled. Here are a few ways to do this.

To check if encryption is enabled now for your streams:

  • Use the AWS Management Console or the DescribeStream API operation. You can also see what CMK is being used for encryption.
  • See encryption in action by looking at responses from PutRecord, PutRecords, or GetRecords When encryption is enabled, the encryptionType parameter is set to “KMS”. If encryption is not enabled, encryptionType is not included in the response.

Sample PutRecord response

{
    "SequenceNumber": "49573959617140871741560010162505906306417380215064887298",
    "ShardId": "shardId-000000000000",
    "EncryptionType": "KMS"
}

Sample GetRecords response

{
    "Records": [
        {
            "Data": "aGVsbG8gd29ybGQ=", 
            "PartitionKey": "test", 
            "ApproximateArrivalTimestamp": 1498292565.825, 
            "EncryptionType": "KMS", 
            "SequenceNumber": "495735762417140871741560010162505906306417380215064887298"
        }, 
        {
            "Data": "ZnJvZG8gbGl2ZXMK", 
            "PartitionKey": "3d0d9301-3c30-4c48-a9a8-e485b2982b28", 
            "ApproximateArrivalTimestamp": 1498292801.747, 
            "EncryptionType": "KMS", 
            "SequenceNumber": "49573959617140871741560010162507115232237011062036103170"
        }
    ], 
    "NextShardIterator": "AAAAAAAAAAEvFypHZDx/4bJVAS34puwdiNcwssKqbh/XhRK7HSYRq3RS+YXJnVKJ8j0gQUt94bONdqQYHk9X9JHgefMUDKzDzndy5WbZWO4CS3hRdMdrbmJ/9KoR4lOfZvqTLt6JWQjDqXv0IaKs06/LHYcEA3oPcyQLOTJHdJl2EzplCTZnn/U295ovxvqF9g9DY8y2nVoMkdFLmdcEMVXjhCDKiRIt", 
    "MillisBehindLatest": 0
}

To check if encryption was enabled, use CloudTrail, which logs the StartStreamEncryption() and StopStreamEncryption() API calls made against a particular stream.

Getting started

It’s very easy to enable, disable, or modify server-side encryption for a particular stream.

  1. In the Kinesis Streams console, select a stream and choose Details.
  2. Select a CMK and select Enabled.
  3. Choose Save.

You can enable encryption only for a live stream, not upon stream creation.  Follow the same process to disable a stream. To use a different CMK, select it and choose Save.

Each of these tasks can also be accomplished using the StartStreamEncryption and StopStreamEncryption API operations.

Considerations

There are a few considerations you should be aware of when using server-side encryption for Kinesis Streams:

  • Permissions
  • Costs
  • Performance

Permissions

One benefit of using the “(Default) aws/kinesis” AWS managed key is that every producer and consumer with permissions to call PutRecord, PutRecords, or GetRecords inherits the right permissions over the “(Default) aws/kinesis” key automatically.

However, this is not necessarily the same case for a CMK. Kinesis Streams producers and consumers do not need to be aware of encryption. However, if you enable encryption using a custom master key but a producer or consumer doesn’t have IAM permissions to use it, PutRecord, PutRecords, or GetRecords requests fail.

This is a great security feature. On the other hand, it can effectively lead to data loss if you inadvertently apply a custom master key that restricts producers and consumers from interacting from the Kinesis stream. Take precautions when applying a custom master key. For more information about the minimum IAM permissions required for producers and consumers interacting with an encrypted stream, see Using Server-Side Encryption.

Costs

When you apply server-side encryption, you are subject to KMS API usage and key costs. Unlike custom KMS master keys, the “(Default) aws/kinesis” CMK is offered free of charge. However, you still need to pay for the API usage costs that Kinesis Streams incurs on your behalf.

API usage costs apply for every CMK, including custom ones. Kinesis Streams calls KMS approximately every 5 minutes when it is rotating the data key. In a 30-day month, the total cost of KMS API calls initiated by a Kinesis stream should be less than a few dollars.

Performance

During testing, AWS discovered that there was a slight increase (typically 0.2 millisecond or less per record) with put and get record latencies due to the additional overhead of encryption.

If you have questions or suggestions, please comment below.

Visualize and Monitor Amazon EC2 Events with Amazon CloudWatch Events and Amazon Kinesis Firehose

Post Syndicated from Karan Desai original https://aws.amazon.com/blogs/big-data/visualize-and-monitor-amazon-ec2-events-with-amazon-cloudwatch-events-and-amazon-kinesis-firehose/

Monitoring your AWS environment is important for security, performance, and cost control purposes. For example, by monitoring and analyzing API calls made to your Amazon EC2 instances, you can trace security incidents and gain insights into administrative behaviors and access patterns. The kinds of events you might monitor include console logins, Amazon EBS snapshot creation/deletion/modification, VPC creation/deletion/modification, and instance reboots, etc.

In this post, I show you how to build a near real-time API monitoring solution for EC2 events using Amazon CloudWatch Events and Amazon Kinesis Firehose. Please be sure to have Amazon CloudTrail enabled in your account.

  • CloudWatch Events offers a near real-time stream of system events that describe changes in AWS resources. CloudWatch Events now supports Kinesis Firehose as a target.
  • Kinesis Firehose is a fully managed service for continuously capturing, transforming, and delivering data in minutes to storage and analytics destinations such as Amazon S3, Amazon Kinesis Analytics, Amazon Redshift, and Amazon Elasticsearch Service.

Walkthrough

For this walkthrough, you create a CloudWatch event rule that matches specific EC2 events such as:

  • Starting, stopping, and terminating an instance
  • Creating and deleting VPC route tables
  • Creating and deleting a security group
  • Creating, deleting, and modifying instance volumes and snapshots

Your CloudWatch event target is a Kinesis Firehose delivery stream that delivers this data to an Elasticsearch cluster, where you set up Kibana for visualization. Using this solution, you can easily load and visualize EC2 events in minutes without setting up complicated data pipelines.

Set up the Elasticsearch cluster

Create the Amazon ES domain in the Amazon ES console, or by using the create-elasticsearch-domain command in the AWS CLI.

This example uses the following configuration:

  • Domain Name: esLogSearch
  • Elasticsearch Version: 1
  • Instance Count: 2
  • Instance type:elasticsearch
  • Enable dedicated master: true
  • Enable zone awareness: true
  • Restrict Amazon ES to an IP-based access policy

Other settings are left as the defaults.

Create a Kinesis Firehose delivery stream

In the Kinesis Firehose console, create a new delivery stream with Amazon ES as the destination. For detailed steps, see Create a Kinesis Firehose Delivery Stream to Amazon Elasticsearch Service.

Set up CloudWatch Events

Create a rule, and configure the event source and target. You can choose to configure multiple event sources with several AWS resources, along with options to specify specific or multiple event types.

In the CloudWatch console, choose Events.

For Service Name, choose EC2.

In Event Pattern Preview, choose Edit and copy the pattern below. For this walkthrough, I selected events that are specific to the EC2 API, but you can modify it to include events for any of your AWS resources.

 

{
	"source": [
		"aws.ec2"
	],
	"detail-type": [
		"AWS API Call via CloudTrail"
	],
	"detail": {
		"eventSource": [
			"ec2.amazonaws.com"
		],
		"eventName": [
			"RunInstances",
			"StopInstances",
			"StartInstances",
			"CreateFlowLogs",
			"CreateImage",
			"CreateNatGateway",
			"CreateVpc",
			"DeleteKeyPair",
			"DeleteNatGateway",
			"DeleteRoute",
			"DeleteRouteTable",
"CreateSnapshot",
"DeleteSnapshot",
			"DeleteVpc",
			"DeleteVpcEndpoints",
			"DeleteSecurityGroup",
			"ModifyVolume",
			"ModifyVpcEndpoint",
			"TerminateInstances"
		]
	}
}

The following screenshot shows what your event looks like in the console.

Next, choose Add target and select the delivery stream that you just created.

Set up Kibana on the Elasticsearch cluster

Amazon ES provides a default installation of Kibana with every Amazon ES domain. You can find the Kibana endpoint on your domain dashboard in the Amazon ES console. You can restrict Amazon ES access to an IP-based access policy.

In the Kibana console, for Index name or pattern, type log. This is the name of the Elasticsearch index.

For Time-field name, choose @time.

To view the events, choose Discover.

The following chart demonstrates the API operations and the number of times that they have been triggered in the past 12 hours.

Summary

In this post, you created a continuous, near real-time solution to monitor various EC2 events such as starting and shutting down instances, creating VPCs, etc. Likewise, you can build a continuous monitoring solution for all the API operations that are relevant to your daily AWS operations and resources.

With Kinesis Firehose as a new target for CloudWatch Events, you can retrieve, transform, and load system events to the storage and analytics destination of your choice in minutes, without setting up complicated data pipelines.

If you have any questions or suggestions, please comment below.


Additional Reading

Learn how to build a serverless architecture to analyze Amazon CloudFront access logs using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

 

 

 

Build a Serverless Architecture to Analyze Amazon CloudFront Access Logs Using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

Post Syndicated from Rajeev Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-serverless-architecture-to-analyze-amazon-cloudfront-access-logs-using-aws-lambda-amazon-athena-and-amazon-kinesis-analytics/

Nowadays, it’s common for a web server to be fronted by a global content delivery service, like Amazon CloudFront. This type of front end accelerates delivery of websites, APIs, media content, and other web assets to provide a better experience to users across the globe.

The insights gained by analysis of Amazon CloudFront access logs helps improve website availability through bot detection and mitigation, optimizing web content based on the devices and browser used to view your webpages, reducing perceived latency by caching of popular object closer to its viewer, and so on. This results in a significant improvement in the overall perceived experience for the user.

This blog post provides a way to build a serverless architecture to generate some of these insights. To do so, we analyze Amazon CloudFront access logs both at rest and in transit through the stream. This serverless architecture uses Amazon Athena to analyze large volumes of CloudFront access logs (on the scale of terabytes per day), and Amazon Kinesis Analytics for streaming analysis.

The analytic queries in this blog post focus on three common use cases:

  1. Detection of common bots using the user agent string
  2. Calculation of current bandwidth usage per Amazon CloudFront distribution per edge location
  3. Determination of the current top 50 viewers

However, you can easily extend the architecture described to power dashboards for monitoring, reporting, and trigger alarms based on deeper insights gained by processing and analyzing the logs. Some examples are dashboards for cache performance, usage and viewer patterns, and so on.

Following we show a diagram of this architecture.

Prerequisites

Before you set up this architecture, install the AWS Command Line Interface (AWS CLI) tool on your local machine, if you don’t have it already.

Setup summary

The following steps are involved in setting up the serverless architecture on the AWS platform:

  1. Create an Amazon S3 bucket for your Amazon CloudFront access logs to be delivered to and stored in.
  2. Create a second Amazon S3 bucket to receive processed logs and store the partitioned data for interactive analysis.
  3. Create an Amazon Kinesis Firehose delivery stream to batch, compress, and deliver the preprocessed logs for analysis.
  4. Create an AWS Lambda function to preprocess the logs for analysis.
  5. Configure Amazon S3 event notification on the CloudFront access logs bucket, which contains the raw logs, to trigger the Lambda preprocessing function.
  6. Create an Amazon DynamoDB table to look up partition details, such as partition specification and partition location.
  7. Create an Amazon Athena table for interactive analysis.
  8. Create a second AWS Lambda function to add new partitions to the Athena table based on the log delivered to the processed logs bucket.
  9. Configure Amazon S3 event notification on the processed logs bucket to trigger the Lambda partitioning function.
  10. Configure Amazon Kinesis Analytics application for analysis of the logs directly from the stream.

ETL and preprocessing

In this section, we parse the CloudFront access logs as they are delivered, which occurs multiple times in an hour. We filter out commented records and use the user agent string to decipher the browser name, the name of the operating system, and whether the request has been made by a bot. For more details on how to decipher the preceding information based on the user agent string, see user-agents 1.1.0 in the Python documentation.

We use the Lambda preprocessing function to perform these tasks on individual rows of the access log. On successful completion, the rows are pushed to an Amazon Kinesis Firehose delivery stream to be persistently stored in an Amazon S3 bucket, the processed logs bucket.

To create a Firehose delivery stream with a new or existing S3 bucket as the destination, follow the steps described in Create a Firehose Delivery Stream to Amazon S3 in the S3 documentation. Keep most of the default settings, but select an AWS Identity and Access Management (IAM) role that has write access to your S3 bucket and specify GZIP compression. Name the delivery stream CloudFrontLogsToS3.

Another pre-requisite for this setup is to create an IAM role that provides the necessary permissions our AWS Lambda function to get the data from S3, process it, and deliver it to the CloudFrontLogsToS3 delivery stream.

Let’s use the AWS CLI to create the IAM role using the following the steps:

  1. Create the IAM policy (lambda-exec-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (lambda-cflogs-exec-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To download the policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/lambda-exec-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/assume-lambda-policy.json  <path_on_your_local_machine>

Following is the lambda-exec-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchAccess",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Sid": "S3Access",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        },
        {
            "Sid": "FirehoseAccess",
            "Effect": "Allow",
            "Action": [
                "firehose:ListDeliveryStreams",
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3"
            ]
        }
    ]
}

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-exec-policy --policy-document file://<path>/lambda-exec-policy.json

To create the AWS Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Following is the assume-lambda-policy.json file, to grant Lambda permission to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To attach the policy (lambda-exec-policy) created to the AWS Lambda execution role (lambda-cflogs-exec-role), type the following command.

aws iam attach-role-policy --role-name lambda-cflogs-exec-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-exec-policy

Now that we have created the CloudFrontLogsToS3 Firehose delivery stream and the lambda-cflogs-exec-role IAM role for Lambda, the next step is to create a Lambda preprocessing function.

This Lambda preprocessing function parses the CloudFront access logs delivered into the S3 bucket and performs a few transformation and mapping operations on the data. The Lambda function adds descriptive information, such as the browser and the operating system that were used to make this request based on the user agent string found in the logs. The Lambda function also adds information about the web distribution to support scenarios where CloudFront access logs are delivered to a centralized S3 bucket from multiple distributions. With the solution in this blog post, you can get insights across distributions and their edge locations.

Use the Lambda Management Console to create a new Lambda function with a Python 2.7 runtime and the s3-get-object-python blueprint. Open the console, and on the Configure triggers page, choose the name of the S3 bucket where the CloudFront access logs are delivered. Choose Put for Event type. For Prefix, type the name of the prefix, if any, for the folder where CloudFront access logs are delivered, for example cloudfront-logs/. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable trigger.

Choose Next and provide a function name to identify this Lambda preprocessing function.

For Code entry type, choose Upload a file from Amazon S3. For S3 link URL, type https.amazonaws.com//preprocessing-lambda/pre-data.zip. In the section, also create an environment variable with the key KINESIS_FIREHOSE_STREAM and a value with the name of the Firehose delivery stream as CloudFrontLogsToS3.

Choose lambda-cflogs-exec-role as the IAM role for the Lambda function, and type prep-data.lambda_handler for the value for Handler.

Choose Next, and then choose Create Lambda.

Table creation in Amazon Athena

In this step, we will build the Athena table. Use the Athena console in the same region and create the table using the query editor.

CREATE EXTERNAL TABLE IF NOT EXISTS cf_logs (
  logdate date,
  logtime string,
  location string,
  bytes bigint,
  requestip string,
  method string,
  host string,
  uri string,
  status bigint,
  referrer string,
  useragent string,
  uriquery string,
  cookie string,
  resulttype string,
  requestid string,
  header string,
  csprotocol string,
  csbytes string,
  timetaken bigint,
  forwardedfor string,
  sslprotocol string,
  sslcipher string,
  responseresulttype string,
  protocolversion string,
  browserfamily string,
  osfamily string,
  isbot string,
  filename string,
  distribution string
)
PARTITIONED BY(year string, month string, day string, hour string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION 's3://<pre-processing-log-bucket>/prefix/';

Creation of the Athena partition

A popular website with millions of requests each day routed using Amazon CloudFront can generate a large volume of logs, on the order of a few terabytes a day. We strongly recommend that you partition your data to effectively restrict the amount of data scanned by each query. Partitioning significantly improves query performance and substantially reduces cost. The Lambda partitioning function adds the partition information to the Athena table for the data delivered to the preprocessed logs bucket.

Before delivering the preprocessed Amazon CloudFront logs file into the preprocessed logs bucket, Amazon Kinesis Firehose adds a UTC time prefix in the format YYYY/MM/DD/HH. This approach supports multilevel partitioning of the data by year, month, date, and hour. You can invoke the Lambda partitioning function every time a new processed Amazon CloudFront log is delivered to the preprocessed logs bucket. To do so, configure the Lambda partitioning function to be triggered by an S3 Put event.

For a website with millions of requests, a large number of preprocessed logs can be delivered multiple times in an hour—for example, at the interval of one each second. To avoid querying the Athena table for partition information every time a preprocessed log file is delivered, you can create an Amazon DynamoDB table for fast lookup.

Based on the year, month, data and hour in the prefix of the delivered log, the Lambda partitioning function checks if the partition specification exists in the Amazon DynamoDB table. If it doesn’t, it’s added to the table using an atomic operation, and then the Athena table is updated.

Type the following command to create the Amazon DynamoDB table.

aws dynamodb create-table --table-name athenapartitiondetails \
--attribute-definitions AttributeName=PartitionSpec,AttributeType=S \
--key-schema AttributeName=PartitionSpec,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=100,WriteCapacityUnits=100

Here the following is true:

  • PartitionSpec is the hash key and is a representation of the partition signature—for example, year=”2017”; month=”05”; day=”15”; hour=”10”.
  • Depending on the rate at which the processed log files are delivered to the processed log bucket, you might have to increase the ReadCapacityUnits and WriteCapacityUnits values, if these are throttled.

The other attributes besides PartitionSpec are the following:

  • PartitionPath – The S3 path associated with the partition.
  • PartitionType – The type of partition used (Hour, Month, Date, Year, or ALL). In this case, ALL is used.

Next step is to create the IAM role to provide permissions for the Lambda partitioning function. You require permissions to do the following:

  1. Look up and write partition information to DynamoDB.
  2. Alter the Athena table with new partition information.
  3. Perform Amazon CloudWatch logs operations.
  4. Perform Amazon S3 operations.

To download the policy document to your local machine, type following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/lambda-partition-function-execution-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/assume-lambda-policy.json <path_on_your_local_machine>

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(lambda-partition-exec-policy) used by the Lambda execution role.
  2. Create the Lambda execution role (lambda-partition-execution-role)and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-partition-exec-policy --policy-document file://<path>/lambda-partition-function-execution-policy.json

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-partition-execution-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

To attach the policy (lambda-partition-exec-policy) created to the AWS Lambda execution role (lambda-partition-execution-role), type the following command.

aws iam attach-role-policy --role-name lambda-partition-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-partition-exec-policy

Following is the lambda-partition-function-execution-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
            	"Sid": "DDBTableAccess",
            	"Effect": "Allow",
            	"Action": "dynamodb:PutItem"
            	"Resource": "arn:aws:dynamodb*:*:table/athenapartitiondetails"
        	},
        	{
            	"Sid": "S3Access",
            	"Effect": "Allow",
            	"Action": [
                		"s3:GetBucketLocation",
                		"s3:GetObject",
                		"s3:ListBucket",
                		"s3:ListBucketMultipartUploads",
                		"s3:ListMultipartUploadParts",
                		"s3:AbortMultipartUpload",
                		"s3:PutObject"
            	],
          		"Resource":"arn:aws:s3:::*"
		},
	              {
		      "Sid": "AthenaAccess",
      		"Effect": "Allow",
      		"Action": [ "athena:*" ],
      		"Resource": [ "*" ]
	      },
        	{
            	"Sid": "CloudWatchLogsAccess",
            	"Effect": "Allow",
            	"Action": [
                		"logs:CreateLogGroup",
                		"logs:CreateLogStream",
             	   	"logs:PutLogEvents"
            	],
            	"Resource": "arn:aws:logs:*:*:*"
        	}
    ]
}

Download the .jar file containing the Java deployment package to your local machine.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/aws-lambda-athena-1.0.0.jar <path_on_your_local_machine>

From the AWS Management Console, create a new Lambda function with Java8 as the runtime. Select the Blank Function blueprint.

On the Configure triggers page, choose the name of the S3 bucket where the preprocessed logs are delivered. Choose Put for the Event Type. For Prefix, type the name of the prefix folder, if any, where preprocessed logs are delivered by Firehose—for example, out/. For Suffix, type the name of the compression format that the Firehose stream (CloudFrontLogToS3) delivers the preprocessed logs —for example, gz. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable Trigger.

Choose Next and provide a function name to identify this Lambda partitioning function.

Choose Java8 for Runtime for the AWS Lambda function. Choose Upload a .ZIP or .JAR file for the Code entry type, and choose Upload to upload the downloaded aws-lambda-athena-1.0.0.jar file.

Next, create the following environment variables for the Lambda function:

  • TABLE_NAME – The name of the Athena table (for example, cf_logs).
  • PARTITION_TYPE – The partition to be created based on the Athena table for the logs delivered to the sub folders in S3 bucket based on Year, Month, Date, Hour, or Set this to ALL to use Year, Month, Date, and Hour.
  • DDB_TABLE_NAME – The name of the DynamoDB table holding partition information (for example, athenapartitiondetails).
  • ATHENA_REGION – The current AWS Region for the Athena table to construct the JDBC connection string.
  • S3_STAGING_DIR – The Amazon S3 location where your query output is written. The JDBC driver asks Athena to read the results and provide rows of data back to the user (for example, s3://<bucketname>/<folder>/).

To configure the function handler and IAM, for Handler copy and paste the name of the handler: com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3EventWithDDB::handleRequest. Choose the existing IAM role, lambda-partition-execution-role.

Choose Next and then Create Lambda.

Interactive analysis using Amazon Athena

In this section, we analyze the historical data that’s been collected since we added the partitions to the Amazon Athena table for data delivered to the preprocessing logs bucket.

Scenario 1 is robot traffic by edge location.

SELECT COUNT(*) AS ct, requestip, location FROM cf_logs
WHERE isbot='True'
GROUP BY requestip, location
ORDER BY ct DESC;

Scenario 2 is total bytes transferred per distribution for each edge location for your website.

SELECT distribution, location, SUM(bytes) as totalBytes
FROM cf_logs
GROUP BY location, distribution;

Scenario 3 is the top 50 viewers of your website.

SELECT requestip, COUNT(*) AS ct  FROM cf_logs
GROUP BY requestip
ORDER BY ct DESC;

Streaming analysis using Amazon Kinesis Analytics

In this section, you deploy a stream processing application using Amazon Kinesis Analytics to analyze the preprocessed Amazon CloudFront log streams. This application analyzes directly from the Amazon Kinesis Stream as it is delivered to the preprocessing logs bucket. The stream queries in section are focused on gaining the following insights:

  • The IP address of the bot, identified by its Amazon CloudFront edge location, that is currently sending requests to your website. The query also includes the total bytes transferred as part of the response.
  • The total bytes served per distribution per population for your website.
  • The top 10 viewers of your website.

To download the firehose-access-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/firehose-access-policy.json  <path_on_your_local_machine>

To download the kinesisanalytics-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/assume-kinesisanalytics-policy.json <path_on_your_local_machine>

Before we create the Amazon Kinesis Analytics application, we need to create the IAM role to provide permission for the analytics application to access Amazon Kinesis Firehose stream.

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(firehose-access-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (ka-execution-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

Following is the firehose-access-policy.json file, which is the IAM policy used by Kinesis Analytics to read Firehose delivery stream.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
    	"Sid": "AmazonFirehoseAccess",
    	"Effect": "Allow",
    	"Action": [
       	"firehose:DescribeDeliveryStream",
        	"firehose:Get*"
    	],
    	"Resource": [
              "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3”
       ]
     }
}

Following is the assume-kinesisanalytics-policy.json file, to grant Amazon Kinesis Analytics permissions to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesisanalytics.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To create the IAM policy used by Analytics access role, type the following command.

aws iam create-policy --policy-name firehose-access-policy --policy-document file://<path>/firehose-access-policy.json

To create the Analytics execution role and assign the service to use this role, type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To attach the policy (irehose-access-policy) created to the Analytics execution role (ka-execution-role), type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To deploy the Analytics application, first download the configuration file and then modify ResourceARN and RoleARN for the Amazon Kinesis Firehose input configuration.

"KinesisFirehoseInput": { 
    "ResourceARN": "arn:aws:firehose:<region>:<account-id>:deliverystream/CloudFrontLogsToS3", 
    "RoleARN": "arn:aws:iam:<account-id>:role/ka-execution-role"
}

To download the Analytics application configuration file, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis//kinesisanalytics/kinesis-analytics-app-configuration.json <path_on_your_local_machine>

To deploy the application, type the following command.

aws kinesisanalytics create-application --application-name "cf-log-analysis" --cli-input-json file://<path>/kinesis-analytics-app-configuration.json

To start the application, type the following command.

aws kinesisanalytics start-application --application-name "cf-log-analysis" --input-configuration Id="1.1",InputStartingPositionConfiguration={InputStartingPosition="NOW"}

SQL queries using Amazon Kinesis Analytics

Scenario 1 is a query for detecting bots for sending request to your website detection for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BOT_DETECTION" (requesttime TIME, destribution VARCHAR(16), requestip VARCHAR(64), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BOT_DETECTION_PUMP" AS INSERT INTO "BOT_DETECTION"
--
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "request_ip" as requestip, 
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
WHERE "is_bot" = true
GROUP BY "request_ip", "edge_location", "distribution_name",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 2 is a query for total bytes transferred per distribution for each edge location for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BYTES_TRANSFFERED" (requesttime TIME, destribution VARCHAR(16), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BYTES_TRANSFFERED_PUMP" AS INSERT INTO "BYTES_TRANSFFERED"
-- Bytes Transffered per second per web destribution by edge location
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
GROUP BY "distribution_name", "edge_location", "request_date",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 3 is a query for the top 50 viewers for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "TOP_TALKERS" (requestip VARCHAR(64), requestcount DOUBLE);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "TOP_TALKERS_PUMP" AS INSERT INTO "TOP_TALKERS"
-- Top Ten Talker
SELECT STREAM ITEM as requestip, ITEM_COUNT as requestcount FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "CF_LOG_STREAM_001"),
  'request_ip', -- name of column in single quotes
  50, -- number of top items
  60 -- tumbling window size in seconds
  )
);

Conclusion

Following the steps in this blog post, you just built an end-to-end serverless architecture to analyze Amazon CloudFront access logs. You analyzed these both in interactive and streaming mode, using Amazon Athena and Amazon Kinesis Analytics respectively.

By creating a partition in Athena for the logs delivered to a centralized bucket, this architecture is optimized for performance and cost when analyzing large volumes of logs for popular websites that receive millions of requests. Here, we have focused on just three common use cases for analysis, sharing the analytic queries as part of the post. However, you can extend this architecture to gain deeper insights and generate usage reports to reduce latency and increase availability. This way, you can provide a better experience on your websites fronted with Amazon CloudFront.

In this blog post, we focused on building serverless architecture to analyze Amazon CloudFront access logs. Our plan is to extend the solution to provide rich visualization as part of our next blog post.


About the Authors

Rajeev Srinivasan is a Senior Solution Architect for AWS. He works very close with our customers to provide big data and NoSQL solution leveraging the AWS platform and enjoys coding . In his spare time he enjoys riding his motorcycle and reading books.

 

Sai Sriparasa is a consultant with AWS Professional Services. He works with our customers to provide strategic and tactical big data solutions with an emphasis on automation, operations & security on AWS. In his spare time, he follows sports and current affairs.

 

 


Related

Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight

Build a Visualization and Monitoring Dashboard for IoT Data with Amazon Kinesis Analytics and Amazon QuickSight

Post Syndicated from Karan Desai original https://aws.amazon.com/blogs/big-data/build-a-visualization-and-monitoring-dashboard-for-iot-data-with-amazon-kinesis-analytics-and-amazon-quicksight/

Customers across the world are increasingly building innovative Internet of Things (IoT) workloads on AWS. With AWS, they can handle the constant stream of data coming from millions of new, internet-connected devices. This data can be a valuable source of information if it can be processed, analyzed, and visualized quickly in a scalable, cost-efficient manner. Engineers and developers can monitor performance and troubleshoot issues while sales and marketing can track usage patterns and statistics to base business decisions.

In this post, I demonstrate a sample solution to build a quick and easy monitoring and visualization dashboard for your IoT data using AWS serverless and managed services. There’s no need for purchasing any additional software or hardware. If you are already using AWS IoT, you can build this dashboard to tap into your existing device data. If you are new to AWS IoT, you can be up and running in minutes using sample data. Later, you can customize it to your needs, as your business grows to millions of devices and messages.

Architecture

The following is a high-level architecture diagram showing the serverless setup to configure.

 

AWS service overview

AWS IoT is a managed cloud platform that lets connected devices interact easily and securely with cloud applications and other devices. AWS IoT can process and route billions of messages to AWS endpoints and to other devices reliably and securely.

Amazon Kinesis Firehose is the easiest way to capture, transform, and load streaming data continuously into AWS from thousands of data sources, such as IoT devices. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration.

Amazon Kinesis Analytics allows you to process streaming data coming from IoT devices in real time with standard SQL, without having to learn new programming languages or processing frameworks, providing actionable insights promptly.

The processed data is fed into Amazon QuickSight, which is a fast, cloud-powered business analytics service that makes it easy to build visualizations, perform ad-hoc analysis, and quickly get business insights from the data.

The most popular way for Internet-connected devices to send data is using MQTT messages. The AWS IoT gateway receives these messages from registered IoT devices. The solution in this post uses device data from AWS Simple Beer Service (SBS), a series of internet-connected kegerators sending sensor outputs such as temperature, humidity, and sound levels in a JSON payload. You can use any existing IoT data source that you may have.

The AWS IoT rules engine allows selecting data from message payloads, processing it, and sending it to other services. You forward the data to a Firehose delivery stream to consolidate the continuous data stream into batches for further processing. The batched data is also stored temporarily in an Amazon S3 bucket for later retrieval and can be set for deletion after a specified time using S3 Lifecycle Management rules.

The incoming data from the Firehose delivery stream is fed into an Analytics application that provides an easy way to process the data in real time using standard SQL queries. Analytics allows writing standard SQL queries to extract specific components from the incoming data stream and perform real-time ETL on it. In this post, you use this feature to aggregate minimum and maximum temperature values from the sensors per minute. You load it in Amazon QuickSight to create a monitoring dashboard and check if the devices are over-heating or cooling down during use. You also extract every device’s location, parameters such as temperature, sound levels, humidity, and the time stamp in Analytics to use on the visualization dashboard.

The processed data from the two queries is fed into two Firehose delivery streams, both of which batch the data into CSV files every minute and store it in S3. The batching time interval is configurable between 1 and 15 minutes in 1-second intervals.

Finally, you use Amazon QuickSight to ingest the processed CSV files from S3 as a data source to build visualizations. Amazon QuickSight’s super-fast, parallel, in-memory, calculation engine (SPICE) parses the ingested data and allows you to create a variety of visualizations with different graph types. You can also use the Amazon QuickSight built-in Story feature to combine visualizations into business dashboards that can be shared in a secure manner.

Implementation

AWS IoT, Amazon Kinesis, and Amazon QuickSight are all fully managed services, which means you can complete the entire setup in just a few steps using the AWS Management Console. Don’t worry about setting up any underlying hardware or installing any additional software. So, get started.

Step 1. Set up your AWS IoT data source

Do you currently use AWS IoT? If you have an existing IoT thing set up and running on AWS IoT, you can skip to Step 2.

If you have an AWS IoT button or other IoT devices that can publish MQTT messages and would like to use that for the setup, follow the Getting Started with AWS IoT topic to connect your thing to AWS IoT. Continue to Step 2.

If you do not have an existing IoT device, you can generate simulated device data using a script on your local machine and have it publish to AWS IoT. The following script lets you set up your AWS IoT environment and publish simulated data that mimics device data from Simple Beer Service.

Generate sample Data

Running the sbs.py Python script generates fictitious AWS IoT messages from multiple SBS devices. The IoT rule sends the message to Firehose for further processing.

The script requires access to AWS CLI credentials and boto3 installation on the machine running the script. Download and run the following Python script:

https://github.com/awslabs/sbs-iot-data-generator/blob/master/sbs.py

The script generates random data that looks like the following:

{"deviceParameter": "Temperature", "deviceValue": 33, "deviceId": "SBS01", "dateTime": "2017-02-03 11:29:37"}
{"deviceParameter": "Sound", "deviceValue": 140, "deviceId": "SBS03", "dateTime": "2017-02-03 11:29:38"}
{"deviceParameter": "Humidity", "deviceValue": 63, "deviceId": "SBS01", "dateTime": "2017-02-03 11:29:39"}
{"deviceParameter": "Flow", "deviceValue": 80, "deviceId": "SBS04", "dateTime": "2017-02-03 11:29:41"}

Run the script and keep it running for the duration of the project to generate sufficient data.

Tip: If you encounter any issues running the script from your local machine, launch an EC2 instance and run the script there as a root user. Remember to assign an appropriate IAM role to your instance at the time of launch that allows it to access AWS IoT.

Step 2. Create three Firehose delivery streams

For this post, you require three Firehose delivery streams:  one to batch raw data from AWS IoT, and two to batch output device data and aggregated data from Analytics.

  1. In the console, choose Firehose.
  2. Create all three Firehose delivery streams using the following field values.

Delivery stream 1:

Name IoT-Source-Stream
S3 bucket <your unique name>-kinesis
S3 prefix source/

Delivery stream 2:

Name IoT-Destination-Data-Stream
S3 bucket <your unique name>-kinesis
S3 prefix data/

Delivery stream 3:

Name IoT-Destination-Aggregate-Stream
S3 bucket <your unique name>-kinesis
S3 prefix aggregate/

Step 3. Set up AWS IoT to receive and forward incoming data

  1. In the console, choose IoT.
  2. Create a new AWS IoT rule with the following field values.
Name IoT_to_Firehose
Attribute *
Topic Filter /sbs/devicedata/#
Add Action Send messages to an Amazon Kinesis Firehose stream (select IoT-Source-Stream from dropdown)
Select Separator “\n (newline)”

A quick check before proceeding further: make sure that you have run the script to generate simulated IoT data or that your IoT Thing is running and delivering data. If not, set it up now. The Amazon Kinesis Analytics application you set up in the next step needs the data to process it further.

Step 4: Create an Analytics application to process data

  1. In the console, choose Kinesis.
  2. Create a new application.
  3. Enter a name of your choice, for example, SBS-IoT-Data.
  4. For the source, choose IoT-Source-Stream.

Analytics auto-discovers the schema on the data by sampling records from the input stream. It also includes an in-built SQL editor that allows you to write standard SQL queries to transform incoming data.

Tip: If Analytics is unable to discover your incoming data, it may be missing the appropriate IAM permissions. In the IAM console, select the role that you assigned to your IoT rule in Step 3. Make sure that it has the ARN of the IoT-Source-Data Firehose stream listed in the firehose:putRecord section.

Here is a sample SQL query that generates two output streams:

  • DESTINATION_SQL_BASIC_STREAM contains the device ID, device parameter, its value, and the time stamp from the incoming stream.
  • DESTINATION_SQL_AGGREGATE_STREAM aggregates the maximum and minimum values of temperatures from the sensors over a one-minute period from the incoming data.
-- Create an output stream with four columns, which is used to send IoT data to the destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_BASIC_STREAM" (dateTime TIMESTAMP, deviceId VARCHAR(8), deviceParameter VARCHAR(16), deviceValue INTEGER);

-- Create a pump that continuously selects from the source stream and inserts it into the output data stream
CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "DESTINATION_SQL_BASIC_STREAM"

-- Filter specific columns from the source stream
SELECT STREAM "dateTime", "deviceId", "deviceParameter", "deviceValue" FROM "SOURCE_SQL_STREAM_001";

-- Create a second output stream with three columns, which is used to send aggregated min/max data to the destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_AGGREGATE_STREAM" (dateTime TIMESTAMP, highestTemp SMALLINT, lowestTemp SMALLINT);

-- Create a pump that continuously selects from a source stream 
CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS INSERT INTO "DESTINATION_SQL_AGGREGATE_STREAM"

-- Extract time in minutes, plus the highest and lowest value of device temperature in that minute, into the destination aggregate stream, aggregated per minute
SELECT STREAM FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) AS "dateTime", MAX("deviceValue") AS "highestTemp", MIN("deviceValue") AS "lowestTemp" FROM "SOURCE_SQL_STREAM_001" WHERE "deviceParameter"='Temperature' GROUP BY FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE);

Real-time analytics shows the results of the SQL query. If everything is working correctly, you see three streams listed, similar to the following screenshot.

Step 5: Connect the Analytics application to output Firehose delivery streams

You create two destinations for the two delivery streams that you created in the previous step. A single Analytics application can have multiple destinations defined; however, this needs to be set up using the AWS CLI, not from the console. If you do not already have it, install the AWS CLI on your local machine and configure it with your credentials.

Tip: If you are running the IoT script from an EC2 instance, it comes pre-installed with the AWS CLI.

Create the first destination delivery stream 

The AWS CLI command to create a new output Firehose delivery stream is as follows:

aws kinesisanalytics add-application-output --application-name <Name of Analytics Application> --current-application-version-id <number> --application-output 'Name=DESTINATION_SQL_BASIC_STREAM,KinesisFirehoseOutput={ResourceARN=<ARN of IoT-Data-Stream>,RoleARN=<ARN of Analytics application>,DestinationSchema={RecordFormatType=CSV}'

Do not copy this into the CLI just yet! Before entering this command, make the following four changes to personalize it:

  • For Name of Analytics Application, enter the value from Step 4, or from the Analytics console.
  • For current-application-version-ID, run the following command:
aws kinesisanalytics describe-application --application-name <application name from above>; | grep ApplicationVersionId
  • For ResourceARN, run the following command:
aws firehose describe-delivery-stream --delivery-stream-name IoT-Destination-Data-Stream | grep DeliveryStreamARN
  • For RoleARN, run the following command:
aws kinesisanalytics describe-application --application-name <application name from above>; | grep RoleARN

Now, paste the complete command in the AWS CLI and press Enter. If there are any errors, the response provides details. If everything goes well, a new destination delivery stream is created to send the first query (DESTINATION_SQL_BASIC_STREAM) to IoT-Destination-Data-Stream.

Create the second destination delivery stream

Following similar steps as above, create a second destination Firehose delivery stream with the following changes:

  • For Name of Analytics Application, enter the same name as the first delivery stream.
  • For current-application-version-ID, increment by 1 from the previous value (unless you made other changes in between these steps). If unsure, run the same command as above to get it again.
  • For ResourceARN, get the value by running the following CLI command:
aws firehose describe-delivery-stream --delivery-stream-name IoT-Destination-Aggregate-Stream | grep DeliveryStreamARN
  • For RoleArn, enter the same value as the first stream.

Run the aws kinesisanalytics CLI command, similar to the previous step but with the new parameters substituted. This creates the second output Firehose destination delivery stream.

Update the IAM role for Analytics to allow writing to both output streams.

  1. In the console, choose IAM, Roles.
  2. Select the role that you created with Analytics in Step 4.
  3. Choose Policy, JSON, and Edit.
  4. Find “Sid”: “WriteOutputFirehose” in the JSON document, go to the “Resource” section and make sure that it includes Resource ARNs of both streams that you found in the previous step.
  5. If it has only one ARN, add the second ARN and choose Save.

This completes the Amazon Kinesis setup. The incoming IoT data is processed by Analytics and delivered, using two output delivery streams, to two separate folders in your S3 bucket.

Step 6: Set up Amazon QuickSight to analyze the data

To build the visualization dashboard, ingest the processed CSV files from the S3 bucket into Amazon QuickSight.

  1. In the console, choose QuickSight.
  2. If this is your first time using Amazon QuickSight, you are asked to create a new account. Follow the prompts.
  3. When you are logged in to your account, choose New Analysis and enter a name of your choice.
  4. Choose New data set for the analysis or, if you have previously imported your data set, select one from the available data sets.
  5. You import two data sets: one with general device parameters information, and the other with aggregates of maximum and minimum temperatures for monitoring. For the first data set, choose S3 from the list of available data sources and enter a name, for example, IoT Device Data.
  6. The location of the S3 bucket and the objects to use are provided to Amazon QuickSight as a manifest file. Create a new manifest file following the supported formats for Amazon S3 manifest files.
  7. In the URIPrefixes section, provide your appropriate S3 bucket and folder location for the general device data. Hint: it should include <your unique name>-kinesis/data/.

Your manifest file should look similar to the following:

{ 
    "fileLocations": [                                                    
              {"URIPrefixes": ["https://s3.amazonaws.com/<YOUR_BUCKET_NAME>/data/<YEAR>/<MONTH>/<DATE>/<HOUR>/"]}
     ],
     "globalUploadSettings": { 
     "format": "CSV",  
     "delimiter": ","
    }
}

Amazon QuickSight imports and parses the data set, and provides available data fields that can be used for making graphs. The Edit/Preview data button allows you to format and transform the data, change data types, and filter or join your data. Make sure that the columns have the correct titles. If not, you can edit them and then save.

Tip: choose the downward arrow on the top right and unselect Files include headers to give each column appropriate headers. Choose Save. This takes you back to the data sets page.

Follow the same steps as above to import the second data set. This time, your manifest should include your aggregate data set folder on S3, which is named <your unique name>-kinesis/aggregate/. Update headers if necessary and choose Save & visualize.

Build an analysis

The visualization screen shows the data set that you last imported, which in this case is the aggregate data. To include the general device data as well, for Fields on the top left, choose Edit analysis data sets. Choose Add data set and select the other data set that you saved earlier.

Now both data sets are available on the analysis screen. For Visual Types at bottom left, select the type of graph to make. For Fields, select the fields to visualize. For example, drag Device ID, Device Parameter, and Value to Field wells, as shown in the screenshot below, to generate a visualization of average parameter values compared across devices.

You can create another visual by choose +Add. This time, select a line graph to show monitoring of the maximum temperature values of the sensors in any minute, from the aggregate data set.

If you would like to create an interactive story to present to your team or organization, you can choose the Story option on the left panel. Create a dashboard with multiple visualizations, to save and share securely with the intended audience. An example of a story is shown below.

Conclusion

Any data is valuable only when it can be actually put to use. In this post, you’ve seen how it’s possible to quickly build a simple Analytics application to ingest, process, and visualize IoT data in near real time entirely using AWS managed services. This solution is scalable and reliable, and costs a fraction of other business intelligence solutions. It is easy enough that anyone with an AWS account can build and use it without any special training.

If you have any questions or suggestions, please comment below.


About the Author

Karan Desai is a Solutions Architect with Amazon Web Services. He works with startups and small businesses in the US, helping them adopt cloud technology to build scalable and secure solutions using AWS. In his spare time, he likes to build personal IoT projects, travel to offbeat places and write about it.

 

 


Related

Visualize Big Data with Amazon QuickSight, Presto, and Apache Spark on Amazon EMR

 

 

 

 

 

 

 

Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator

Post Syndicated from Allan MacInnis original https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/

When building a streaming data solution, most customers want to test it with data that is similar to their production data. Creating this data and streaming it to your solution can often be the most tedious task in testing the solution.

Amazon Kinesis Streams and Amazon Kinesis Firehose enable you to continuously capture and store terabytes of data per hour from hundreds of thousands of sources. Amazon Kinesis Analytics gives you the ability to use standard SQL to analyze and aggregate this data in real-time. It’s easy to create an Amazon Kinesis stream or Firehose delivery stream with just a few clicks in the AWS Management Console (or a few commands using the AWS CLI or Amazon Kinesis API). However, to generate a continuous stream of test data, you must write a custom process or script that runs continuously, using the AWS SDK or CLI to send test records to Amazon Kinesis. Although this task is necessary to adequately test your solution, it means more complexity and longer development and testing times.

Wouldn’t it be great if there were a user-friendly tool to generate test data and send it to Amazon Kinesis? Well, now there is—the Amazon Kinesis Data Generator (KDG).

KDG overview

The KDG simplifies the task of generating data and sending it to Amazon Kinesis. The tool provides a user-friendly UI that runs directly in your browser. With the KDG, you can do the following:

  • Create templates that represent records for your specific use cases
  • Populate the templates with fixed data or random data
  • Save the templates for future use
  • Continuously send thousands of records per second to your Amazon Kinesis stream or Firehose delivery stream

The KDG is open source, and you can find the source code on the Amazon Kinesis Data Generator repo in GitHub. Because the tool is a collection of static HTML and JavaScript files that run directly in your browser, you can start using it immediately without downloading or cloning the project. It is enabled as a static site in GitHub, and we created a short URL to access it.

To get started immediately, check it out at http://amzn.to/datagen.

Using the KDG

Getting started with the KDG requires only three short steps:

  1. Create an Amazon Cognito user in your AWS account (first-time only).
  2. Use this user’s credentials to log in to the KDG.
  3. Create a record template for your data.

When you’ve completed these steps, you can then send data to Streams or Firehose.

Create an Amazon Cognito user

The KDG is a great example of a mobile application that uses Amazon Cognito for a user repository and user authentication, and the AWS JavaScript SDK to communicate with AWS services directly from your browser. For information about how to build your own JavaScript application that uses Amazon Cognito, see Use Amazon Cognito in your website for simple AWS authentication on the AWS Mobile Blog.

Before you can start sending data to your Amazon Kinesis stream, you must create an Amazon Cognito user in your account who can write to Streams and Firehose. When you create the user, you create a username and password for that user. You use those credentials to sign in to the KDG. To simplify creating the Amazon Cognito user in your account, we created a Lambda function and a CloudFormation template. For more information about creating the Amazon Cognito user in your AWS account, see Configure Your AWS Account.

Note:  It’s important that you use the URL provided by the output of the CloudFormation stack the first time that you access the KDG. This URL contains parameters needed by the KDG. The KDG stores the values of these parameters locally, so you can then access the tool using the short URL, http://amzn.to/datagen.

Log in to the KDG

After you create an Amazon Cognito user in your account, the next step is to log in to the KDG. To do this, provide the username and password that you created earlier.

On the main page, you can configure your data templates and send data to an Amazon Kinesis stream or Firehose delivery stream.

The basic configuration is simple enough. All fields on the page are required:

  • Region: Choose the AWS Region that contains the Amazon Kinesis stream or Firehose delivery stream to receive your streaming data.
  • Stream/firehose name: Choose the name of the stream or delivery stream to receive your streaming data.
  • Records per second: Enter the number of records to send to your stream or delivery stream each second.
  • Record template: Enter the raw data, or a template that represents your data structure, to be used for each record sent by the KDG. For information about creating templates for your data, see the “Creating Record Templates” section, later in this post.

When you set the Records per second value, consider that the KDG isn’t intended to be a data producer for load-testing your application. However, it can easily send several thousand records per second from a single tab in your browser, which is plenty of data for most applications. In testing, the KDG has produced 80,000 records per second to a single Amazon Kinesis stream, but your mileage may vary. The maximum rate at which it produces records depends on your computer’s specs and the complexity of your record template.

Ensure that your stream or delivery stream is scaled appropriately:

  • 1,000 records/second or 1 MB/second to an Amazon Kinesis stream
  • 5,000 records/second or 5 MB/second to a Firehose delivery stream

Otherwise, Amazon Kinesis may reject records, and you won’t achieve your desired throughput. For more information about adding capacity to a stream by adding more shards, see Resharding a Stream. For information about increasing the capacity of a delivery stream, see Amazon Kinesis Firehose Limits.

Create record templates

The Record Template field is a free-text field where you can enter any text that represents a single streaming data record. You can create a single line of static data, so that each record sent to Amazon Kinesis is identical. Or, you can format the text as a template.

In this case, the KDG substitutes portions of the template with fake or random data before sending the record. This lets you introduce randomness or variability in each record that is sent in your data stream. The KDG uses Faker.js, an open source library, to generate fake data. For more information, see the faker.js project page in GitHub. The easiest way to see how this works is to review an example.

To simulate records being sent from a weather sensor Internet of Things (IoT) device, you want each record to be formatted in JSON. The following is an example of what a final record must look like:

{
	"sensorId": 40,
	"currentTemperature": 76,
	"status": "OK"
} 

For this use case, you want to simulate sending data from one of 50 sensors, so the sensorID field can be an integer between 1 and 50. The temperature value can range between 10 and 150, so the currentTemperature field should contain a value in this range. Finally, the status value can be one of three possible values: OK, FAIL, and WARN. The KDG template format uses moustache syntax (double curly-braces) to enclose items that should be replaced before the record is sent to Amazon Kinesis. To model the record, the template looks like this:

{
    "sensorId": {{random.number(50)}},
    "currentTemperature": {{random.number(
        {
            "min":10,
            "max":150
        }
    )}},
    "status": "{{random.arrayElement(
        ["OK","FAIL","WARN"]
    )}}"
}

Take a look at one more example, simulating a stream of records that represent rows from an Apache access log. A single Apache access log entry might look like this:

76.0.56.179 - - [29/Apr/2017:16:32:11 -05:00] "GET /wp-admin" 200 8233 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0 rv:6.0; CY) AppleWebKit/535.0.0 (KHTML, like Gecko) Version/4.0.3 Safari/535.0.0"

The following example shows how to create a template for the Apache access log:

{{internet.ip}} - - [{{date.now("DD/MMM/YYYY:HH:mm:ss Z")}}] "{{random.weightedArrayElement({"weights":[0.6,0.1,0.1,0.2],"data":["GET","POST","DELETE","PUT"]})}} {{random.arrayElement(["/list","/wp-content","/wp-admin","/explore","/search/tag/list","/app/main/posts","/posts/posts/explore"])}}" {{random.weightedArrayElement({"weights": [0.9,0.04,0.02,0.04], "data":["200","404","500","301"]})}} {{random.number(10000)}} "-" "{{internet.userAgent}}"

For more information about creating your own templates, see the Record Template section of the KDG documentation.

The KDG saves the templates that you create in your local browser storage. As long as you use the same browser on the same computer, you can reuse up to five templates.

Summary

Testing your streaming data solution has never been easier. Get started today by visiting the KDG hosted UI or its Amazon Kinesis Data Generator page in GitHub. The project is licensed under the Apache 2.0 license, so feel free to clone and modify it for your own use as necessary. And of course, please submit any issues or pull requests via GitHub.

If you have any questions or suggestions, please add them below.

 


About the Author

Allan MacInnis is a Solutions Architect at Amazon Web Services. He works with our customers to help them build streaming data solutions using Amazon Kinesis. In his spare time, he enjoys mountain biking and spending time with his family.

 

 


Related

Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount

 

 

Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS

Post Syndicated from Temitayo Olajide original https://aws.amazon.com/blogs/big-data/encrypt-and-decrypt-amazon-kinesis-records-using-aws-kms/

Customers with strict compliance or data security requirements often require data to be encrypted at all times, including at rest or in transit within the AWS cloud. This post shows you how to build a real-time streaming application using Kinesis in which your records are encrypted while at rest or in transit.

Amazon Kinesis overview

The Amazon Kinesis platform enables you to build custom applications that analyze or process streaming data for specialized needs. Amazon Kinesis can continuously capture and store terabytes of data per hour from hundreds of thousands of sources such as website clickstreams, financial transactions, social media feeds, IT logs, and transaction tracking events.

Through the use of HTTPS, Amazon Kinesis Streams encrypts data in-flight between clients which protects against someone eavesdropping on records being transferred. However, the records encrypted by HTTPS are decrypted once the data enters the service. This data is stored at rest for 24 hours (configurable up to 168 hours) to ensure that your applications have enough headroom to process, replay, or catch up if they fall behind.

Walkthrough

In this post you build encryption and decryption into sample Kinesis producer and consumer applications using the Amazon Kinesis Producer Library (KPL), the Amazon Kinesis Consumer Library (KCL), AWS KMS, and the aws-encryption-sdk. The methods and the techniques used in this post to encrypt and decrypt Kinesis records can be easily replicated into your architecture. Some constraints:

  • AWS charges for the use of KMS API requests for encryption and decryption, for more information see AWS KMS Pricing.
  • You cannot use Amazon Kinesis Analytics to query Amazon Kinesis Streams with records encrypted by clients in this sample application.
  • If your application requires low latency processing, note that there will be a slight hit in latency.

The following diagram shows the architecture of the solution.

Encrypting the records at the producer

Before you call the PutRecord or PutRecords API, you will encrypt the string record by calling KinesisEncryptionUtils.toEncryptedString.

In this example, we used a sample stock sales ticker object:

example {"tickerSymbol": "AMZN", "salesPrice": "900", "orderId": "300", "timestamp": "2017-01-30 02:41:38"}. 

The method (KinesisEncryptionUtils.toEncryptedString) call takes four parameters:

  • amazonaws.encryptionsdk.AwsCrypto
  • stock sales ticker object
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • util.Map of an encryption context

A ciphertext is returned back to the main caller which is then also checked for size by calling KinesisEncryptionUtils.calculateSizeOfObject. Encryption increases the size of an object. To prevent the object from being throttled, the size of the payload (one or more records) is validated to ensure it is not greater than 1MB. In this example encrypted records sizes with payload exceeding 1MB are logged as warning. If the size is less than the limit, then either addUserRecord or PutRecord and PutRecords are called if you are using the KPL or the Kinesis Streams API respectively 

Example: Encrypting records with KPL

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context);
log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000)
   log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encrypted record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
//Adding the encrypted record to stream
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, randomPartitionKey(), data);
Futures.addCallback(f, callback);

In the above code, the example sales ticker record is passed to the KinesisEncryptionUtils.toEncryptedString and an encrypted record is returned. The encryptedRecord value is also passed to KinesisEncryptionUtils.calculateSizeOfObject and the size of the encrypted payload is returned and checked to see if it is less than 1MB. If it is, the payload is then UTF-8 encoded (KinesisEncryptionUtils.toEncryptedByteStream), then sent to the stream for processing.

Example: Encrypting the records with Streams PutRecord

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov, context);
log.info("Size of encrypted object is : " + KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if (KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) > 1024000)
    log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encryptyed record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
putRecordRequest.setData(data);
putRecordRequest.setPartitionKey(randomPartitionKey());
//putting the record into the stream
kinesis.putRecord(putRecordRequest);

Verifying that records are encrypted

After the call to KinesisEncryptionUtils.toEncryptedString, you can print out the encrypted string record just before UTF-8 encoding. An example of what is printed to standard output when running this sample application is shown below.

[main] INFO kinesisencryption.streams.EncryptedProducerWithStreams - String Record is TickerSalesObject{tickerSymbol='FB', salesPrice='184.285409142', orderId='2a0358f1-9f8a-4bbe-86b3-c2929047e15d', timeStamp='2017-01-30 02:41:38'} and Encrypted Record String is AYADeMf6zmVg9JvIkGNv5M39rhUAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFpUkpCaG1UOFQ3UTZQZ253dm9FSU9iUDZPdE1xTHdBZ1JjNlZxN2doMDZ3QlBEWUZndWdJSEFKaXNvT0ZPUGsrdz09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDCCYBk+hfB3tOGVx7QIBEIA7FqaEcOWpic+gKNeT+dUe4yttB9dsZSFPAUTlz2L2zlyLXSLMh1otRH24SO485ov+TCTtRCgiA8a9rYQCAAAAAAwAABAArlGWPO8BavNSJIpJOtJekRUhOwbM+WM1NBVXB/////8AAAABXNZnRND3J7u8EZx3AAAAkfSxVPMYUv0Ovrd4AIUTmMcaiR0Z+IcJNAXqAhvMmDKpsJaQG76Q6pYExarolwT+6i87UOi6TGvAiPnH74GbkEniWe66rAF6mOra2JkffK6pBdhh95mEOGLaVPBqs2jswUTfdcBJQl9NEb7wx9XpFX8fNDF56Vly7u6f8OQ7lY6fNrOupe5QBFnLvwehhtogd72NTQ/yEbDDoPKUZN3IlWIEAGYwZAIwISFw+zdghALtarsHSIgPMs7By7/Yuda2r3hqSmqlCyCXy7HMFIQxHcEILjiLp76NAjB1D8r8TC1Zdzsfiypi5X8FvnK/6EpUyFoOOp3y4nEuLo8M2V/dsW5nh4u2/m1oMbw=

You can also verify that the record stayed encrypted in Streams by printing out the UTF-8 decoded received record immediately after the getRecords API call. An example of the print output when running the sample application is shown below.

[Thread-2] INFO kinesisencryption.utils.KinesisEncryptionUtils - Verifying object received from stream is encrypted. -Encrypted UTF-8 decoded : AYADeBJz/kt7Fm3L1lvS8Wy8jhAAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFrM2N4K2s1ODJuOGVlNWF3TVJ1dk1UUHZQc2FHeGoxQisxb09kNWtDUExHYjJDS0lMZW5LSnlYakRmdFR4dzQyUT09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDAGI3oWLlIJ2p6kffQIBEIA7JVUOTsLtEyNK8vS4GIS9iyTejuB2xhIpRXfG8o0lUfHawcrCbNbNH8XLm/8RW5JbgXo10EpOs8dSjkICAAAAAAwAABAAy64r24sGVKWN4C1gXCwJYHvZkLpJJj16SZlhpv////8AAAABg2pPFchIiaM7D9VuAAAAkwh10ul5sZQ08KsgkFszOOvFoQu95CiY7cK8H+tBloVOZglMqhhhvoIIZLr9hmI8/lQvRXzGDdo7Xkp0FAT5Jpztt8Hq/ZuLfZtNYIWOw594jShqpZt6uXMdMnpb/38R3e5zLK5vrYkM6NS4WPMFrHsOKN5tn0CDForgojRcdpmCJ8+cWLNltb2S+EJiWiyWS+ibw2vJ/RFm6WZO6nD+MXn3vyMAZzBlAjAuIUTYL1cbQ3ENxDIeXHJAWQguNPqxq4HgaCmCEI9/rn/GAKSc2nT9ln3UsVq/2dgCMQC7yNJ3DCTnppavfxTbcVS+rXaDDpZZx/ZsluMqXAFM5/FFvKRqr0dVML28tGunxmU=

Decrypting the records at the consumer

After you receive the records into your consumer as a list, you can get the data as a ByteBuffer by calling record.getData. You then decode and decrypt the byteBuffer by calling the KinesisEncryptionUtils.decryptByteStream. This method takes five parameters:

  • amazonaws.encryptionsdk.AwsCrypto
  • record ByteBuffer
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • key arn string
  • java.util.Map of your encryption context

A string representation of the ticker sales object is returned back to the caller for further processing. In this example, this representation is just printed to standard output.

[Thread-2] INFO kinesisencryption.streams.DecryptShardConsumerThread - Decrypted Text Result is TickerSalesObject{tickerSymbol='AMZN', salesPrice='304.958313333', orderId='50defaf0-1c37-4e84-85d7-bc15597355eb', timeStamp='2017-01-30 02:41:38'}

Example: Decrypting records with the KCL and Streams API

ByteBuffer buffer = record.getData();
//Decrypting the encrypted record data
String decryptedResult = KinesisEncryptionUtils.decryptByteStream(crypto,buffer,prov,this.getKeyArn(), this.getContext());
log.info("Decrypted Text Result is " + decryptedResult);

With the above code, records in the Kinesis Streams are decrypted using the same key ARN and encryption context that was previously used to encrypt it at the producer side.

Maven dependencies

To use the implementation I’ve outlined in this post, you need to use a few maven dependencies outlined below in the pom.xml together with the Bouncy Castle libraries. Bouncy Castle provides a cryptography API for Java.

 <dependency>
        <groupId>org.bouncycastle</groupId>
        <artifactId>bcprov-ext-jdk15on</artifactId>
        <version>1.54</version>
    </dependency>
<dependency>
   <groupId>com.amazonaws</groupId>
   <artifactId>aws-encryption-sdk-java</artifactId>
   <version>0.0.1</version>
</dependency>

Summary

You may incorporate above sample code snippets or use it as a guide in your application code to just start encrypting and decrypting your records to and from an Amazon Kinesis Stream.

A complete producer and consumer example application and a more detailed step-by-step example of developing an Amazon Kinesis producer and consumer application on AWS with encrypted records is available at the kinesisencryption github repository.

If you have questions or suggestions, please comment below.


About the Author

Temitayo Olajide is a Cloud Support Engineer with Amazon Web Services. He works with customers to provide architectural solutions, support and guidance to implementing high velocity streaming data applications in the cloud. In his spare time, he plays ping-pong and hangs out with family and friends

 

 


Related

Secure Amazon EMR with Encryption

 

 

Month in Review: February 2017

Post Syndicated from Derek Young original https://aws.amazon.com/blogs/big-data/month-in-review-february-2017/

Another month of big data solutions on the Big Data Blog!

Take a look at our summaries below and learn, comment, and share. Thank you for reading!

NEW POSTS

Implement Serverless Log Analytics Using Amazon Kinesis Analytics
In this post, learn how how to implement a solution that analyzes streaming Apache access log data from an EC2 instance aggregated over 5 minutes.

Migrate External Table Definitions from a Hive Metastore to Amazon Athena
For customers who use Hive external tables on Amazon EMR, or any flavor of Hadoop, a key challenge is how to effectively migrate an existing Hive metastore to Amazon Athena, an interactive query service that directly analyzes data stored in Amazon S3. In this post, learn an approach to migrate an existing Hive metastore to Athena, as well as how to use the Athena JDBC driver to run scripts.

AWS Big Data is Coming to HIMSS!
This year’s HIMSS conference was held at the Orange County Convention Center in Orlando, Florida from February 20 – 23. This blog post lists past AWS Big Data Blog posts to show how AWS technologies are being used to improve healthcare.

Create Tables in Amazon Athena from Nested JSON and Mappings Using JSONSerDe
In this post, you will use the tightly coupled integration of Amazon Kinesis Firehose for log delivery, Amazon S3 for log storage, and Amazon Athena with JSONSerDe to run SQL queries against these logs without the need for data transformation or insertion into a database.

Scheduled Refresh for SPICE Data Sets on Amazon QuickSight
QuickSight uses SPICE (Super-fast, Parallel, In-Memory Calculation Engine), a fully managed data store that enables blazing fast visualizations and can ingest data from AWS, on-premises, and cloud sources. Data in SPICE could be refreshed at any time with the click of a button within QuickSight. This post announced the ability to schedule these refreshes!

Harmonize, Search, and Analyze Loosely Coupled Datasets on AWS
You have come up with an exciting hypothesis, and now you are keen to find and analyze as much data as possible to prove (or refute) it. There are many datasets that might be applicable, but they have been created at different times by different people and don’t conform to any common standard. In this blog post, we will describe a sample application that illustrates how to solve these problems. You can install our sample app, which will harmonize and index three disparate datasets to make them searchable, present a data-driven, customizable UI for searching the datasets to do preliminary analysis and to locate relevant datasets, and integrate with Amazon Athena and Amazon QuickSight for custom analysis and visualization.

FROM THE ARCHIVE

Building Event-Driven Batch Analytics on AWS
Modern businesses typically collect data from internal and external sources at various frequencies throughout the day. In this post, you learn an elastic and modular approach for how to collect, process, and analyze data for event-driven applications in AWS.


Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.

Leave a comment below to let us know what big data topics you’d like to see next on the AWS Big Data Blog.

Converging Data Silos to Amazon Redshift Using AWS DMS

Post Syndicated from Pratim Das original https://aws.amazon.com/blogs/big-data/converging-data-silos-to-amazon-redshift-using-aws-dms/

Organizations often grow organically—and so does their data in individual silos. Such systems are often powered by traditional RDBMS systems and they grow orthogonally in size and features. To gain intelligence across heterogeneous data sources, you have to join the data sets. However, this imposes new challenges, as joining data over dblinks or into a single view is extremely cumbersome and an operational nightmare.

This post walks through using AWS Database Migration Service (AWS DMS) and other AWS services to make it easy to converge multiple heterogonous data sources to Amazon Redshift. You can then use Amazon QuickSight, to visualize the converged dataset to gain additional business insights.

AWS service overview

Here’s a brief overview of AWS services that help with data convergence.

AWS DMS

With DMS, you can migrate your data to and from most widely used commercial and open-source databases. The service supports homogenous migrations such as Oracle to Oracle, as well as heterogeneous migrations between different database platforms, such as Oracle to Amazon Aurora or Microsoft SQL Server to MySQL. It also allows you to stream data to Amazon Redshift from any of the supported sources including:

  • Amazon Aurora
  • PostgreSQL
  • MySQL
  • MariaDB
  • Oracle
  • SAP ASE
  • SQL Server

DMS enables consolidation and easy analysis of data in the petabyte-scale data warehouse. It can also be used for continuous data replication with high availability.

Amazon QuickSight

Amazon QuickSight provides very fast, easy-to-use, cloud-powered business intelligence at 1/10th the cost of traditional BI solutions. QuickSight uses a new, super-fast, parallel, in-memory calculation engine (“SPICE”) to perform advanced calculations and render visualizations rapidly.

QuickSight integrates automatically with AWS data services, enables organizations to scale to hundreds of thousands of users, and delivers fast and responsive query performance to them. You can easily connect QuickSight to AWS data services, including Amazon Redshift, Amazon RDS, Amazon Aurora, Amazon S3, and Amazon Athena. You can also upload CSV, TSV, and spreadsheet files or connect to third-party data sources such as Salesforce.

Amazon Redshift

Amazon Redshift delivers fast query performance by using columnar storage technology to improve I/O efficiency and parallelizing queries across multiple nodes. Amazon Redshift is typically priced at 1/10th of the price of the competition. We have many customers running petabyte scale data analytics on AWS using Amazon Redshift.

Amazon Redshift is also ANSI SQL compliant, supports JDBC/ODBC, and is easy to connect to your existing business intelligence (BI) solution. However, if your storage requirement is in the 10s of TB range and requires high levels of concurrency across small queries, you may want to consider Amazon Aurora as the target converged database.

Walkthrough

Assume that you have an events company specializing on sports, and have built a MySQL database that holds data for the players and the sporting events. Customers and ticket information is stored in another database; in this case, assume it is PostgresSQL and this gets updated when customer purchases tickets from our website and mobile apps. You can download a sample dataset from the aws-database-migration-samples GitHub repo.

These databases could be anywhere: at an on-premises facility; on AWS in Amazon EC2 or Amazon RDS, or other cloud provider; or in a mixture of such locations. To complicate things a little more, you can assume that the lost opportunities (where a customer didn’t complete buying the ticket even though it was added to the shopping cart) are streamed via clickstream through Amazon Kinesis and then stored on Amazon S3. We then use AWS Data Pipeline to orchestrate a process to cleanse that data using Amazon EMR and make it ready for loading to Amazon Redshift. The clickstream integration is not covered in this post but was demonstrated in the recent Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics post.

Architecture

In this solution, you use DMS to bring the two data sources into Amazon Redshift and run analytics to gain business insights. The following diagram demonstrates the proposed solution.

DataSilos_1_1

After the data is available on Amazon Redshift, you could easily build BI dashboards and generate intelligent reports to gain insights using Amazon QuickSight. You could also take this a step further and build a model using Amazon Machine Learning. Amazon Machine Learning uses powerful algorithms to create ML models by finding patterns in your existing data stored in Amazon S3, or Amazon Redshift. It is also highly scalable and can generate billions of predictions daily, and serve those predictions in real time and at high throughput.

Creating source databases

For the purposes of this post, create two RDS databases, one with a MySQL engine, and the other with PostgreSQL and then load some data. These represent a real-life scenario where databases could be located on-premises, on AWS, or both. Just as in real life, there may be more than two source databases; the process described in this post would still be reasonably similar.

Follow the steps in Tutorial: Create a Web Server and an Amazon RDS Database to create the two source databases. Use the links from the main tutorial page to see how to connect to specific databases and load data. For more information, see:

Make a note of the security group that you create and associate all the RDS instances with it. Call it “MyRDSSecurityGroup”.

Afterward, you should be able to see all the databases listed in the RDS Instances dashboard.

DataSilos_2_1

Setting up a target Amazon Redshift cluster

Set up a two-node cluster as shown below, with a cluster name similar to “consolidated-dwh” and a database named similar to “mydwh”. You could also set up a one-node cluster based on the instance type; the instance type may be available on the AWS Free Tier.

DataSilos_3

In the next step, choose Publicly Accessible for non-production usage to keep the configuration simple.

Also, for simplicity, choose the same VPC where you have placed the RDS instances and include the MyRDSSecurityGroup in the list of security groups allowed to access the Amazon Redshift cluster.

Setting up DMS

You can set up DMS easily, as indicated in the AWS Database Migration Service post on the AWS blog. However, rather than using the wizard, you may take a step-by-step approach:

  1. Create a replication instance.
  2. Create the endpoints for the two source databases and the target Amazon Redshift database.
  3. Create a task to synchronize each of the sources to the target.

Create a replication instance

In the DMS console, choose Replication instances, Create replication instance. The instance type you select depends on the data volume you deal with. After setup, you should be able to see your replication instance.

DataSilos_4
Create endpoints

In the DMS console, choose Endpoints, Create endpoint. You need to configure the two source endpoints representing the PostgreSQL and MySQL RDS databases. You also need to create the target endpoint by supplying the Amazon Redshift database that you created in the previous steps. After configuration, the endpoints look similar to the following screenshot:

DataSilos_5

Create a task and start data migration

You can rely on DMS to create the target tables in your target Amazon Redshift database or you may want to take advantage of AWS Schema Conversion Tool to create the target schema and also do a compatibility analysis in the process. Using the AWS Schema Conversion Tool is particularly useful when migrating using heterogeneous data sources. For more information, see Getting Started with the AWS Schema Conversion Tool.

For simplicity, I avoided using the AWS Schema Conversion Tool in this post and used jump to DMS to create the target schema and underlying tables and then set up the synchronization between the data sources and the target.

In the DMS console, choose Tasks, Create Tasks. Fill in the fields as shown in the following screenshot:

DataSilos_6

Note that given the source is RDS MySQL and you chose Migrate data and replicate on going changes, you need to enable bin log retention. Other engines have other requirements and DMS prompts you accordingly. For this particular case, run the following command:

call mysql.rds_set_configuration('binlog retention hours', 24);

Now, choose Start task on create. In the task settings, choose Drop tables on target to have DMS create the tables, if you haven’t already created the target tables using the AWS Schema Conversion Tool, as described earlier. Choose Enable logging but note that this incurs additional costs as the generated CloudWatch logs require storage.

In the table mappings, for Schema to migrate, ensure that the correct schema has been selected from the source databases. DMS creates the schema on the target if it does not already exist.

Repeat for the other data source, choosing the other source endpoint and the same Amazon Redshift target endpoint. In the table mappings section, choose Custom and customize as appropriate. For example, you can specify the schema names to include and tables to exclude, as shown in the following screenshot:

DataSilos_7

Using this custom configuration, you can perform some minor transformations, such as down casing target table names, or choosing a different target schema for both sources.

After both tasks have successfully completed, the Tasks tab now looks like the following:

DataSilos_8

Running queries on Amazon Redshift

In Amazon Redshift, select your target cluster and choose Loads. You can see all operations that DMS performed in the background to load the data from the two source databases into Amazon Redshift.

DataSilos_9

Ensure change data capture is working

Generate additional data on Amazon RDS PostgreSQL in the ticketing.sporting_event_ticket by running the script provided in the generate_mlb_season.sql aws-database-migration-samples GitHub repository. Notice that the tasks have caught up and are showing the migration in progress. You can also query the target tables and see that the new data is in the target table.

Visualization options

Set up QuickSight and configure your data source to be your Amazon Redshift database. If you have a Redshift cluster in the same account and in the same region, it will appear when you clock Redshift (Auto-discovered) from the data sets page, as shown below.

DataSilos_16

Access to any other Redshift cluster can be configured as follows using the Redshift (Manual connect) link:

DataSilos_10

Now, create your data set. Choose New Data Set and select either a new data source or an existing data source listed at the bottom of the page. Choose Ticketing for Sports.

DataSilos_11_1
In the next step, choose Create Data Set.

In the next step, when QuickSight prompts you to choose your table, you can select the schema and the required table and choose Select. Alternatively, you may choose Edit/Preview data.

DataSilos_11

You could use the graphical options shown below to start creating your data set. Given that you have data from multiple sources, it’s safe to assume that your target tables are in separate schemas. Select the schema and tables, select the other schemas, and bring the appropriate tables to the palette by selecting them using the check box to the right. For each join, select the join type and then map the appropriate keys between the tables until the two reds turn to one of the blue join types.

DataSilos_12

In this case, rather than preparing the data set in the palette, you provide a custom SQL query. On the left pane, choose Tables, Switch to Custom SQL tool.

Paste the following SQL query in the Custom SQL field and enter a name.

select to_char( e.start_date_time, 'YYYY-MM-DD' ) event_date, 
to_char( e.start_date_time, 'HH24:MI' ) start_time, e.sold_out, 
e.sport_type_name, l.name event_location, l.city event_city, 
l.seating_capacity, hteam.name home_team, hl.name home_field, 
hl.city home_city, ateam.name away_team, al.name away_field, 
al.city away_city, sum( t.ticket_price ) total_ticket_price, 
avg( t.ticket_price ) average_ticket_price, 
min ( t.ticket_price ) cheapest_ticket, 
max( t.ticket_price ) most_expensive_ticket, count(*) num_tickets

from ticketing.sporting_event_ticket t, sourcemysql.sporting_event e, 
sourcemysql.sport_location l, sourcemysql.sport_team hteam, 
sourcemysql.sport_team ateam, sourcemysql.sport_location hl, 
sourcemysql.sport_location al

where t.sporting_event_id = e.id
and t.sport_location_id = l.id
and e.home_team_id = hteam.id
and e.away_team_id = ateam.id
and hteam.home_field_id = hl.id
and ateam.home_field_id = al.id

group by to_char( e.start_date_time, 'YYYY-MM-DD' ), 
to_char( e.start_date_time, 'HH24:MI' ), e.start_date_time, 
e.sold_out, e.sport_type_name, l.name, l.city, l.seating_capacity, 
hteam.name, ateam.name, hl.name, hl.city, al.name, al.city;

DataSilos_13

You can choose Save and visualize and view the QuickSight visualization toolkit and filter options. Here you can build your story or dashboards and start sharing them with your team.

Now, you can choose various fields from the field list and the various measures to get the appropriate visualization, like the one shown below. This one was aimed to understand the date at which each event in each city reached the maximum capacity.

DataSilos_14

You can also combine many such visualizations and prepare your dashboard for management reporting. The analysis may also drive where you need to invent on campaigns and where things are going better than expected to ensure a healthy sales pipeline.

DataSilos_15

Summary

In this post, you used AWS DMS to converge multiple heterogonous data sources to an Amazon Redshift cluster. You also used Quicksight to create a data visualization on the converged dataset to provide you with additional insights. Although we have used an e-commerce use case related to an events company, this concept of converging multiple data silos to a target is also applicable to other verticals such as retail, health-care, finance, insurance and banking, gaming, and so on.

If you have questions or suggestions, please comment below.


About the Author

 

Pratim_DasPratim Das is a Specialist Solutions Architect for Analytics in EME. He works with customers on big data and analytical projects, helping them build solutions on AWS, using AWS services and (or) other open source or commercial solution from the big data echo system. In his spare time he enjoys cooking and creating exciting new recipes always with that spicy kick.

 

 


Related

Derive Insights from IoT in Minutes using AWS IoT, Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight

o_realtime_1_1_1
 

 

 

 

 

Month in Review: December 2016

Post Syndicated from Derek Young original https://aws.amazon.com/blogs/big-data/month-in-review-december-2016/

Another month of big data solutions on the Big Data Blog.

Take a look at our summaries below and learn, comment, and share. Thank you for reading!

Implementing Authorization and Auditing using Apache Ranger on Amazon EMR
Apache Ranger is a framework to enable, monitor, and manage comprehensive data security across the Hadoop platform. Features include centralized security administration, fine-grained authorization across many Hadoop components (Hadoop, Hive, HBase, Storm, Knox, Solr, Kafka, and YARN) and central auditing. In this post, walk through the steps to enable authorization and audit for Amazon EMR clusters using Apache Ranger.

Amazon Redshift Engineering’s Advanced Table Design Playbook
Amazon Redshift is a fully managed, petabyte scale, massively parallel data warehouse that offers simple operations and high performance. In practice, the best way to improve query performance by orders of magnitude is by tuning Amazon Redshift tables to better meet your workload requirements. This five-part blog series will guide you through applying distribution styles, sort keys, and compression encodings and configuring tables for data durability and recovery purposes.

Interactive Analysis of Genomic Datasets Using Amazon Athena
In this post, learn to prepare genomic data for analysis with Amazon Athena. We’ll demonstrate how Athena is well-adapted to address common genomics query paradigms using the Thousand Genomes dataset hosted on Amazon S3, a seminal genomics study. Although this post is focused on genomic analysis, similar approaches can be applied to any discipline where large-scale, interactive analysis is required.

Joining and Enriching Streaming Data on Amazon Kinesis
In this blog post, learn three approaches for joining and enriching streaming data on Amazon Kinesis Streams by using Amazon Kinesis Analytics, AWS Lambda, and Amazon DynamoDB.

Using SaltStack to Run Commands in Parallel on Amazon EMR
SaltStack is an open source project for automation and configuration management. It started as a remote execution engine designed to scale to many machines while delivering high-speed execution. You can now use the new bootstrap action that installs SaltStack on Amazon EMR. It provides a basic configuration that enables selective targeting of the nodes based on instance roles, instance groups, and other parameters.

Building an Event-Based Analytics Pipeline for Amazon Game Studios’ Breakaway
Amazon Game Studios’ new title Breakaway is an online 4v4 team battle sport that delivers fast action, teamwork, and competition. In this post, learn the technical details of how the Breakaway team uses AWS to collect, process, and analyze gameplay telemetry to answer questions about arena design.

Respond to State Changes on Amazon EMR Clusters with Amazon CloudWatch Events
With new support for Amazon EMR in Amazon CloudWatch Events, you can be notified quickly and programmatically respond to state changes in your EMR clusters. Additionally, these events are also displayed in the Amazon EMR console. CloudWatch Events allows you to create filters and rules to match these events and route them to Amazon SNS topics, AWS Lambda functions, Amazon SQS queues, streams in Amazon Kinesis Streams, or built-in targets.

Run Jupyter Notebook and JupyterHub on Amazon EMR
Data scientists who run Jupyter and JupyterHub on Amazon EMR can use Python, R, Julia, and Scala to process, analyze, and visualize big data stored in Amazon S3. Jupyter notebooks can be saved to S3 automatically, so users can shut down and launch new EMR clusters, as needed. See how EMR makes it easy to spin up clusters with different sizes and CPU/memory configurations to suit different workloads and budgets.

Derive Insights from IoT in Minutes using AWS IoT, Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight
In this post, see how you can build a business intelligence capability for streaming IoT device data using AWS serverless and managed services. You can be up and running in minutes―starting small, but able to easily grow to millions of devices and billions of messages.

Serving Real-Time Machine Learning Predictions on Amazon EMR
The typical progression for creating and using a trained model for recommendations falls into two general areas: training the model and hosting the model. Model training has become a well-known standard practice. In this post, we highlight one way to host those recommendations using Amazon EMR with JobServer

Powering Amazon Redshift Analytics with Apache Spark and Amazon Machine Learning
In this post, learn to generate a predictive model for flight delays that can be used to help pick the flight least likely to add to your travel stress. To accomplish this, you’ll use Apache Spark running on Amazon EMR for extracting, transforming, and loading (ETL) the data, Amazon Redshift for analysis, and Amazon Machine Learning for creating predictive models.

FROM THE ARCHIVE

Running sparklyr – RStudio’s R Interface to Spark on Amazon EMR
Sparklyr is an R interface to Spark that allows users to use Spark as the backend for dplyr, one of the most popular data manipulation packages. Sparklyr provides interfaces to Spark packages and also allows users to query data in Spark using SQL and develop extensions for the full Spark API. This short post shows you how to run RStudio and sparklyr on EMR.


Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.

Leave a comment below to let us know what big data topics you’d like to see next on the AWS Big Data Blog.

Derive Insights from IoT in Minutes using AWS IoT, Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight

Post Syndicated from Ben Snively original https://aws.amazon.com/blogs/big-data/derive-insights-from-iot-in-minutes-using-aws-iot-amazon-kinesis-firehose-amazon-athena-and-amazon-quicksight/

Ben Snively is a Solutions Architect with AWS

Speed and agility are essential with today’s analytics tools. The quicker you can get from idea to first results, the more you can experiment and innovate with your data, perform ad-hoc analysis, and drive answers to new business questions.

Serverless architectures help in this respect by taking care of the non-differentiated heavy lifting in your environment―tasks such as managing servers, clusters, and device endpoints – allowing you to focus on assembling your IoT system, analyzing data, and building meaningful reports quickly and efficiently.

In this post, I show how you can build a business intelligence capability for streaming IoT device data using AWS serverless and managed services. You can be up and running in minutes―starting small, but able to easily grow to millions of devices and billions of messages.

AWS serverless services

AWS services offer a quicker time to intelligence. The following is a high-level diagram showing how the services in this post are configured:

o_IoT_Minutes_1

AWS IoT easily and securely connects devices through the MQQT and HTTPS  protocols. The IoT Rules Engine continuously processes incoming messages, enabling your devices to interact with other AWS services.

Amazon S3 is object storage that provides you a highly reliable, secure, and scalable storage for all your data, big or small.  It is designed to deliver 99.999999999% durability, and scale past trillions of objects.

Amazon Kinesis Firehose allows you to capture and automatically load streaming data into Amazon Kinesis Analytics, Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service, enabling near-real-time business intelligence and the use of dashboards.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using SQL.  Simply point to your data in Amazon S3, define the schema, and start querying using standard SQL, with most results delivered in seconds.

Amazon QuickSight is a fast, cloud-powered business analytics service that makes it easy to build visualizations, perform ad-hoc analysis, and quickly get business insights from your data. You can easily run SQL queries using Athena on data stored in S3, and build business dashboards within QuickSight.

Walkthrough: Heartrate sensors

In this walkthrough, you run a script to mimic multiple sensors publishing messages on an IoT MQTT topic, with one message published every second. The events get sent to AWS IoT, where an IoT rule is configured.  The IoT rule captures all messages and sends them to Firehose.  From there, Firehose writes the messages in batches to objects stored in S3.  In S3, you set up a table in Athena and use QuickSight to analyze the IoT data.

Configuring Firehose to write to S3

Create a Firehose delivery stream using the following field values.

Name IoT-to-BI-Example
S3 Bucket <Create one>
S3 Prefix iot_to_bi_example/

Configuring the AWS IoT rule

Create a new AWS IoT rule with the following field values.

Name Iot_to_bi_demoRule
Attribute *
Topic Filter /health/#
Add Action Send messages to an Amazon Kinesis Firehose stream (from previous section).
Select Separator “\n (newline)”

Generating sample data

Running the heartrate.py python script generates fictitious IoT messages from multiple userid values.  The IoT rule sends the message to Firehose, which writes the data out to S3.

The script assumes that it has access to AWS CLI credentials and that boto3 is installed on the machine running the script.

Download and run the following Python script:

https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-iot-athena-quicksight-bi/heartrate.py

The script generates random data that looks like the following:

{"heartRate": 67, "userId": "Brady", "rateType": "NORMAL", "dateTime": "2016-12-18 14:01:39"}
{"heartRate": 78, "userId": "Bailey", "rateType": "NORMAL", "dateTime": "2016-12-18 14:01:41"}
{"heartRate": 94, "userId": "Beatrice", "rateType": "NORMAL", "dateTime": "2016-12-18 14:01:42"}
…
{"heartRate": 73, "userId": "Ben", "rateType": "NORMAL", "dateTime": "2016-12-18 14:01:54"}

Run the script for a few hours and then end the task or process.

Configuring Athena

Log into the Athena console. In the Query Editor, create a table using the following query:

CREATE EXTERNAL TABLE heartrate_iot_data (
    heartRate int,
    userId string,
    rateType string,
    dateTime timestamp)
ROW FORMAT  serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties( 'ignore.malformed.json' = 'true' )
LOCATION 's3://<CREATED-BUCKET>/iot_to_bi_example/'

After the query completes, you see the new table in the left-hand pane:

o_IoT_Minutes_2

Run the following command to see the different userid values that were generated by the test script:

SELECT userid, COUNT(userid) FROM heartrate_iot_data GROUP BY userid

Your counts depend on how long you left the script to run, but you should have the following names listed:

o_IoT_Minutes_3

Analyzing the data

Now, analyze this data using Athena via QuickSight.

Set up a data source

Log into QuickSight and choose Manage data, New data set. Choose Athena as a new data source.

o_IoT_Minutes_4

Name your data source “AthenaDataSource”. Select the default schema and the heartrate_iot_data table.

o_IoT_Minutes_5

On the left, choose Visualize.

NOTE: The Edit/Preview data button allows you to prepare the dataset differently prior to visualizing it.  Examples include being able to format and transform your data, create alias data fields, change data types, and filter or join your data.

Build an analysis

On the left, choose userid; you’ll see the same names and counts that you saw when you ran the Athena SQL query earlier.

o_IoT_Minutes_6

Choose ratetype to query over the IoT data for the number of userids that have had HIGH versus NORMAL heartrates.

o_IoT_Minutes_7

Focus on “HIGH” heart rates by interacting with the graph. Open one of the bars under the HIGH section and choose Focus only on HIGH.

o_IoT_Minutes_8

This configures a filter to include only “HIGH” records.

o_IoT_Minutes_9

On the left, choose Visualize. Clear ratetype (as you are now filtering by it), choose heartrate, and switch to AVERAGE instead of SUM.

IoT_Minutes_10

Selecting the datetime X axis title allows you to change the time units easily. Change it to an hour.

You can easily change the parameters and see the users who haven’t had any HIGH heartrate data (in this dataset: Bonny, Brady, and Branden). 

o_IoT_Minutes_11

Conclusion

With these few easy steps, you’ve assembled a fully managed set of services for collecting sensor data from devices, batching the data into S3, and building intelligence from that data using QuickSight and Athena.

This was all done without launching a single server or cluster; throughout, you’ve been able to focus on the data and analytics, rather than the management of the infrastructure.

In this example, you kept the data in JSON.  Converting it to Parquet would have been much more performant. Setting up partitions for the data would also restrict the amount of data scanned.

If you have questions or suggestions, please comment below.


About the Author

 

ben_snively_90Ben Snively is a Public Sector Specialist Solutions Architect. He works with government, non-profit and education customers on big data and analytical projects, helping them build solutions using AWS. In his spare time he adds IoT sensors throughout his house and runs analytics on it.

 

 


Related

Integrating IoT Events into Your Analytic Platform

iot_1_smlal

Joining and Enriching Streaming Data on Amazon Kinesis

Post Syndicated from Assaf Mentzer original https://aws.amazon.com/blogs/big-data/joining-and-enriching-streaming-data-on-amazon-kinesis/

Are you trying to move away from a batch-based ETL pipeline? You might do this, for example, to get real-time insights into your streaming data, such as clickstream, financial transactions, sensor data, customer interactions, and so on.  If so, it’s possible that as soon as you get down to requirements, you realize your streaming data doesn’t have all of the fields you need for real-time processing, and you are really missing that JOIN keyword!

You might also have requirements to enrich the streaming data based on a static reference, a dynamic dataset, or even with another streaming data source.  How can you achieve this without sacrificing the velocity of the streaming data?

In this blog post, I provide three use cases and approaches for joining and enriching streaming data:

Joining streaming data with a relatively static dataset on Amazon S3 using Amazon Kinesis Analytics

In this use case, Amazon Kinesis Analytics can be used to define a reference data input on S3, and use S3 for enriching a streaming data source.

For example, bike share systems around the world can publish data files about available bikes and docks, at each station, in real time.  On bike-share system data feeds that follow the General Bikeshare Feed Specification (GBFS), there is a reference dataset that contains a static list of all stations, their capacities, and locations.

Let’s say you would like to enrich the bike-availability data (which changes throughout the day) with the bike station’s latitude and longitude (which is static) for downstream applications.  The architecture would look like this:

o_joiningenriching_1

To illustrate how you can use Amazon Kinesis Analytics to do this, follow these steps to set up an AWS CloudFormation stack, which will do the following:

  • Continuously produce sample bike-availability data onto an Amazon Kinesis stream (with, by default, a single shard).
  • Generate a sample S3 reference data file.
  • Create an Amazon Kinesis Analytics application that performs the join with the reference data file.

To create the CloudFormation stack

  1. Open the AWS CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
  2. Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
    https://s3.amazonaws.com/aws-bigdata-blog/artifacts/Joining-and-Enriching-Streaming-Data-on-Amazon-Kinesis/demo-data-enrichment-s3-reference-cfn-template.json
  1. For the stack name, type demo-data-enrichment-s3-reference-stack. Under the KinesisAnalyticsReferenceDataS3Bucket parameter, type the name of an S3 bucket in the US East (N. Virginia) region where the reference data will be uploaded.  This CloudFormation template will create a sample data file under KinesisAnalyticsReferenceDataS3Key. (Make sure the value of the KinesisAnalyticsReferenceDataS3Key parameter does not conflict with existing S3 objects in your bucket).You’ll also see parameters referencing an S3 bucket (LambdaKinesisAnalyticsApplicationCustomResourceCodeS3Bucket) and an associated S3 key. The S3 bucket and the S3 key represent the location of the AWS Lambda package (written in Python) that contains the AWS CloudFormation custom resources required to create an Amazon Kinesis application. Do not modify the values of these parameters.
  1. Follow the remaining steps in the Create Stack wizard (click “Next” button, and then the “Create” button). Wait until the status displayed for the stack is CREATE_COMPLETE.

You now have an Amazon Kinesis stream in your AWS account that has sample bike-availability streaming data which, if you followed the template, is named demo-data-enrichment-bike-availability-input-stream and an Amazon Kinesis Analytics application that performs the joining.

You might want to examine the reference data file generated under your bucket (its default name is demo_data_enrichment_s3_reference_data.json).  If you are using a JSON formatter/viewer, it will look like the following.  Note how the records are put together as top-level objects (no commas separating the records).

{
  "station_id": "s001",
  "name": "Liberty Ave",
  "lat": 40.6892,
  "lon": -74.0445
}
{
  "station_id": "s002",
  "name": "Empire St",
  "lat": 40.7484,
  "lon": -73.9857
}

In the CloudFormation template, the reference data has been added to the Amazon Kinesis application through the AddApplicationReferenceDataSource API method.

Next, open the Amazon Kinesis Analytics console and examine the application.  If you did not modify the default parameters, the application name should be demo-data-enrichment-s3-reference-application.

The Application status should be RUNNING.  If its status is still STARTING, wait until it changes to RUNNING. Choose Application details, and then go to SQL results. You should see a page similar to the following:

o_joiningenriching_2

In the Amazon Kinesis Analytics SQL code, an in-application stream (DESTINATION_SQL_STREAM) is created with five columns. Data is inserted into the stream using a pump (STREAM_PUMP) by joining the source stream (SOURCE_SQL_STREAM_001) and the reference S3 data file (REFERENCE_DATA) using the station_id field. For more information about streams and pumps, see In-Application Streams and Pumps in the Amazon Kinesis Analytics Developer Guide.

The joined (enriched) data fields – station_name, station_lat and station_lon – should appear on the Real-time analytics tab, DESTINATION_SQL_STREAM.

o_joiningenriching_3

The LEFT OUTER JOIN works just like the ANSI-standard SQL. When you use LEFT OUTER JOIN, the streaming records are preserved even if there is no matching station_id in the reference data.  You can delete LEFT OUTER (leaving only JOIN, which means INNER join), choose Save and then run SQL. Only the records with a matching station_id will appear in the output.

o_joiningenriching_4

You can write the results to an Amazon Kinesis stream or to S3, Amazon Redshift, or Amazon Elasticsearch Service with an Amazon Kinesis Firehose delivery stream by adding a destination on the Destination tab.  For more information, see the Writing SQL on Streaming Data with Amazon Kinesis Analytics blog post.

Note: If you have not modified the default parameters in the CloudFormation template, S3 reference data source should have been added to your application.  If you are interested in adding S3 reference data source on your own streams, the following code snippet shows what it looks like in Python:

import boto3
kinesisanalytics_client = boto3.client('kinesisanalytics')

application_name = 'YOUR KINESIS ANALYTICS APPLICATION NAME'
reference_data_s3_bucket = 'YOUR REFERENCE DATA S3 BUCKET'
reference_data_s3_key = 'YOUR REFERENCE DATA S3 KEY' #example: folder/sub-folder/reference.json
reference_data_role_arn = 'YOUR IAM ROLE ARN' #with s3:GetObject permission on the reference data s3 key. Example: arn:aws:iam::123456789012:role/service-role/role-name

response = kinesisanalytics_client.describe_application(
    ApplicationName=application_name
)
application_version_id = response['ApplicationDetail']['ApplicationVersionId']

kinesisanalytics_client.add_application_reference_data_source(
    ApplicationName=application_name,
    CurrentApplicationVersionId=application_version_id,
    ReferenceDataSource={
        'TableName': 'REFERENCE_DATA',
        'S3ReferenceDataSource': {
            'BucketARN': 'arn:aws:s3:::%s' % reference_data_s3_bucket,
            'FileKey': reference_data_s3_key,
            'ReferenceRoleARN': reference_data_role_arn
        },
        'ReferenceSchema': {
            'RecordFormat': {
                'RecordFormatType': 'JSON',
                'MappingParameters': {
                    'JSONMappingParameters': {
                        'RecordRowPath': '$'
                    }
                }
            },
            'RecordEncoding': 'UTF-8',
            'RecordColumns': [
                {
                    'Name': 'station_id',
                    'Mapping': '$.station_id',
                    'SqlType': 'VARCHAR(4)'
                },
                {
                    'Name': 'name',
                    'Mapping': '$.name',
                    'SqlType': 'VARCHAR(64)'
                },
                {
                    'Name': 'lat',
                    'Mapping': '$.lat',
                    'SqlType': 'REAL'
                },
                {
                    'Name': 'lon',
                    'Mapping': '$.lon',
                    'SqlType': 'REAL'
                },
            ]
        }
    }
)

 

This data enrichment approach works for a relatively static dataset because it requires you to upload the entire reference dataset to S3 whenever there is a change.  This approach might not work for cases where the dataset changes too frequently to catch up with the streaming data.

If you change the reference data stored in the S3 bucket, you need to use the UpdateApplication operation (using the API or AWS CLI) to refresh the data in the Amazon Kinesis Analytics in-application table.  You can use the following Python script to refresh the data. Although it will not be covered in this blog post, you can automate data refresh by setting up Amazon S3 event notification with AWS Lambda.

#!/usr/bin/env python
import json,boto3

# Name of the Kinesis Analytics Application
KINESIS_ANALYTICS_APPLICATION_NAME='demo-data-enrichment-s3-reference-application'

# AWS region
AWS_REGION='us-east-1'


def main_handler():

    kinesisanalytics_client=boto3.client('kinesisanalytics',AWS_REGION)

    # retrieve the current application version id
    response_describe_application = kinesisanalytics_client.describe_application(
        ApplicationName=KINESIS_ANALYTICS_APPLICATION_NAME
    )
    application_version_id=response_describe_application['ApplicationDetail']['ApplicationVersionId']

    # update the application
    response = kinesisanalytics_client.update_application(
        ApplicationName=KINESIS_ANALYTICS_APPLICATION_NAME,
        CurrentApplicationVersionId=application_version_id,
        ApplicationUpdate={
        }
    )

    print("Done")


if __name__ == "__main__":
    main_handler()

 

To clean up resources created during this demo

  1. To delete the Amazon Kinesis Analytics application, open the Amazon Kinesis Analytics console, choose the application (demo-data-enrichment-s3-reference-application), choose Actions, and then choose Delete application.
  2. To delete the stack, open the AWS CloudFormation console, choose the stack (demo-data-enrichment-s3-reference-stack), choose Actions, and then choose Delete Stack.

The S3 reference data file should have been removed from your bucket, but your other data files should remain intact.

Enriching streaming data with a dynamic dataset using AWS Lambda and Amazon DynamoDB

In many cases, the data you want to enrich is not static. Imagine a case in which you are capturing user activities from a web or mobile application and have to enrich the activity data with frequently changing fields from a user database table.

A better approach is to store reference data in a way that can support random reads and writes efficiently.  Amazon DynamoDB is a fully managed non-relational database that can be used in this case.  AWS Lambda can be set up to automatically read batches of records from your Amazon Kinesis streams, which can perform data enrichment like looking up data from DynamoDB, and then produce the enriched data onto another stream.

If you have a stream of user activities and want to look up a user’s birth year from a DynamoDB table, the architecture will look like this:

o_joiningenriching_5

To set up a demo with this architecture, follow these steps to set up an AWS CloudFormation stack, which continuously produces sample user scores onto an Amazon Kinesis stream. An AWS Lambda function enriches the records with data from a DynamoDB table and produces the results onto another Amazon Kinesis stream.

  1. Open the CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
  2. Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
    https://s3.amazonaws.com/aws-bigdata-blog/artifacts/Joining-and-Enriching-Streaming-Data-on-Amazon-Kinesis/demo-data-enrichment-ddb-reference-cfn-template.json
  3. For the stack name, type demo-data-enrichment-ddb-reference-stack. Review the parameters. If none of them conflict with your Amazon Kinesis stream names or DynamoDB table name, you can accept the default values and choose Next.  The following steps are written with the assumption that you are using the default values.
  4. Complete the remaining steps in the Create Stack wizard. Wait until CREATE_COMPLETE is displayed for the stack status.This CloudFormation template creates three Lambda functions: one for setting up sample data in a DynamoDB table, one for producing sample records continuously, and one for data enrichment. The description for this last function is Data Enrichment Demo – Lambda function to consume and enrich records from Kinesis.
  1. Open the Lambda console, select Data Enrichment Demo – Lambda function to consume and enrich records from Kinesis, and then choose the Triggers
  2. If No records processed is displayed for Last result, wait for a few minutes, and then reload this page. If things are set up correctly, OK will be displayed for Last result.  This might take a few minutes.

o_joiningenriching_6

At this point, the Lambda function is performing the data enrichment and producing records onto an output stream. Lambda is commonly used for preprocessing the analytics app to handle more complicated data formats.

Although this data enrichment process doesn’t involve Amazon Kinesis Analytics, you can still use the Kinesis Analytics service to examine, or even process, the enriched records, by creating a new Kinesis Analytics application and connect it to demo-data-enrichment-user-activity-output-stream.

The records on the demo-data-enrichment-user-activity-output-stream will look like the following and will show the enriched birthyear field on the streaming data.

o_joiningenriching_7

You can use this birthyear value in your Amazon Kinesis Analytics application. Although it’s not covered in this blog post, you can aggregate by age groups and count the user activities.

You can review the code for the Lambda function on the Code tab. The code used here is for demonstration purpose only. You might want to modify and thoroughly test it before applying it to your use case.

Note the following:

  • The Lambda function receives records in batches (instead of one record per Lambda invocation). You can control this behavior through Batch size on the Triggers tab (in the preceding example, Batch size is set to 100). The batch size you specify is the maximum number of records that you want your Lambda function to receive per invocation.
  • The retrieval from the reference data source (in this case, DynamoDB) can be done in batch instead of record by record. This example uses the batch_get_item() API method.
  • Depending on how strict the record sequencing requirement is, the writing of results to the output stream can also be done in batch. This example uses the put_records() API method.

DynamoDB is not the only option.  What’s important is to have a data source that supports random data read (and write) at a high efficiency.  Because it’s a distributed database with built-in partitioning, DynamoDB is a good choice, but you can also use Amazon RDS as long as the data retrieval is fast enough (perhaps by having a proper index and by spreading the read workload over read replicas). You can also use an in-memory database like Amazon ElastiCache for Redis or Memcached.

To clean up the resources created for this demo

  1. If you created a Kinesis Analytics application: Open the Amazon Kinesis Analytics console, select your application, choose Actions, and then choose Delete application.
  2. Open the CloudFormation console, choose demo-data-enrichment-ddb-reference-stack, choose Actions, and then choose Delete Stack.

Joining multiple sets of streaming data using Amazon Kinesis Analytics

To illustrate this use case, let’s say you have two streams of temperature sensor data coming from a set of machines: one measuring the engine temperature and the other measuring the power supply temperature.  For the best prediction of machine failure, you’ve been told (perhaps by your data scientist) that the two temperature readings must be validated against a prediction model ─ not individually, but as a set.

Because this is streaming data, the time window for the data joining should be clearly defined.  In this example, the joining must occur on temperature readings within the same window (same minute) so that the temperature of the engine is matched against the temperature of the power supply in the same timeframe.

This data joining can be achieved with Amazon Kinesis Analytics, but has to follow a certain pattern.

First of all, an Amazon Kinesis Analytics application supports no more than one streaming data source. The different sets of streaming data have to be produced onto one Amazon Kinesis stream.

An Amazon Kinesis Analytics application can use this Amazon Kinesis stream as input and can process the data based on a certain field (in this example, sensor_location) into multiple in-application streams.

The joining can now occur on the two in-application streams.  In this example, the join fields are the machine_id and the one-minute tumbling window.

o_joiningenriching_8

The selection of the time field for the windowing criteria is a topic of its own.  For information, see Writing SQL on Streaming Data with Amazon Kinesis Analytics. It is important to get a well-aligned window for real-world applications.  In this example, the processing time (ROWTIME) will be used for the tumbling window calculation for simplicity.

Again, you can follow these steps to set up an AWS CloudFormation stack, which continuously produces two sets of sample sensor data onto a single Amazon Kinesis stream and creates an Amazon Kinesis Analytics application that performs the joining.

  1. Open the CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
  2. Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
    https://s3.amazonaws.com/aws-bigdata-blog/artifacts/Joining-and-Enriching-Streaming-Data-on-Amazon-Kinesis/demo-data-joining-multiple-streams-cfn-template.json
  3. For the stack name, type demo-data-joining-multiple-streams-stack. Review the parameters. If none of them conflict with your Amazon Kinesis stream, you can accept the default values and choose Next. The following steps are written with the assumption that you are using the default values.
  4. You will also see parameters referencing an S3 bucket and S3 key. This is an AWS Lambda package (written in Python) that contains the AWS CloudFormation custom resources to create an Amazon Kinesis application.
  5. Complete the remaining steps in the Create Stack wizard. Wait until the status displayed for the stack is CREATE_COMPLETE.

Open the Amazon Kinesis Analytics console and examine the application.  If you have not modified the default parameters in the CloudFormation template, the application name should be demo-data-joining-multiple-streams-application.

The Application status should be RUNNING.  If its status is still STARTING, wait until it changes to RUNNING.

Choose Application details, and then go to SQL results.

You should see a page similar to the following:

o_joiningenriching_9

The SQL statements have been populated for you. The script first creates two in-application streams (for engine and power supply temperature readings, respectively).  DESTINATION_SQL_STREAM holds the joined results.

On the Real-time analytics tab, you’ll see the average temperature readings of the engine and power supply have been joined together using the machine_id and per-minute tumbling window.  The results can be written to an Amazon Kinesis stream or to S3, Amazon Redshift, or Amazon Elastisearch Service with a Firehose delivery stream.

o_joiningenriching_10

To clean up resources created for this demo

  1. To delete the Amazon Kinesis Analytics application, open the Amazon Kinesis Analytics console, choose demo-data-joining-multiple-streams-application, choose Actions, and then choose Delete application.
  2. Open the CloudFormation console, choose demo-data-joining-multiple-streams-stack, choose Actions, and then choose Delete Stack.

Summary

In this blog post, I shared three approaches for joining and enriching streaming data on Amazon Kinesis Streams by using Amazon Kinesis Analytics, AWS Lambda, and Amazon DynamoDB.  I hope this information will be helpful in situations when your real-time analytics applications require additional data fields from reference data sources or the real-time insights must be derived from data across multiple streaming data sources.  These joining and enrichment techniques will help you can get the best business value from your data.

If you have a question or suggestion, please leave a comment below.


About the author

 

assaf_mentzer_90Assaf Mentzer is a Senior Consultant in Big Data & Analytics for AWS Professional Services. He works with enterprise customers to provide leadership on big data projects, helping them reach their full data potential in the cloud. In his spare time, he enjoys watching sports, playing ping pong, and hanging out with family and friends.

 

 


Related

 

Writing SQL on Streaming Data with Amazon Kinesis Analytics

writingsql_image1


From ELK Stack to EKK: Aggregating and Analyzing Apache Logs with Amazon Elasticsearch Service, Amazon Kinesis, and Kibana

Post Syndicated from Pubali Sen original https://aws.amazon.com/blogs/devops/from-elk-stack-to-ekk-aggregating-and-analyzing-apache-logs-with-amazon-elasticsearch-service-amazon-kinesis-and-kibana/

By Pubali Sen, Shankar Ramachandran

Log aggregation is critical to your operational infrastructure. A reliable, secure, and scalable log aggregation solution makes all the difference during a crunch-time debugging session.

In this post, we explore an alternative to the popular log aggregation solution, the ELK stack (Elasticsearch, Logstash, and Kibana): the EKK stack (Amazon Elasticsearch Service, Amazon Kinesis, and Kibana). The EKK solution eliminates the undifferentiated heavy lifting of deploying, managing, and scaling your log aggregation solution. With the EKK stack, you can focus on analyzing logs and debugging your application, instead of managing and scaling the system that aggregates the logs.

In this blog post, we describe how to use an EKK stack to monitor Apache logs. Let’s look at the components of the EKK solution.

Amazon Elasticsearch Service is a popular search and analytics engine that provides real-time application monitoring and log and clickstream analytics. For this post, you will store and index Apache logs in Amazon ES. As a managed service, Amazon ES is easy to deploy, operate, and scale in the AWS Cloud. Using a managed service also eliminates administrative overhead, like patch management, failure detection, node replacement, backing up, and monitoring. Because Amazon ES includes built-in integration with Kibana, it eliminates installing and configuring that platform. This simplifies your process further. For more information about Amazon ES, see the Amazon Elasticsearch Service detail page.

Amazon Kinesis Agent is an easy-to-install standalone Java software application that collects and sends data. The agent continuously monitors the Apache log file and ships new data to the delivery stream. This agent is also responsible for file rotation, checkpointing, retrying upon failures, and delivering the log data reliably and in a timely manner. For more information, see Writing to Amazon Kinesis Firehose Using Amazon Kinesis Agent or Amazon Kinesis Agent in GitHub.

Amazon Kinesis Firehose provides the easiest way to load streaming data into AWS. In this post, Firehose helps you capture and automatically load the streaming log data to Amazon ES and back it up in Amazon Simple Storage Service (Amazon S3). For more information, see the Amazon Kinesis Firehose detail page.

You’ll provision an EKK stack by using an AWS CloudFormation template. The template provisions an Apache web server and sends the Apache access logs to an Amazon ES cluster using Amazon Kinesis Agent and Firehose. You’ll back up the logs to an S3 bucket. To see the logs, you’ll leverage the Amazon ES Kibana endpoint.

By using the template, you can quickly complete the following tasks:

·      Provision an Amazon ES cluster.

·      Provision an Amazon Elastic Compute Cloud (Amazon EC2) instance.

·      Install Apache HTTP Server version 2.4.23.

·      Install the Amazon Kinesis Agent on the web server.

·      Provision an Elastic Load Balancing load balancer.

·      Create the Amazon ES index and the associated log mappings.

·      Create an Amazon Kinesis Firehose delivery stream.

·      Create all AWS Identity and Access Management (IAM) roles and policies. For example, the Firehose delivery stream backs up the Apache logs to an S3 bucket. This requires that the Firehose delivery stream be associated with a role that gives it permission to upload the logs to the correct S3 bucket.

·      Configure Amazon CloudWatch Logs log streams and log groups for the Firehose delivery stream. This helps you to troubleshoot when the log events don’t reach their destination.

EKK Stack Architecture
The following architecture diagram shows how an EKK stack works.

Arch8-2-2-2

Prerequisites
To build the EKK stack, you must have the following:

·      An Amazon EC2 key pair in the US West (Oregon) Region. If you don’t have one, create one.

·      An S3 bucket in the US West (Oregon) Region. If you don’t have one, create one.

·      A default VPC in the US West (Oregon) Region. If you have deleted the default VPC, request one.

·      Administrator-level permissions in IAM to enable Amazon ES and Amazon S3 to receive the log data from the EC2 instance through Firehose.

Getting Started
Begin by launching the AWS CloudFormation template to create the stack.

1.     In the AWS CloudFormation console, choose  to   launch-stack the AWS CloudFormation template. Make sure that you are in the US West (Oregon) region.

Note: If you want to download the template to your computer and then upload it to AWS CloudFormation, you can do so from this Amazon S3 bucket. Save the template to a location on your computer that’s easy to remember.

2.     Choose Next.

3.     On the Specify Details page, provide the following:

Screen Shot 2016-11-01 at 9.44.20 AM

a)    Stack Name: A name for your stack.

b)    InstanceType: Select the instance family for the EC2 instance hosting the web server.

c)     KeyName: Select the Amazon EC2 key pair in the US West (Oregon) Region.

d)    SSHLocation: The IP address range that can be used to connect to the EC2 instance by using SSH. Accept the default, 0.0.0.0/0.

e)    WebserverPort: The TCP/IP port of the web server. Accept the default, 80.

4.     Choose Next.

5.     On the Options page, optionally specify tags for your AWS CloudFormation template, and then choose Next.

createStack2

6.     On the Review page, review your template details. Select the Acknowledgement checkbox, and then choose Create to create the stack.

It takes about 10-15 minutes to create the entire stack.

Configure the Amazon Kinesis Agent
After AWS CloudFormation has created the stack, configure the Amazon Kinesis Agent.

1.     In the AWS CloudFormation console, choose the Resources tab to find the Firehose delivery stream name. You need this to configure the agent. Record this value because you will need it in step 3.

createStack3

2.     On the Outputs tab, find and record the public IP address of the web server. You need it to connect to the web server using SSH to configure the agent. For instructions on how to connect to an EC2 instance using SSH, see Connecting to Your Linux Instance Using SSH.

outputs

3. On the web server’s command line, run the following command:

sudo vi /etc/aws-kinesis/agent.json

This command opens the configuration file, agent.json, as follows.

{ "cloudwatch.emitMetrics": true, "firehose.endpoint": "firehose.us-west-2.amazonaws.com", "awsAccessKeyId": "", "awsSecretAccessKey": "", "flows": [ { "filePattern": "/var/log/httpd/access_log", "deliveryStream": "", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] } 

4.     For the deliveryStream key, type the value of the KinesisFirehoseDeliveryName that you retrieved from the stack’s Resources tab. After you type the value, save and terminate the agent.json file.

5.     Run the following command on the CLI:

sudo service aws-kinesis-agent restart

6.     On the AWS CloudFormation console choose the resources tab and note the name of the Amazon ES cluster corresponding to the LogicalID ESDomain.

7.     Go to AWS Management Console, and choose Amazon Elasticsearch Service. Under My Domains, you can see the Amazon ES domain that the AWS CloudFormation template created.

createStac5

Configure Kibana and View Your Apache Logs
Amazon ES provides a default installation of Kibana with every Amazon ES domain. You can find the Kibana endpoint on your domain dashboard in the Amazon ES console.

1.     In the Amazon ES console, choose the Kibana endpoint.

2.     In Kibana, for Index name or pattern, type logmonitor. logmonitor is the name of the AWS ES index that you created for the web server access logs. The health checks from Amazon Elastic Load Balancing generate access logs on the web server, which flow through the EKK pipeline to Kibana for discovery and visualization.

3.     In Time-field name, select datetime.

kibana1

4.     On the Kibana console, choose the Discover tab to see the Apache logs.

kibana3

Use Kibana to visualize the log data by creating bar charts, line and scatter plots, histograms, pie charts, etc.

Kibana4

Pie chart of IP addresses accessing the web server in the last 30 days

Kibana5

Bar chart of IP addresses accessing the web server in the last 5 minutes

You can graph information about http response, bytes, or IP address to provide meaningful insights on the Apache logs. Kibana also facilitates making dashboards by combining graphs.

Monitor Your Log Aggregator

To monitor the Firehose delivery stream, navigate to the Firehose console. Choose the stream, and then choose the Monitoring tab to see the Amazon CloudWatch metrics for the stream.

monito1

 

When log delivery fails, the Amazon S3 and Amazon ES logs help you troubleshoot. For example, the following screenshot shows logs when delivery to an Amazon ES destination fails because the date mapping on the index was not in line with the ingest log.

monitor2

Conclusion
In this post, we showed how to ship Apache logs to Kibana by using Amazon Kinesis Agent, Amazon ES, and Firehose. It’s worth pointing out that Firehose automatically scales up or down based on the rate at which your application generates logs. To learn more about scaling Amazon ES clusters, see the Amazon Elasticsearch Service Developer Guide.

Managed services like Amazon ES and Amazon Kinesis Firehose simplify provisioning and managing a log aggregation system. The ability to run SQL queries against your streaming log data using Amazon Kinesis Analytics further strengthens the case for using an EKK stack. The AWS CloudFormation template used in this post is available to extend and build your own EKK stack.

 

Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics

Post Syndicated from Chris Marshall original https://aws.amazon.com/blogs/big-data/real-time-clickstream-anomaly-detection-with-amazon-kinesis-analytics/

Chris Marshall is a Solutions Architect for Amazon Web Services

Analyzing web log traffic to gain insights that drive business decisions has historically been performed using batch processing.  While effective, this approach results in delayed responses to emerging trends and user activities.  There are solutions to deal with processing data in real time using streaming and micro-batching technologies, but they can be complex to set up and maintain.  Amazon Kinesis Analytics is a managed service that makes it very easy to identify and respond to changes in behavior in real-time.

One use case where it’s valuable to have immediate insights is analyzing clickstream data.   In the world of digital advertising, an impression is when an ad is displayed in a browser and a clickthrough represents a user clicking on that ad.  A clickthrough rate (CTR) is one way to monitor the ad’s effectiveness.  CTR is calculated in the form of: CTR = Clicks / Impressions * 100.  Digital marketers are interested in monitoring CTR to know when particular ads perform better than normal, giving them a chance to optimize placements within the ad campaign.  They may also be interested in anomalous low-end CTR that could be a result of a bad image or bidding model.

In this post, I show an analytics pipeline which detects anomalies in real time for a web traffic stream, using the RANDOM_CUT_FOREST function available in Amazon Kinesis Analytics.

RANDOM_CUT_FOREST 

Amazon Kinesis Analytics includes a powerful set of analytics functions to analyze streams of data.  One such function is RANDOM_CUT_FOREST.  This function detects anomalies by scoring data flowing through a dynamic data stream. This novel approach identifies a normal pattern in streaming data, then compares new data points in reference to it. For more information, see Robust Random Cut Forest Based Anomaly Detection On Streams.

Analytics pipeline components

To demonstrate how the RANDOM_CUT_FOREST function can be used to detect anomalies in real-time click through rates, I will walk you through how to build an analytics pipeline and generate web traffic using a simple Python script.  When your injected anomaly is detected, you get an email or SMS message to alert you to the event.

This post first walks through the components of the analytics pipeline, then discusses how to build and test it in your own AWS account.  The accompanying AWS CloudFormation script builds the Amazon API Gateway API, Amazon Kinesis streams, AWS Lambda function, Amazon SNS components, and IAM roles required for this walkthrough. Then you manually create the Amazon Kinesis Analytics application that uses the RANDOM_CUT_FOREST function in SQL.

Web Client

Because Python is a popular scripting language available on many clients, we’ll use it to generate web impressions and click data by making HTTP GET requests with the requests library.  This script mimics beacon traffic generated by web browsers.  The GET request contains a query string value of click or impression to indicate the type of beacon being captured.  The script has a while loop that iterates 2500 times.  For each pass through the loop, an Impression beacon is sent.  Then, the random function is used to potentially send an additional click beacon.  For most of the passes through the loop, the click request is generated at the rate of roughly 10% of the time.  However, for some web requests, the CTR jumps to a rate of 50% representing an anomaly.  These are artificially high CTR rates used for the purpose of illustration for this post, but the functionality would be the same using small fractional values

import requests
import random
import sys
import argparse

def getClicked(rate):
	if random.random() <= rate:
		return True
	else:
		return False

def httpGetImpression():
	url = args.target + '?browseraction=Impression' 
	r = requests.get(url)

def httpGetClick():
	url = args.target + '?browseraction=Click' 
	r = requests.get(url)
	sys.stdout.write('+')

parser = argparse.ArgumentParser()
parser.add_argument("target",help=" the http(s) location to send the GET request")
args = parser.parse_args()
i = 0
while (i < 2500):
	httpGetImpression()
	if(i<1950 or i>=2000):
		clicked = getClicked(.1)
		sys.stdout.write('_')
	else:
		clicked = getClicked(.5)
		sys.stdout.write('-')
	if(clicked):
		httpGetClick()
	i = i + 1
	sys.stdout.flush()

Web “front door”

Client web requests are handled by Amazon API Gateway, a fully managed service that makes it easy for developers to create, publish, maintain, monitor, and secure APIs at any scale. This handles the web request traffic to get data into your analytics pipeline by being a service proxy to an Amazon Kinesis stream.  API Gateway takes click and impression web traffic and converts the HTTP header and query string data into a JSON message and place it into your stream.  The CloudFormation script creates an API endpoint with a body mapping template similar to the following:

#set($inputRoot = $input.path('$'))
{
  "Data": "$util.base64Encode("{ ""browseraction"" : ""$input.params('browseraction')"", ""site"" : ""$input.params('Host')"" }")",
  "PartitionKey" : "$input.params('Host')",
  "StreamName" : "CSEKinesisBeaconInputStream"
}

(API Gateway service proxy body mapping template)

This tells API Gateway to take the host header and query string inputs and convert them into a payload to be put into a stream using a service proxy for Amazon Kinesis Streams.

Input stream

Amazon Kinesis Streams can capture terabytes of data per hour from thousands of sources.  In this case, you use it to deliver your JSON messages to Amazon Kinesis Analytics.

Analytics application

Amazon Kinesis Analytics processes the incoming messages and allow you to perform analytics on the dynamic stream of data.  You use SQL to calculate the CTR and pass that into your RANDOM_CUT_FOREST function to calculate an anomaly score.

First, calculate the counts of impressions and clicks using SQL with a tumbling window in Amazon Kinesis Analytics.  Analytics uses streams and pumps in SQL to process data.  See our earlier post to get an overview of streaming data with Kinesis Analytics. Inside the Analytics SQL, a stream is analogous to a table and a pump is the flow of data into those tables.  Here’s the description of streams for the impression and click data.

CREATE OR REPLACE STREAM "CLICKSTREAM" ( 
   "CLICKCOUNT" DOUBLE
);


CREATE OR REPLACE STREAM "IMPRESSIONSTREAM" ( 
   "IMPRESSIONCOUNT" DOUBLE
);

Later, you calculate the clicks divided by impressions; you are using a DOUBLE data type to contain the count values.  If you left them as integers, the division below would yield only a 0 or 1.  Next, you define the pumps that populate the stream using a tumbling window, a window of time during which all records received are considered as part of the statement.

CREATE OR REPLACE PUMP "CLICKPUMP" STARTED AS 
INSERT INTO "CLICKSTREAM" ("CLICKCOUNT") 
SELECT STREAM COUNT(*) 
FROM "SOURCE_SQL_STREAM_001"
WHERE "browseraction" = 'Click'
GROUP BY FLOOR(
  ("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00')
    SECOND / 10 TO SECOND
);
CREATE OR REPLACE PUMP "IMPRESSIONPUMP" STARTED AS 
INSERT INTO "IMPRESSIONSTREAM" ("IMPRESSIONCOUNT") 
SELECT STREAM COUNT(*) 
FROM "SOURCE_SQL_STREAM_001"
WHERE "browseraction" = 'Impression'
GROUP BY FLOOR(
  ("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00')
    SECOND / 10 TO SECOND
);

The GROUP BY statement uses FLOOR and ROWTIME to create a tumbling window of 10 seconds.  This tumbling window will output one record every ten seconds assuming we are receiving data from the corresponding pump.  In this case, you output the total number of records that were clicks or impressions.

Next, use the output of these pumps to calculate the CTR:

CREATE OR REPLACE STREAM "CTRSTREAM" (
  "CTR" DOUBLE
);

CREATE OR REPLACE PUMP "CTRPUMP" STARTED AS 
INSERT INTO "CTRSTREAM" ("CTR")
SELECT STREAM "CLICKCOUNT" / "IMPRESSIONCOUNT" * 100 as "CTR"
FROM "IMPRESSIONSTREAM",
  "CLICKSTREAM"
WHERE "IMPRESSIONSTREAM".ROWTIME = "CLICKSTREAM".ROWTIME;

Finally, these CTR values are used in RANDOM_CUT_FOREST to detect anomalous values.  The DESTINATION_SQL_STREAM is the output stream of data from the Amazon Kinesis Analytics application.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    "CTRPERCENT" DOUBLE,
    "ANOMALY_SCORE" DOUBLE
);

CREATE OR REPLACE PUMP "OUTPUT_PUMP" STARTED AS 
INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT STREAM * FROM
TABLE (RANDOM_CUT_FOREST( 
             CURSOR(SELECT STREAM "CTR" FROM "CTRSTREAM"), --inputStream
             100, --numberOfTrees (default)
             12, --subSampleSize 
             100000, --timeDecay (default)
             1) --shingleSize (default)
)
WHERE ANOMALY_SCORE > 2; 

The RANDOM_CUT_FOREST function greatly simplifies the programming required for anomaly detection.  However, understanding your data domain is paramount when performing data analytics.  The RANDOM_CUT_FOREST function is a tool for data scientists, not a replacement for them.  Knowing whether your data is logarithmic, circadian rhythmic, linear, etc. will provide the insights necessary to select the right parameters for RANDOM_CUT_FOREST.  For more information about parameters, see the RANDOM_CUT_FOREST Function.

Fortunately, the default values work in a wide variety of cases. In this case, use the default values for all but the subSampleSize parameter.  Typically, you would use a larger sample size to increase the pool of random samples used to calculate the anomaly score; for this post, use 12 samples so as to start evaluating the anomaly scores sooner.

Your SQL query outputs one record every ten seconds from the tumbling window so you’ll have enough evaluation values after two minutes to start calculating the anomaly score.  You are also using a cutoff value where records are only output to “DESTINATION_SQL_STREAM” if the anomaly score from the function is greater than 2 using the WHERE clause. To help visualize the cutoff point, here are the data points from a few runs through the pipeline using the sample Python script:

As you can see, the majority of the CTR data points are grouped around a 10% rate.  As the values move away from the 10% rate, their anomaly scores go up because they are more infrequent.  In the graph, an anomaly score above the cutoff is shaded orange. A cutoff value of 2 works well for this data set.

Output stream

The Amazon Kinesis Analytics application outputs the values from “DESTINATION_SQL_STREAM” to an Amazon Kinesis stream when the record has an anomaly score greater than 2.

Anomaly execution function

The output stream is an input event for an AWS Lambda function with a batch size of 1, so the function gets called one time for each message put in the output stream.  This function represents a place where you could react to anomalous events.  In a real world scenario, you may want to instead update a real-time bidding system to react to current events.  In this example, you send a message to SNS to see a tangible response to the changing CTR.  The CloudFormation template creates a Lambda function similar to the following:

var AWS = require('aws-sdk');
var sns = new AWS.SNS( { region: "<region>" });
	exports.handler = function(event, context) {
	//our batch size is 1 record so loop expected to process only once
	event.Records.forEach(function(record) {
		// Kinesis data is base64 encoded so decode here
		var payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
		var rec = payload.split(',');
		var ctr = rec[0];
		var anomaly_score = rec[1];
		var detail = 'Anomaly detected with a click through rate of ' + ctr + '% and an anomaly score of ' + anomaly_score;
		var subject = 'Anomaly Detected';
		var params = {
			Message: detail,
			MessageStructure: 'String',
			Subject: subject,
			TopicArn: 'arn:aws:sns:us-east-1::ClickStreamEvent'
		};
		sns.publish(params, function(err, data) {
			if (err) context.fail(err.stack);
			else     context.succeed('Published Notification');
		});
	});
}; 

Notification system

Amazon SNS is a fully managed, highly scalable messaging platform.  For this walkthrough, you send messages to SNS with subscribers that send email and text messages to a mobile phone.

Analytics pipeline walkthrough

Sometimes it’s best to build out a solution so you can see all the parts working and get a good sense of how it works.   Here are the steps to build out the entire pipeline as described above in your own account and perform real-time clickstream analysis yourself.  Following this walkthrough creates resources in your account that you can use to test the example pipeline.

Prerequisites

To complete the walkthrough, you’ll need an AWS account and a place to execute Python scripts.

Configure the CloudFormation template

Download the CloudFormation template named CSE-cfn.json from the AWS Big Data blog Github repository. In the CloudFormation console, set your region to N. Virginia, Oregon, or Ireland and then browse to that file.

When your script executes and an anomaly is detected, SNS sends a notification.  To see these in an email or text message, enter your email address and mobile phone number in the Parameters section and choose Next.

This CloudFormation script creates policies and roles necessary to process the incoming messages.  Acknowledge this by selecting the field or you will get a warning about IAM_CAPABILITY.

If you entered a valid email or SMS number, you will receive a validation message when the SNS subscriptions are created by CloudFormation.  If you don’t wish to receive email or texts from your pipeline, you don’t need to complete the verification process.

Run the Python script

Copy ClickImpressionGenerator.py to a local folder.

This script uses the requests library to make HTTP requests.  If you don’t have that Python package globally installed, you can install it locally in by running the following command in the same folder as the Python script:

pip install requests –t .

When the CloudFormation stack is complete, choose Outputs, ExecutionCommand, and run the command from the folder with the Python script.

This will start generating web traffic and send it to the API Gateway in the front of your analytics pipeline.  It is important to have data flowing into your Amazon Kinesis Analytics application for the next section so that a schema can be generated based on the incoming data.  If the script completes before you finish with the next section, simply restart the script with the same command.

Create the Analytics pipeline application

Open the Amazon Kinesis console and choose Go to Analytics.

Choose Create new application to create the Kinesis Analytics application, enter an application name and description, and then choose Save and continue.

Choose Connect to a source to see a list of streams.  Select the stream that starts with the name of your CloudFormation stack and contains CSEKinesisBeaconInputStream in the form of: <stack name>- CSEKinesisBeaconInputStream-<random string>.  This is the stream that your click and impression requests will be sent to from API Gateway.

Scroll down and choose Choose an IAM role.  Select the role with the name in the form of <stack name>-CSEKinesisAnalyticsRole-<random string>.  If your ClickImpressionGenerator.py script is still running and generating data, you should see a stream sample.  Scroll to the bottom and choose Save and continue.

Next, add the SQL for your real-time analytics.  Choose the Go to SQL editor, choose Yes, start application, and paste the SQL contents from CSE-SQL.sql into the editor. Choose Save and run SQL.  After a minute or so, you should see data showing up every ten seconds in the CLICKSTREAM.  Choose Exit when you are done editing.

Choose Connect to a destination and select the <stack name>-CSEKinesisBeaconOutputStream-<random string> from the list of streams.  Change the output format to CSV.

Choose Choose an IAM role and select <stack name>-CSEKinesisAnalyticsRole-<random string> again.  Choose Save and continue.  Now your analytics pipeline is complete.  When your script gets to the injected anomaly section, you should get a text message or email to notify you of the anomaly.  To clean up the resources created here, delete the stack in CloudFormation, stop the Kinesis Analytics application then delete it.

Summary

A pipeline like this can be used for many use cases where anomaly detection is valuable. What solutions have you enabled with this architecture? If you have questions or suggestions, please comment below.

 


Related

Process Amazon Kinesis Aggregated Data with AWS Lambda

Writing SQL on Streaming Data with Amazon Kinesis Analytics – Part 2

Post Syndicated from Ryan Nienhuis original https://aws.amazon.com/blogs/big-data/writing-sql-on-streaming-data-with-amazon-kinesis-analytics-part-2/

Ryan Nienhuis is a Senior Product Manager for Amazon Kinesis.

This is the second of two AWS Big Data posts on Writing SQL on Streaming Data with Amazon Kinesis Analytics. In the last post, I provided an overview of streaming data and key concepts, such as the basics of streaming SQL, and completed a walkthrough using a simple example. In this post, I cover more advanced stream processing concepts using Amazon Kinesis Analytics and you can complete an end-to-end application.


Amazon Kinesis Analytics allows you to easily write SQL ­­­on streaming data, providing a powerful way to build a stream processing application in minutes. The service allows you to connect to streaming data sources, process the data with sub-second latencies, and continuously emit results to downstream destinations for use in real-time alerts, dashboards, or further analysis.

This post introduces you to the different types of windows supported by Amazon Kinesis Analytics, the importance of time as it relates to stream data processing, and best practices for sending your SQL results to a configured destination. These are more advanced stream data processing concepts; for an introduction, see Writing SQL on Streaming Data with Amazon Kinesis Analytics Part 1.

Generating real-time analytics using windows

As I covered in the first post, streaming data is continuously generated; therefore, you need to specify bounds when processing data to make your result set deterministic. Some SQL statements operate on individual rows and have natural bounds, such as a continuous filter that evaluates each row based upon a defined SQL WHERE clause. However, SQL statements that process data across rows need to have set bounds, such as calculating the average of particular column. The mechanism that provides these bounds is a window.

Windows are important because they define the bounds for which you want your query to operate. The starting bound is usually the current row that Amazon Kinesis Analytics is processing, and the window defines the ending bound.

Windows are required with any query that works across rows, because the in-application stream is unbounded and windows provide a mechanism to bind the result set and make the query deterministic. Analytics supports three types of windows: tumbling, sliding, and custom.

Tumbling windows

Tumbling windows are useful for periodic reports. You can use a tumbling window to compute an average number of visitors to your website in the last 5 minutes, or the maximum over the past hour. A single result is emitted for each key in the group as specified by the clause at the end of the defined window.

An important characteristic of a tumbling window is that the bounds do not overlap; the start of a new tumbling window begins with the end of the old window. The below image visually captures how a 5-minute tumbling window would aggregate results, in separate windows that are not overlapping. The blue, green, and red windows represent minutes 0 to 5, 5 to 10, and 10 to 15, respectively.

Tumbling windows are always included in a GROUP BY clause and use the FLOOR function. The FLOOR function takes the lowest possible integer for a given input. To create a tumbling time window, convert a timestamp to an integer representation and put it in a FLOOR function to create the window bound. Three examples are shown below:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
	ticker_symbol varchar(4), 
	ticker_symbol_count integer);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol, COUNT(*) AS ticker_symbol_count
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, 
/* 10 second tumbling window */
--FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND) ;
/* 1 minute tumbling window */
-- FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE);
/* 2 minute tumbling window */
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 2 TO MINUTE);

This tumbling window syntax requires some explanation. Look at the first example:

FLOOR((“SOURCE_SQL_STREAM_001”.ROWTIME – TIMESTAMP ‘1970-01-01 00:00:00’) SECOND / 10 TO SECOND)

  • “SOURCE_SQL_STREAM_001”.ROWTIME – TIMESTAMP ‘1970-01-01 00:00:00’

Timestamps in Amazon Kinesis Analytics are stored as signed 64-bit integers, with 0 representing the Unix epoch (‘1970-01-01 00:00:00’). This statement calculates the number of seconds after the Unix epoch as an integer. For more information about the timestamp data type, see Data Types in the Amazon Kinesis Analytics SQL Reference Guide.

  • (<seconds since Unix epoch>) SECOND / 10 TO SECOND

This statement converts the integer back to seconds, divides by 10, and then specifies the precision to seconds. The result is the number of seconds rounded up to 10 after the Unix epoch.

  • FLOOR(<seconds rounded up to nearest 10th second since Unix epoch)

This FLOOR function takes the highest possible integer given a range. This function gets to a deterministic result when it sees a result that is no longer part of the last range (i.e., a new window). Therefore, the first row with a ROWTIME value that rounds up to the nearest 10th second triggers a result.

Sliding windows

Sliding windows are useful in real-time monitoring use cases because they provide the most up-to-date information, and allow you to easily control the sensitivity of the metric through the window size. For example, you can use a sliding window to compute the average availability (or success rate) of a public API call.

Sliding windows are a fixed size like tumbling windows, but each subsequent window overlaps with the previous one, as shown in the image below. Sliding windows emit new results any time a new row enters the window.

For each new row entering the window, a new row is generated with the aggregated result. For this reason, sliding windows are typically used in conjunction with conditions in a WHERE clause; for example, only emitting the average availability of a public API action if the result breaches a specific threshold, as specified by the WHERE clause. An example of a sliding window with a WHERE clause is shown below.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
	ticker_symbol varchar(4), 
	avg_change double);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol, avg_change
FROM (
    SELECT STREAM
        ticker_symbol, 
        AVG(change) OVER W1 as avg_change
    FROM "SOURCE_SQL_STREAM_001"
    WINDOW W1 AS (PARTITION BY ticker_symbol RANGE INTERVAL '10' SECOND PRECEDING)
)
WHERE ABS(avg_change) > 1;

The above example is a nested query. The nested SELECT statement calculates the average change over a 10 second sliding window, defined as W1. This window partitions results by the ticker symbol (PARTITION BY ticker symbol), and uses an interval to define the start and end of the query (RANGE INTERVAL ‘10’ SECOND PRECEDING). The outside SELECT statement filters results where the absolute value of the average change is greater than 1.

Custom windows

The final type of window are custom windows, which are useful for event correlation use cases such as sessionization. Sessionization is a common web and mobile analytics use case, and refers to grouping together all actions completed by a user during a session to describe their end-to-end experience. For example, it is extremely useful to determine the most successful route to purchasing a product on an ecommerce website. Custom windows are ideal for event correlation use cases because they do not have a fixed size; they end when a specific criteria is met. I will cover customer windows in more detail in a later post.

Time semantics in Amazon Kinesis Analytics

In streaming data, there are different types of time; how they are used is important to the analytics.  All of the above examples use ROWTIME, or the processing time, which is great for some use cases. However, in many scenarios, you want a time that more accurately reflects when the event occurred, such as the event or ingest time.

Amazon Kinesis Analytics supports three different time semantics for processing data:

  • Event time

The timestamp is assigned when the event occurred, also called client-side time. For example, the timestamp that is added in your client application, such as a web or mobile application. Event time is often desirable in analytics because it represents the actual time when the event occurred in the environment. However, many sources, such as mobile phones and web clients, do not have reliable clocks, which can lead to inaccurate timestamps. Moreover, connectivity issues can lead to records appearing in a stream out of order relative to when they occurred.

If you have a reasonably accurate clock on your source, event time is earlier than processing time.

  • Ingest time

The timestamp is assigned when a record is added to the stream, also called server-side time. Amazon Kinesis Streams includes a field called ApproximateArrivalTimeStamp in every record that provides this timestamp. The ingest time is often a close approximation of event time. If there is a delay in the record being delivered to the stream, this can lead to inaccuracies; however, this rarely happens and usually only when devices lose connectivity. Finally, while the ingest time is rarely out of order, this can occur due to the distributed nature of streaming data. Therefore, ingest time should be considered a mostly accurate and in-order reflection of the event time.

For all practical purposes, ingest time is earlier than processing time.

  • Processing time

The timestamp when Amazon Kinesis Analytics inserts a row in the first in-application stream. Analytics provides this timestamp in the ROWTIME column that exists in each in-application stream. The processing time is always monotonically increasing, but it will not be accurate if your application falls behind. This ROWTIME is very accurate in relation to the wall clock, but may not be the time when the event actually occurred.

A scenario may help you understand how these times manifest themselves in the real world. Say that you have a mobile application for which you want to count the number of unique users using your app every 1 minute and send the data to your live analytics store. You capture each user event, append a timestamp (event time), and send it to an Amazon Kinesis stream. In most cases, these events arrive as they occur, with less than a second delay. However, some customers have inaccurate clocks which cause the appended event timestamp to be wrong. Fortunately, another timestamp is added to each event in the stream that is accurate, giving you a good approximation of when the event occurred.

You now set up an Amazon Kinesis Analytics application to read from the stream. Your Analytics application computes the unique users of your mobile application using the ROWTIME (processing time). After your application has been in production for a week, you find that you need some additional logic to make the query more accurate and now you need to re-process all records from the last 24 hours. When you start your application from earlier in the stream, the time used as an analytic is based off ROWTIME, which is right now. This means the time associated with your computation is incorrect.

How do you accurately compute the unique users of your mobile application?

One of the recommended approaches for gracefully handling out of order events and achieving an accurate logical time for your analytics is to use two time windows. The first window uses the processing time, or ROWTIME, to trigger when results are emitted. The second window uses the ingest time, or ApproximateArrivalTimeStamp, as the logical time that reflects when the event occurs. This strategy handles situations where data arrives late and when your applications falls behind.

In the first post, you built an application that read from a demo stream containing simulated stock ticker data. The SQL code you wrote calculated the average price, average change, maximum price, and maximum change over a 10-second tumbling time window. In this post, you are going to update that application to use two windows to gracefully handle out-of-order events and applications failing behind. You then send those final results to an Amazon Redshift table through an Amazon Kinesis Firehose delivery stream. The final solution:

  1. Ingests and reliably stores streaming data in an Amazon Kinesis stream
  2. Parses and validates the raw data through the source configuration of your Amazon Kinesis Analytics application
  3. Computes useful metrics on tumbling time windows using your Analytics application SQL code
  4. Transforms SQL results into CSV format using the destination configuration of your Analytics application
  5. Ingests and reliable delivers CSV data to an Amazon Redshift table through Firehose

Calculating simple metrics quickly and making them available to business and IT users in minutes is a common use case for streaming data. Many customers use long ETL pipelines for this scenario that are difficult to maintain and have hours to days of delay before a stakeholder can get visibility into what is happening with the business. You will use the stock ticker demo stream to perform a count of the number of transactions and the average/maximum/minimum prices and change for every stock over 1 minute.

Walkthrough:  Completing your first end-to-end application

You can jump right in if you completed the walkthrough in Part 1; go to the “Create the destination for your SQL results” section. If not, take five minutes to complete the following two setup steps.

Setup step 1:  Create an Amazon Kinesis Analytics application

  1. Open the Amazon Kinesis Analytics console and choose Create a new application.

  1. Provide a name and (optional) description for your application and choose Continue.

You are taken to the application hub page.

Setup step 2:  Create a streaming data source

For input, Analytics supports Amazon Kinesis Streams and Amazon Kinesis Firehose as streaming data input, and reference data input through S3. The primary difference between these two sources is that data is read continuously from the streaming data sources and at one time for reference data sources. Reference data sources are used for joining against the incoming stream to enrich the data.

In Amazon Kinesis Analytics, choose Connect to a source.

If you have existing Amazon Kinesis streams or Firehose delivery streams, they are shown here.

For the purposes of this post, you use a demo stream, which creates and populates a stream with sample data on your behalf. The demo stream is created under your account with a single shard, which supports up to 1 MB/sec of write throughput and 2 MB/sec of read throughput. Analytics writes simulated stock ticker data to the demo stream directly from your browser and your application reads data from the stream in real time.

Next, choose Configure a new stream and Create demo stream.

Later, you refer to the demo stream in your SQL code as “SOURCE_SQL_STREAM_001”. Analytics calls the DiscoverInputSchema API operation, which infers a schema by sampling records from your selected input data stream. You can see the applied schema on your data in the formatted sample shown in the browser, as well as the original sample taken from the raw stream. You can then edit the schema to fine tune it to your needs.

Feel free to explore; when you are ready, choose Save and continue. You are taken back to the streaming application hub.

On the streaming application hub, choose Go to SQL Editor and Run Application.

Copy and paste the following code into the text editor.

CREATE OR REPLACE STREAM "INTERMEDIATE_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "INTERMEDIATE_SQL_STREAM"
SELECT STREAM   ticker_symbol,
                sector,
                COUNT(*) AS ticker_symbol_count,
                AVG(price) as avg_price,
                AVG(change) as avg_change,
                MAX(price) as max_price,
                MIN(price) as min_price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, sector, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE PUMP "STREAM_PUMP_02" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM   ticker_symbol, sector, ticker_symbol_count, avg_price, avg_change, max_price, min_price
FROM "INTERMEDIATE_SQL_STREAM";
WHERE sector = 'TECHNOLOGY';

Choose Save and run SQL.

Create the destination for your SQL results

Amazon Kinesis Analytics supports up to four destinations, which can be any combination of an Amazon S3 bucket, Elasticsearch cluster, or Amazon Redshift table through a Firehose delivery stream; or an Amazon Kinesis stream. If your specific destination isn’t supported, you can write to your SQL results to a stream and then use AWS Lambda to send it to your desired destination.

For this walkthrough, you are going to add an output that delivers data to an Amazon Redshift table using a Firehose delivery stream. There are many advantages to using Firehose to load data into Amazon Redshift, including the separation of the data processing and delivery.

In this context, Firehose serves as a buffer between the application and the eventual destination. The application doesn’t have to wait for data to be loaded to continue processing. If the Amazon Redshift cluster were to become temporarily unavailable for any reason, the application would still be able to process the latest data in the stream. This is especially important when you have multiple destinations; you don’t want to block all processing because a single destination is misconfigured or down.

Here are the steps for this walkthrough:

  1. Create a cluster, delivery stream, and destination table with a predefined CloudFormation template.
  2. Create a target table.
  3. Update the Firehose delivery stream and Analytics application.
  4. Update window timing.
  5. Set up the destination.
  6. Verify results.
  7. Clean up.

Create a cluster, delivery stream, and destination table

You need to create (a) an Amazon Redshift cluster, (b) a Firehose delivery stream, and (c) the Amazon Redshift table to which the SQL results are delivered. This would take about 10 minutes to do by hand, so I have created an AWS CloudFormation template that automates these steps for you.

AWS CloudFormation is a service that helps you model and set up your Amazon Web Services resources so that you can spend less time managing those resources and more time focusing on your applications that run in AWS. You provide a template to the service, and the service automatically provisions and sets up the resources for you.

  1. Open the CloudFormation console and choose Create stack.
  2. Choose Specify an Amazon S3 template URL and copy and paste the following link:

https://s3.amazonaws.com/amazon-kinesis-analytics/KA-demo-cf-template-v2.json

  1. Choose Next.
  2. Review the parameters and provide a stack name and Amazon Redshift password. Remember this password as you need it later. Choose Next.
  3. Review the optional parameters and choose Next.
  4. Review the final creation steps and choose Create. The template should take a couple of minutes to complete.

Create the target table in Amazon Redshift

After the stack has been created, you can use a tool like SQL Workbench to create the target table in Amazon Redshift. The target table will look very similar to the output in-application stream you create in your Analytics application. For more information about setting up SQL Workbench with Amazon Redshift, see Connect to Your Cluster by Using SQL Workbench/J.

Next, run the following SQL statement to create the target Amazon Redshift table:

CREATE TABLE stock_ticker_counts (
    process_time	TIMESTAMP,    
    ingest_time	TIMESTAMP,
    ticker_symbol       VARCHAR(4),
    sector	       VARCHAR(16),  
    ticker_symbol_count INTEGER,
    average_price       DECIMAL(10,2),
    average_change      DECIMAL(10,2),    
    maximum_price       DECIMAL(10,2),
    minimum_price       DECIMAL(10,2));

Update the Firehose delivery stream and Analytics application

After the Amazon Kinesis Firehose delivery stream “KADemoAggregatedTickerData” is created, open the Firehose console and edit the Amazon Redshift table column names above.

process_time, ingest_time, ticker_symbol, sector, ticker_symbol_count, average_price, average_change, maximum_price, minimum_price

Next, update the Amazon Kinesis Analytics application with the final SQL code to match the columns of the destination.

I built on the previous blog post for this walkthrough. The starting SQL code is as follows:

CREATE OR REPLACE STREAM "INTERMEDIATE_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "INTERMEDIATE_SQL_STREAM"
SELECT STREAM   ticker_symbol,
                sector,
                COUNT(*) AS ticker_symbol_count,
                AVG(price) as avg_price,
                AVG(change) as avg_change,
                MAX(price) as max_price,
                MIN(price) as min_price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, sector, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE PUMP "STREAM_PUMP_02" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM   ticker_symbol, sector, ticker_symbol_count, avg_price, avg_change, max_price, min_price
FROM "INTERMEDIATE_SQL_STREAM"
WHERE sector = 'TECHNOLOGY';

Remove the second step in the application code that performs the continuous filter and inserts data into “DESTINATION_SQL_STREAM”.

  • Delete the second CREATE PUMP statement.
  • Delete the second CREATE STREAM statement.
  • Update the first two SQL statements to reference “DESTINATION_SQL_STREAM” instead of “INTERMEDIATE_SQL_STREAM”. Your code should now look like the following:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol VARCHAR(4),
    sector VARCHAR(16), 
    ticker_symbol_count INTEGER,
    avg_price DECIMAL(10,2),
    avg_change DECIMAL(10,2),
    max_price DECIMAL(10,2),
    min_price DECIMAL(10,2));

CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM   ticker_symbol,
                sector,
                COUNT(*) AS ticker_symbol_count,
                AVG(price) AS avg_price,
                AVG(change) AS avg_change,
                MAX(price) AS max_price,
                MIN(price) AS min_price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, sector, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

Add some additional statements to your query, including an additional window for the ingest time.

  • Add the sector to the SELECT and GROUP BY clause:
  • Add the process and ingest time to the SELECT clause:
ROWTIME AS process_time,
(TIMESTAMP '1970-01-01 00:00:00' + 10 * FLOOR(("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND)) AS ingest_time,

You add this last calculation for ingest_time in the SELECT and GROUP BY clauses in order to add the second window. Similar to the previous explanation, this calculation emits the 10-second intervals of the ingest time.

  • Add a window that uses ingest time to the GROUP BY clause:
FLOOR(("INPUT_SQL_STREAM".APPROXIMATE_ARRIVAL_TIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

The second tumbling window you added uses the stream approximate arrival timestamp, also known as the ingest time. Now that you have added the second window, ingest_time becomes the logical timestamp that is used in the window. The previous window uses ROWTIME (the processing time), which is now simply a trigger to determine when results are emitted.

  • Finally, update the target stream to match your new output.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    process_time		TIMESTAMP,    
    ingest_time		TIMESTAMP,
    ticker_symbol	VARCHAR(4),
    sector 		VARCHAR(16), 
    ticker_symbol_count	INTEGER,
    avg_price 		DECIMAL(10,2),
    avg_change 		DECIMAL(10,2),
    max_price 		DECIMAL(10,2),
    min_price 		DECIMAL(10,2));

Your application code should look like the following:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    process_time		TIMESTAMP,    
    ingest_time		TIMESTAMP,
    ticker_symbol		VARCHAR(4),
    sector 			VARCHAR(16), 
    ticker_symbol_count	INTEGER,
    avg_price 		DECIMAL(10,2),
    avg_change 		DECIMAL(10,2),
    max_price 		DECIMAL(10,2),
    min_price 		DECIMAL(10,2));


CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM	
        ROWTIME AS process_time,
        (TIMESTAMP '1970-01-01 00:00:00' + 10 * FLOOR(("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND)) AS ingest_time,
        ticker_symbol,
        sector,
		COUNT(*),
		AVG(price),     
		MAX(price),     
		MIN(price),
		AVG(change)
FROM "SOURCE_SQL_STREAM_001"
GROUP BY	ticker_symbol, sector,
		FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 30 TO SECOND),
		FLOOR(("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

Choose Save and Run and see the results.

ROWTIME is now just a trigger, and the ingest_time value becomes the time-series key used in the analysis, along with ticker_symbol. This approach gracefully handles rows that arrive late and can be combined for a final accurate result in the destination.

Update window timing

Make one final change before you set up the destination. I find that using short time windows while I am testing and developing code is much easier to work with because I see the results with little delay (in this example, it’s a 10-second window so that you have a 10-second delay). However, I don’t need that level of granularity in my Amazon Redshift table, so I update my windows to be 1 minute. The updated code is shown below.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    process_time		TIMESTAMP,    
    ingest_time		TIMESTAMP,
    ticker_symbol		VARCHAR(4),
    sector 			VARCHAR(16), 
    ticker_symbol_count	INTEGER,
    avg_price 		DECIMAL(10,2),
    avg_change 		DECIMAL(10,2),
    max_price 		DECIMAL(10,2),
    min_price 		DECIMAL(10,2));


CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM	
	ROWTIME as process_time,
	FLOOR("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME TO MINUTE) AS ingest_time,
	ticker_symbol,
	sector,
	COUNT(*),
	AVG(price),     
	MAX(price),     
	MIN(price),
	AVG(change)
FROM "SOURCE_SQL_STREAM_001"
GROUP BY	ticker_symbol, sector,
		FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE),
		FLOOR("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME TO MINUTE);

Choose Save and Run and see the results.

Add the destination to the application

In your application code, you write the output of SQL statements to one or more in-application streams. You can optionally add output configuration to your application to persist everything written to an in-application stream to an external destination such as an Amazon Kinesis stream or a Firehose delivery stream. This allows you to complete your use case of sending to a downstream analytics tool, or by reacting to your data in real-time.

  1. Set up the destination by choosing your application name at the top of the page and then choosing Add a destination. You are shown a list of your streams that can be used as a destination. The Firehose delivery stream that you created to deliver data to Amazon Redshift should be listed.
  2. Select the Firehose delivery stream “KADemoAggregatedTickerData” that you just created with CloudFormation.
  3. Choose CSV as output format, select a role, and choose Save.

Verify results

The end-to-end application is now running and emits results to the Amazon Redshift table continuously. Go back to SQL Workbench and see the results arrive. It takes approximately 2 minutes for the results to show up in your Amazon Redshift table. This is because you have a 1-minute tumbling window in your Analytics application code and a 1-minute buffer in Firehose.

In SQL Workbench, execute the following query:

SELECT * FROM stock_ticker_counts;

Combine all the results for the late-arriving rows, to get a final, accurate count.

SELECT 	ingest_time,
	ticker_symbol,
	sector, 
	sum(ticker_symbol_count) AS ticker_symbol_count, 
	sum(ticker_symbol_count * average_price) / SUM(ticker_symbol_count) AS average_price,
	sum(ticker_symbol_count * average_change) / SUM(ticker_symbol_count) AS average_change,
	MAX(maximum_price) AS maximum_price, 
	MIN(minimum_price) AS minimum_price 
FROM stock_ticker_counts
GROUP BY ingest_time, ticker_symbol, sector;

The query shows final and accurate results, and seamlessly handles out-of-order rows because of the added second window. For me, the results are making it to Amazon Redshift less than two minutes after the data was generated.

Clean up

The final step is to clean up. Take the following steps to avoid incurring charges.

  1. Delete the Streams demo stream.
  2. Stop the Analytics application.
  3. Delete the CloudFormation stack for the Amazon Redshift cluster and Firehose delivery stream.

Summary

Previously, real-time stream data processing was only accessible to those with the technical skills to build and manage a complex application. With Amazon Kinesis Analytics, anyone familiar with ANSI-Standard SQL can build and deploy a stream data processing application in minutes.

This end-to-end solution provided a managed and elastic data processing pipeline using Analytics that calculates useful results over streaming data. Results are calculated as they arrive, and Firehose delivers them to a persistent analytics store, Amazon Redshift.

It’s simple to get this solution working for your use case. All that is required is to replace the Amazon Kinesis demo stream with your own, and then set up data producers. From there, configure the analytics and you have an end-to-end solution for capturing, processing, and durably storing streaming data.

If you have questions or suggestions, please comment below.

—————————–

Related

Querying Amazon Kinesis Streams Directly with SQL and Spark Streaming