Tag Archives: Amazon Kinesis Analytics

Real-Time Hotspot Detection in Amazon Kinesis Analytics

Post Syndicated from Randall Hunt original https://aws.amazon.com/blogs/aws/real-time-hotspot-detection-in-amazon-kinesis-analytics/

Today we’re releasing a new machine learning feature in Amazon Kinesis Data Analytics for detecting “hotspots” in your streaming data. We launched Kinesis Data Analytics in August of 2016 and we’ve continued to add features since. As you may already know, Kinesis Data Analytics is a fully managed real-time processing engine for streaming data that lets you write SQL queries to derive meaning from your data and output the results to Kinesis Data Firehose, Kinesis Data Streams, or even an AWS Lambda function. The new HOTSPOT function adds to the existing machine learning capabilities in Kinesis that allow customers to leverage unsupervised streaming based machine learning algorithms. Customers don’t need to be experts in data science or machine learning to take advantage of these capabilities.

Hotspots

The HOTSPOTS function is a new Kinesis Data Analytics SQL function you can use to idenitfy relatively dense regions in your data without having to explicity build and train complicated machine learning models. You can identify subsections of your data that need immediate attention and take action programatically by streaming the hotspots out to a Kinesis Data stream, to a Firehose delivery stream, or by invoking a AWS Lambda function.

There are a ton of really cool scenarios where this could make your operations easier. Imagine a ride-share program or autonomous vehicle fleet communicating spatiotemporal data about traffic jams and congestion, or a datacenter where a number of servers start to overheat indicating an HVAC issue. HOTSPOTS is not limited to spatiotemporal data and you could apply it across many problem domains.

The function follows some simple syntax and accepts the DOUBLE, INTEGER, FLOAT, TINYINT, SMALLINT, REAL, and BIGINT data types.

The HOTSPOT function takes a cursor as input and returns a JSON string describing the hotspot. This will be easier to understand with an example.

Using Kinesis Data Analytics to Detect Hotspots

Let’s take a simple data set from NY Taxi and Limousine Commission that tracks yellow cab pickup and dropoff locations. Most of this data is already on S3 and publicly accessible at s3://nyc-tlc/. We will create a small python script to load our Kinesis Data Stream with Taxi records which will feed our Kinesis Data Analytics. Finally we’ll output all of this to a Kinesis Data Firehose connected to an Amazon Elasticsearch Service cluster for visualization with Kibana. I know from living in New York for 5 years that we’ll probably find a hotspot or two in this data.

First, we’ll create an input Kinesis stream and start sending our NYC Taxi Ride data into it. I just wrote a quick python script to read from one of the CSV files and used boto3 to push the records into Kinesis. You can put the record in whatever way works for you.

 

import csv
import json
import boto3
def chunkit(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

kinesis = boto3.client("kinesis")
with open("taxidata2.csv") as f:
    reader = csv.DictReader(f)
    records = chunkit([{"PartitionKey": "taxis", "Data": json.dumps(row)} for row in reader], 500)
    for chunk in records:
        kinesis.put_records(StreamName="TaxiData", Records=chunk)

Next, we’ll create the Kinesis Data Analytics application and add our input stream with our taxi data as the source.

Next we’ll automatically detect the schema.

Now we’ll create a quick SQL Script to detect our hotspots and add that to the Real Time Analytics section of our application.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    "pickup_longitude" DOUBLE,
    "pickup_latitude" DOUBLE,
    HOTSPOTS_RESULT VARCHAR(10000)
); 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT "pickup_longitude", "pickup_latitude", "HOTSPOTS_RESULT" FROM
        TABLE(HOTSPOTS(
            CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            1000,
            0.013,
            20
        )
    );


Our HOTSPOTS function takes an input stream, a window size, scan radius, and a minimum number of points to count as a hotspot. The values for these are application dependent but you can tinker with them in the console easily until you get the results you want. There are more details about the parameters themselves in the documentation. The HOTSPOTS_RESULT returns some useful JSON that would let us plot bounding boxes around our hotspots:

{
  "hotspots": [
    {
      "density": "elided",
      "minValues": [40.7915039, -74.0077401],
      "maxValues": [40.7915041, -74.0078001]
    }
  ]
}

 

When we have our desired results we can save the script and connect our application to our Amazon Elastic Search Service Firehose Delivery Stream. We can run an intermediate lambda function in the firehose to transform our record into a format more suitable for geographic work. Then we can update our mapping in Elasticsearch to index the hotspot objects as Geo-Shapes.

Finally, we can connect to Kibana and visualize the results.

Looks like Manhattan is pretty busy!

Available Now
This feature is available now in all existing regions with Kinesis Data Analytics. I think this is a really interesting new feature of Kinesis Data Analytics that can bring immediate value to many applications. Let us know what you build with it on Twitter or in the comments!

Randall

Best Practices for Running Apache Kafka on AWS

Post Syndicated from Prasad Alle original https://aws.amazon.com/blogs/big-data/best-practices-for-running-apache-kafka-on-aws/

This post was written in partnership with Intuit to share learnings, best practices, and recommendations for running an Apache Kafka cluster on AWS. Thanks to Vaishak Suresh and his colleagues at Intuit for their contribution and support.

Intuit, in their own words: Intuit, a leading enterprise customer for AWS, is a creator of business and financial management solutions. For more information on how Intuit partners with AWS, see our previous blog post, Real-time Stream Processing Using Apache Spark Streaming and Apache Kafka on AWS. Apache Kafka is an open-source, distributed streaming platform that enables you to build real-time streaming applications.

The best practices described in this post are based on our experience in running and operating large-scale Kafka clusters on AWS for more than two years. Our intent for this post is to help AWS customers who are currently running Kafka on AWS, and also customers who are considering migrating on-premises Kafka deployments to AWS.

AWS offers Amazon Kinesis Data Streams, a Kafka alternative that is fully managed.

Running your Kafka deployment on Amazon EC2 provides a high performance, scalable solution for ingesting streaming data. AWS offers many different instance types and storage option combinations for Kafka deployments. However, given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.

In this blog post, we cover the following aspects of running Kafka clusters on AWS:

  • Deployment considerations and patterns
  • Storage options
  • Instance types
  • Networking
  • Upgrades
  • Performance tuning
  • Monitoring
  • Security
  • Backup and restore

Note: While implementing Kafka clusters in a production environment, make sure also to consider factors like your number of messages, message size, monitoring, failure handling, and any operational issues.

Deployment considerations and patterns

In this section, we discuss various deployment options available for Kafka on AWS, along with pros and cons of each option. A successful deployment starts with thoughtful consideration of these options. Considering availability, consistency, and operational overhead of the deployment helps when choosing the right option.

Single AWS Region, Three Availability Zones, All Active

One typical deployment pattern (all active) is in a single AWS Region with three Availability Zones (AZs). One Kafka cluster is deployed in each AZ along with Apache ZooKeeper and Kafka producer and consumer instances as shown in the illustration following.

In this pattern, this is the Kafka cluster deployment:

  • Kafka producers and Kafka cluster are deployed on each AZ.
  • Data is distributed evenly across three Kafka clusters by using Elastic Load Balancer.
  • Kafka consumers aggregate data from all three Kafka clusters.

Kafka cluster failover occurs this way:

  • Mark down all Kafka producers
  • Stop consumers
  • Debug and restack Kafka
  • Restart consumers
  • Restart Kafka producers

Following are the pros and cons of this pattern.

Pros Cons
  • Highly available
  • Can sustain the failure of two AZs
  • No message loss during failover
  • Simple deployment

 

  • Very high operational overhead:
    • All changes need to be deployed three times, one for each Kafka cluster
    • Maintaining and monitoring three Kafka clusters
    • Maintaining and monitoring three consumer clusters

A restart is required for patching and upgrading brokers in a Kafka cluster. In this approach, a rolling upgrade is done separately for each cluster.

Single Region, Three Availability Zones, Active-Standby

Another typical deployment pattern (active-standby) is in a single AWS Region with a single Kafka cluster and Kafka brokers and Zookeepers distributed across three AZs. Another similar Kafka cluster acts as a standby as shown in the illustration following. You can use Kafka mirroring with MirrorMaker to replicate messages between any two clusters.

In this pattern, this is the Kafka cluster deployment:

  • Kafka producers are deployed on all three AZs.
  • Only one Kafka cluster is deployed across three AZs (active).
  • ZooKeeper instances are deployed on each AZ.
  • Brokers are spread evenly across all three AZs.
  • Kafka consumers can be deployed across all three AZs.
  • Standby Kafka producers and a Multi-AZ Kafka cluster are part of the deployment.

Kafka cluster failover occurs this way:

  • Switch traffic to standby Kafka producers cluster and Kafka cluster.
  • Restart consumers to consume from standby Kafka cluster.

Following are the pros and cons of this pattern.

Pros Cons
  • Less operational overhead when compared to the first option
  • Only one Kafka cluster to manage and consume data from
  • Can handle single AZ failures without activating a standby Kafka cluster
  • Added latency due to cross-AZ data transfer among Kafka brokers
  • For Kafka versions before 0.10, replicas for topic partitions have to be assigned so they’re distributed to the brokers on different AZs (rack-awareness)
  • The cluster can become unavailable in case of a network glitch, where ZooKeeper does not see Kafka brokers
  • Possibility of in-transit message loss during failover

Intuit recommends using a single Kafka cluster in one AWS Region, with brokers distributing across three AZs (single region, three AZs). This approach offers stronger fault tolerance than otherwise, because a failed AZ won’t cause Kafka downtime.

Storage options

There are two storage options for file storage in Amazon EC2:

Ephemeral storage is local to the Amazon EC2 instance. It can provide high IOPS based on the instance type. On the other hand, Amazon EBS volumes offer higher resiliency and you can configure IOPS based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. Your choice of storage is closely related to the type of workload supported by your Kafka cluster.

Kafka provides built-in fault tolerance by replicating data partitions across a configurable number of instances. If a broker fails, you can recover it by fetching all the data from other brokers in the cluster that host the other replicas. Depending on the size of the data transfer, it can affect recovery process and network traffic. These in turn eventually affect the cluster’s performance.

The following table contrasts the benefits of using an instance store versus using EBS for storage.

Instance store EBS
  • Instance storage is recommended for large- and medium-sized Kafka clusters. For a large cluster, read/write traffic is distributed across a high number of brokers, so the loss of a broker has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important, but a failed broker takes longer and requires more network traffic for a smaller Kafka cluster.
  • Storage-optimized instances like h1, i3, and d2 are an ideal choice for distributed applications like Kafka.

 

  • The primary advantage of using EBS in a Kafka deployment is that it significantly reduces data-transfer traffic when a broker fails or must be replaced. The replacement broker joins the cluster much faster.
  • Data stored on EBS is persisted in case of an instance failure or termination. The broker’s data stored on an EBS volume remains intact, and you can mount the EBS volume to a new EC2 instance. Most of the replicated data for the replacement broker is already available in the EBS volume and need not be copied over the network from another broker. Only the changes made after the original broker failure need to be transferred across the network. That makes this process much faster.

 

 

Intuit chose EBS because of their frequent instance restacking requirements and also other benefits provided by EBS.

Generally, Kafka deployments use a replication factor of three. EBS offers replication within their service, so Intuit chose a replication factor of two instead of three.

Instance types

The choice of instance types is generally driven by the type of storage required for your streaming applications on a Kafka cluster. If your application requires ephemeral storage, h1, i3, and d2 instances are your best option.

Intuit used r3.xlarge instances for their brokers and r3.large for ZooKeeper, with ST1 (throughput optimized HDD) EBS for their Kafka cluster.

Here are sample benchmark numbers from Intuit tests.

Configuration Broker bytes (MB/s)
  • r3.xlarge
  • ST1 EBS
  • 12 brokers
  • 12 partitions

 

Aggregate 346.9

If you need EBS storage, then AWS has a newer-generation r4 instance. The r4 instance is superior to R3 in many ways:

  • It has a faster processor (Broadwell).
  • EBS is optimized by default.
  • It features networking based on Elastic Network Adapter (ENA), with up to 10 Gbps on smaller sizes.
  • It costs 20 percent less than R3.

Note: It’s always best practice to check for the latest changes in instance types.

Networking

The network plays a very important role in a distributed system like Kafka. A fast and reliable network ensures that nodes can communicate with each other easily. The available network throughput controls the maximum amount of traffic that Kafka can handle. Network throughput, combined with disk storage, is often the governing factor for cluster sizing.

If you expect your cluster to receive high read/write traffic, select an instance type that offers 10-Gb/s performance.

In addition, choose an option that keeps interbroker network traffic on the private subnet, because this approach allows clients to connect to the brokers. Communication between brokers and clients uses the same network interface and port. For more details, see the documentation about IP addressing for EC2 instances.

If you are deploying in more than one AWS Region, you can connect the two VPCs in the two AWS Regions using cross-region VPC peering. However, be aware of the networking costs associated with cross-AZ deployments.

Upgrades

Kafka has a history of not being backward compatible, but its support of backward compatibility is getting better. During a Kafka upgrade, you should keep your producer and consumer clients on a version equal to or lower than the version you are upgrading from. After the upgrade is finished, you can start using a new protocol version and any new features it supports. There are three upgrade approaches available, discussed following.

Rolling or in-place upgrade

In a rolling or in-place upgrade scenario, upgrade one Kafka broker at a time. Take into consideration the recommendations for doing rolling restarts to avoid downtime for end users.

Downtime upgrade

If you can afford the downtime, you can take your entire cluster down, upgrade each Kafka broker, and then restart the cluster.

Blue/green upgrade

Intuit followed the blue/green deployment model for their workloads, as described following.

If you can afford to create a separate Kafka cluster and upgrade it, we highly recommend the blue/green upgrade scenario. In this scenario, we recommend that you keep your clusters up-to-date with the latest Kafka version. For additional details on Kafka version upgrades or more details, see the Kafka upgrade documentation.

The following illustration shows a blue/green upgrade.

In this scenario, the upgrade plan works like this:

  • Create a new Kafka cluster on AWS.
  • Create a new Kafka producers stack to point to the new Kafka cluster.
  • Create topics on the new Kafka cluster.
  • Test the green deployment end to end (sanity check).
  • Using Amazon Route 53, change the new Kafka producers stack on AWS to point to the new green Kafka environment that you have created.

The roll-back plan works like this:

  • Switch Amazon Route 53 to the old Kafka producers stack on AWS to point to the old Kafka environment.

For additional details on blue/green deployment architecture using Kafka, see the re:Invent presentation Leveraging the Cloud with a Blue-Green Deployment Architecture.

Performance tuning

You can tune Kafka performance in multiple dimensions. Following are some best practices for performance tuning.

 These are some general performance tuning techniques:

  • If throughput is less than network capacity, try the following:
    • Add more threads
    • Increase batch size
    • Add more producer instances
    • Add more partitions
  • To improve latency when acks =-1, increase your num.replica.fetches value.
  • For cross-AZ data transfer, tune your buffer settings for sockets and for OS TCP.
  • Make sure that num.io.threads is greater than the number of disks dedicated for Kafka.
  • Adjust num.network.threads based on the number of producers plus the number of consumers plus the replication factor.
  • Your message size affects your network bandwidth. To get higher performance from a Kafka cluster, select an instance type that offers 10 Gb/s performance.

For Java and JVM tuning, try the following:

  • Minimize GC pauses by using the Oracle JDK, which uses the new G1 garbage-first collector.
  • Try to keep the Kafka heap size below 4 GB.

Monitoring

Knowing whether a Kafka cluster is working correctly in a production environment is critical. Sometimes, just knowing that the cluster is up is enough, but Kafka applications have many moving parts to monitor. In fact, it can easily become confusing to understand what’s important to watch and what you can set aside. Items to monitor range from simple metrics about the overall rate of traffic, to producers, consumers, brokers, controller, ZooKeeper, topics, partitions, messages, and so on.

For monitoring, Intuit used several tools, including Newrelec, Wavefront, Amazon CloudWatch, and AWS CloudTrail. Our recommended monitoring approach follows.

For system metrics, we recommend that you monitor:

  • CPU load
  • Network metrics
  • File handle usage
  • Disk space
  • Disk I/O performance
  • Garbage collection
  • ZooKeeper

For producers, we recommend that you monitor:

  • Batch-size-avg
  • Compression-rate-avg
  • Waiting-threads
  • Buffer-available-bytes
  • Record-queue-time-max
  • Record-send-rate
  • Records-per-request-avg

For consumers, we recommend that you monitor:

  • Batch-size-avg
  • Compression-rate-avg
  • Waiting-threads
  • Buffer-available-bytes
  • Record-queue-time-max
  • Record-send-rate
  • Records-per-request-avg

Security

Like most distributed systems, Kafka provides the mechanisms to transfer data with relatively high security across the components involved. Depending on your setup, security might involve different services such as encryption, Kerberos, Transport Layer Security (TLS) certificates, and advanced access control list (ACL) setup in brokers and ZooKeeper. The following tells you more about the Intuit approach. For details on Kafka security not covered in this section, see the Kafka documentation.

Encryption at rest

For EBS-backed EC2 instances, you can enable encryption at rest by using Amazon EBS volumes with encryption enabled. Amazon EBS uses AWS Key Management Service (AWS KMS) for encryption. For more details, see Amazon EBS Encryption in the EBS documentation. For instance store–backed EC2 instances, you can enable encryption at rest by using Amazon EC2 instance store encryption.

Encryption in transit

Kafka uses TLS for client and internode communications.

Authentication

Authentication of connections to brokers from clients (producers and consumers) to other brokers and tools uses either Secure Sockets Layer (SSL) or Simple Authentication and Security Layer (SASL).

Kafka supports Kerberos authentication. If you already have a Kerberos server, you can add Kafka to your current configuration.

Authorization

In Kafka, authorization is pluggable and integration with external authorization services is supported.

Backup and restore

The type of storage used in your deployment dictates your backup and restore strategy.

The best way to back up a Kafka cluster based on instance storage is to set up a second cluster and replicate messages using MirrorMaker. Kafka’s mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. Depending on your setup and requirements, your backup cluster might be in the same AWS Region as your main cluster or in a different one.

For EBS-based deployments, you can enable automatic snapshots of EBS volumes to back up volumes. You can easily create new EBS volumes from these snapshots to restore. We recommend storing backup files in Amazon S3.

For more information on how to back up in Kafka, see the Kafka documentation.

Conclusion

In this post, we discussed several patterns for running Kafka in the AWS Cloud. AWS also provides an alternative managed solution with Amazon Kinesis Data Streams, there are no servers to manage or scaling cliffs to worry about, you can scale the size of your streaming pipeline in seconds without downtime, data replication across availability zones is automatic, you benefit from security out of the box, Kinesis Data Streams is tightly integrated with a wide variety of AWS services like Lambda, Redshift, Elasticsearch and it supports open source frameworks like Storm, Spark, Flink, and more. You may refer to kafka-kinesis connector.

If you have questions or suggestions, please comment below.


Additional Reading

If you found this post useful, be sure to check out Implement Serverless Log Analytics Using Amazon Kinesis Analytics and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.


About the Author

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

 

 

Optimize Delivery of Trending, Personalized News Using Amazon Kinesis and Related Services

Post Syndicated from Yukinori Koide original https://aws.amazon.com/blogs/big-data/optimize-delivery-of-trending-personalized-news-using-amazon-kinesis-and-related-services/

This is a guest post by Yukinori Koide, an the head of development for the Newspass department at Gunosy.

Gunosy is a news curation application that covers a wide range of topics, such as entertainment, sports, politics, and gourmet news. The application has been installed more than 20 million times.

Gunosy aims to provide people with the content they want without the stress of dealing with a large influx of information. We analyze user attributes, such as gender and age, and past activity logs like click-through rate (CTR). We combine this information with article attributes to provide trending, personalized news articles to users.

In this post, I show you how to process user activity logs in real time using Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, and related AWS services.

Why does Gunosy need real-time processing?

Users need fresh and personalized news. There are two constraints to consider when delivering appropriate articles:

  • Time: Articles have freshness—that is, they lose value over time. New articles need to reach users as soon as possible.
  • Frequency (volume): Only a limited number of articles can be shown. It’s unreasonable to display all articles in the application, and users can’t read all of them anyway.

To deliver fresh articles with a high probability that the user is interested in them, it’s necessary to include not only past user activity logs and some feature values of articles, but also the most recent (real-time) user activity logs.

We optimize the delivery of articles with these two steps.

  1. Personalization: Deliver articles based on each user’s attributes, past activity logs, and feature values of each article—to account for each user’s interests.
  2. Trends analysis/identification: Optimize delivering articles using recent (real-time) user activity logs—to incorporate the latest trends from all users.

Optimizing the delivery of articles is always a cold start. Initially, we deliver articles based on past logs. We then use real-time data to optimize as quickly as possible. In addition, news has a short freshness time. Specifically, day-old news is past news, and even the news that is three hours old is past news. Therefore, shortening the time between step 1 and step 2 is important.

To tackle this issue, we chose AWS for processing streaming data because of its fully managed services, cost-effectiveness, and so on.

Solution

The following diagrams depict the architecture for optimizing article delivery by processing real-time user activity logs

There are three processing flows:

  1. Process real-time user activity logs.
  2. Store and process all user-based and article-based logs.
  3. Execute ad hoc or heavy queries.

In this post, I focus on the first processing flow and explain how it works.

Process real-time user activity logs

The following are the steps for processing user activity logs in real time using Kinesis Data Streams and Kinesis Data Analytics.

  1. The Fluentd server sends the following user activity logs to Kinesis Data Streams:
{"article_id": 12345, "user_id": 12345, "action": "click"}
{"article_id": 12345, "user_id": 12345, "action": "impression"}
...
  1. Map rows of logs to columns in Kinesis Data Analytics.

  1. Set the reference data to Kinesis Data Analytics from Amazon S3.

a. Gunosy has user attributes such as gender, age, and segment. Prepare the following CSV file (user_id, gender, segment_id) and put it in Amazon S3:

101,female,1
102,male,2
103,female,3
...

b. Add the application reference data source to Kinesis Data Analytics using the AWS CLI:

$ aws kinesisanalytics add-application-reference-data-source \
  --application-name <my-application-name> \
  --current-application-version-id <version-id> \
  --reference-data-source '{
  "TableName": "REFERENCE_DATA_SOURCE",
  "S3ReferenceDataSource": {
    "BucketARN": "arn:aws:s3:::<my-bucket-name>",
    "FileKey": "mydata.csv",
    "ReferenceRoleARN": "arn:aws:iam::<account-id>:role/..."
  },
  "ReferenceSchema": {
    "RecordFormat": {
      "RecordFormatType": "CSV",
      "MappingParameters": {
        "CSVMappingParameters": {"RecordRowDelimiter": "\n", "RecordColumnDelimiter": ","}
      }
    },
    "RecordEncoding": "UTF-8",
    "RecordColumns": [
      {"Name": "USER_ID", "Mapping": "0", "SqlType": "INTEGER"},
      {"Name": "GENDER",  "Mapping": "1", "SqlType": "VARCHAR(32)"},
      {"Name": "SEGMENT_ID", "Mapping": "2", "SqlType": "INTEGER"}
    ]
  }
}'

This application reference data source can be referred on Kinesis Data Analytics.

  1. Run a query against the source data stream on Kinesis Data Analytics with the application reference data source.

a. Define the temporary stream named TMP_SQL_STREAM.

CREATE OR REPLACE STREAM "TMP_SQL_STREAM" (
  GENDER VARCHAR(32), SEGMENT_ID INTEGER, ARTICLE_ID INTEGER
);

b. Insert the joined source stream and application reference data source into the temporary stream.

CREATE OR REPLACE PUMP "TMP_PUMP" AS
INSERT INTO "TMP_SQL_STREAM"
SELECT STREAM
  R.GENDER, R.SEGMENT_ID, S.ARTICLE_ID, S.ACTION
FROM      "SOURCE_SQL_STREAM_001" S
LEFT JOIN "REFERENCE_DATA_SOURCE" R
  ON S.USER_ID = R.USER_ID;

c. Define the destination stream named DESTINATION_SQL_STREAM.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  TIME TIMESTAMP, GENDER VARCHAR(32), SEGMENT_ID INTEGER, ARTICLE_ID INTEGER, 
  IMPRESSION INTEGER, CLICK INTEGER
);

d. Insert the processed temporary stream, using a tumbling window, into the destination stream per minute.

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
  ROW_TIME AS TIME,
  GENDER, SEGMENT_ID, ARTICLE_ID,
  SUM(CASE ACTION WHEN 'impression' THEN 1 ELSE 0 END) AS IMPRESSION,
  SUM(CASE ACTION WHEN 'click' THEN 1 ELSE 0 END) AS CLICK
FROM "TMP_SQL_STREAM"
GROUP BY
  GENDER, SEGMENT_ID, ARTICLE_ID,
  FLOOR("TMP_SQL_STREAM".ROWTIME TO MINUTE);

The results look like the following:

  1. Insert the results into Amazon Elasticsearch Service (Amazon ES).
  2. Batch servers get results from Amazon ES every minute. They then optimize delivering articles with other data sources using a proprietary optimization algorithm.

How to connect a stream to another stream in another AWS Region

When we built the solution, Kinesis Data Analytics was not available in the Asia Pacific (Tokyo) Region, so we used the US West (Oregon) Region. The following shows how we connected a data stream to another data stream in the other Region.

There is no need to continue containing all components in a single AWS Region, unless you have a situation where a response difference at the millisecond level is critical to the service.

Benefits

The solution provides benefits for both our company and for our users. Benefits for the company are cost savings—including development costs, operational costs, and infrastructure costs—and reducing delivery time. Users can now find articles of interest more quickly. The solution can process more than 500,000 records per minute, and it enables fast and personalized news curating for our users.

Conclusion

In this post, I showed you how we optimize trending user activities to personalize news using Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, and related AWS services in Gunosy.

AWS gives us a quick and economical solution and a good experience.

If you have questions or suggestions, please comment below.


Additional Reading

If you found this post useful, be sure to check out Implement Serverless Log Analytics Using Amazon Kinesis Analytics and Joining and Enriching Streaming Data on Amazon Kinesis.


About the Authors

Yukinori Koide is the head of development for the Newspass department at Gunosy. He is working on standardization of provisioning and deployment flow, promoting the utilization of serverless and containers for machine learning and AI services. His favorite AWS services are DynamoDB, Lambda, Kinesis, and ECS.

 

 

 

Akihiro Tsukada is a start-up solutions architect with AWS. He supports start-up companies in Japan technically at many levels, ranging from seed to later-stage.

 

 

 

 

Yuta Ishii is a solutions architect with AWS. He works with our customers to provide architectural guidance for building media & entertainment services, helping them improve the value of their services when using AWS.

 

 

 

 

 

Now Available: New Digital Training to Help You Learn About AWS Big Data Services

Post Syndicated from Sara Snedeker original https://aws.amazon.com/blogs/big-data/now-available-new-digital-training-to-help-you-learn-about-aws-big-data-services/

AWS Training and Certification recently released free digital training courses that will make it easier for you to build your cloud skills and learn about using AWS Big Data services. This training includes courses like Introduction to Amazon EMR and Introduction to Amazon Athena.

You can get free and unlimited access to more than 100 new digital training courses built by AWS experts at aws.training. It’s easy to access training related to big data. Just choose the Analytics category on our Find Training page to browse through the list of courses. You can also use the keyword filter to search for training for specific AWS offerings.

Recommended training

Just getting started, or looking to learn about a new service? Check out the following digital training courses:

Introduction to Amazon EMR (15 minutes)
Covers the available tools that can be used with Amazon EMR and the process of creating a cluster. It includes a demonstration of how to create an EMR cluster.

Introduction to Amazon Athena (10 minutes)
Introduces the Amazon Athena service along with an overview of its operating environment. It covers the basic steps in implementing Athena and provides a brief demonstration.

Introduction to Amazon QuickSight (10 minutes)
Discusses the benefits of using Amazon QuickSight and how the service works. It also includes a demonstration so that you can see Amazon QuickSight in action.

Introduction to Amazon Redshift (10 minutes)
Walks you through Amazon Redshift and its core features and capabilities. It also includes a quick overview of relevant use cases and a short demonstration.

Introduction to AWS Lambda (10 minutes)
Discusses the rationale for using AWS Lambda, how the service works, and how you can get started using it.

Introduction to Amazon Kinesis Analytics (10 minutes)
Discusses how Amazon Kinesis Analytics collects, processes, and analyzes streaming data in real time. It discusses how to use and monitor the service and explores some use cases.

Introduction to Amazon Kinesis Streams (15 minutes)
Covers how Amazon Kinesis Streams is used to collect, process, and analyze real-time streaming data to create valuable insights.

Introduction to AWS IoT (10 minutes)
Describes how the AWS Internet of Things (IoT) communication architecture works, and the components that make up AWS IoT. It discusses how AWS IoT works with other AWS services and reviews a case study.

Introduction to AWS Data Pipeline (10 minutes)
Covers components like tasks, task runner, and pipeline. It also discusses what a pipeline definition is, and reviews the AWS services that are compatible with AWS Data Pipeline.

Go deeper with classroom training

Want to learn more? Enroll in classroom training to learn best practices, get live feedback, and hear answers to your questions from an instructor.

Big Data on AWS (3 days)
Introduces you to cloud-based big data solutions such as Amazon EMR, Amazon Redshift, Amazon Kinesis, and the rest of the AWS big data platform.

Data Warehousing on AWS (3 days)
Introduces you to concepts, strategies, and best practices for designing a cloud-based data warehousing solution, and demonstrates how to collect, store, and prepare data for the data warehouse.

Building a Serverless Data Lake (1 day)
Teaches you how to design, build, and operate a serverless data lake solution with AWS services. Includes topics such as ingesting data from any data source at large scale, storing the data securely and durably, using the right tool to process large volumes of data, and understanding the options available for analyzing the data in near-real time.

More training coming in 2018

We’re always evaluating and expanding our training portfolio, so stay tuned for more training options in the new year. You can always visit us at aws.training to explore our latest offerings.

AWS CloudFormation Supports Amazon Kinesis Analytics Applications

Post Syndicated from Ryan Nienhuis original https://aws.amazon.com/blogs/big-data/aws-cloudformation-supports-amazon-kinesis-analytics-applications/

You can now provision and manage resources for Amazon Kinesis Analytics applications using AWS CloudFormation.  Kinesis Analytics is the easiest way to process streaming data in real time with standard SQL, without having to learn new programming languages or processing frameworks. Kinesis Analytics enables you to query streaming data or build entire streaming applications using SQL. Using the service, you gain actionable insights and can respond to your business and customer needs promptly.

Customers can create CloudFormation templates that easily create or update Kinesis Analytics applications. Typically, a template is used as a way to manage code across different environments, or to prototype a new streaming data solution quickly.

We have created two sample templates using past AWS Big Data Blog posts that referenced Kinesis Analytics.

For more information about the new feature, see the AWS Cloudformation User Guide.

 

Perform Near Real-time Analytics on Streaming Data with Amazon Kinesis and Amazon Elasticsearch Service

Post Syndicated from Tristan Li original https://aws.amazon.com/blogs/big-data/perform-near-real-time-analytics-on-streaming-data-with-amazon-kinesis-and-amazon-elasticsearch-service/

Nowadays, streaming data is seen and used everywhere—from social networks, to mobile and web applications, IoT devices, instrumentation in data centers, and many other sources. As the speed and volume of this type of data increases, the need to perform data analysis in real time with machine learning algorithms and extract a deeper understanding from the data becomes ever more important. For example, you might want a continuous monitoring system to detect sentiment changes in a social media feed so that you can react to the sentiment in near real time.

In this post, we use Amazon Kinesis Streams to collect and store streaming data. We then use Amazon Kinesis Analytics to process and analyze the streaming data continuously. Specifically, we use the Kinesis Analytics built-in RANDOM_CUT_FOREST function, a machine learning algorithm, to detect anomalies in the streaming data. Finally, we use Amazon Kinesis Firehose to export the anomalies data to Amazon Elasticsearch Service (Amazon ES). We then build a simple dashboard in the open source tool Kibana to visualize the result.

Solution overview

The following diagram depicts a high-level overview of this solution.

Amazon Kinesis Streams

You can use Amazon Kinesis Streams to build your own streaming application. This application can process and analyze streaming data by continuously capturing and storing terabytes of data per hour from hundreds of thousands of sources.

Amazon Kinesis Analytics

Kinesis Analytics provides an easy and familiar standard SQL language to analyze streaming data in real time. One of its most powerful features is that there are no new languages, processing frameworks, or complex machine learning algorithms that you need to learn.

Amazon Kinesis Firehose

Kinesis Firehose is the easiest way to load streaming data into AWS. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service.

Amazon Elasticsearch Service

Amazon ES is a fully managed service that makes it easy to deploy, operate, and scale Elasticsearch for log analytics, full text search, application monitoring, and more.

Solution summary

The following is a quick walkthrough of the solution that’s presented in the diagram:

  1. IoT sensors send streaming data into Kinesis Streams. In this post, you use a Python script to simulate an IoT temperature sensor device that sends the streaming data.
  2. By using the built-in RANDOM_CUT_FOREST function in Kinesis Analytics, you can detect anomalies in real time with the sensor data that is stored in Kinesis Streams. RANDOM_CUT_FOREST is also an appropriate algorithm for many other kinds of anomaly-detection use cases—for example, the media sentiment example mentioned earlier in this post.
  3. The processed anomaly data is then loaded into the Kinesis Firehose delivery stream.
  4. By using the built-in integration that Kinesis Firehose has with Amazon ES, you can easily export the processed anomaly data into the service and visualize it with Kibana.

Implementation steps

The following sections walk through the implementation steps in detail.

Creating the delivery stream

  1. Open the Amazon Kinesis Streams console.
  2. Create a new Kinesis stream. Give it a name that indicates it’s for raw incoming stream data—for example, RawStreamData. For Number of shards, type 1.
  3. The Python code provided below simulates a streaming application, such as an IoT device, and generates random data and anomalies into a Kinesis stream. The code generates two temperature ranges, where the first range is the hypothetical sensor’s normal operating temperature range (10–20), and the second is the anomaly temperature range (100–120).Make sure to change the stream name on line 16 and 20 and the Region on line 6 to match your configuration. Alternatively, you can download the Amazon Kinesis Data Generator from this repository and use it to generate the data.
    import json
    import datetime
    import random
    import testdata
    from boto import kinesis
    
    kinesis = kinesis.connect_to_region("us-east-1")
    
    def getData(iotName, lowVal, highVal):
       data = {}
       data["iotName"] = iotName
       data["iotValue"] = random.randint(lowVal, highVal) 
       return data
    
    while 1:
       rnd = random.random()
       if (rnd < 0.01):
          data = json.dumps(getData("DemoSensor", 100, 120))  
          kinesis.put_record("RawStreamData", data, "DemoSensor")
          print '***************************** anomaly ************************* ' + data
       else:
          data = json.dumps(getData("DemoSensor", 10, 20))  
          kinesis.put_record("RawStreamData", data, "DemoSensor")
          print data

  4. Open the Amazon Elasticsearch Service console and create a new domain.
    1. Give the domain a unique name. In the Configure cluster screen, use the default settings.
    2. In the Set up access policy screen, in the Set the domain access policy list, choose Allow access to the domain from specific IP(s).
    3. Enter the public IP address of your computer.
      Note: If you’re working behind a proxy or firewall, see the “Use a proxy to simplify request signing” section in this AWS Database blog post to learn how to work with a proxy. For additional information about securing access to your Amazon ES domain, see How to Control Access to Your Amazon Elasticsearch Domain in the AWS Security Blog.
  5. After the Amazon ES domain is up and running, you can set up and configure Kinesis Firehose to export results to Amazon ES:
    1. Open the Amazon Kinesis Firehose console and choose Create Delivery Stream.
    2. In the Destination dropdown list, choose Amazon Elasticsearch Service.
    3. Type a stream name, and choose the Amazon ES domain that you created in Step 4.
    4. Provide an index name and ES type. In the S3 bucket dropdown list, choose Create New S3 bucket. Choose Next.
    5. In the configuration, change the Elasticsearch Buffer size to 1 MB and the Buffer interval to 60s. Use the default settings for all other fields. This shortens the time for the data to reach the ES cluster.
    6. Under IAM Role, choose Create/Update existing IAM role.
      The best practice is to create a new role every time. Otherwise, the console keeps adding policy documents to the same role. Eventually the size of the attached policies causes IAM to reject the role, but it does it in a non-obvious way, where the console basically quits functioning.
    7. Choose Next to move to the Review page.
  6. Review the configuration, and then choose Create Delivery Stream.
  7. Run the Python file for 1–2 minutes, and then press Ctrl+C to stop the execution. This loads some data into the stream for you to visualize in the next step.

Analyzing the data

Now it’s time to analyze the IoT streaming data using Amazon Kinesis Analytics.

  1. Open the Amazon Kinesis Analytics console and create a new application. Give the application a name, and then choose Create Application.
  2. On the next screen, choose Connect to a source. Choose the raw incoming data stream that you created earlier. (Note the stream name Source_SQL_STREAM_001 because you will need it later.)
  3. Use the default settings for everything else. When the schema discovery process is complete, it displays a success message with the formatted stream sample in a table as shown in the following screenshot. Review the data, and then choose Save and continue.
  4. Next, choose Go to SQL editor. When prompted, choose Yes, start application.
  5. Copy the following SQL code and paste it into the SQL editor window.
    CREATE OR REPLACE STREAM "TEMP_STREAM" (
       "iotName"        varchar (40),
       "iotValue"   integer,
       "ANOMALY_SCORE"  DOUBLE);
    -- Creates an output stream and defines a schema
    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "iotName"       varchar(40),
       "iotValue"       integer,
       "ANOMALY_SCORE"  DOUBLE,
       "created" TimeStamp);
     
    -- Compute an anomaly score for each record in the source stream
    -- using Random Cut Forest
    CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "TEMP_STREAM"
    SELECT STREAM "iotName", "iotValue", ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    );
    
    -- Sort records by descending anomaly score, insert into output stream
    CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "iotName", "iotValue", ANOMALY_SCORE, ROWTIME FROM "TEMP_STREAM"
    ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

 

  1. Choose Save and run SQL.
    As the application is running, it displays the results as stream data arrives. If you don’t see any data coming in, run the Python script again to generate some fresh data. When there is data, it appears in a grid as shown in the following screenshot.Note that you are selecting data from the source stream name Source_SQL_STREAM_001 that you created previously. Also note the ANOMALY_SCORE column. This is the value that the Random_Cut_Forest function calculates based on the temperature ranges provided by the Python script. Higher (anomaly) temperature ranges have a higher score.Looking at the SQL code, note that the first two blocks of code create two new streams to store temporary data and the final result. The third block of code analyzes the raw source data (Stream_Pump_1) using the Random_Cut_Forest function. It calculates an anomaly score (ANOMALY_SCORE) and inserts it into the TEMP_STREAM stream. The final code block loads the result stored in the TEMP_STREAM into DESTINATION_SQL_STREAM.
  2. Choose Exit (done editing) next to the Save and run SQL button to return to the application configuration page.

Load processed data into the Kinesis Firehose delivery stream

Now, you can export the result from DESTINATION_SQL_STREAM into the Amazon Kinesis Firehose stream that you created previously.

  1. On the application configuration page, choose Connect to a destination.
  2. Choose the stream name that you created earlier, and use the default settings for everything else. Then choose Save and Continue.
  3. On the application configuration page, choose Exit to Kinesis Analytics applications to return to the Amazon Kinesis Analytics console.
  4. Run the Python script again for 4–5 minutes to generate enough data to flow through Amazon Kinesis Streams, Kinesis Analytics, Kinesis Firehose, and finally into the Amazon ES domain.
  5. Open the Kinesis Firehose console, choose the stream, and then choose the Monitoring
  6. As the processed data flows into Kinesis Firehose and Amazon ES, the metrics appear on the Delivery Stream metrics page. Keep in mind that the metrics page takes a few minutes to refresh with the latest data.
  7. Open the Amazon Elasticsearch Service dashboard in the AWS Management Console. The count in the Searchable documents column increases as shown in the following screenshot. In addition, the domain shows a cluster health of Yellow. This is because, by default, it needs two instances to deploy redundant copies of the index. To fix this, you can deploy two instances instead of one.

Visualize the data using Kibana

Now it’s time to launch Kibana and visualize the data.

  1. Use the ES domain link to go to the cluster detail page, and then choose the Kibana link as shown in the following screenshot.

    If you’re working behind a proxy or firewall, see the “Use a proxy to simplify request signing” section in this blog post to learn how to work with a proxy.
  2. In the Kibana dashboard, choose the Discover tab to perform a query.
  3. You can also visualize the data using the different types of charts offered by Kibana. For example, by going to the Visualize tab, you can quickly create a split bar chart that aggregates by ANOMALY_SCORE per minute.


Conclusion

In this post, you learned how to use Amazon Kinesis to collect, process, and analyze real-time streaming data, and then export the results to Amazon ES for analysis and visualization with Kibana. If you have comments about this post, add them to the “Comments” section below. If you have questions or issues with implementing this solution, please open a new thread on the Amazon Kinesis or Amazon ES discussion forums.


Next Steps

Take your skills to the next level. Learn real-time clickstream anomaly detection with Amazon Kinesis Analytics.

 


About the Author

Tristan Li is a Solutions Architect with Amazon Web Services. He works with enterprise customers in the US, helping them adopt cloud technology to build scalable and secure solutions on AWS.

 

 

 

 

Under the Hood of Server-Side Encryption for Amazon Kinesis Streams

Post Syndicated from Damian Wylie original https://aws.amazon.com/blogs/big-data/under-the-hood-of-server-side-encryption-for-amazon-kinesis-streams/

Customers are using Amazon Kinesis Streams to ingest, process, and deliver data in real time from millions of devices or applications. Use cases for Kinesis Streams vary, but a few common ones include IoT data ingestion and analytics, log processing, clickstream analytics, and enterprise data bus architectures.

Within milliseconds of data arrival, applications (KCL, Apache Spark, AWS Lambda, Amazon Kinesis Analytics) attached to a stream are continuously mining value or delivering data to downstream destinations. Customers are then scaling their streams elastically to match demand. They pay incrementally for the resources that they need, while taking advantage of a fully managed, serverless streaming data service that allows them to focus on adding value closer to their customers.

These benefits are great; however, AWS learned that many customers could not take advantage of Kinesis Streams unless their data-at-rest within a stream was encrypted. Many customers did not want to manage encryption on their own, so they asked for a fully managed, automatic, server-side encryption mechanism leveraging centralized AWS Key Management Service (AWS KMS) customer master keys (CMK).

Motivated by this feedback, AWS added another fully managed, low cost aspect to Kinesis Streams by delivering server-side encryption via KMS managed encryption keys (SSE-KMS) in the following regions:

  • US East (N. Virginia)
  • US West (Oregon)
  • US West (N. California)
  • EU (Ireland)
  • Asia Pacific (Singapore)
  • Asia Pacific (Tokyo)

In this post, I cover the mechanics of the Kinesis Streams server-side encryption feature. I also share a few best practices and considerations so that you can get started quickly.

Understanding the mechanics

The following section walks you through how Kinesis Streams uses CMKs to encrypt a message in the PutRecord or PutRecords path before it is propagated to the Kinesis Streams storage layer, and then decrypt it in the GetRecords path after it has been retrieved from the storage layer.

When server-side encryption is enabled—which takes just a few clicks in the console—the partition key and payload for every incoming record is encrypted automatically as it’s flowing into Kinesis Streams, using the selected CMK. When data is at rest within a stream, it’s encrypted.

When records are retrieved through a GetRecords request from the encrypted stream, they are decrypted automatically as they are flowing out of the service. That means your Kinesis Streams producers and consumers do not need to be aware of encryption. You have a fully managed data encryption feature at your fingertips, which can be enabled within seconds.

AWS also makes it easy to audit the application of server-side encryption. You can use the AWS Management Console for instant stream-level verification; the responses from PutRecord, PutRecords, and getRecords; or AWS CloudTrail.

Calling PutRecord or PutRecords

When server-side encryption is enabled for a particular stream, Kinesis Streams and KMS perform the following actions when your applications call PutRecord or PutRecords on a stream with server-side encryption enabled. The Amazon Kinesis Producer Library (KPL) uses PutRecords.

 

  1. Data is sent from a customer’s producer (client) to a Kinesis stream using TLS via HTTPS. Data in transit to a stream is encrypted by default.
  2. After data is received, it is momentarily stored in RAM within a front-end proxy layer.
  3. Kinesis Streams authenticates the producer, then impersonates the producer to request input keying material from KMS.
  4. KMS creates key material, encrypts it by using CMK, and sends both the plaintext and encrypted key material to the service, encrypted with TLS.
  5. The client uses the plaintext key material to derive data encryption keys (data keys) that are unique per-record.
  6. The client encrypts the payload and partition key using the data key in RAM within the front-end proxy layer and removes the plaintext data key from memory.
  7. The client appends the encrypted key material to the encrypted data.
  8. The plaintext key material is securely cached in memory within the front-end layer for reuse, until it expires after 5 minutes.
  9. The client delivers the encrypted message to a back-end store where it is stored at rest and fetchable by an authorized consumer through a GetRecords The Amazon Kinesis Client Library (KCL) calls GetRecords to retrieve records from a stream.

Calling getRecords

Kinesis Streams and KMS perform the following actions when your applications call GetRecords on a server-side encrypted stream.

 

  1. When a GeRecords call is made, the front-end proxy layer retrieves the encrypted record from its back-end store.
  2. The consumer (client) makes a request to KMS using a token generated by the customer’s request. KMS authorizes it.
  3. The client requests that KMS decrypt the encrypted key material.
  4. KMS decrypts the encrypted key material and sends the plaintext key material to the client.
  5. Kinesis Streams derives the per-record data keys from the decrypted key material.
  6. If the calling application is authorized, the client decrypts the payload and removes the plaintext data key from memory.
  7. The client delivers the payload over TLS and HTTPS to the consumer, requesting the records. Data in transit to a consumer is encrypted by default.

Verifying server-side encryption

Auditors or administrators often ask for proof that server-side encryption was or is enabled. Here are a few ways to do this.

To check if encryption is enabled now for your streams:

  • Use the AWS Management Console or the DescribeStream API operation. You can also see what CMK is being used for encryption.
  • See encryption in action by looking at responses from PutRecord, PutRecords, or GetRecords When encryption is enabled, the encryptionType parameter is set to “KMS”. If encryption is not enabled, encryptionType is not included in the response.

Sample PutRecord response

{
    "SequenceNumber": "49573959617140871741560010162505906306417380215064887298",
    "ShardId": "shardId-000000000000",
    "EncryptionType": "KMS"
}

Sample GetRecords response

{
    "Records": [
        {
            "Data": "aGVsbG8gd29ybGQ=", 
            "PartitionKey": "test", 
            "ApproximateArrivalTimestamp": 1498292565.825, 
            "EncryptionType": "KMS", 
            "SequenceNumber": "495735762417140871741560010162505906306417380215064887298"
        }, 
        {
            "Data": "ZnJvZG8gbGl2ZXMK", 
            "PartitionKey": "3d0d9301-3c30-4c48-a9a8-e485b2982b28", 
            "ApproximateArrivalTimestamp": 1498292801.747, 
            "EncryptionType": "KMS", 
            "SequenceNumber": "49573959617140871741560010162507115232237011062036103170"
        }
    ], 
    "NextShardIterator": "AAAAAAAAAAEvFypHZDx/4bJVAS34puwdiNcwssKqbh/XhRK7HSYRq3RS+YXJnVKJ8j0gQUt94bONdqQYHk9X9JHgefMUDKzDzndy5WbZWO4CS3hRdMdrbmJ/9KoR4lOfZvqTLt6JWQjDqXv0IaKs06/LHYcEA3oPcyQLOTJHdJl2EzplCTZnn/U295ovxvqF9g9DY8y2nVoMkdFLmdcEMVXjhCDKiRIt", 
    "MillisBehindLatest": 0
}

To check if encryption was enabled, use CloudTrail, which logs the StartStreamEncryption() and StopStreamEncryption() API calls made against a particular stream.

Getting started

It’s very easy to enable, disable, or modify server-side encryption for a particular stream.

  1. In the Kinesis Streams console, select a stream and choose Details.
  2. Select a CMK and select Enabled.
  3. Choose Save.

You can enable encryption only for a live stream, not upon stream creation.  Follow the same process to disable a stream. To use a different CMK, select it and choose Save.

Each of these tasks can also be accomplished using the StartStreamEncryption and StopStreamEncryption API operations.

Considerations

There are a few considerations you should be aware of when using server-side encryption for Kinesis Streams:

  • Permissions
  • Costs
  • Performance

Permissions

One benefit of using the “(Default) aws/kinesis” AWS managed key is that every producer and consumer with permissions to call PutRecord, PutRecords, or GetRecords inherits the right permissions over the “(Default) aws/kinesis” key automatically.

However, this is not necessarily the same case for a CMK. Kinesis Streams producers and consumers do not need to be aware of encryption. However, if you enable encryption using a custom master key but a producer or consumer doesn’t have IAM permissions to use it, PutRecord, PutRecords, or GetRecords requests fail.

This is a great security feature. On the other hand, it can effectively lead to data loss if you inadvertently apply a custom master key that restricts producers and consumers from interacting from the Kinesis stream. Take precautions when applying a custom master key. For more information about the minimum IAM permissions required for producers and consumers interacting with an encrypted stream, see Using Server-Side Encryption.

Costs

When you apply server-side encryption, you are subject to KMS API usage and key costs. Unlike custom KMS master keys, the “(Default) aws/kinesis” CMK is offered free of charge. However, you still need to pay for the API usage costs that Kinesis Streams incurs on your behalf.

API usage costs apply for every CMK, including custom ones. Kinesis Streams calls KMS approximately every 5 minutes when it is rotating the data key. In a 30-day month, the total cost of KMS API calls initiated by a Kinesis stream should be less than a few dollars.

Performance

During testing, AWS discovered that there was a slight increase (typically 0.2 millisecond or less per record) with put and get record latencies due to the additional overhead of encryption.

If you have questions or suggestions, please comment below.

Visualize and Monitor Amazon EC2 Events with Amazon CloudWatch Events and Amazon Kinesis Firehose

Post Syndicated from Karan Desai original https://aws.amazon.com/blogs/big-data/visualize-and-monitor-amazon-ec2-events-with-amazon-cloudwatch-events-and-amazon-kinesis-firehose/

Monitoring your AWS environment is important for security, performance, and cost control purposes. For example, by monitoring and analyzing API calls made to your Amazon EC2 instances, you can trace security incidents and gain insights into administrative behaviors and access patterns. The kinds of events you might monitor include console logins, Amazon EBS snapshot creation/deletion/modification, VPC creation/deletion/modification, and instance reboots, etc.

In this post, I show you how to build a near real-time API monitoring solution for EC2 events using Amazon CloudWatch Events and Amazon Kinesis Firehose. Please be sure to have Amazon CloudTrail enabled in your account.

  • CloudWatch Events offers a near real-time stream of system events that describe changes in AWS resources. CloudWatch Events now supports Kinesis Firehose as a target.
  • Kinesis Firehose is a fully managed service for continuously capturing, transforming, and delivering data in minutes to storage and analytics destinations such as Amazon S3, Amazon Kinesis Analytics, Amazon Redshift, and Amazon Elasticsearch Service.

Walkthrough

For this walkthrough, you create a CloudWatch event rule that matches specific EC2 events such as:

  • Starting, stopping, and terminating an instance
  • Creating and deleting VPC route tables
  • Creating and deleting a security group
  • Creating, deleting, and modifying instance volumes and snapshots

Your CloudWatch event target is a Kinesis Firehose delivery stream that delivers this data to an Elasticsearch cluster, where you set up Kibana for visualization. Using this solution, you can easily load and visualize EC2 events in minutes without setting up complicated data pipelines.

Set up the Elasticsearch cluster

Create the Amazon ES domain in the Amazon ES console, or by using the create-elasticsearch-domain command in the AWS CLI.

This example uses the following configuration:

  • Domain Name: esLogSearch
  • Elasticsearch Version: 1
  • Instance Count: 2
  • Instance type:elasticsearch
  • Enable dedicated master: true
  • Enable zone awareness: true
  • Restrict Amazon ES to an IP-based access policy

Other settings are left as the defaults.

Create a Kinesis Firehose delivery stream

In the Kinesis Firehose console, create a new delivery stream with Amazon ES as the destination. For detailed steps, see Create a Kinesis Firehose Delivery Stream to Amazon Elasticsearch Service.

Set up CloudWatch Events

Create a rule, and configure the event source and target. You can choose to configure multiple event sources with several AWS resources, along with options to specify specific or multiple event types.

In the CloudWatch console, choose Events.

For Service Name, choose EC2.

In Event Pattern Preview, choose Edit and copy the pattern below. For this walkthrough, I selected events that are specific to the EC2 API, but you can modify it to include events for any of your AWS resources.

 

{
	"source": [
		"aws.ec2"
	],
	"detail-type": [
		"AWS API Call via CloudTrail"
	],
	"detail": {
		"eventSource": [
			"ec2.amazonaws.com"
		],
		"eventName": [
			"RunInstances",
			"StopInstances",
			"StartInstances",
			"CreateFlowLogs",
			"CreateImage",
			"CreateNatGateway",
			"CreateVpc",
			"DeleteKeyPair",
			"DeleteNatGateway",
			"DeleteRoute",
			"DeleteRouteTable",
"CreateSnapshot",
"DeleteSnapshot",
			"DeleteVpc",
			"DeleteVpcEndpoints",
			"DeleteSecurityGroup",
			"ModifyVolume",
			"ModifyVpcEndpoint",
			"TerminateInstances"
		]
	}
}

The following screenshot shows what your event looks like in the console.

Next, choose Add target and select the delivery stream that you just created.

Set up Kibana on the Elasticsearch cluster

Amazon ES provides a default installation of Kibana with every Amazon ES domain. You can find the Kibana endpoint on your domain dashboard in the Amazon ES console. You can restrict Amazon ES access to an IP-based access policy.

In the Kibana console, for Index name or pattern, type log. This is the name of the Elasticsearch index.

For Time-field name, choose @time.

To view the events, choose Discover.

The following chart demonstrates the API operations and the number of times that they have been triggered in the past 12 hours.

Summary

In this post, you created a continuous, near real-time solution to monitor various EC2 events such as starting and shutting down instances, creating VPCs, etc. Likewise, you can build a continuous monitoring solution for all the API operations that are relevant to your daily AWS operations and resources.

With Kinesis Firehose as a new target for CloudWatch Events, you can retrieve, transform, and load system events to the storage and analytics destination of your choice in minutes, without setting up complicated data pipelines.

If you have any questions or suggestions, please comment below.


Additional Reading

Learn how to build a serverless architecture to analyze Amazon CloudFront access logs using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

 

 

 

Build a Serverless Architecture to Analyze Amazon CloudFront Access Logs Using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

Post Syndicated from Rajeev Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-serverless-architecture-to-analyze-amazon-cloudfront-access-logs-using-aws-lambda-amazon-athena-and-amazon-kinesis-analytics/

Nowadays, it’s common for a web server to be fronted by a global content delivery service, like Amazon CloudFront. This type of front end accelerates delivery of websites, APIs, media content, and other web assets to provide a better experience to users across the globe.

The insights gained by analysis of Amazon CloudFront access logs helps improve website availability through bot detection and mitigation, optimizing web content based on the devices and browser used to view your webpages, reducing perceived latency by caching of popular object closer to its viewer, and so on. This results in a significant improvement in the overall perceived experience for the user.

This blog post provides a way to build a serverless architecture to generate some of these insights. To do so, we analyze Amazon CloudFront access logs both at rest and in transit through the stream. This serverless architecture uses Amazon Athena to analyze large volumes of CloudFront access logs (on the scale of terabytes per day), and Amazon Kinesis Analytics for streaming analysis.

The analytic queries in this blog post focus on three common use cases:

  1. Detection of common bots using the user agent string
  2. Calculation of current bandwidth usage per Amazon CloudFront distribution per edge location
  3. Determination of the current top 50 viewers

However, you can easily extend the architecture described to power dashboards for monitoring, reporting, and trigger alarms based on deeper insights gained by processing and analyzing the logs. Some examples are dashboards for cache performance, usage and viewer patterns, and so on.

Following we show a diagram of this architecture.

Prerequisites

Before you set up this architecture, install the AWS Command Line Interface (AWS CLI) tool on your local machine, if you don’t have it already.

Setup summary

The following steps are involved in setting up the serverless architecture on the AWS platform:

  1. Create an Amazon S3 bucket for your Amazon CloudFront access logs to be delivered to and stored in.
  2. Create a second Amazon S3 bucket to receive processed logs and store the partitioned data for interactive analysis.
  3. Create an Amazon Kinesis Firehose delivery stream to batch, compress, and deliver the preprocessed logs for analysis.
  4. Create an AWS Lambda function to preprocess the logs for analysis.
  5. Configure Amazon S3 event notification on the CloudFront access logs bucket, which contains the raw logs, to trigger the Lambda preprocessing function.
  6. Create an Amazon DynamoDB table to look up partition details, such as partition specification and partition location.
  7. Create an Amazon Athena table for interactive analysis.
  8. Create a second AWS Lambda function to add new partitions to the Athena table based on the log delivered to the processed logs bucket.
  9. Configure Amazon S3 event notification on the processed logs bucket to trigger the Lambda partitioning function.
  10. Configure Amazon Kinesis Analytics application for analysis of the logs directly from the stream.

ETL and preprocessing

In this section, we parse the CloudFront access logs as they are delivered, which occurs multiple times in an hour. We filter out commented records and use the user agent string to decipher the browser name, the name of the operating system, and whether the request has been made by a bot. For more details on how to decipher the preceding information based on the user agent string, see user-agents 1.1.0 in the Python documentation.

We use the Lambda preprocessing function to perform these tasks on individual rows of the access log. On successful completion, the rows are pushed to an Amazon Kinesis Firehose delivery stream to be persistently stored in an Amazon S3 bucket, the processed logs bucket.

To create a Firehose delivery stream with a new or existing S3 bucket as the destination, follow the steps described in Create a Firehose Delivery Stream to Amazon S3 in the S3 documentation. Keep most of the default settings, but select an AWS Identity and Access Management (IAM) role that has write access to your S3 bucket and specify GZIP compression. Name the delivery stream CloudFrontLogsToS3.

Another pre-requisite for this setup is to create an IAM role that provides the necessary permissions our AWS Lambda function to get the data from S3, process it, and deliver it to the CloudFrontLogsToS3 delivery stream.

Let’s use the AWS CLI to create the IAM role using the following the steps:

  1. Create the IAM policy (lambda-exec-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (lambda-cflogs-exec-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To download the policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/lambda-exec-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/assume-lambda-policy.json  <path_on_your_local_machine>

Following is the lambda-exec-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchAccess",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Sid": "S3Access",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        },
        {
            "Sid": "FirehoseAccess",
            "Effect": "Allow",
            "Action": [
                "firehose:ListDeliveryStreams",
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3"
            ]
        }
    ]
}

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-exec-policy --policy-document file://<path>/lambda-exec-policy.json

To create the AWS Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Following is the assume-lambda-policy.json file, to grant Lambda permission to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To attach the policy (lambda-exec-policy) created to the AWS Lambda execution role (lambda-cflogs-exec-role), type the following command.

aws iam attach-role-policy --role-name lambda-cflogs-exec-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-exec-policy

Now that we have created the CloudFrontLogsToS3 Firehose delivery stream and the lambda-cflogs-exec-role IAM role for Lambda, the next step is to create a Lambda preprocessing function.

This Lambda preprocessing function parses the CloudFront access logs delivered into the S3 bucket and performs a few transformation and mapping operations on the data. The Lambda function adds descriptive information, such as the browser and the operating system that were used to make this request based on the user agent string found in the logs. The Lambda function also adds information about the web distribution to support scenarios where CloudFront access logs are delivered to a centralized S3 bucket from multiple distributions. With the solution in this blog post, you can get insights across distributions and their edge locations.

Use the Lambda Management Console to create a new Lambda function with a Python 2.7 runtime and the s3-get-object-python blueprint. Open the console, and on the Configure triggers page, choose the name of the S3 bucket where the CloudFront access logs are delivered. Choose Put for Event type. For Prefix, type the name of the prefix, if any, for the folder where CloudFront access logs are delivered, for example cloudfront-logs/. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable trigger.

Choose Next and provide a function name to identify this Lambda preprocessing function.

For Code entry type, choose Upload a file from Amazon S3. For S3 link URL, type https.amazonaws.com//preprocessing-lambda/pre-data.zip. In the section, also create an environment variable with the key KINESIS_FIREHOSE_STREAM and a value with the name of the Firehose delivery stream as CloudFrontLogsToS3.

Choose lambda-cflogs-exec-role as the IAM role for the Lambda function, and type prep-data.lambda_handler for the value for Handler.

Choose Next, and then choose Create Lambda.

Table creation in Amazon Athena

In this step, we will build the Athena table. Use the Athena console in the same region and create the table using the query editor.

CREATE EXTERNAL TABLE IF NOT EXISTS cf_logs (
  logdate date,
  logtime string,
  location string,
  bytes bigint,
  requestip string,
  method string,
  host string,
  uri string,
  status bigint,
  referrer string,
  useragent string,
  uriquery string,
  cookie string,
  resulttype string,
  requestid string,
  header string,
  csprotocol string,
  csbytes string,
  timetaken bigint,
  forwardedfor string,
  sslprotocol string,
  sslcipher string,
  responseresulttype string,
  protocolversion string,
  browserfamily string,
  osfamily string,
  isbot string,
  filename string,
  distribution string
)
PARTITIONED BY(year string, month string, day string, hour string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION 's3://<pre-processing-log-bucket>/prefix/';

Creation of the Athena partition

A popular website with millions of requests each day routed using Amazon CloudFront can generate a large volume of logs, on the order of a few terabytes a day. We strongly recommend that you partition your data to effectively restrict the amount of data scanned by each query. Partitioning significantly improves query performance and substantially reduces cost. The Lambda partitioning function adds the partition information to the Athena table for the data delivered to the preprocessed logs bucket.

Before delivering the preprocessed Amazon CloudFront logs file into the preprocessed logs bucket, Amazon Kinesis Firehose adds a UTC time prefix in the format YYYY/MM/DD/HH. This approach supports multilevel partitioning of the data by year, month, date, and hour. You can invoke the Lambda partitioning function every time a new processed Amazon CloudFront log is delivered to the preprocessed logs bucket. To do so, configure the Lambda partitioning function to be triggered by an S3 Put event.

For a website with millions of requests, a large number of preprocessed logs can be delivered multiple times in an hour—for example, at the interval of one each second. To avoid querying the Athena table for partition information every time a preprocessed log file is delivered, you can create an Amazon DynamoDB table for fast lookup.

Based on the year, month, data and hour in the prefix of the delivered log, the Lambda partitioning function checks if the partition specification exists in the Amazon DynamoDB table. If it doesn’t, it’s added to the table using an atomic operation, and then the Athena table is updated.

Type the following command to create the Amazon DynamoDB table.

aws dynamodb create-table --table-name athenapartitiondetails \
--attribute-definitions AttributeName=PartitionSpec,AttributeType=S \
--key-schema AttributeName=PartitionSpec,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=100,WriteCapacityUnits=100

Here the following is true:

  • PartitionSpec is the hash key and is a representation of the partition signature—for example, year=”2017”; month=”05”; day=”15”; hour=”10”.
  • Depending on the rate at which the processed log files are delivered to the processed log bucket, you might have to increase the ReadCapacityUnits and WriteCapacityUnits values, if these are throttled.

The other attributes besides PartitionSpec are the following:

  • PartitionPath – The S3 path associated with the partition.
  • PartitionType – The type of partition used (Hour, Month, Date, Year, or ALL). In this case, ALL is used.

Next step is to create the IAM role to provide permissions for the Lambda partitioning function. You require permissions to do the following:

  1. Look up and write partition information to DynamoDB.
  2. Alter the Athena table with new partition information.
  3. Perform Amazon CloudWatch logs operations.
  4. Perform Amazon S3 operations.

To download the policy document to your local machine, type following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/lambda-partition-function-execution-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/assume-lambda-policy.json <path_on_your_local_machine>

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(lambda-partition-exec-policy) used by the Lambda execution role.
  2. Create the Lambda execution role (lambda-partition-execution-role)and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-partition-exec-policy --policy-document file://<path>/lambda-partition-function-execution-policy.json

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-partition-execution-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

To attach the policy (lambda-partition-exec-policy) created to the AWS Lambda execution role (lambda-partition-execution-role), type the following command.

aws iam attach-role-policy --role-name lambda-partition-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-partition-exec-policy

Following is the lambda-partition-function-execution-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
            	"Sid": "DDBTableAccess",
            	"Effect": "Allow",
            	"Action": "dynamodb:PutItem"
            	"Resource": "arn:aws:dynamodb*:*:table/athenapartitiondetails"
        	},
        	{
            	"Sid": "S3Access",
            	"Effect": "Allow",
            	"Action": [
                		"s3:GetBucketLocation",
                		"s3:GetObject",
                		"s3:ListBucket",
                		"s3:ListBucketMultipartUploads",
                		"s3:ListMultipartUploadParts",
                		"s3:AbortMultipartUpload",
                		"s3:PutObject"
            	],
          		"Resource":"arn:aws:s3:::*"
		},
	              {
		      "Sid": "AthenaAccess",
      		"Effect": "Allow",
      		"Action": [ "athena:*" ],
      		"Resource": [ "*" ]
	      },
        	{
            	"Sid": "CloudWatchLogsAccess",
            	"Effect": "Allow",
            	"Action": [
                		"logs:CreateLogGroup",
                		"logs:CreateLogStream",
             	   	"logs:PutLogEvents"
            	],
            	"Resource": "arn:aws:logs:*:*:*"
        	}
    ]
}

Download the .jar file containing the Java deployment package to your local machine.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/aws-lambda-athena-1.0.0.jar <path_on_your_local_machine>

From the AWS Management Console, create a new Lambda function with Java8 as the runtime. Select the Blank Function blueprint.

On the Configure triggers page, choose the name of the S3 bucket where the preprocessed logs are delivered. Choose Put for the Event Type. For Prefix, type the name of the prefix folder, if any, where preprocessed logs are delivered by Firehose—for example, out/. For Suffix, type the name of the compression format that the Firehose stream (CloudFrontLogToS3) delivers the preprocessed logs —for example, gz. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable Trigger.

Choose Next and provide a function name to identify this Lambda partitioning function.

Choose Java8 for Runtime for the AWS Lambda function. Choose Upload a .ZIP or .JAR file for the Code entry type, and choose Upload to upload the downloaded aws-lambda-athena-1.0.0.jar file.

Next, create the following environment variables for the Lambda function:

  • TABLE_NAME – The name of the Athena table (for example, cf_logs).
  • PARTITION_TYPE – The partition to be created based on the Athena table for the logs delivered to the sub folders in S3 bucket based on Year, Month, Date, Hour, or Set this to ALL to use Year, Month, Date, and Hour.
  • DDB_TABLE_NAME – The name of the DynamoDB table holding partition information (for example, athenapartitiondetails).
  • ATHENA_REGION – The current AWS Region for the Athena table to construct the JDBC connection string.
  • S3_STAGING_DIR – The Amazon S3 location where your query output is written. The JDBC driver asks Athena to read the results and provide rows of data back to the user (for example, s3://<bucketname>/<folder>/).

To configure the function handler and IAM, for Handler copy and paste the name of the handler: com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3EventWithDDB::handleRequest. Choose the existing IAM role, lambda-partition-execution-role.

Choose Next and then Create Lambda.

Interactive analysis using Amazon Athena

In this section, we analyze the historical data that’s been collected since we added the partitions to the Amazon Athena table for data delivered to the preprocessing logs bucket.

Scenario 1 is robot traffic by edge location.

SELECT COUNT(*) AS ct, requestip, location FROM cf_logs
WHERE isbot='True'
GROUP BY requestip, location
ORDER BY ct DESC;

Scenario 2 is total bytes transferred per distribution for each edge location for your website.

SELECT distribution, location, SUM(bytes) as totalBytes
FROM cf_logs
GROUP BY location, distribution;

Scenario 3 is the top 50 viewers of your website.

SELECT requestip, COUNT(*) AS ct  FROM cf_logs
GROUP BY requestip
ORDER BY ct DESC;

Streaming analysis using Amazon Kinesis Analytics

In this section, you deploy a stream processing application using Amazon Kinesis Analytics to analyze the preprocessed Amazon CloudFront log streams. This application analyzes directly from the Amazon Kinesis Stream as it is delivered to the preprocessing logs bucket. The stream queries in section are focused on gaining the following insights:

  • The IP address of the bot, identified by its Amazon CloudFront edge location, that is currently sending requests to your website. The query also includes the total bytes transferred as part of the response.
  • The total bytes served per distribution per population for your website.
  • The top 10 viewers of your website.

To download the firehose-access-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/firehose-access-policy.json  <path_on_your_local_machine>

To download the kinesisanalytics-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/assume-kinesisanalytics-policy.json <path_on_your_local_machine>

Before we create the Amazon Kinesis Analytics application, we need to create the IAM role to provide permission for the analytics application to access Amazon Kinesis Firehose stream.

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(firehose-access-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (ka-execution-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

Following is the firehose-access-policy.json file, which is the IAM policy used by Kinesis Analytics to read Firehose delivery stream.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
    	"Sid": "AmazonFirehoseAccess",
    	"Effect": "Allow",
    	"Action": [
       	"firehose:DescribeDeliveryStream",
        	"firehose:Get*"
    	],
    	"Resource": [
              "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3”
       ]
     }
}

Following is the assume-kinesisanalytics-policy.json file, to grant Amazon Kinesis Analytics permissions to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesisanalytics.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To create the IAM policy used by Analytics access role, type the following command.

aws iam create-policy --policy-name firehose-access-policy --policy-document file://<path>/firehose-access-policy.json

To create the Analytics execution role and assign the service to use this role, type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To attach the policy (irehose-access-policy) created to the Analytics execution role (ka-execution-role), type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To deploy the Analytics application, first download the configuration file and then modify ResourceARN and RoleARN for the Amazon Kinesis Firehose input configuration.

"KinesisFirehoseInput": { 
    "ResourceARN": "arn:aws:firehose:<region>:<account-id>:deliverystream/CloudFrontLogsToS3", 
    "RoleARN": "arn:aws:iam:<account-id>:role/ka-execution-role"
}

To download the Analytics application configuration file, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis//kinesisanalytics/kinesis-analytics-app-configuration.json <path_on_your_local_machine>

To deploy the application, type the following command.

aws kinesisanalytics create-application --application-name "cf-log-analysis" --cli-input-json file://<path>/kinesis-analytics-app-configuration.json

To start the application, type the following command.

aws kinesisanalytics start-application --application-name "cf-log-analysis" --input-configuration Id="1.1",InputStartingPositionConfiguration={InputStartingPosition="NOW"}

SQL queries using Amazon Kinesis Analytics

Scenario 1 is a query for detecting bots for sending request to your website detection for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BOT_DETECTION" (requesttime TIME, destribution VARCHAR(16), requestip VARCHAR(64), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BOT_DETECTION_PUMP" AS INSERT INTO "BOT_DETECTION"
--
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "request_ip" as requestip, 
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
WHERE "is_bot" = true
GROUP BY "request_ip", "edge_location", "distribution_name",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 2 is a query for total bytes transferred per distribution for each edge location for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BYTES_TRANSFFERED" (requesttime TIME, destribution VARCHAR(16), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BYTES_TRANSFFERED_PUMP" AS INSERT INTO "BYTES_TRANSFFERED"
-- Bytes Transffered per second per web destribution by edge location
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
GROUP BY "distribution_name", "edge_location", "request_date",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 3 is a query for the top 50 viewers for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "TOP_TALKERS" (requestip VARCHAR(64), requestcount DOUBLE);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "TOP_TALKERS_PUMP" AS INSERT INTO "TOP_TALKERS"
-- Top Ten Talker
SELECT STREAM ITEM as requestip, ITEM_COUNT as requestcount FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "CF_LOG_STREAM_001"),
  'request_ip', -- name of column in single quotes
  50, -- number of top items
  60 -- tumbling window size in seconds
  )
);

Conclusion

Following the steps in this blog post, you just built an end-to-end serverless architecture to analyze Amazon CloudFront access logs. You analyzed these both in interactive and streaming mode, using Amazon Athena and Amazon Kinesis Analytics respectively.

By creating a partition in Athena for the logs delivered to a centralized bucket, this architecture is optimized for performance and cost when analyzing large volumes of logs for popular websites that receive millions of requests. Here, we have focused on just three common use cases for analysis, sharing the analytic queries as part of the post. However, you can extend this architecture to gain deeper insights and generate usage reports to reduce latency and increase availability. This way, you can provide a better experience on your websites fronted with Amazon CloudFront.

In this blog post, we focused on building serverless architecture to analyze Amazon CloudFront access logs. Our plan is to extend the solution to provide rich visualization as part of our next blog post.


About the Authors

Rajeev Srinivasan is a Senior Solution Architect for AWS. He works very close with our customers to provide big data and NoSQL solution leveraging the AWS platform and enjoys coding . In his spare time he enjoys riding his motorcycle and reading books.

 

Sai Sriparasa is a consultant with AWS Professional Services. He works with our customers to provide strategic and tactical big data solutions with an emphasis on automation, operations & security on AWS. In his spare time, he follows sports and current affairs.

 

 


Related

Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight

Build a Visualization and Monitoring Dashboard for IoT Data with Amazon Kinesis Analytics and Amazon QuickSight

Post Syndicated from Karan Desai original https://aws.amazon.com/blogs/big-data/build-a-visualization-and-monitoring-dashboard-for-iot-data-with-amazon-kinesis-analytics-and-amazon-quicksight/

Customers across the world are increasingly building innovative Internet of Things (IoT) workloads on AWS. With AWS, they can handle the constant stream of data coming from millions of new, internet-connected devices. This data can be a valuable source of information if it can be processed, analyzed, and visualized quickly in a scalable, cost-efficient manner. Engineers and developers can monitor performance and troubleshoot issues while sales and marketing can track usage patterns and statistics to base business decisions.

In this post, I demonstrate a sample solution to build a quick and easy monitoring and visualization dashboard for your IoT data using AWS serverless and managed services. There’s no need for purchasing any additional software or hardware. If you are already using AWS IoT, you can build this dashboard to tap into your existing device data. If you are new to AWS IoT, you can be up and running in minutes using sample data. Later, you can customize it to your needs, as your business grows to millions of devices and messages.

Architecture

The following is a high-level architecture diagram showing the serverless setup to configure.

 

AWS service overview

AWS IoT is a managed cloud platform that lets connected devices interact easily and securely with cloud applications and other devices. AWS IoT can process and route billions of messages to AWS endpoints and to other devices reliably and securely.

Amazon Kinesis Firehose is the easiest way to capture, transform, and load streaming data continuously into AWS from thousands of data sources, such as IoT devices. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration.

Amazon Kinesis Analytics allows you to process streaming data coming from IoT devices in real time with standard SQL, without having to learn new programming languages or processing frameworks, providing actionable insights promptly.

The processed data is fed into Amazon QuickSight, which is a fast, cloud-powered business analytics service that makes it easy to build visualizations, perform ad-hoc analysis, and quickly get business insights from the data.

The most popular way for Internet-connected devices to send data is using MQTT messages. The AWS IoT gateway receives these messages from registered IoT devices. The solution in this post uses device data from AWS Simple Beer Service (SBS), a series of internet-connected kegerators sending sensor outputs such as temperature, humidity, and sound levels in a JSON payload. You can use any existing IoT data source that you may have.

The AWS IoT rules engine allows selecting data from message payloads, processing it, and sending it to other services. You forward the data to a Firehose delivery stream to consolidate the continuous data stream into batches for further processing. The batched data is also stored temporarily in an Amazon S3 bucket for later retrieval and can be set for deletion after a specified time using S3 Lifecycle Management rules.

The incoming data from the Firehose delivery stream is fed into an Analytics application that provides an easy way to process the data in real time using standard SQL queries. Analytics allows writing standard SQL queries to extract specific components from the incoming data stream and perform real-time ETL on it. In this post, you use this feature to aggregate minimum and maximum temperature values from the sensors per minute. You load it in Amazon QuickSight to create a monitoring dashboard and check if the devices are over-heating or cooling down during use. You also extract every device’s location, parameters such as temperature, sound levels, humidity, and the time stamp in Analytics to use on the visualization dashboard.

The processed data from the two queries is fed into two Firehose delivery streams, both of which batch the data into CSV files every minute and store it in S3. The batching time interval is configurable between 1 and 15 minutes in 1-second intervals.

Finally, you use Amazon QuickSight to ingest the processed CSV files from S3 as a data source to build visualizations. Amazon QuickSight’s super-fast, parallel, in-memory, calculation engine (SPICE) parses the ingested data and allows you to create a variety of visualizations with different graph types. You can also use the Amazon QuickSight built-in Story feature to combine visualizations into business dashboards that can be shared in a secure manner.

Implementation

AWS IoT, Amazon Kinesis, and Amazon QuickSight are all fully managed services, which means you can complete the entire setup in just a few steps using the AWS Management Console. Don’t worry about setting up any underlying hardware or installing any additional software. So, get started.

Step 1. Set up your AWS IoT data source

Do you currently use AWS IoT? If you have an existing IoT thing set up and running on AWS IoT, you can skip to Step 2.

If you have an AWS IoT button or other IoT devices that can publish MQTT messages and would like to use that for the setup, follow the Getting Started with AWS IoT topic to connect your thing to AWS IoT. Continue to Step 2.

If you do not have an existing IoT device, you can generate simulated device data using a script on your local machine and have it publish to AWS IoT. The following script lets you set up your AWS IoT environment and publish simulated data that mimics device data from Simple Beer Service.

Generate sample Data

Running the sbs.py Python script generates fictitious AWS IoT messages from multiple SBS devices. The IoT rule sends the message to Firehose for further processing.

The script requires access to AWS CLI credentials and boto3 installation on the machine running the script. Download and run the following Python script:

https://github.com/awslabs/sbs-iot-data-generator/blob/master/sbs.py

The script generates random data that looks like the following:

{"deviceParameter": "Temperature", "deviceValue": 33, "deviceId": "SBS01", "dateTime": "2017-02-03 11:29:37"}
{"deviceParameter": "Sound", "deviceValue": 140, "deviceId": "SBS03", "dateTime": "2017-02-03 11:29:38"}
{"deviceParameter": "Humidity", "deviceValue": 63, "deviceId": "SBS01", "dateTime": "2017-02-03 11:29:39"}
{"deviceParameter": "Flow", "deviceValue": 80, "deviceId": "SBS04", "dateTime": "2017-02-03 11:29:41"}

Run the script and keep it running for the duration of the project to generate sufficient data.

Tip: If you encounter any issues running the script from your local machine, launch an EC2 instance and run the script there as a root user. Remember to assign an appropriate IAM role to your instance at the time of launch that allows it to access AWS IoT.

Step 2. Create three Firehose delivery streams

For this post, you require three Firehose delivery streams:  one to batch raw data from AWS IoT, and two to batch output device data and aggregated data from Analytics.

  1. In the console, choose Firehose.
  2. Create all three Firehose delivery streams using the following field values.

Delivery stream 1:

Name IoT-Source-Stream
S3 bucket <your unique name>-kinesis
S3 prefix source/

Delivery stream 2:

Name IoT-Destination-Data-Stream
S3 bucket <your unique name>-kinesis
S3 prefix data/

Delivery stream 3:

Name IoT-Destination-Aggregate-Stream
S3 bucket <your unique name>-kinesis
S3 prefix aggregate/

Step 3. Set up AWS IoT to receive and forward incoming data

  1. In the console, choose IoT.
  2. Create a new AWS IoT rule with the following field values.
Name IoT_to_Firehose
Attribute *
Topic Filter /sbs/devicedata/#
Add Action Send messages to an Amazon Kinesis Firehose stream (select IoT-Source-Stream from dropdown)
Select Separator “\n (newline)”

A quick check before proceeding further: make sure that you have run the script to generate simulated IoT data or that your IoT Thing is running and delivering data. If not, set it up now. The Amazon Kinesis Analytics application you set up in the next step needs the data to process it further.

Step 4: Create an Analytics application to process data

  1. In the console, choose Kinesis.
  2. Create a new application.
  3. Enter a name of your choice, for example, SBS-IoT-Data.
  4. For the source, choose IoT-Source-Stream.

Analytics auto-discovers the schema on the data by sampling records from the input stream. It also includes an in-built SQL editor that allows you to write standard SQL queries to transform incoming data.

Tip: If Analytics is unable to discover your incoming data, it may be missing the appropriate IAM permissions. In the IAM console, select the role that you assigned to your IoT rule in Step 3. Make sure that it has the ARN of the IoT-Source-Data Firehose stream listed in the firehose:putRecord section.

Here is a sample SQL query that generates two output streams:

  • DESTINATION_SQL_BASIC_STREAM contains the device ID, device parameter, its value, and the time stamp from the incoming stream.
  • DESTINATION_SQL_AGGREGATE_STREAM aggregates the maximum and minimum values of temperatures from the sensors over a one-minute period from the incoming data.
-- Create an output stream with four columns, which is used to send IoT data to the destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_BASIC_STREAM" (dateTime TIMESTAMP, deviceId VARCHAR(8), deviceParameter VARCHAR(16), deviceValue INTEGER);

-- Create a pump that continuously selects from the source stream and inserts it into the output data stream
CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "DESTINATION_SQL_BASIC_STREAM"

-- Filter specific columns from the source stream
SELECT STREAM "dateTime", "deviceId", "deviceParameter", "deviceValue" FROM "SOURCE_SQL_STREAM_001";

-- Create a second output stream with three columns, which is used to send aggregated min/max data to the destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_AGGREGATE_STREAM" (dateTime TIMESTAMP, highestTemp SMALLINT, lowestTemp SMALLINT);

-- Create a pump that continuously selects from a source stream 
CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS INSERT INTO "DESTINATION_SQL_AGGREGATE_STREAM"

-- Extract time in minutes, plus the highest and lowest value of device temperature in that minute, into the destination aggregate stream, aggregated per minute
SELECT STREAM FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) AS "dateTime", MAX("deviceValue") AS "highestTemp", MIN("deviceValue") AS "lowestTemp" FROM "SOURCE_SQL_STREAM_001" WHERE "deviceParameter"='Temperature' GROUP BY FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE);

Real-time analytics shows the results of the SQL query. If everything is working correctly, you see three streams listed, similar to the following screenshot.

Step 5: Connect the Analytics application to output Firehose delivery streams

You create two destinations for the two delivery streams that you created in the previous step. A single Analytics application can have multiple destinations defined; however, this needs to be set up using the AWS CLI, not from the console. If you do not already have it, install the AWS CLI on your local machine and configure it with your credentials.

Tip: If you are running the IoT script from an EC2 instance, it comes pre-installed with the AWS CLI.

Create the first destination delivery stream 

The AWS CLI command to create a new output Firehose delivery stream is as follows:

aws kinesisanalytics add-application-output --application-name <Name of Analytics Application> --current-application-version-id <number> --application-output 'Name=DESTINATION_SQL_BASIC_STREAM,KinesisFirehoseOutput={ResourceARN=<ARN of IoT-Data-Stream>,RoleARN=<ARN of Analytics application>,DestinationSchema={RecordFormatType=CSV}'

Do not copy this into the CLI just yet! Before entering this command, make the following four changes to personalize it:

  • For Name of Analytics Application, enter the value from Step 4, or from the Analytics console.
  • For current-application-version-ID, run the following command:
aws kinesisanalytics describe-application --application-name <application name from above>; | grep ApplicationVersionId
  • For ResourceARN, run the following command:
aws firehose describe-delivery-stream --delivery-stream-name IoT-Destination-Data-Stream | grep DeliveryStreamARN
  • For RoleARN, run the following command:
aws kinesisanalytics describe-application --application-name <application name from above>; | grep RoleARN

Now, paste the complete command in the AWS CLI and press Enter. If there are any errors, the response provides details. If everything goes well, a new destination delivery stream is created to send the first query (DESTINATION_SQL_BASIC_STREAM) to IoT-Destination-Data-Stream.

Create the second destination delivery stream

Following similar steps as above, create a second destination Firehose delivery stream with the following changes:

  • For Name of Analytics Application, enter the same name as the first delivery stream.
  • For current-application-version-ID, increment by 1 from the previous value (unless you made other changes in between these steps). If unsure, run the same command as above to get it again.
  • For ResourceARN, get the value by running the following CLI command:
aws firehose describe-delivery-stream --delivery-stream-name IoT-Destination-Aggregate-Stream | grep DeliveryStreamARN
  • For RoleArn, enter the same value as the first stream.

Run the aws kinesisanalytics CLI command, similar to the previous step but with the new parameters substituted. This creates the second output Firehose destination delivery stream.

Update the IAM role for Analytics to allow writing to both output streams.

  1. In the console, choose IAM, Roles.
  2. Select the role that you created with Analytics in Step 4.
  3. Choose Policy, JSON, and Edit.
  4. Find “Sid”: “WriteOutputFirehose” in the JSON document, go to the “Resource” section and make sure that it includes Resource ARNs of both streams that you found in the previous step.
  5. If it has only one ARN, add the second ARN and choose Save.

This completes the Amazon Kinesis setup. The incoming IoT data is processed by Analytics and delivered, using two output delivery streams, to two separate folders in your S3 bucket.

Step 6: Set up Amazon QuickSight to analyze the data

To build the visualization dashboard, ingest the processed CSV files from the S3 bucket into Amazon QuickSight.

  1. In the console, choose QuickSight.
  2. If this is your first time using Amazon QuickSight, you are asked to create a new account. Follow the prompts.
  3. When you are logged in to your account, choose New Analysis and enter a name of your choice.
  4. Choose New data set for the analysis or, if you have previously imported your data set, select one from the available data sets.
  5. You import two data sets: one with general device parameters information, and the other with aggregates of maximum and minimum temperatures for monitoring. For the first data set, choose S3 from the list of available data sources and enter a name, for example, IoT Device Data.
  6. The location of the S3 bucket and the objects to use are provided to Amazon QuickSight as a manifest file. Create a new manifest file following the supported formats for Amazon S3 manifest files.
  7. In the URIPrefixes section, provide your appropriate S3 bucket and folder location for the general device data. Hint: it should include <your unique name>-kinesis/data/.

Your manifest file should look similar to the following:

{ 
    "fileLocations": [                                                    
              {"URIPrefixes": ["https://s3.amazonaws.com/<YOUR_BUCKET_NAME>/data/<YEAR>/<MONTH>/<DATE>/<HOUR>/"]}
     ],
     "globalUploadSettings": { 
     "format": "CSV",  
     "delimiter": ","
    }
}

Amazon QuickSight imports and parses the data set, and provides available data fields that can be used for making graphs. The Edit/Preview data button allows you to format and transform the data, change data types, and filter or join your data. Make sure that the columns have the correct titles. If not, you can edit them and then save.

Tip: choose the downward arrow on the top right and unselect Files include headers to give each column appropriate headers. Choose Save. This takes you back to the data sets page.

Follow the same steps as above to import the second data set. This time, your manifest should include your aggregate data set folder on S3, which is named <your unique name>-kinesis/aggregate/. Update headers if necessary and choose Save & visualize.

Build an analysis

The visualization screen shows the data set that you last imported, which in this case is the aggregate data. To include the general device data as well, for Fields on the top left, choose Edit analysis data sets. Choose Add data set and select the other data set that you saved earlier.

Now both data sets are available on the analysis screen. For Visual Types at bottom left, select the type of graph to make. For Fields, select the fields to visualize. For example, drag Device ID, Device Parameter, and Value to Field wells, as shown in the screenshot below, to generate a visualization of average parameter values compared across devices.

You can create another visual by choose +Add. This time, select a line graph to show monitoring of the maximum temperature values of the sensors in any minute, from the aggregate data set.

If you would like to create an interactive story to present to your team or organization, you can choose the Story option on the left panel. Create a dashboard with multiple visualizations, to save and share securely with the intended audience. An example of a story is shown below.

Conclusion

Any data is valuable only when it can be actually put to use. In this post, you’ve seen how it’s possible to quickly build a simple Analytics application to ingest, process, and visualize IoT data in near real time entirely using AWS managed services. This solution is scalable and reliable, and costs a fraction of other business intelligence solutions. It is easy enough that anyone with an AWS account can build and use it without any special training.

If you have any questions or suggestions, please comment below.


About the Author

Karan Desai is a Solutions Architect with Amazon Web Services. He works with startups and small businesses in the US, helping them adopt cloud technology to build scalable and secure solutions using AWS. In his spare time, he likes to build personal IoT projects, travel to offbeat places and write about it.

 

 


Related

Visualize Big Data with Amazon QuickSight, Presto, and Apache Spark on Amazon EMR

 

 

 

 

 

 

 

Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator

Post Syndicated from Allan MacInnis original https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/

When building a streaming data solution, most customers want to test it with data that is similar to their production data. Creating this data and streaming it to your solution can often be the most tedious task in testing the solution.

Amazon Kinesis Streams and Amazon Kinesis Firehose enable you to continuously capture and store terabytes of data per hour from hundreds of thousands of sources. Amazon Kinesis Analytics gives you the ability to use standard SQL to analyze and aggregate this data in real-time. It’s easy to create an Amazon Kinesis stream or Firehose delivery stream with just a few clicks in the AWS Management Console (or a few commands using the AWS CLI or Amazon Kinesis API). However, to generate a continuous stream of test data, you must write a custom process or script that runs continuously, using the AWS SDK or CLI to send test records to Amazon Kinesis. Although this task is necessary to adequately test your solution, it means more complexity and longer development and testing times.

Wouldn’t it be great if there were a user-friendly tool to generate test data and send it to Amazon Kinesis? Well, now there is—the Amazon Kinesis Data Generator (KDG).

KDG overview

The KDG simplifies the task of generating data and sending it to Amazon Kinesis. The tool provides a user-friendly UI that runs directly in your browser. With the KDG, you can do the following:

  • Create templates that represent records for your specific use cases
  • Populate the templates with fixed data or random data
  • Save the templates for future use
  • Continuously send thousands of records per second to your Amazon Kinesis stream or Firehose delivery stream

The KDG is open source, and you can find the source code on the Amazon Kinesis Data Generator repo in GitHub. Because the tool is a collection of static HTML and JavaScript files that run directly in your browser, you can start using it immediately without downloading or cloning the project. It is enabled as a static site in GitHub, and we created a short URL to access it.

To get started immediately, check it out at http://amzn.to/datagen.

Using the KDG

Getting started with the KDG requires only three short steps:

  1. Create an Amazon Cognito user in your AWS account (first-time only).
  2. Use this user’s credentials to log in to the KDG.
  3. Create a record template for your data.

When you’ve completed these steps, you can then send data to Streams or Firehose.

Create an Amazon Cognito user

The KDG is a great example of a mobile application that uses Amazon Cognito for a user repository and user authentication, and the AWS JavaScript SDK to communicate with AWS services directly from your browser. For information about how to build your own JavaScript application that uses Amazon Cognito, see Use Amazon Cognito in your website for simple AWS authentication on the AWS Mobile Blog.

Before you can start sending data to your Amazon Kinesis stream, you must create an Amazon Cognito user in your account who can write to Streams and Firehose. When you create the user, you create a username and password for that user. You use those credentials to sign in to the KDG. To simplify creating the Amazon Cognito user in your account, we created a Lambda function and a CloudFormation template. For more information about creating the Amazon Cognito user in your AWS account, see Configure Your AWS Account.

Note:  It’s important that you use the URL provided by the output of the CloudFormation stack the first time that you access the KDG. This URL contains parameters needed by the KDG. The KDG stores the values of these parameters locally, so you can then access the tool using the short URL, http://amzn.to/datagen.

Log in to the KDG

After you create an Amazon Cognito user in your account, the next step is to log in to the KDG. To do this, provide the username and password that you created earlier.

On the main page, you can configure your data templates and send data to an Amazon Kinesis stream or Firehose delivery stream.

The basic configuration is simple enough. All fields on the page are required:

  • Region: Choose the AWS Region that contains the Amazon Kinesis stream or Firehose delivery stream to receive your streaming data.
  • Stream/firehose name: Choose the name of the stream or delivery stream to receive your streaming data.
  • Records per second: Enter the number of records to send to your stream or delivery stream each second.
  • Record template: Enter the raw data, or a template that represents your data structure, to be used for each record sent by the KDG. For information about creating templates for your data, see the “Creating Record Templates” section, later in this post.

When you set the Records per second value, consider that the KDG isn’t intended to be a data producer for load-testing your application. However, it can easily send several thousand records per second from a single tab in your browser, which is plenty of data for most applications. In testing, the KDG has produced 80,000 records per second to a single Amazon Kinesis stream, but your mileage may vary. The maximum rate at which it produces records depends on your computer’s specs and the complexity of your record template.

Ensure that your stream or delivery stream is scaled appropriately:

  • 1,000 records/second or 1 MB/second to an Amazon Kinesis stream
  • 5,000 records/second or 5 MB/second to a Firehose delivery stream

Otherwise, Amazon Kinesis may reject records, and you won’t achieve your desired throughput. For more information about adding capacity to a stream by adding more shards, see Resharding a Stream. For information about increasing the capacity of a delivery stream, see Amazon Kinesis Firehose Limits.

Create record templates

The Record Template field is a free-text field where you can enter any text that represents a single streaming data record. You can create a single line of static data, so that each record sent to Amazon Kinesis is identical. Or, you can format the text as a template.

In this case, the KDG substitutes portions of the template with fake or random data before sending the record. This lets you introduce randomness or variability in each record that is sent in your data stream. The KDG uses Faker.js, an open source library, to generate fake data. For more information, see the faker.js project page in GitHub. The easiest way to see how this works is to review an example.

To simulate records being sent from a weather sensor Internet of Things (IoT) device, you want each record to be formatted in JSON. The following is an example of what a final record must look like:

{
	"sensorId": 40,
	"currentTemperature": 76,
	"status": "OK"
} 

For this use case, you want to simulate sending data from one of 50 sensors, so the sensorID field can be an integer between 1 and 50. The temperature value can range between 10 and 150, so the currentTemperature field should contain a value in this range. Finally, the status value can be one of three possible values: OK, FAIL, and WARN. The KDG template format uses moustache syntax (double curly-braces) to enclose items that should be replaced before the record is sent to Amazon Kinesis. To model the record, the template looks like this:

{
    "sensorId": {{random.number(50)}},
    "currentTemperature": {{random.number(
        {
            "min":10,
            "max":150
        }
    )}},
    "status": "{{random.arrayElement(
        ["OK","FAIL","WARN"]
    )}}"
}

Take a look at one more example, simulating a stream of records that represent rows from an Apache access log. A single Apache access log entry might look like this:

76.0.56.179 - - [29/Apr/2017:16:32:11 -05:00] "GET /wp-admin" 200 8233 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0 rv:6.0; CY) AppleWebKit/535.0.0 (KHTML, like Gecko) Version/4.0.3 Safari/535.0.0"

The following example shows how to create a template for the Apache access log:

{{internet.ip}} - - [{{date.now("DD/MMM/YYYY:HH:mm:ss Z")}}] "{{random.weightedArrayElement({"weights":[0.6,0.1,0.1,0.2],"data":["GET","POST","DELETE","PUT"]})}} {{random.arrayElement(["/list","/wp-content","/wp-admin","/explore","/search/tag/list","/app/main/posts","/posts/posts/explore"])}}" {{random.weightedArrayElement({"weights": [0.9,0.04,0.02,0.04], "data":["200","404","500","301"]})}} {{random.number(10000)}} "-" "{{internet.userAgent}}"

For more information about creating your own templates, see the Record Template section of the KDG documentation.

The KDG saves the templates that you create in your local browser storage. As long as you use the same browser on the same computer, you can reuse up to five templates.

Summary

Testing your streaming data solution has never been easier. Get started today by visiting the KDG hosted UI or its Amazon Kinesis Data Generator page in GitHub. The project is licensed under the Apache 2.0 license, so feel free to clone and modify it for your own use as necessary. And of course, please submit any issues or pull requests via GitHub.

If you have any questions or suggestions, please add them below.

 


About the Author

Allan MacInnis is a Solutions Architect at Amazon Web Services. He works with our customers to help them build streaming data solutions using Amazon Kinesis. In his spare time, he enjoys mountain biking and spending time with his family.

 

 


Related

Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount

 

 

Encrypt and Decrypt Amazon Kinesis Records Using AWS KMS

Post Syndicated from Temitayo Olajide original https://aws.amazon.com/blogs/big-data/encrypt-and-decrypt-amazon-kinesis-records-using-aws-kms/

Customers with strict compliance or data security requirements often require data to be encrypted at all times, including at rest or in transit within the AWS cloud. This post shows you how to build a real-time streaming application using Kinesis in which your records are encrypted while at rest or in transit.

Amazon Kinesis overview

The Amazon Kinesis platform enables you to build custom applications that analyze or process streaming data for specialized needs. Amazon Kinesis can continuously capture and store terabytes of data per hour from hundreds of thousands of sources such as website clickstreams, financial transactions, social media feeds, IT logs, and transaction tracking events.

Through the use of HTTPS, Amazon Kinesis Streams encrypts data in-flight between clients which protects against someone eavesdropping on records being transferred. However, the records encrypted by HTTPS are decrypted once the data enters the service. This data is stored at rest for 24 hours (configurable up to 168 hours) to ensure that your applications have enough headroom to process, replay, or catch up if they fall behind.

Walkthrough

In this post you build encryption and decryption into sample Kinesis producer and consumer applications using the Amazon Kinesis Producer Library (KPL), the Amazon Kinesis Consumer Library (KCL), AWS KMS, and the aws-encryption-sdk. The methods and the techniques used in this post to encrypt and decrypt Kinesis records can be easily replicated into your architecture. Some constraints:

  • AWS charges for the use of KMS API requests for encryption and decryption, for more information see AWS KMS Pricing.
  • You cannot use Amazon Kinesis Analytics to query Amazon Kinesis Streams with records encrypted by clients in this sample application.
  • If your application requires low latency processing, note that there will be a slight hit in latency.

The following diagram shows the architecture of the solution.

Encrypting the records at the producer

Before you call the PutRecord or PutRecords API, you will encrypt the string record by calling KinesisEncryptionUtils.toEncryptedString.

In this example, we used a sample stock sales ticker object:

example {"tickerSymbol": "AMZN", "salesPrice": "900", "orderId": "300", "timestamp": "2017-01-30 02:41:38"}. 

The method (KinesisEncryptionUtils.toEncryptedString) call takes four parameters:

  • amazonaws.encryptionsdk.AwsCrypto
  • stock sales ticker object
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • util.Map of an encryption context

A ciphertext is returned back to the main caller which is then also checked for size by calling KinesisEncryptionUtils.calculateSizeOfObject. Encryption increases the size of an object. To prevent the object from being throttled, the size of the payload (one or more records) is validated to ensure it is not greater than 1MB. In this example encrypted records sizes with payload exceeding 1MB are logged as warning. If the size is less than the limit, then either addUserRecord or PutRecord and PutRecords are called if you are using the KPL or the Kinesis Streams API respectively 

Example: Encrypting records with KPL

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context);
log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000)
   log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encrypted record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
//Adding the encrypted record to stream
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, randomPartitionKey(), data);
Futures.addCallback(f, callback);

In the above code, the example sales ticker record is passed to the KinesisEncryptionUtils.toEncryptedString and an encrypted record is returned. The encryptedRecord value is also passed to KinesisEncryptionUtils.calculateSizeOfObject and the size of the encrypted payload is returned and checked to see if it is less than 1MB. If it is, the payload is then UTF-8 encoded (KinesisEncryptionUtils.toEncryptedByteStream), then sent to the stream for processing.

Example: Encrypting the records with Streams PutRecord

//Encrypting the records
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov, context);
log.info("Size of encrypted object is : " + KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
//check if size of record is greater than 1MB
if (KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) > 1024000)
    log.warn("Record added is greater than 1MB and may be throttled");
//UTF-8 encoding of encryptyed record
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
putRecordRequest.setData(data);
putRecordRequest.setPartitionKey(randomPartitionKey());
//putting the record into the stream
kinesis.putRecord(putRecordRequest);

Verifying that records are encrypted

After the call to KinesisEncryptionUtils.toEncryptedString, you can print out the encrypted string record just before UTF-8 encoding. An example of what is printed to standard output when running this sample application is shown below.

[main] INFO kinesisencryption.streams.EncryptedProducerWithStreams - String Record is TickerSalesObject{tickerSymbol='FB', salesPrice='184.285409142', orderId='2a0358f1-9f8a-4bbe-86b3-c2929047e15d', timeStamp='2017-01-30 02:41:38'} and Encrypted Record String is AYADeMf6zmVg9JvIkGNv5M39rhUAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFpUkpCaG1UOFQ3UTZQZ253dm9FSU9iUDZPdE1xTHdBZ1JjNlZxN2doMDZ3QlBEWUZndWdJSEFKaXNvT0ZPUGsrdz09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDCCYBk+hfB3tOGVx7QIBEIA7FqaEcOWpic+gKNeT+dUe4yttB9dsZSFPAUTlz2L2zlyLXSLMh1otRH24SO485ov+TCTtRCgiA8a9rYQCAAAAAAwAABAArlGWPO8BavNSJIpJOtJekRUhOwbM+WM1NBVXB/////8AAAABXNZnRND3J7u8EZx3AAAAkfSxVPMYUv0Ovrd4AIUTmMcaiR0Z+IcJNAXqAhvMmDKpsJaQG76Q6pYExarolwT+6i87UOi6TGvAiPnH74GbkEniWe66rAF6mOra2JkffK6pBdhh95mEOGLaVPBqs2jswUTfdcBJQl9NEb7wx9XpFX8fNDF56Vly7u6f8OQ7lY6fNrOupe5QBFnLvwehhtogd72NTQ/yEbDDoPKUZN3IlWIEAGYwZAIwISFw+zdghALtarsHSIgPMs7By7/Yuda2r3hqSmqlCyCXy7HMFIQxHcEILjiLp76NAjB1D8r8TC1Zdzsfiypi5X8FvnK/6EpUyFoOOp3y4nEuLo8M2V/dsW5nh4u2/m1oMbw=

You can also verify that the record stayed encrypted in Streams by printing out the UTF-8 decoded received record immediately after the getRecords API call. An example of the print output when running the sample application is shown below.

[Thread-2] INFO kinesisencryption.utils.KinesisEncryptionUtils - Verifying object received from stream is encrypted. -Encrypted UTF-8 decoded : AYADeBJz/kt7Fm3L1lvS8Wy8jhAAbgACAAdLaW5lc2lzAARjYXJzABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREFrM2N4K2s1ODJuOGVlNWF3TVJ1dk1UUHZQc2FHeGoxQisxb09kNWtDUExHYjJDS0lMZW5LSnlYakRmdFR4dzQyUT09AAEAB2F3cy1rbXMAS2Fybjphd3M6a21zOnVzLWVhc3QtMTo1NzM5MDY1ODEwMDI6a2V5LzM3ZGM5MGRjLTNmMWMtNGE3Ny1hNTFkLWE2NTNiMTczZmNkYgCnAQEBAHgbPoaYTiF/oIMp49yPBkZmVVotylZpUqwkkzJJicLjLQAAAH4wfAYJKoZIhvcNAQcGoG8wbQIBADBoBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDAGI3oWLlIJ2p6kffQIBEIA7JVUOTsLtEyNK8vS4GIS9iyTejuB2xhIpRXfG8o0lUfHawcrCbNbNH8XLm/8RW5JbgXo10EpOs8dSjkICAAAAAAwAABAAy64r24sGVKWN4C1gXCwJYHvZkLpJJj16SZlhpv////8AAAABg2pPFchIiaM7D9VuAAAAkwh10ul5sZQ08KsgkFszOOvFoQu95CiY7cK8H+tBloVOZglMqhhhvoIIZLr9hmI8/lQvRXzGDdo7Xkp0FAT5Jpztt8Hq/ZuLfZtNYIWOw594jShqpZt6uXMdMnpb/38R3e5zLK5vrYkM6NS4WPMFrHsOKN5tn0CDForgojRcdpmCJ8+cWLNltb2S+EJiWiyWS+ibw2vJ/RFm6WZO6nD+MXn3vyMAZzBlAjAuIUTYL1cbQ3ENxDIeXHJAWQguNPqxq4HgaCmCEI9/rn/GAKSc2nT9ln3UsVq/2dgCMQC7yNJ3DCTnppavfxTbcVS+rXaDDpZZx/ZsluMqXAFM5/FFvKRqr0dVML28tGunxmU=

Decrypting the records at the consumer

After you receive the records into your consumer as a list, you can get the data as a ByteBuffer by calling record.getData. You then decode and decrypt the byteBuffer by calling the KinesisEncryptionUtils.decryptByteStream. This method takes five parameters:

  • amazonaws.encryptionsdk.AwsCrypto
  • record ByteBuffer
  • amazonaws.encryptionsdk.kms.KmsMasterKeyProvider
  • key arn string
  • java.util.Map of your encryption context

A string representation of the ticker sales object is returned back to the caller for further processing. In this example, this representation is just printed to standard output.

[Thread-2] INFO kinesisencryption.streams.DecryptShardConsumerThread - Decrypted Text Result is TickerSalesObject{tickerSymbol='AMZN', salesPrice='304.958313333', orderId='50defaf0-1c37-4e84-85d7-bc15597355eb', timeStamp='2017-01-30 02:41:38'}

Example: Decrypting records with the KCL and Streams API

ByteBuffer buffer = record.getData();
//Decrypting the encrypted record data
String decryptedResult = KinesisEncryptionUtils.decryptByteStream(crypto,buffer,prov,this.getKeyArn(), this.getContext());
log.info("Decrypted Text Result is " + decryptedResult);

With the above code, records in the Kinesis Streams are decrypted using the same key ARN and encryption context that was previously used to encrypt it at the producer side.

Maven dependencies

To use the implementation I’ve outlined in this post, you need to use a few maven dependencies outlined below in the pom.xml together with the Bouncy Castle libraries. Bouncy Castle provides a cryptography API for Java.

 <dependency>
        <groupId>org.bouncycastle</groupId>
        <artifactId>bcprov-ext-jdk15on</artifactId>
        <version>1.54</version>
    </dependency>
<dependency>
   <groupId>com.amazonaws</groupId>
   <artifactId>aws-encryption-sdk-java</artifactId>
   <version>0.0.1</version>
</dependency>

Summary

You may incorporate above sample code snippets or use it as a guide in your application code to just start encrypting and decrypting your records to and from an Amazon Kinesis Stream.

A complete producer and consumer example application and a more detailed step-by-step example of developing an Amazon Kinesis producer and consumer application on AWS with encrypted records is available at the kinesisencryption github repository.

If you have questions or suggestions, please comment below.


About the Author

Temitayo Olajide is a Cloud Support Engineer with Amazon Web Services. He works with customers to provide architectural solutions, support and guidance to implementing high velocity streaming data applications in the cloud. In his spare time, he plays ping-pong and hangs out with family and friends

 

 


Related

Secure Amazon EMR with Encryption

 

 

Month in Review: February 2017

Post Syndicated from Derek Young original https://aws.amazon.com/blogs/big-data/month-in-review-february-2017/

Another month of big data solutions on the Big Data Blog!

Take a look at our summaries below and learn, comment, and share. Thank you for reading!

NEW POSTS

Implement Serverless Log Analytics Using Amazon Kinesis Analytics
In this post, learn how how to implement a solution that analyzes streaming Apache access log data from an EC2 instance aggregated over 5 minutes.

Migrate External Table Definitions from a Hive Metastore to Amazon Athena
For customers who use Hive external tables on Amazon EMR, or any flavor of Hadoop, a key challenge is how to effectively migrate an existing Hive metastore to Amazon Athena, an interactive query service that directly analyzes data stored in Amazon S3. In this post, learn an approach to migrate an existing Hive metastore to Athena, as well as how to use the Athena JDBC driver to run scripts.

AWS Big Data is Coming to HIMSS!
This year’s HIMSS conference was held at the Orange County Convention Center in Orlando, Florida from February 20 – 23. This blog post lists past AWS Big Data Blog posts to show how AWS technologies are being used to improve healthcare.

Create Tables in Amazon Athena from Nested JSON and Mappings Using JSONSerDe
In this post, you will use the tightly coupled integration of Amazon Kinesis Firehose for log delivery, Amazon S3 for log storage, and Amazon Athena with JSONSerDe to run SQL queries against these logs without the need for data transformation or insertion into a database.

Scheduled Refresh for SPICE Data Sets on Amazon QuickSight
QuickSight uses SPICE (Super-fast, Parallel, In-Memory Calculation Engine), a fully managed data store that enables blazing fast visualizations and can ingest data from AWS, on-premises, and cloud sources. Data in SPICE could be refreshed at any time with the click of a button within QuickSight. This post announced the ability to schedule these refreshes!

Harmonize, Search, and Analyze Loosely Coupled Datasets on AWS
You have come up with an exciting hypothesis, and now you are keen to find and analyze as much data as possible to prove (or refute) it. There are many datasets that might be applicable, but they have been created at different times by different people and don’t conform to any common standard. In this blog post, we will describe a sample application that illustrates how to solve these problems. You can install our sample app, which will harmonize and index three disparate datasets to make them searchable, present a data-driven, customizable UI for searching the datasets to do preliminary analysis and to locate relevant datasets, and integrate with Amazon Athena and Amazon QuickSight for custom analysis and visualization.

FROM THE ARCHIVE

Building Event-Driven Batch Analytics on AWS
Modern businesses typically collect data from internal and external sources at various frequencies throughout the day. In this post, you learn an elastic and modular approach for how to collect, process, and analyze data for event-driven applications in AWS.


Want to learn more about Big Data or Streaming Data? Check out our Big Data and Streaming data educational pages.

Leave a comment below to let us know what big data topics you’d like to see next on the AWS Big Data Blog.