Tag Archives: Amazon Kinesis Analytics

Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/

Stream processing facilitates the collection, processing, and analysis of real-time data and enables the continuous generation of insights and quick reactions to emerging situations. This capability is useful when the value of derived insights diminishes over time. Hence, the faster you can react to a detected situation, the more valuable the reaction is going to be. Consider, for instance, a streaming application that analyzes and blocks fraudulent credit card transactions while they occur. Compare that application to a traditional batch-oriented approach that identifies fraudulent transactions at the end of every business day and generates a nice report for you to read the next morning.

It is quite common for the value of insights to diminish over time. Therefore, using stream processing can substantially improve the value of your analytics application. However, building and operating a streaming application that continuously receives and processes data is much more challenging than operating a traditional batch-oriented analytics application.

In this post, we discuss how you can use Apache Flink and Amazon Kinesis Data Analytics for Java Applications to address these challenges. We explore how to build a reliable, scalable, and highly available streaming architecture based on managed services that substantially reduce the operational overhead compared to a self-managed environment. We particularly focus on how to prepare and run Flink applications with Kinesis Data Analytics for Java Applications. To this end, we use an exemplary scenario that includes source code and AWS CloudFormation templates. You can follow along with this example using your own AWS account or adapt the code according to your specific requirements.

Challenges of running streaming applications

When you build a streaming application, the downstream systems naturally rely on a continuous and timely generation of output. Accordingly, there are much higher requirements on the availability of the streaming application. There is also much less time to address operational issues compared to a traditional batch-based approach. In a batch-processing scenario, when a job that runs once at the end of a business day fails, you can often restart the failed job and still complete the computation by the next morning, when the results are needed. In contrast, when a streaming application fails, downstream systems that consume the output might be affected within minutes, or even sooner, when the expected output no longer arrives in time.

Moreover, in case of failure, you can’t simply delete all intermediate results and restart a failed processing job, as it is commonly done in the batch-processing case. The output of a streaming job is continuously consumed by downstream systems. Output that has already been consumed cannot easily be retracted. Therefore, the entire processing pipeline is much more sensitive to duplicates that are introduced by an application that is restarted on failure. Furthermore, the computations of a streaming application often rely on some kind of internal state that can be corrupted or even lost when the application fails.

Last but not least, streaming applications often deal with a varying amount of throughput. Therefore, scaling the application according to the current load is highly desirable. When the load increases, the infrastructure that supports the streaming application must scale to keep the application from becoming overloaded, falling behind, and producing results that are no longer relevant. On the other hand, when the load decreases, the infrastructure should scale in again to remain cost effective by not provisioning more resources than are needed.

A reliable and scalable streaming architecture based on Flink and Kinesis Data Analytics for Java Applications

Apache Flink is an open-source project that is tailored to stateful computations over unbounded and bounded datasets. Flink addresses many of the challenges that are common when analyzing streaming data by supporting different APIs (including Java and SQL), rich time semantics, and state management capabilities. It can also recover from failures while maintaining exactly-once processing semantics. Therefore, Flink is well suited for analyzing streaming data with low latency.

In this post, we illustrate how to deploy, operate, and scale a Flink application with Kinesis Data Analytics for Java Applications. We use a scenario to analyze the telemetry data of a taxi fleet in New York City in near-real time to optimize the fleet operation. In this scenario, every taxi in the fleet is capturing information about completed trips. The tracked information includes the pickup and drop-off locations, number of passengers, and generated revenue. This information is ingested into a Kinesis data stream as a simple JSON blob. From there, the data is processed by a Flink application, which is deployed to Kinesis Data Analytics for Java Applications. This application identifies areas that are currently requesting a high number of taxi rides. The derived insights are finally persisted into Amazon Elasticsearch Service, where they can be accessed and visualized using Kibana.

This scenario leads to the following architecture, which is separated into three stages for the ingestion, processing, and presentation of data.

Separating the different aspects of the infrastructure is a common approach in this domain and has several benefits over a more tightly coupled architecture.

First, the Kinesis data stream serves as a buffer that decouples the producers from the consumers. Taxis can persist the events that they generate into the data stream regardless of the condition of, for instance, the processing layer, which might be currently recovering from a node failure. Likewise, the derived data remains available through Kibana even if the ingestion or processing layer is currently unavailable due to some operational issues. Last but not least, all components can be scaled independently and can use infrastructure that is specifically tailored according to their individual requirements.

This architecture also allows you to experiment and adopt new technologies in the future. Multiple independent applications can concurrently consume the data stored in the Kinesis data stream. You can then test how a new version of an existing application performs with a copy of the production traffic. But you can also introduce a different tool and technology stack to analyze the data, again without affecting the existing production application. For example, it is common to persist the raw event data to Amazon S3 by adding a Kinesis Data Firehose delivery stream as a second consumer to the Kinesis data stream. This facilitates long-term archiving of the data, which you can then use to evaluate ad hoc queries or analyze historic trends.

All in all, separating the different aspects of the architecture into ingestion, processing, and presentation nicely decouples different components, making the architecture more robust. It furthermore allows you to choose different tools for different purposes and gives you a lot of flexibility to change or evolve the architecture over time.

For the rest of this post, we focus on using Apache Flink and Kinesis Data Analytics for Java Applications to identify areas that currently request a high number of taxi rides. We also derive the average trip duration to the New York City airports. But with this architecture, you also have the option to consume the incoming events using other tools, such as Apache Spark Structured Streaming and Kinesis Data Firehose, instead of, or in addition to, what is described here.

Let’s kick the tires!

To see the described architecture in action, execute the following AWS CloudFormation template in your own AWS account. The template first builds the Flink application that analyzes the incoming taxi trips, including the Flink Kinesis Connector that is required to read data from a Kinesis data stream. It then creates the infrastructure and submits the Flink application to Kinesis Data Analytics for Java Applications.

The entire process of building the application and creating the infrastructure takes about 20 minutes. After the AWS CloudFormation stack is created, the Flink application has been deployed as a Kinesis Data Analytics for Java application. It then waits for events in the data stream to arrive. Checkpointing is enabled so that the application can seamlessly recover from failures of the underlying infrastructure while Kinesis Data Analytics for Java Applications manages the checkpoints on your behalf. In addition, automatic scaling is configured so that Kinesis Data Analytics for Java Applications automatically allocates or removes resources and scales the application (that is, it adapts its parallelism) in response to changes in the incoming traffic.

To populate the Kinesis data stream, we use a Java application that replays a public dataset of historic taxi trips made in New York City into the data stream. The Java application has already been downloaded to an Amazon EC2 instance that was provisioned by AWS CloudFormation. You just need to connect to the instance and execute the JAR file to start ingesting events into the stream.

You can obtain all of the following commands, including their correct parameters, from the output section of the AWS CloudFormation template that you executed previously.

$ ssh [email protected]«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-1.0.jar -stream «Kinesis data stream name» -region «AWS region» -speedup 3600

The speedup parameter determines how much faster the data is ingested into the Kinesis data stream relative to the actual occurrence of the historic events. With the given parameters, the Java application ingests an hour of historic data within one second. This results in a throughput of roughly 13k events and 6 MB of data per second, which completely saturates the Kinesis data stream (more on this later).

You can then go ahead and inspect the derived data through the Kibana dashboard that has been created. Or you can create your own visualizations to explore the data in Kibana.

https://«Elasticsearch endpoint»/_plugin/kibana/app/kibana#/dashboard/nyc-tlc-dashboard

The prepared Kibana dashboard contains a heatmap and a line graph. The heatmap visualizes locations where taxis are currently requested, and it shows that the highest demand for taxis is Manhattan. Moreover, the JFK and LaGuardia airports are also spots on the map where substantially more rides are requested compared to their direct neighborhoods. The line graph visualizes the average trip duration to these two airports. The following image shows how it steadily increases throughout the day until it abruptly drops in the evening.

For this post, the Elasticsearch cluster is configured to accept connections from the IP address range specified as a parameter of the AWS CloudFormation template. For production workloads, it’s much more desirable to further tighten the security of your Elasticsearch domain, for instance, by using Amazon Cognito for Kibana access control.

Scaling the architecture to increase its throughput

For this post, the Kinesis data stream was deliberately underprovisioned so that the Java application is completely saturating the data stream. When you closely inspect the output of the Java application, you’ll notice that the “replay lag” is continuously increasing. This means that the producer cannot ingest events as quickly as it is required according to the specified speedup parameter.

You can dive deeper into the metrics of the data stream by accessing it through an Amazon CloudWatch Dashboard. You can then see that the WriteProvisionedThroughputExceeded metric is slightly increased: Roughly 0.4 percent of the records are not accepted into the stream as the respective requests are throttled. In other terms, the data stream is underprovisioned, in particular as the producer pauses the ingestion of new events when too many events are in flight.

To increase the throughput of the data stream, you can simply update the number of shards from 6 to 12 with a couple of clicks on the console and through an API call, respectively. For production environments, you might even want to automate this procedure. For details on how to automatically scale a Kinesis data stream, see the blog post Scaling Amazon Kinesis Data Streams with AWS Application Auto Scaling.

When the scaling operation of the stream finishes, you can observe how the “replay lag” decreases and more events are ingested into the stream.

However, as a direct result, more events need to be processed. So now the Kinesis Data Analytics for Java application becomes overloaded and can no longer keep up with the increased number of incoming events. You can observe this through the millisBehindLatest metric, which is published to CloudWatch. The metric reports the time difference between the oldest record currently read by the Kinesis Data Analytics for Java application and the latest record in the stream according to the ingestion time in milliseconds. So it indicates how much behind the processing is from the tip of the stream.

As these metrics show, 10 minutes after the scaling operation finishes, processing is already more than 3 minutes behind the latest event in the stream. Even worse, it steadily keeps falling behind, continuously widening this gap.

However, in contrast to Kinesis Data Streams, Kinesis Data Analytics for Java Applications natively supports auto scaling. After a couple of minutes, you can see the effect of the scaling activities in the metrics. The millisBehindLatest metric starts to decrease until it reaches zero, when the processing has caught up with the tip of the Kinesis data stream.

However, notice how the millisBehindLatest metric spikes just before it starts to decline. This is caused by the way that scaling a Kinesis Data Analytics for Java application works today. To scale a running application, the internal state of the application is persisted into a so-called savepoint. This savepoint is exposed as a snapshot by Kinesis Data Analytics for Java Applications. Subsequently, the running instance of the application is terminated, and a new instance of the same application with more resources and a higher parallelism is created. The new instance of the application then populates its internal state from the snapshot and resumes the processing from where the now terminated instance left off.

Accordingly, the scaling operation causes a brief interruption of the processing, which explains the spike in metric. However, this operation is transparent to the producers and consumers. Producers can continue to write to the Kinesis data stream because they are nicely decoupled from the application. Likewise, consumers can still use Kibana to view their dashboards, although they might not see the latest data because it hasn’t yet been processed.

Let’s step back for a moment and review what you just did: You created a fully managed, highly available, scalable streaming architecture. You ingested and analyzed up to 25k events per second. You doubled the throughput of the architecture by scaling the Kinesis data stream and the Kinesis Data Analytics for Java application with a couple of clicks. You did all this while the architecture remained fully functional and kept receiving and processing events, not losing a single event. You also could have scaled the Elasticsearch cluster as seamlessly as the other components. But we’ll leave that as an exercise for the interested reader.

Try to imagine what it would have taken you to build something similar from scratch.

Prepare Flink applications for Kinesis Data Analytics for Java Applications

Now that you have seen the streaming application in action, let’s look at what is required to deploy and run a Flink application with Kinesis Data Analytics for Java Applications.

Similar to other deployment methods, the Flink application is first built and packaged into a fat JAR, which contains all the necessary dependencies for the application to run. The resulting fat JAR is then uploaded to Amazon S3. The location of the fat JAR on S3 and some additional configuration parameters are then used to create an application that can be executed by Kinesis Data Analytics for Java Applications. So instead of logging in to a cluster and directly submitting a job to the Flink runtime, you upload the respective fat JAR to S3. You then create a Kinesis Data Analytics for Java application that you can interact with using API calls, the console, and the AWS CLI, respectively.

Adapt the Flink configuration and runtime parameters

To obtain a valid Kinesis Data Analytics for Java application, the fat JAR of the Flink application must include certain dependencies. When you use Apache Maven to build your Flink application, you can simply add another dependency to the .pom file of your project.

<!—pom.xml ->
<project>
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-kinesisanalytics-runtime</artifactId>
            <version>1.0.1</version>
        </dependency>
    </dependencies>
    ...
</project>

You can then specify parameters that are passed to the resulting Kinesis Data Analytics for Java application when it is created or updated. These parameters are basically key-value pairs that are contained in a property map that is part of a property group.

"ApplicationConfiguration": {
    "EnvironmentProperties": {
        "PropertyGroups": [
            {
                "PropertyGroupId": "FlinkApplicationProperties",
                "PropertyMap": {
                    "InputStreamName": "...",
                    ...
                }
            }
        ]
    },
    ...
}

You can then obtain the values of these parameters in the application code from the Kinesis Data Analytics for Java Applications runtime. For example, the following code snippet gets the name of the Kinesis data stream that the application should connect to from the FlinkApplicationProperties property group.

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

String kinesisStreamName = flinkProperties.getProperty("InputStreamName");

You use the same mechanism to configure other properties for the Kinesis Data Analytics for Java application (for example, checkpointing and the parallelism of the application) that are usually specified as a parameter or configuration option directly to the Flink runtime.

"ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
        "CheckpointConfiguration": {
            "ConfigurationType": "DEFAULT"
        },
        "MonitoringConfiguration": {
            "ConfigurationType": "CUSTOM",
            "MetricsLevel": "TASK",
            "LogLevel": "INFO"
        },
        "ParallelismConfiguration": {
            "ConfigurationType": "DEFAULT"
        }
    },
    ...
}

With this configuration, the checkpointing and parallelism settings are left at their default. This enables checkpointing and auto scaling and sets the initial parallelism of the Kinesis Data Analytics for Java application to one. Moreover, the log level is increased to INFO and CloudWatch metrics are collected for every subtask of the application.

Build the Flink Kinesis Connector

When you are building a Flink application that reads data from a Kinesis data stream, you might notice that the Flink Kinesis Connector is not available from Maven central. You actually need to build it yourself. The following steps build the connector for any recent Apache Flink release. However, because Kinesis Data Analytics for Java Applications is based on Flink 1.6.2, you can use this specific version for now.

$ wget -qO- https://github.com/apache/flink/archive/release-1.6.2.zip | bsdtar -xf-

$ cd flink-release-1.6.2

$ mvn clean package -B -DskipTests -Dfast -Pinclude-kinesis -pl flink-connectors/flink-connector-kinesis

Note that the connector has already been built and stored on S3 by the AWS CloudFormation template. You can simply download the JAR file of the connector from there and put it in your local Maven repository using the following Maven command:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.6.2.jar -DpomFile flink-connector-kinesis_2.11-1.6.2.pom.xml

Integrate the Flink Elasticsearch sink with Amazon Elasticsearch Service

Beginning with the 1.6 release, Apache Flink comes with an Elasticsearch connector that supports the Elasticsearch APIs over HTTP. Therefore, it can natively talk to the endpoints that are provided by Amazon Elasticsearch Service.

You just need to decide how to authenticate requests against the public endpoint of the Elasticsearch cluster. You can whitelist individual IPs for access to the cluster. However, the recommended way of authenticating against the Amazon Elasticsearch Service endpoint is to add authentication information to AWS requests using IAM credentials and the Signature Version 4 signing process.

To extend the Flink Elasticsearch connector, which is not aware of the AWS specific signing process, you can use the open-source aws-signing-request-interceptor, which is available from Maven central. You just need to add an interceptor to the Elasticsearch sink that is called just before the request is sent to the Amazon Elasticsearch Service endpoint. The interceptor can then sign the request using the permission of the role that has been configured for the Kinesis Data Analytics for Java application.

final List<HttpHost> httpHosts = Arrays.asList(HttpHost.create("https://...")));

ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<T>() {
      ...
    }
);

final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, "eu-west-1", "es", clock);

esSinkBuilder.setRestClientFactory(
    restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(
        callback -> callback.addInterceptorLast(new AWSSigningRequestInterceptor(awsSigner))
    )
);

esSinkBuilder.build();

Note that the actual code in the GitHub repository is a bit more sophisticated because you need to obtain a serializable request interceptor. But the basic approach to sign requests remains the same.

Monitor and debug the Flink application

When running a Kinesis Data Analytics for Java application, you don’t get direct access to the cluster that runs Flink. This is because the underlying infrastructure is completely managed by the service. You merely interact with the service through an API. However, you can still obtain metrics and logging information through CloudWatch and CloudWatch Logs, respectively.

The Kinesis Data Analytics for Java application exposes a lot of operational metrics, ranging from metrics for the entire application down to metrics for individual processes of operators of the application. You can control which level of detail is adequate or required for your purposes. In fact, the metrics used in the previous section were all obtained through CloudWatch.

In addition to operational metrics, you can configure the Kinesis Data Analytics for Java application to write messages to CloudWatch Logs. This capability seamlessly integrates with common logging frameworks, such as Apache Log4j and the Simple Logging Facade for Java (SLF4J). So it is useful for debugging and identifying the cause of operational issues.

To enable logging for your Kinesis Data Analytics for Java application, just specify an existing CloudWatch log stream as a logging option when you start the application, as follows:

final Logger LOG = LoggerFactory.getLogger(...);

LOG.info("Starting to consume events from stream {}", flinkProperties.getProperty("InputStreamName"));

After the log messages are persisted into CloudWatch Logs, you can easily query and analyze them through CloudWatch Logs Insights

Conclusion

In this post, you not only built a reliable, scalable, and highly available streaming application based on Apache Flink and Kinesis Data Analytics for Java Applications. You also scaled the different components while ingesting and analyzing up to 25k events per second in near-real time. In large parts, this scenario was enabled by using managed services, so you didn’t need to spend time on provisioning and configuring the underlying infrastructure.

The sources of the application and the AWS CloudFormation template used in this post are available from GitHub for your reference. You can dive into all the details of the Flink application and the configuration of the underlying services. I’m curious to see what you will build when you can focus on analyzing data in a streaming fashion rather than spending time on managing and operating infrastructure.

 


About the Author

Steffen Hausmann is a specialist solutions architect with AWS.

 

 

 

 

Create real-time clickstream sessions and run analytics with Amazon Kinesis Data Analytics, AWS Glue, and Amazon Athena

Post Syndicated from Hugo Rozestraten original https://aws.amazon.com/blogs/big-data/create-real-time-clickstream-sessions-and-run-analytics-with-amazon-kinesis-data-analytics-aws-glue-and-amazon-athena/

Clickstream events are small pieces of data that are generated continuously with high speed and volume. Often, clickstream events are generated by user actions, and it is useful to analyze them.

For example, you can detect user behavior in a website or application by analyzing the sequence of clicks a user makes, the amount of time the user spends, where they usually begin the navigation, and how it ends. By tracking this user behavior in real time, you can update recommendations, perform advanced A/B testing, push notifications based on session length, and much more. To track and analyze these events, you need to identify and create sessions from them. The process of identifying events in the data and creating sessions is known as sessionization.

Capturing and processing data clickstream events in real time can be difficult. As the number of users and web and mobile assets you have increases, so does the volume of data. Amazon Kinesis provides you with the capabilities necessary to ingest this data in real time and generate useful statistics immediately so that you can take action.

When you run sessionization on clickstream data, you identify events and assign them to a session with a specified key and lag period. After each event has a key, you can perform analytics on them. The use cases for sessionization vary widely, and have different requirements. For example, you might need to identify and create sessions from events in web analytics to track user actions. Sessionization is also broadly used across many different areas, such as log data and IoT.

This blog post demonstrates how to identify and create sessions from real-time clickstream events and then analyze them using Amazon Kinesis Data Analytics.

Why did we choose Kinesis Data Analytics?

Clickstream data arrives continuously as thousands of messages per second receiving new events. When you analyze the effectiveness of new application features, site layout, or marketing campaigns, it is important to analyze them in real time so that you can take action faster.

To perform the sessionization in batch jobs, you could use a tool such as AWS Glue or Amazon EMR. But with daily schedules, queries and aggregation, it can take more resources and time because each aggregation involves working with large amounts of data. Performing sessionization in Kinesis Data Analytics takes less time and gives you a lower latency between the sessions generation. You can trigger real-time alerts with AWS Lambda functions based on conditions, such as session time that is shorter than 20 seconds, or a machine learning endpoint.

Identifying a session among thousands of clicks

A session is a short-lived and interactive exchange between two or more devices and/or users. For example, it can be a user browsing and then exiting your website, or an IoT device waking up to perform a job and then going back to sleep. These interactions result in a series of events that occur in sequence that start and end, or a session. A start and an end of a session can be difficult to determine, and are often defined by a time period without a relevant event associated with a user or device. A session starts when a new event arrives after a specified “lag” time period has passed without an event arriving. A session ends in a similar manner, when a new event does not arrive within the specified lag period.

This blog post relies on several other posts about performing batch analytics on SQL data with sessions. My two favorite posts on this subject are Sessionization in SQL, Hive, Pig and Python from Dataiku and Finding User Session with SQL by Benn Stancil at Mode. Both posts take advantage of SQL window functions to identify and build sessions from clickstream events.

ANSI added SQL window functions to the SQL standard in 2003 and has since expanded them. Window functions work naturally with streaming data and enable you to easily translate batch SQL examples to Kinesis Data Analytics.

In this use case, I group the events of a specific user as described in the following simplified example. In this example, I use distinct navigation patterns from three users to analyze user behavior. To begin, I group events by user ID to obtain some statistics from data, as shown following:

In this example, for “User ID 20,” the minimum timestamp is 2018-11-29 23:35:10 and the maximum timestamp is 2018-11-29 23:35:44. This provides a 34 seconds-long session, starting with action “B_10” and ending with action “A_02.” These “actions” are identification of the application’s buttons in this example.

Suppose that after several minutes, new “User ID 20” actions arrive. Would you consider them as running in the same session? A user can abort a navigation or start a new one. Also, applications often have timeouts. You have to decide what is the maximum session length to consider it a new session. A session can run anywhere from 20 to 50 seconds, or from 1 to 5 minutes.

There are other elements that you might want to consider, such as a client IP or a machine ID. These elements allow you to separate sessions that occur on different devices.

High-level solution overview

The end-to-end scenario described in this post uses Amazon Kinesis Data Streams to capture the clickstream data and Kinesis Data Analytics to build and analyze the sessions. The aggregated analytics are used to trigger real-time events on Lambda and then send them to Kinesis Data Firehose. Kinesis Data Firehose sends data to an Amazon S3 bucket, where it is ingested to a table by an AWS Glue crawler and made available for running queries with Amazon Athena. You can use this table for ad hoc analysis.

The following diagram shows an end-to-end sessionization solution.

  • Data ingestion: You can use Kinesis Data Streams to build custom applications that process or analyze streaming data for specialized needs. Kinesis Data Streams can continuously capture and store terabytes of data per hour from hundreds of thousands of sources, such as website clickstreams, financial transactions, social media feeds, IT logs, and location-tracking events.
  • Data sessionization: Kinesis Data Analytics is the easiest way to process streaming data in real time with standard SQL without having to learn new programming languages or processing frameworks. With Kinesis Data Analytics, you can query streaming data or build entire streaming applications using SQL, so that you can gain actionable insights and respond to your business and customer needs promptly.
  • Data processing and storage: The sessionization stream is read from Kinesis Data Analytics using an AWS Lambda function. The function triggers two events: one real-time dashboard in Amazon CloudWatch and a second one to persist data with Kinesis Data Firehose.
  • Data analysis: AWS Glue is used to crawl Amazon S3 and build or update metadata definition for Amazon Athena tables.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena provides connectivity to any application using JDBC or ODBC drivers.

  • Data visualization: Amazon QuickSight is a visualization tool that is natively used to build dashboards over Amazon Athena data.
  • Monitoring: Amazon CloudWatch is a tool that lets you monitor the streaming activities, such as the number of bytes processed or delivered per second, or the number of failures.

After you finish the sessionization stage in Kinesis Data Analytics, you can output data into different tools. For example, you can use a Lambda function to process the data on the fly and take actions such as send SMS alerts or roll back a deployment. To learn how to implement such workflows based on AWS Lambda output, see my other blog post Implement Log Analytics using Amazon Kinesis Data Analytics. In this post, we send data to Amazon CloudWatch, and build a real-time dashboard.

Lambda clickstream generator

To generate the workload, you can use a Python Lambda function with random values, simulating a beer-selling application.

The same user ID can have sessions on different devices, such as a tablet, a browser, or a phone application. This information is captured by the device ID. As a result, the data for the Lambda function payload has these parameters: a user ID, a device ID, a client event, and a client timestamp, as shown in the following example.

The following is the code for the Lambda function payload generator, which is scheduled using CloudWatch Events scheduled events:

...
def getReferrer():
    x = random.randint(1,5)
    x = x*50 
    y = x+30 
    data = {}
    data['user_id'] = random.randint(x,y)
    data['device_id'] = random.choice(['mobile','computer', 'tablet', 'mobile','computer'])
    data['client_event'] = random.choice(['beer_vitrine_nav','beer_checkout','beer_product_detail',
    'beer_products','beer_selection','beer_cart'])
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['client_timestamp'] = str_now
    return data

def lambda_handler(event, context):
...
        data = json.dumps(getReferrer())
        kinesis.put_record(
                StreamName='sessionsclicks',
                Data=data,
                PartitionKey='partitionkey')

As a result, the following payloads are sent to Kinesis Data Analytics:

Using window SQL functions in Kinesis Data Analytics

Grouping sessions lets us combine all the events from a given user ID or a device ID that occurred during a specific time period. Amazon Kinesis Data Analytics SQL queries in your application code execute continuously over in-application streams. You need to specify bounded queries using a window defined in terms of time or rows. These queries are called window SQL functions.

I had three available options for windowed query functions in Kinesis Data Analytics: sliding windows, tumbling windows, and stagger windows. I chose stagger window because it has some good features for the sessionization use case, as follows:

  • Stagger windows open when the first event that matches a partition key condition arrives. So for each key, it evaluates its particular window as opposed to the other window functions that evaluate one unique window for all the partition keys matched.
  • When dealing with clickstreams, you cannot rely on the order that events arrive in the stream, but when the stream was generated. Stagger windows handle the arrival of out-of-order events well. The time when the window is opened and when the window closes is considered based on the age specified, which is measured from the time when the window opened.

To partition by the timestamp, I chose to write two distinct SQL functions.

In Kinesis Data Analytics, SOURCE_SQL_STREAM_001 is by default the main stream from the source. In this case, it’s receiving the source payload from Kinesis Data Streams.

Kinesis Data Analytics SQL – Create a stream

The following function creates a stream to receive the query aggregation result:

-- CREATE a Stream to receive the query aggregation result
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(
  session_id VARCHAR(60),
  user_id INTEGER,
  device_id VARCHAR(10),
  timeagg timestamp,
  events INTEGER,
  beginnavigation VARCHAR(32),
  endnavigation VARCHAR(32),
  beginsession VARCHAR(25),
  endsession VARCHAR(25),
  duration_sec INTEGER
);

Kinesis Data Analytics SQL – Using a SECOND interval “STEP” function

The following function creates the PUMP and inserts it as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_SEC" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
    SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
    UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
    ||cast( UNIX_TIMESTAMP(STEP("client_timestamp" by interval '30' second))/1000 as VARCHAR(20))) as session_id,
    "user_id" , "device_id",
-- create a common rounded STEP timestamp for this session
    STEP("client_timestamp" by interval '30' second),
-- Count the number of client events , clicks on this session
    COUNT("client_event") events,
-- What was the first navigation action
    first_value("client_event") as beginnavigation,
-- what was the last navigation action    
    last_value("client_event") as endnavigation,
-- begining minute and second  
    SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second      
    SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration    
    TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream    
    FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with STEP to Seconds, for Seconds intervals    
    WINDOWED BY STAGGER (
                PARTITION BY "user_id", "device_id", STEP("client_timestamp" by interval '30' second) 
                RANGE INTERVAL '30' SECOND );

Kinesis Data Analytics SQL – Using a MINUTE interval “FLOOR” function

The following code creates the PUMP and inserts as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_MIN" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
||cast(UNIX_TIMESTAMP(FLOOR("client_timestamp" TO MINUTE))/1000 as VARCHAR(20))) as session_id,
"user_id" , "device_id",
-- create a common rounded timestamp for this session
FLOOR("client_timestamp" TO MINUTE),
-- Count the number of client events , clicks on this session
COUNT("client_event") events,
-- What was the first navigation action
first_value("client_event") as beginnavigation,
-- what was the last navigation action
last_value("client_event") as endnavigation,
-- begining minute and second
SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second
SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration
TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream
FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with floor to Minute, for Minute intervals
WINDOWED BY STAGGER (
            PARTITION BY "user_id", "device_id", FLOOR("client_timestamp" TO MINUTE) 
            RANGE INTERVAL '1' MINUTE);

Sessions

In Kinesis Data Analytics, you can view the resulting data transformed by the SQL, with the sessions identification and information. Session_ID is calculated by User_ID + (3 Chars) of DEVICE_ID + rounded Unix timestamp without the milliseconds.

Automated deployment with AWS CloudFormation

All the steps of this end-to-end solution are included in an AWS CloudFormation template. Fire up the template, add the code on your web server, and voilà, you get real-time sessionization.

This AWS CloudFormation template is intended to be deployed only in the us-east-1 Region.

Create the stack

Step 1: To get started, sign into the AWS Management Console, and then open the stagger window template.

Step 2: On the AWS CloudFormation console, choose Next, and complete the AWS CloudFormation parameters:

  • Stack name: The name of the stack (blog-sessionization or sessions-blog)
  • StreamName: sessionsblog
  • Stream Shard Count: 1 or 2 (1 MB/s) per shard.
  • Bucket Name:  Change to a unique name, for example session-n-bucket-hhug123121.
  • Buffer Interval: 60–900 seconds buffering hint for Kinesis Data Firehose before the data is send to Amazon S3 from Kinesis Data Firehose.
  • Buffer Size: 1–128 MB per file, if the interval is not achieved first.
  • Destination Prefix: Aggregated (internal folder of the bucket to save aggregated data).
  • Base sessions on seconds or minutes: Choose which you want (minutes will start with 1 minute, seconds will start with 30 seconds).

Step 3: Check if the launch has completed, and if it has not, check for errors.

The most common error is when you point to an Amazon S3 bucket that already exists.

Process the data

Step 1: After the deployment, navigate to the solution on the Amazon Kinesis console.

Step 2: Go to the Kinesis Analytics applications page, and choose AnalyticsApp-blog-sessionizationXXXXX, as follows.

Step 3: Choose Run application to start the application.

Step 4: Wait a few seconds for the application to be available, and then choose Application details.

Step 5: On the Application details page, choose Go to SQL results.

Step 6: Examine the SQL code and SOURCE_SQL_STREAM, and change the INTERVAL if you’d like.

Step 7: Choose the Real-time analytics tab to check the DESTINATION_SQL_STREAM results.

 

Step 8: Check the Destination tab to view the AWS Lambda function as the destination to your aggregation.

Step 8: Check the CloudWatch real-time dashboard.

Open the Sessionization-<your cloudformation stack name> dashboard.

Check the number of “events” during the sessions, and the “session duration” behavior from a timeframe. Then you can make decisions, such as whether you need to roll back a new site layout or new features of your application.

Step 9: Open the AWS Glue console and run the crawler that the AWS CloudFormation template created for you.

Choose the crawler job, and then choose Run crawler.

Analyze the data

Step 1: After the job finishes, open the Amazon Athena console and explore the data.

On the Athena console, choose the sessionization database in the list. You should see two tables created based on the data in Amazon S3: rawdata and aggregated.

Step 2: Choose the vertical ellipsis (three dots) on the right side to explore each of the tables, as shown in the following screenshots.

Step 3: Create a view on the Athena console to query only today’s data from your aggregated table, as follows:

CREATE OR REPLACE VIEW clicks_today AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) and
cast(partition_2 as integer)=day(current_date) ;

The successful query appears on the console as follows:

Step 4: Create a view to query only the current month data from your aggregated table, as in the following example:

CREATE OR REPLACE VIEW clicks_month AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) ;

The successful query appears as follows:

Step 5: Query data with the sessions grouped by the session duration ordered by sessions, as follows:

SELECT duration_sec, count(1) sessions 
FROM "clicks_today"
where duration_sec>0
group by duration_sec
order by sessions desc;

The query results appear as follows:

Visualize the data

Step 1: Open the Amazon QuickSight console.

If you have never used Amazon QuickSight, perform this setup first.

Step 2: Set up Amazon QuickSight account settings to access Athena and your S3 bucket.

First, select the Amazon Athena check box. Select the Amazon S3 check box to edit Amazon QuickSight access to your S3 buckets.

Choose the buckets that you want to make available, and then choose Select buckets.

Step 3: Choose Manage data.

Step 4: Choose NEW DATASET.

In the list of data sources, choose Athena.

Step 5: Enter daily_session as your data source name.

Step 6: Choose the view that you created for daily sessions, and choose Select.

Step 7: Then you can choose to use either SPICE (cache) or direct query access.

Step 8: Choose beginnavigation and duration_sec as metrics.

Step 9: Choose +Add to add a new visualization.

Step 10: In Visual types, choose the Tree map graph type.

Step 11: For Group by, choose device_id; for Size, choose duration_sec (Sum); and for Color, choose events (Sum).

Summary

In this post, I described how to perform sessionization of clickstream events and analyze them in a serverless architecture. The use of a Kinesis Data Analytics stagger window makes the SQL code short and easy to write and understand. The integration between the services enables a complete data flow with minimal coding.

You also learned about ways to explore and visualize this data using Amazon Athena, AWS Glue, and Amazon QuickSight.

To learn more about the Amazon Kinesis family of use cases, check the Amazon Kinesis Big Data Blog page.

If you have questions or suggestions, please leave a comment below.

Do more with Amazon Kinesis Data Analytics

To explore other ways to gain insights using Kinesis Data Analytics, see Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Author

Hugo is an analytics and database specialist solutions architect at Amazon Web Services out of São Paulo (Brazil). He is currently engaged with several Data Lake and Analytics projects for customers in Latin America. He loves family time, dogs and mountain biking.

 

 

 

 

Your guide to Amazon Kinesis sessions, chalk talks, and workshops at AWS re:Invent 2018

Post Syndicated from Larry Heathcote original https://aws.amazon.com/blogs/big-data/your-guide-to-amazon-kinesis-sessions-chalk-talks-and-workshops-at-aws-reinvent-2018/

AWS re:Invent 2018 is almost here! This post includes a list of Amazon Kinesis sessions, chalk talks, and workshops at AWS re:Invent 2018. You can choose the link next to each session description for the session schedule. Use the information to help schedule your conference week in Las Vegas to learn more about Amazon Kinesis.

Sessions

ANT208 – Serverless Video Ingestion & Analytics with Amazon Kinesis Video Streams

Amazon Kinesis Video Streams makes it easy to capture live video, play it back, and store it for real-time and batch-oriented ML-driven analytics. In this session, we first dive deep on the top five best practices for getting started and scaling with Amazon Kinesis Video Streams. Next, we demonstrate a streaming video from a standard USB camera connected to a laptop, and we perform a live playback on a standard browser within minutes. We also have on stage members of Amazon Go, who are building the next generation of physical retail store experiences powered by their “just walk out” technology. They walk through the technical details of their integration with Kinesis Video Streams and highlight their successes and difficulties along the way.

ANT310 – Architecting for Real-Time Insights with Amazon Kinesis

Amazon Kinesis makes it easy to speed up the time it takes for you to get valuable, real-time insights from your streaming data. In this session, we walk through the most popular applications that customers implement using Amazon Kinesis, including streaming extract-transform-load, continuous metric generation, and responsive analytics. Our customer Autodesk joins us to describe how they created real-time metrics generation and analytics using Amazon Kinesis and Amazon Elasticsearch Service. They walk us through their architecture and the best practices they learned in building and deploying their real-time analytics solution.

ANT322-R – High Performance Data Streaming with Amazon Kinesis: Best Practices

Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information. In this session, we dive deep into best practices for Kinesis Data Streams and Kinesis Data Firehose to get the most performance out of your data streaming applications. Our customer NICE inContact joins us to discuss how they utilize Amazon Kinesis Data Streams to make real-time decisions on customer contact routing and agent assignments for its Call Center as a Service (CCaaS) Platform. NICE inContact walks through their architecture and requirements for low-latency, accurate processing to be as responsive as possible to changes.

ANT322-R1 – High Performance Data Streaming with Amazon Kinesis: Best Practices

Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information. In this session, we dive deep into best practices for Kinesis Data Streams and Kinesis Data Firehose to get the most performance out of your data streaming applications. Comcast uses Amazon Kinesis Data Streams to build a Streaming Data Platform that centralizes data exchanges. It is foundational to the way our data analysts and data scientists derive real-time insights from the data. In the second part of this talk, Comcast zooms into how to properly scale a Kinesis stream. We first list the factors to consider to avoid scaling issues with standard Kinesis stream consumption, and then we see how the new fan-out feature changes these scaling considerations.

SRV316-R & SRV316-R1 – Serverless Stream Processing Pipeline Best Practices

Real-time analytics has traditionally been analyzed using batch processing in DWH/Hadoop environments. Common use cases use data lakes, data science, and machine learning (ML). Creating serverless data-driven architecture and serverless streaming solutions with services like Amazon Kinesis, AWS Lambda, and Amazon Athena can solve real-time ingestion, storage, and analytics challenges, and help you focus on application logic without managing infrastructure. In this session, we introduce design patterns, best practices, and share customer journeys from batch to real-time insights in building modern serverless data-driven architecture applications. Hear how Intel built the Intel Pharma Analytics Platform using a serverless architecture. This AI cloud-based offering enables remote monitoring of patients using an array of sensors, wearable devices, and ML algorithms to objectively quantify the impact of interventions and power clinical studies in various therapeutics conditions.

SEC402-R – AWS, I Choose You: Pokemon’s Battle against the Bots

Join us for this advanced-level talk to learn about Pokemon’s journey defending against DDoS attacks and bad bots with AWS WAF, AWS Shield, and other AWS services. We go through their initial challenges and the evolution of their bot mitigation solution, which includes offline log analysis and dynamic updates of badbot IPs along with rate-based rules. This is an advanced talk and assumes some knowledge of Amazon DynamoDB, Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, AWS Firewall Manager, AWS Shield, and AWS WAF.

Chalk Talks

ANT358 – Serverless Stream Processing Tips & Tricks

Streaming data ingestion and near real-time analysis gives you immediate insights into your data. By using AWS Lambda with Amazon Kinesis, you can obtain these insights without the need to manage servers. But are you doing this in the most optimal way? In this interactive session, we review the best practices for using Lambda with Kinesis, and how to avoid common pitfalls.

ANT359 – Considerations for Building Your First Streaming Application

Do you want to increase your knowledge of AWS big data web services and launch your first big data application on the cloud? In this chalk talk, we provide an overview of many of the AWS analytics services, including Amazon EMR, Amazon Kinesis, Amazon Athena, and Amazon Redshift. We discuss how they are architected together to solve common big data problems, such as ingestion, ETL, and real-time analytics.

ANT360 – Don’t Wait Until Tomorrow: From Batch to Streaming

In recent years, there has been explosive growth in the number of connected devices and real-time data sources. Data is being produced continuously and its production rate is accelerating. Businesses can no longer wait for hours or days to use this data. To gain the most valuable insights, they must use this data immediately so they can react quickly to new information. In this chalk talk, we discuss how to take advantage of streaming data sources to analyze and react in near-real time. In addition, we present different options for how to solve a real-world scenario and walk through those solutions.

ANT361 – Using Amazon Kinesis Data Streams as a Low-Latency Message Bus

Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information. In this chalk talk, we dive deep into best practices for Kinesis Data Streams and how to optimize for low-latency, multi-consumer solutions.

BAP328-R & BAP328-R1 – Architectures for Gaining Data Insights into Your Contact Center Experience

Join us for a deep dive into using Amazon Kinesis Data Analytics for insight into what’s happening with the contacts and agents in your Amazon Connect contact center. Learn how to leverage AWS analytics and ML services to inspect, transform, and gain insight into the customer’s journey through your contact center. We also show you how to use Alexa for Business to receive timely voice-activated business intelligence on your contact center’s performance.

Workshops

Before starting a workshop, you should have a basic understanding of Amazon Kinesis. Please bring your laptop and power supply to the workshop.

ANT213-R – Build Your First Big Data Application on AWS

Do you want to increase your knowledge of AWS big data web services and launch your first big data application on the cloud? In this session, we walk you through simplifying big data processing as a data bus comprising ingest, store, process, and visualize. You will build a big data application using AWS managed services, including Amazon Athena, Amazon Kinesis, Amazon DynamoDB, and Amazon S3. Along the way, we review architecture design patterns for big data applications and give you access to a take-home lab so you can rebuild and customize the application yourself.

ANT213-R1 – Build Your First Big Data Application on AWS

Do you want to increase your knowledge of AWS big data web services and launch your first big data application on the cloud? In this session, we walk you through simplifying big data processing as a data bus comprising ingest, store, process, and visualize. You will build a big data application using AWS managed services, including Amazon Athena, Amazon Kinesis, Amazon DynamoDB, and Amazon S3. Along the way, we review architecture design patterns for big data applications and give you access to a take-home lab so you can rebuild and customize the application yourself.

ANT357 – Stream Video, Analyze It in Real Time, and Share It in Real Time

Video is ‘big data.’ Image sensors—in our smartphones, smart home devices, and traffic cameras—are getting Internet-connected. Massive streams of video data are generated, but currently not mined for real-time insights to drive businesses forward. In this workshop, learn to capture, process, and analyze video streams. Build and configure your camera device’s media pipeline to start streaming video into the AWS Cloud using Amazon Kinesis Video Streams. Next, build and deploy your own machine learning (ML) model in Amazon SageMaker to generate inferences about objects or activities in your video stream. Finally, build a browser-based web player to view the video in Live and On-Demand modes, including the analyzed video stream. In this workshop, you use Amazon Kinesis Video Streams, Amazon SageMaker, Amazon Rekognition Video, and Amazon ECS.

ANT362 – Use Streaming Data to Gain Real-Time Insights into Your Business

In recent years, there has been an explosive growth in the number of connected devices and real-time data sources. Because of this, data is being continuously produced, and its production rate is accelerating. Businesses can no longer wait for hours or days to use this data. To gain the most valuable insights, they must use this data immediately so they can react quickly to new information. In this workshop, you will learn how to take advantage of streaming data sources to analyze and react in near real time. We provide several requirements for a real-world streaming data scenario, and you’re tasked with creating a solution that successfully satisfies the requirements using services such as Amazon Kinesis, AWS Lambda, and Amazon SNS.

ANT318-R – Build, Deploy and Serve Machine learning models on streaming data using Amazon SageMaker, Apache Spark on Amazon EMR and Amazon Kinesis

As data exponentially grows in organizations, there is an increasing need to use machine learning (ML) to gather insights from this data at scale and to use those insights to perform real-time predictions on incoming data. In this workshop, we walk you through how to train an Apache Spark model using Amazon SageMaker that points to Apache Livy and running on an Amazon EMR Spark cluster. We also show you how to host the Spark model on Amazon SageMaker to serve a RESTful inference API. Finally, we show you how to use the RESTful API to serve real-time predictions on streaming data from Amazon Kinesis Data Streams.

ANT318-R1 – Build, Deploy and Serve Machine learning models on streaming data using Amazon SageMaker, Apache Spark on Amazon EMR and Amazon Kinesis

As data exponentially grows in organizations, there is an increasing need to use machine learning (ML) to gather insights from this data at scale and to use those insights to perform real-time predictions on incoming data. In this workshop, we walk you through how to train an Apache Spark model using Amazon SageMaker that points to Apache Livy and running on an Amazon EMR Spark cluster. We also show you how to host the Spark model on Amazon SageMaker to serve a RESTful inference API. Finally, we show you how to use the RESTful API to serve real-time predictions on streaming data from Amazon Kinesis Data Streams.

GPSWS406 – Advanced Serverless Data Processing

In this hands-on workshop, you learn best practices and architectural patterns for building streaming data processing pipelines without servers. Using Amazon Kinesis, AWS Lambda, and other services, you have the opportunity to build, deploy, and monitor an application to ingest and process high-velocity data at scale. This advanced workshop assumes that you have experience writing Lambda functions and understand the basics of the AWS serverless platform, so come ready to dive into the deep end. Bring your laptop with a full keyboard. We provide a sandbox AWS account for you to use during the workshop.

MAE309 – Build an AWS Analytics Solution to Monitor the Video Streaming Experience

In this workshop, we build and deploy an end-to-end analytics solution for monitoring the video streaming experience. We integrate an open source video player with Amazon Kinesis Data Streams to capture events in real time. We explore the data available for capture and a variety of use cases: from generating alerts on poor experience to content recommendations based on user behavior. We also show you how this real-time data can be archived in a data lake and further used to generate reports of aggregate performance and experience across a number of dimensions.

ADT401 – Real-Time Web Analytics with Amazon Kinesis Data Analytics

Knowing what users are doing on your websites in real time provides insights you can act on without waiting for delayed batch processing of clickstream data. Watching the immediate impact on user behavior after new releases, detecting and responding to anomalies, situational awareness, and evaluating trends are all benefits of real-time website analytics. In this workshop, we build a cost-optimized platform to capture web beacon traffic, analyze it for interesting metrics, and display it on a customized dashboard. We start by deploying the Web Analytics Solution Accelerator, then once the core is complete, we extend their solution to capture new and interesting metrics, process those with Amazon Kinesis Data Analytics, and display new graphs on their custom dashboard. Participants come away with a fully functional system for capturing, analyzing, and displaying valuable website metrics in real time.

GAM305 – Dynamic Encounters for Veteran Players Using Machine Learning

Are you trying to keep your game fresh for long-time players but don’t have the resources to keep building new handcrafted content? Join this session to learn how to launch dynamic content for groups of players without relying on static techniques like instancing or spawn points. We dive into Amazon Kinesis and Amazon Kinesis Data Analytics for real-time data collection and hotspot detection, and machine learning for encounter-building based on observed player behavior.

Conclusion

We look forward to seeing you at AWS re:Invent 2018 in Las Vegas. In addition to the sessions described in this blog post, please stop by the Analytics booth during Expo hours to learn more about Amazon Kinesis.

 


About the Author

Larry Heathcote is a Principal Product Marketing Manager at Amazon Web Services. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys family time, home projects, grilling out, and classic barbeque.

 

 

Real-Time Hotspot Detection in Amazon Kinesis Analytics

Post Syndicated from Randall Hunt original https://aws.amazon.com/blogs/aws/real-time-hotspot-detection-in-amazon-kinesis-analytics/

Today we’re releasing a new machine learning feature in Amazon Kinesis Data Analytics for detecting “hotspots” in your streaming data. We launched Kinesis Data Analytics in August of 2016 and we’ve continued to add features since. As you may already know, Kinesis Data Analytics is a fully managed real-time processing engine for streaming data that lets you write SQL queries to derive meaning from your data and output the results to Kinesis Data Firehose, Kinesis Data Streams, or even an AWS Lambda function. The new HOTSPOT function adds to the existing machine learning capabilities in Kinesis that allow customers to leverage unsupervised streaming based machine learning algorithms. Customers don’t need to be experts in data science or machine learning to take advantage of these capabilities.

Hotspots

The HOTSPOTS function is a new Kinesis Data Analytics SQL function you can use to idenitfy relatively dense regions in your data without having to explicity build and train complicated machine learning models. You can identify subsections of your data that need immediate attention and take action programatically by streaming the hotspots out to a Kinesis Data stream, to a Firehose delivery stream, or by invoking a AWS Lambda function.

There are a ton of really cool scenarios where this could make your operations easier. Imagine a ride-share program or autonomous vehicle fleet communicating spatiotemporal data about traffic jams and congestion, or a datacenter where a number of servers start to overheat indicating an HVAC issue. HOTSPOTS is not limited to spatiotemporal data and you could apply it across many problem domains.

The function follows some simple syntax and accepts the DOUBLE, INTEGER, FLOAT, TINYINT, SMALLINT, REAL, and BIGINT data types.

The HOTSPOT function takes a cursor as input and returns a JSON string describing the hotspot. This will be easier to understand with an example.

Using Kinesis Data Analytics to Detect Hotspots

Let’s take a simple data set from NY Taxi and Limousine Commission that tracks yellow cab pickup and dropoff locations. Most of this data is already on S3 and publicly accessible at s3://nyc-tlc/. We will create a small python script to load our Kinesis Data Stream with Taxi records which will feed our Kinesis Data Analytics. Finally we’ll output all of this to a Kinesis Data Firehose connected to an Amazon Elasticsearch Service cluster for visualization with Kibana. I know from living in New York for 5 years that we’ll probably find a hotspot or two in this data.

First, we’ll create an input Kinesis stream and start sending our NYC Taxi Ride data into it. I just wrote a quick python script to read from one of the CSV files and used boto3 to push the records into Kinesis. You can put the record in whatever way works for you.

 

import csv
import json
import boto3
def chunkit(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

kinesis = boto3.client("kinesis")
with open("taxidata2.csv") as f:
    reader = csv.DictReader(f)
    records = chunkit([{"PartitionKey": "taxis", "Data": json.dumps(row)} for row in reader], 500)
    for chunk in records:
        kinesis.put_records(StreamName="TaxiData", Records=chunk)

Next, we’ll create the Kinesis Data Analytics application and add our input stream with our taxi data as the source.

Next we’ll automatically detect the schema.

Now we’ll create a quick SQL Script to detect our hotspots and add that to the Real Time Analytics section of our application.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    "pickup_longitude" DOUBLE,
    "pickup_latitude" DOUBLE,
    HOTSPOTS_RESULT VARCHAR(10000)
); 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT "pickup_longitude", "pickup_latitude", "HOTSPOTS_RESULT" FROM
        TABLE(HOTSPOTS(
            CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            1000,
            0.013,
            20
        )
    );


Our HOTSPOTS function takes an input stream, a window size, scan radius, and a minimum number of points to count as a hotspot. The values for these are application dependent but you can tinker with them in the console easily until you get the results you want. There are more details about the parameters themselves in the documentation. The HOTSPOTS_RESULT returns some useful JSON that would let us plot bounding boxes around our hotspots:

{
  "hotspots": [
    {
      "density": "elided",
      "minValues": [40.7915039, -74.0077401],
      "maxValues": [40.7915041, -74.0078001]
    }
  ]
}

 

When we have our desired results we can save the script and connect our application to our Amazon Elastic Search Service Firehose Delivery Stream. We can run an intermediate lambda function in the firehose to transform our record into a format more suitable for geographic work. Then we can update our mapping in Elasticsearch to index the hotspot objects as Geo-Shapes.

Finally, we can connect to Kibana and visualize the results.

Looks like Manhattan is pretty busy!

Available Now
This feature is available now in all existing regions with Kinesis Data Analytics. I think this is a really interesting new feature of Kinesis Data Analytics that can bring immediate value to many applications. Let us know what you build with it on Twitter or in the comments!

Randall

Best Practices for Running Apache Kafka on AWS

Post Syndicated from Prasad Alle original https://aws.amazon.com/blogs/big-data/best-practices-for-running-apache-kafka-on-aws/

This post was written in partnership with Intuit to share learnings, best practices, and recommendations for running an Apache Kafka cluster on AWS. Thanks to Vaishak Suresh and his colleagues at Intuit for their contribution and support.

Intuit, in their own words: Intuit, a leading enterprise customer for AWS, is a creator of business and financial management solutions. For more information on how Intuit partners with AWS, see our previous blog post, Real-time Stream Processing Using Apache Spark Streaming and Apache Kafka on AWS. Apache Kafka is an open-source, distributed streaming platform that enables you to build real-time streaming applications.

The best practices described in this post are based on our experience in running and operating large-scale Kafka clusters on AWS for more than two years. Our intent for this post is to help AWS customers who are currently running Kafka on AWS, and also customers who are considering migrating on-premises Kafka deployments to AWS.

AWS offers Amazon Kinesis Data Streams, a Kafka alternative that is fully managed.

Running your Kafka deployment on Amazon EC2 provides a high performance, scalable solution for ingesting streaming data. AWS offers many different instance types and storage option combinations for Kafka deployments. However, given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.

In this blog post, we cover the following aspects of running Kafka clusters on AWS:

  • Deployment considerations and patterns
  • Storage options
  • Instance types
  • Networking
  • Upgrades
  • Performance tuning
  • Monitoring
  • Security
  • Backup and restore

Note: While implementing Kafka clusters in a production environment, make sure also to consider factors like your number of messages, message size, monitoring, failure handling, and any operational issues.

Deployment considerations and patterns

In this section, we discuss various deployment options available for Kafka on AWS, along with pros and cons of each option. A successful deployment starts with thoughtful consideration of these options. Considering availability, consistency, and operational overhead of the deployment helps when choosing the right option.

Single AWS Region, Three Availability Zones, All Active

One typical deployment pattern (all active) is in a single AWS Region with three Availability Zones (AZs). One Kafka cluster is deployed in each AZ along with Apache ZooKeeper and Kafka producer and consumer instances as shown in the illustration following.

In this pattern, this is the Kafka cluster deployment:

  • Kafka producers and Kafka cluster are deployed on each AZ.
  • Data is distributed evenly across three Kafka clusters by using Elastic Load Balancer.
  • Kafka consumers aggregate data from all three Kafka clusters.

Kafka cluster failover occurs this way:

  • Mark down all Kafka producers
  • Stop consumers
  • Debug and restack Kafka
  • Restart consumers
  • Restart Kafka producers

Following are the pros and cons of this pattern.

ProsCons
  • Highly available
  • Can sustain the failure of two AZs
  • No message loss during failover
  • Simple deployment

 

  • Very high operational overhead:
    • All changes need to be deployed three times, one for each Kafka cluster
    • Maintaining and monitoring three Kafka clusters
    • Maintaining and monitoring three consumer clusters

A restart is required for patching and upgrading brokers in a Kafka cluster. In this approach, a rolling upgrade is done separately for each cluster.

Single Region, Three Availability Zones, Active-Standby

Another typical deployment pattern (active-standby) is in a single AWS Region with a single Kafka cluster and Kafka brokers and Zookeepers distributed across three AZs. Another similar Kafka cluster acts as a standby as shown in the illustration following. You can use Kafka mirroring with MirrorMaker to replicate messages between any two clusters.

In this pattern, this is the Kafka cluster deployment:

  • Kafka producers are deployed on all three AZs.
  • Only one Kafka cluster is deployed across three AZs (active).
  • ZooKeeper instances are deployed on each AZ.
  • Brokers are spread evenly across all three AZs.
  • Kafka consumers can be deployed across all three AZs.
  • Standby Kafka producers and a Multi-AZ Kafka cluster are part of the deployment.

Kafka cluster failover occurs this way:

  • Switch traffic to standby Kafka producers cluster and Kafka cluster.
  • Restart consumers to consume from standby Kafka cluster.

Following are the pros and cons of this pattern.

ProsCons
  • Less operational overhead when compared to the first option
  • Only one Kafka cluster to manage and consume data from
  • Can handle single AZ failures without activating a standby Kafka cluster
  • Added latency due to cross-AZ data transfer among Kafka brokers
  • For Kafka versions before 0.10, replicas for topic partitions have to be assigned so they’re distributed to the brokers on different AZs (rack-awareness)
  • The cluster can become unavailable in case of a network glitch, where ZooKeeper does not see Kafka brokers
  • Possibility of in-transit message loss during failover

Intuit recommends using a single Kafka cluster in one AWS Region, with brokers distributing across three AZs (single region, three AZs). This approach offers stronger fault tolerance than otherwise, because a failed AZ won’t cause Kafka downtime.

Storage options

There are two storage options for file storage in Amazon EC2:

Ephemeral storage is local to the Amazon EC2 instance. It can provide high IOPS based on the instance type. On the other hand, Amazon EBS volumes offer higher resiliency and you can configure IOPS based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. Your choice of storage is closely related to the type of workload supported by your Kafka cluster.

Kafka provides built-in fault tolerance by replicating data partitions across a configurable number of instances. If a broker fails, you can recover it by fetching all the data from other brokers in the cluster that host the other replicas. Depending on the size of the data transfer, it can affect recovery process and network traffic. These in turn eventually affect the cluster’s performance.

The following table contrasts the benefits of using an instance store versus using EBS for storage.

Instance storeEBS
  • Instance storage is recommended for large- and medium-sized Kafka clusters. For a large cluster, read/write traffic is distributed across a high number of brokers, so the loss of a broker has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important, but a failed broker takes longer and requires more network traffic for a smaller Kafka cluster.
  • Storage-optimized instances like h1, i3, and d2 are an ideal choice for distributed applications like Kafka.

 

  • The primary advantage of using EBS in a Kafka deployment is that it significantly reduces data-transfer traffic when a broker fails or must be replaced. The replacement broker joins the cluster much faster.
  • Data stored on EBS is persisted in case of an instance failure or termination. The broker’s data stored on an EBS volume remains intact, and you can mount the EBS volume to a new EC2 instance. Most of the replicated data for the replacement broker is already available in the EBS volume and need not be copied over the network from another broker. Only the changes made after the original broker failure need to be transferred across the network. That makes this process much faster.

 

 

Intuit chose EBS because of their frequent instance restacking requirements and also other benefits provided by EBS.

Generally, Kafka deployments use a replication factor of three. EBS offers replication within their service, so Intuit chose a replication factor of two instead of three.

Instance types

The choice of instance types is generally driven by the type of storage required for your streaming applications on a Kafka cluster. If your application requires ephemeral storage, h1, i3, and d2 instances are your best option.

Intuit used r3.xlarge instances for their brokers and r3.large for ZooKeeper, with ST1 (throughput optimized HDD) EBS for their Kafka cluster.

Here are sample benchmark numbers from Intuit tests.

ConfigurationBroker bytes (MB/s)
  • r3.xlarge
  • ST1 EBS
  • 12 brokers
  • 12 partitions

 

Aggregate 346.9

If you need EBS storage, then AWS has a newer-generation r4 instance. The r4 instance is superior to R3 in many ways:

  • It has a faster processor (Broadwell).
  • EBS is optimized by default.
  • It features networking based on Elastic Network Adapter (ENA), with up to 10 Gbps on smaller sizes.
  • It costs 20 percent less than R3.

Note: It’s always best practice to check for the latest changes in instance types.

Networking

The network plays a very important role in a distributed system like Kafka. A fast and reliable network ensures that nodes can communicate with each other easily. The available network throughput controls the maximum amount of traffic that Kafka can handle. Network throughput, combined with disk storage, is often the governing factor for cluster sizing.

If you expect your cluster to receive high read/write traffic, select an instance type that offers 10-Gb/s performance.

In addition, choose an option that keeps interbroker network traffic on the private subnet, because this approach allows clients to connect to the brokers. Communication between brokers and clients uses the same network interface and port. For more details, see the documentation about IP addressing for EC2 instances.

If you are deploying in more than one AWS Region, you can connect the two VPCs in the two AWS Regions using cross-region VPC peering. However, be aware of the networking costs associated with cross-AZ deployments.

Upgrades

Kafka has a history of not being backward compatible, but its support of backward compatibility is getting better. During a Kafka upgrade, you should keep your producer and consumer clients on a version equal to or lower than the version you are upgrading from. After the upgrade is finished, you can start using a new protocol version and any new features it supports. There are three upgrade approaches available, discussed following.

Rolling or in-place upgrade

In a rolling or in-place upgrade scenario, upgrade one Kafka broker at a time. Take into consideration the recommendations for doing rolling restarts to avoid downtime for end users.

Downtime upgrade

If you can afford the downtime, you can take your entire cluster down, upgrade each Kafka broker, and then restart the cluster.

Blue/green upgrade

Intuit followed the blue/green deployment model for their workloads, as described following.

If you can afford to create a separate Kafka cluster and upgrade it, we highly recommend the blue/green upgrade scenario. In this scenario, we recommend that you keep your clusters up-to-date with the latest Kafka version. For additional details on Kafka version upgrades or more details, see the Kafka upgrade documentation.

The following illustration shows a blue/green upgrade.

In this scenario, the upgrade plan works like this:

  • Create a new Kafka cluster on AWS.
  • Create a new Kafka producers stack to point to the new Kafka cluster.
  • Create topics on the new Kafka cluster.
  • Test the green deployment end to end (sanity check).
  • Using Amazon Route 53, change the new Kafka producers stack on AWS to point to the new green Kafka environment that you have created.

The roll-back plan works like this:

  • Switch Amazon Route 53 to the old Kafka producers stack on AWS to point to the old Kafka environment.

For additional details on blue/green deployment architecture using Kafka, see the re:Invent presentation Leveraging the Cloud with a Blue-Green Deployment Architecture.

Performance tuning

You can tune Kafka performance in multiple dimensions. Following are some best practices for performance tuning.

 These are some general performance tuning techniques:

  • If throughput is less than network capacity, try the following:
    • Add more threads
    • Increase batch size
    • Add more producer instances
    • Add more partitions
  • To improve latency when acks =-1, increase your num.replica.fetches value.
  • For cross-AZ data transfer, tune your buffer settings for sockets and for OS TCP.
  • Make sure that num.io.threads is greater than the number of disks dedicated for Kafka.
  • Adjust num.network.threads based on the number of producers plus the number of consumers plus the replication factor.
  • Your message size affects your network bandwidth. To get higher performance from a Kafka cluster, select an instance type that offers 10 Gb/s performance.

For Java and JVM tuning, try the following:

  • Minimize GC pauses by using the Oracle JDK, which uses the new G1 garbage-first collector.
  • Try to keep the Kafka heap size below 4 GB.

Monitoring

Knowing whether a Kafka cluster is working correctly in a production environment is critical. Sometimes, just knowing that the cluster is up is enough, but Kafka applications have many moving parts to monitor. In fact, it can easily become confusing to understand what’s important to watch and what you can set aside. Items to monitor range from simple metrics about the overall rate of traffic, to producers, consumers, brokers, controller, ZooKeeper, topics, partitions, messages, and so on.

For monitoring, Intuit used several tools, including Newrelec, Wavefront, Amazon CloudWatch, and AWS CloudTrail. Our recommended monitoring approach follows.

For system metrics, we recommend that you monitor:

  • CPU load
  • Network metrics
  • File handle usage
  • Disk space
  • Disk I/O performance
  • Garbage collection
  • ZooKeeper

For producers, we recommend that you monitor:

  • Batch-size-avg
  • Compression-rate-avg
  • Waiting-threads
  • Buffer-available-bytes
  • Record-queue-time-max
  • Record-send-rate
  • Records-per-request-avg

For consumers, we recommend that you monitor:

  • Batch-size-avg
  • Compression-rate-avg
  • Waiting-threads
  • Buffer-available-bytes
  • Record-queue-time-max
  • Record-send-rate
  • Records-per-request-avg

Security

Like most distributed systems, Kafka provides the mechanisms to transfer data with relatively high security across the components involved. Depending on your setup, security might involve different services such as encryption, Kerberos, Transport Layer Security (TLS) certificates, and advanced access control list (ACL) setup in brokers and ZooKeeper. The following tells you more about the Intuit approach. For details on Kafka security not covered in this section, see the Kafka documentation.

Encryption at rest

For EBS-backed EC2 instances, you can enable encryption at rest by using Amazon EBS volumes with encryption enabled. Amazon EBS uses AWS Key Management Service (AWS KMS) for encryption. For more details, see Amazon EBS Encryption in the EBS documentation. For instance store–backed EC2 instances, you can enable encryption at rest by using Amazon EC2 instance store encryption.

Encryption in transit

Kafka uses TLS for client and internode communications.

Authentication

Authentication of connections to brokers from clients (producers and consumers) to other brokers and tools uses either Secure Sockets Layer (SSL) or Simple Authentication and Security Layer (SASL).

Kafka supports Kerberos authentication. If you already have a Kerberos server, you can add Kafka to your current configuration.

Authorization

In Kafka, authorization is pluggable and integration with external authorization services is supported.

Backup and restore

The type of storage used in your deployment dictates your backup and restore strategy.

The best way to back up a Kafka cluster based on instance storage is to set up a second cluster and replicate messages using MirrorMaker. Kafka’s mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. Depending on your setup and requirements, your backup cluster might be in the same AWS Region as your main cluster or in a different one.

For EBS-based deployments, you can enable automatic snapshots of EBS volumes to back up volumes. You can easily create new EBS volumes from these snapshots to restore. We recommend storing backup files in Amazon S3.

For more information on how to back up in Kafka, see the Kafka documentation.

Conclusion

In this post, we discussed several patterns for running Kafka in the AWS Cloud. AWS also provides an alternative managed solution with Amazon Kinesis Data Streams, there are no servers to manage or scaling cliffs to worry about, you can scale the size of your streaming pipeline in seconds without downtime, data replication across availability zones is automatic, you benefit from security out of the box, Kinesis Data Streams is tightly integrated with a wide variety of AWS services like Lambda, Redshift, Elasticsearch and it supports open source frameworks like Storm, Spark, Flink, and more. You may refer to kafka-kinesis connector.

If you have questions or suggestions, please comment below.


Additional Reading

If you found this post useful, be sure to check out Implement Serverless Log Analytics Using Amazon Kinesis Analytics and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.


About the Author

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

 

 

Optimize Delivery of Trending, Personalized News Using Amazon Kinesis and Related Services

Post Syndicated from Yukinori Koide original https://aws.amazon.com/blogs/big-data/optimize-delivery-of-trending-personalized-news-using-amazon-kinesis-and-related-services/

This is a guest post by Yukinori Koide, an the head of development for the Newspass department at Gunosy.

Gunosy is a news curation application that covers a wide range of topics, such as entertainment, sports, politics, and gourmet news. The application has been installed more than 20 million times.

Gunosy aims to provide people with the content they want without the stress of dealing with a large influx of information. We analyze user attributes, such as gender and age, and past activity logs like click-through rate (CTR). We combine this information with article attributes to provide trending, personalized news articles to users.

In this post, I show you how to process user activity logs in real time using Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, and related AWS services.

Why does Gunosy need real-time processing?

Users need fresh and personalized news. There are two constraints to consider when delivering appropriate articles:

  • Time: Articles have freshness—that is, they lose value over time. New articles need to reach users as soon as possible.
  • Frequency (volume): Only a limited number of articles can be shown. It’s unreasonable to display all articles in the application, and users can’t read all of them anyway.

To deliver fresh articles with a high probability that the user is interested in them, it’s necessary to include not only past user activity logs and some feature values of articles, but also the most recent (real-time) user activity logs.

We optimize the delivery of articles with these two steps.

  1. Personalization: Deliver articles based on each user’s attributes, past activity logs, and feature values of each article—to account for each user’s interests.
  2. Trends analysis/identification: Optimize delivering articles using recent (real-time) user activity logs—to incorporate the latest trends from all users.

Optimizing the delivery of articles is always a cold start. Initially, we deliver articles based on past logs. We then use real-time data to optimize as quickly as possible. In addition, news has a short freshness time. Specifically, day-old news is past news, and even the news that is three hours old is past news. Therefore, shortening the time between step 1 and step 2 is important.

To tackle this issue, we chose AWS for processing streaming data because of its fully managed services, cost-effectiveness, and so on.

Solution

The following diagrams depict the architecture for optimizing article delivery by processing real-time user activity logs

There are three processing flows:

  1. Process real-time user activity logs.
  2. Store and process all user-based and article-based logs.
  3. Execute ad hoc or heavy queries.

In this post, I focus on the first processing flow and explain how it works.

Process real-time user activity logs

The following are the steps for processing user activity logs in real time using Kinesis Data Streams and Kinesis Data Analytics.

  1. The Fluentd server sends the following user activity logs to Kinesis Data Streams:
{"article_id": 12345, "user_id": 12345, "action": "click"}
{"article_id": 12345, "user_id": 12345, "action": "impression"}
...
  1. Map rows of logs to columns in Kinesis Data Analytics.

  1. Set the reference data to Kinesis Data Analytics from Amazon S3.

a. Gunosy has user attributes such as gender, age, and segment. Prepare the following CSV file (user_id, gender, segment_id) and put it in Amazon S3:

101,female,1
102,male,2
103,female,3
...

b. Add the application reference data source to Kinesis Data Analytics using the AWS CLI:

$ aws kinesisanalytics add-application-reference-data-source \
  --application-name <my-application-name> \
  --current-application-version-id <version-id> \
  --reference-data-source '{
  "TableName": "REFERENCE_DATA_SOURCE",
  "S3ReferenceDataSource": {
    "BucketARN": "arn:aws:s3:::<my-bucket-name>",
    "FileKey": "mydata.csv",
    "ReferenceRoleARN": "arn:aws:iam::<account-id>:role/..."
  },
  "ReferenceSchema": {
    "RecordFormat": {
      "RecordFormatType": "CSV",
      "MappingParameters": {
        "CSVMappingParameters": {"RecordRowDelimiter": "\n", "RecordColumnDelimiter": ","}
      }
    },
    "RecordEncoding": "UTF-8",
    "RecordColumns": [
      {"Name": "USER_ID", "Mapping": "0", "SqlType": "INTEGER"},
      {"Name": "GENDER",  "Mapping": "1", "SqlType": "VARCHAR(32)"},
      {"Name": "SEGMENT_ID", "Mapping": "2", "SqlType": "INTEGER"}
    ]
  }
}'

This application reference data source can be referred on Kinesis Data Analytics.

  1. Run a query against the source data stream on Kinesis Data Analytics with the application reference data source.

a. Define the temporary stream named TMP_SQL_STREAM.

CREATE OR REPLACE STREAM "TMP_SQL_STREAM" (
  GENDER VARCHAR(32), SEGMENT_ID INTEGER, ARTICLE_ID INTEGER
);

b. Insert the joined source stream and application reference data source into the temporary stream.

CREATE OR REPLACE PUMP "TMP_PUMP" AS
INSERT INTO "TMP_SQL_STREAM"
SELECT STREAM
  R.GENDER, R.SEGMENT_ID, S.ARTICLE_ID, S.ACTION
FROM      "SOURCE_SQL_STREAM_001" S
LEFT JOIN "REFERENCE_DATA_SOURCE" R
  ON S.USER_ID = R.USER_ID;

c. Define the destination stream named DESTINATION_SQL_STREAM.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  TIME TIMESTAMP, GENDER VARCHAR(32), SEGMENT_ID INTEGER, ARTICLE_ID INTEGER, 
  IMPRESSION INTEGER, CLICK INTEGER
);

d. Insert the processed temporary stream, using a tumbling window, into the destination stream per minute.

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
  ROW_TIME AS TIME,
  GENDER, SEGMENT_ID, ARTICLE_ID,
  SUM(CASE ACTION WHEN 'impression' THEN 1 ELSE 0 END) AS IMPRESSION,
  SUM(CASE ACTION WHEN 'click' THEN 1 ELSE 0 END) AS CLICK
FROM "TMP_SQL_STREAM"
GROUP BY
  GENDER, SEGMENT_ID, ARTICLE_ID,
  FLOOR("TMP_SQL_STREAM".ROWTIME TO MINUTE);

The results look like the following:

  1. Insert the results into Amazon Elasticsearch Service (Amazon ES).
  2. Batch servers get results from Amazon ES every minute. They then optimize delivering articles with other data sources using a proprietary optimization algorithm.

How to connect a stream to another stream in another AWS Region

When we built the solution, Kinesis Data Analytics was not available in the Asia Pacific (Tokyo) Region, so we used the US West (Oregon) Region. The following shows how we connected a data stream to another data stream in the other Region.

There is no need to continue containing all components in a single AWS Region, unless you have a situation where a response difference at the millisecond level is critical to the service.

Benefits

The solution provides benefits for both our company and for our users. Benefits for the company are cost savings—including development costs, operational costs, and infrastructure costs—and reducing delivery time. Users can now find articles of interest more quickly. The solution can process more than 500,000 records per minute, and it enables fast and personalized news curating for our users.

Conclusion

In this post, I showed you how we optimize trending user activities to personalize news using Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, and related AWS services in Gunosy.

AWS gives us a quick and economical solution and a good experience.

If you have questions or suggestions, please comment below.


Additional Reading

If you found this post useful, be sure to check out Implement Serverless Log Analytics Using Amazon Kinesis Analytics and Joining and Enriching Streaming Data on Amazon Kinesis.


About the Authors

Yukinori Koide is the head of development for the Newspass department at Gunosy. He is working on standardization of provisioning and deployment flow, promoting the utilization of serverless and containers for machine learning and AI services. His favorite AWS services are DynamoDB, Lambda, Kinesis, and ECS.

 

 

 

Akihiro Tsukada is a start-up solutions architect with AWS. He supports start-up companies in Japan technically at many levels, ranging from seed to later-stage.

 

 

 

 

Yuta Ishii is a solutions architect with AWS. He works with our customers to provide architectural guidance for building media & entertainment services, helping them improve the value of their services when using AWS.

 

 

 

 

 

Now Available: New Digital Training to Help You Learn About AWS Big Data Services

Post Syndicated from Sara Snedeker original https://aws.amazon.com/blogs/big-data/now-available-new-digital-training-to-help-you-learn-about-aws-big-data-services/

AWS Training and Certification recently released free digital training courses that will make it easier for you to build your cloud skills and learn about using AWS Big Data services. This training includes courses like Introduction to Amazon EMR and Introduction to Amazon Athena.

You can get free and unlimited access to more than 100 new digital training courses built by AWS experts at aws.training. It’s easy to access training related to big data. Just choose the Analytics category on our Find Training page to browse through the list of courses. You can also use the keyword filter to search for training for specific AWS offerings.

Recommended training

Just getting started, or looking to learn about a new service? Check out the following digital training courses:

Introduction to Amazon EMR (15 minutes)
Covers the available tools that can be used with Amazon EMR and the process of creating a cluster. It includes a demonstration of how to create an EMR cluster.

Introduction to Amazon Athena (10 minutes)
Introduces the Amazon Athena service along with an overview of its operating environment. It covers the basic steps in implementing Athena and provides a brief demonstration.

Introduction to Amazon QuickSight (10 minutes)
Discusses the benefits of using Amazon QuickSight and how the service works. It also includes a demonstration so that you can see Amazon QuickSight in action.

Introduction to Amazon Redshift (10 minutes)
Walks you through Amazon Redshift and its core features and capabilities. It also includes a quick overview of relevant use cases and a short demonstration.

Introduction to AWS Lambda (10 minutes)
Discusses the rationale for using AWS Lambda, how the service works, and how you can get started using it.

Introduction to Amazon Kinesis Analytics (10 minutes)
Discusses how Amazon Kinesis Analytics collects, processes, and analyzes streaming data in real time. It discusses how to use and monitor the service and explores some use cases.

Introduction to Amazon Kinesis Streams (15 minutes)
Covers how Amazon Kinesis Streams is used to collect, process, and analyze real-time streaming data to create valuable insights.

Introduction to AWS IoT (10 minutes)
Describes how the AWS Internet of Things (IoT) communication architecture works, and the components that make up AWS IoT. It discusses how AWS IoT works with other AWS services and reviews a case study.

Introduction to AWS Data Pipeline (10 minutes)
Covers components like tasks, task runner, and pipeline. It also discusses what a pipeline definition is, and reviews the AWS services that are compatible with AWS Data Pipeline.

Go deeper with classroom training

Want to learn more? Enroll in classroom training to learn best practices, get live feedback, and hear answers to your questions from an instructor.

Big Data on AWS (3 days)
Introduces you to cloud-based big data solutions such as Amazon EMR, Amazon Redshift, Amazon Kinesis, and the rest of the AWS big data platform.

Data Warehousing on AWS (3 days)
Introduces you to concepts, strategies, and best practices for designing a cloud-based data warehousing solution, and demonstrates how to collect, store, and prepare data for the data warehouse.

Building a Serverless Data Lake (1 day)
Teaches you how to design, build, and operate a serverless data lake solution with AWS services. Includes topics such as ingesting data from any data source at large scale, storing the data securely and durably, using the right tool to process large volumes of data, and understanding the options available for analyzing the data in near-real time.

More training coming in 2018

We’re always evaluating and expanding our training portfolio, so stay tuned for more training options in the new year. You can always visit us at aws.training to explore our latest offerings.

AWS CloudFormation Supports Amazon Kinesis Analytics Applications

Post Syndicated from Ryan Nienhuis original https://aws.amazon.com/blogs/big-data/aws-cloudformation-supports-amazon-kinesis-analytics-applications/

You can now provision and manage resources for Amazon Kinesis Analytics applications using AWS CloudFormation.  Kinesis Analytics is the easiest way to process streaming data in real time with standard SQL, without having to learn new programming languages or processing frameworks. Kinesis Analytics enables you to query streaming data or build entire streaming applications using SQL. Using the service, you gain actionable insights and can respond to your business and customer needs promptly.

Customers can create CloudFormation templates that easily create or update Kinesis Analytics applications. Typically, a template is used as a way to manage code across different environments, or to prototype a new streaming data solution quickly.

We have created two sample templates using past AWS Big Data Blog posts that referenced Kinesis Analytics.

For more information about the new feature, see the AWS Cloudformation User Guide.

 

Perform Near Real-time Analytics on Streaming Data with Amazon Kinesis and Amazon Elasticsearch Service

Post Syndicated from Tristan Li original https://aws.amazon.com/blogs/big-data/perform-near-real-time-analytics-on-streaming-data-with-amazon-kinesis-and-amazon-elasticsearch-service/

Nowadays, streaming data is seen and used everywhere—from social networks, to mobile and web applications, IoT devices, instrumentation in data centers, and many other sources. As the speed and volume of this type of data increases, the need to perform data analysis in real time with machine learning algorithms and extract a deeper understanding from the data becomes ever more important. For example, you might want a continuous monitoring system to detect sentiment changes in a social media feed so that you can react to the sentiment in near real time.

In this post, we use Amazon Kinesis Streams to collect and store streaming data. We then use Amazon Kinesis Analytics to process and analyze the streaming data continuously. Specifically, we use the Kinesis Analytics built-in RANDOM_CUT_FOREST function, a machine learning algorithm, to detect anomalies in the streaming data. Finally, we use Amazon Kinesis Firehose to export the anomalies data to Amazon Elasticsearch Service (Amazon ES). We then build a simple dashboard in the open source tool Kibana to visualize the result.

Solution overview

The following diagram depicts a high-level overview of this solution.

Amazon Kinesis Streams

You can use Amazon Kinesis Streams to build your own streaming application. This application can process and analyze streaming data by continuously capturing and storing terabytes of data per hour from hundreds of thousands of sources.

Amazon Kinesis Analytics

Kinesis Analytics provides an easy and familiar standard SQL language to analyze streaming data in real time. One of its most powerful features is that there are no new languages, processing frameworks, or complex machine learning algorithms that you need to learn.

Amazon Kinesis Firehose

Kinesis Firehose is the easiest way to load streaming data into AWS. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service.

Amazon Elasticsearch Service

Amazon ES is a fully managed service that makes it easy to deploy, operate, and scale Elasticsearch for log analytics, full text search, application monitoring, and more.

Solution summary

The following is a quick walkthrough of the solution that’s presented in the diagram:

  1. IoT sensors send streaming data into Kinesis Streams. In this post, you use a Python script to simulate an IoT temperature sensor device that sends the streaming data.
  2. By using the built-in RANDOM_CUT_FOREST function in Kinesis Analytics, you can detect anomalies in real time with the sensor data that is stored in Kinesis Streams. RANDOM_CUT_FOREST is also an appropriate algorithm for many other kinds of anomaly-detection use cases—for example, the media sentiment example mentioned earlier in this post.
  3. The processed anomaly data is then loaded into the Kinesis Firehose delivery stream.
  4. By using the built-in integration that Kinesis Firehose has with Amazon ES, you can easily export the processed anomaly data into the service and visualize it with Kibana.

Implementation steps

The following sections walk through the implementation steps in detail.

Creating the delivery stream

  1. Open the Amazon Kinesis Streams console.
  2. Create a new Kinesis stream. Give it a name that indicates it’s for raw incoming stream data—for example, RawStreamData. For Number of shards, type 1.
  3. The Python code provided below simulates a streaming application, such as an IoT device, and generates random data and anomalies into a Kinesis stream. The code generates two temperature ranges, where the first range is the hypothetical sensor’s normal operating temperature range (10–20), and the second is the anomaly temperature range (100–120).Make sure to change the stream name on line 16 and 20 and the Region on line 6 to match your configuration. Alternatively, you can download the Amazon Kinesis Data Generator from this repository and use it to generate the data.
    import json
    import datetime
    import random
    import testdata
    from boto import kinesis
    
    kinesis = kinesis.connect_to_region("us-east-1")
    
    def getData(iotName, lowVal, highVal):
       data = {}
       data["iotName"] = iotName
       data["iotValue"] = random.randint(lowVal, highVal) 
       return data
    
    while 1:
       rnd = random.random()
       if (rnd < 0.01):
          data = json.dumps(getData("DemoSensor", 100, 120))  
          kinesis.put_record("RawStreamData", data, "DemoSensor")
          print '***************************** anomaly ************************* ' + data
       else:
          data = json.dumps(getData("DemoSensor", 10, 20))  
          kinesis.put_record("RawStreamData", data, "DemoSensor")
          print data

  4. Open the Amazon Elasticsearch Service console and create a new domain.
    1. Give the domain a unique name. In the Configure cluster screen, use the default settings.
    2. In the Set up access policy screen, in the Set the domain access policy list, choose Allow access to the domain from specific IP(s).
    3. Enter the public IP address of your computer.
      Note: If you’re working behind a proxy or firewall, see the “Use a proxy to simplify request signing” section in this AWS Database blog post to learn how to work with a proxy. For additional information about securing access to your Amazon ES domain, see How to Control Access to Your Amazon Elasticsearch Domain in the AWS Security Blog.
  5. After the Amazon ES domain is up and running, you can set up and configure Kinesis Firehose to export results to Amazon ES:
    1. Open the Amazon Kinesis Firehose console and choose Create Delivery Stream.
    2. In the Destination dropdown list, choose Amazon Elasticsearch Service.
    3. Type a stream name, and choose the Amazon ES domain that you created in Step 4.
    4. Provide an index name and ES type. In the S3 bucket dropdown list, choose Create New S3 bucket. Choose Next.
    5. In the configuration, change the Elasticsearch Buffer size to 1 MB and the Buffer interval to 60s. Use the default settings for all other fields. This shortens the time for the data to reach the ES cluster.
    6. Under IAM Role, choose Create/Update existing IAM role.
      The best practice is to create a new role every time. Otherwise, the console keeps adding policy documents to the same role. Eventually the size of the attached policies causes IAM to reject the role, but it does it in a non-obvious way, where the console basically quits functioning.
    7. Choose Next to move to the Review page.
  6. Review the configuration, and then choose Create Delivery Stream.
  7. Run the Python file for 1–2 minutes, and then press Ctrl+C to stop the execution. This loads some data into the stream for you to visualize in the next step.

Analyzing the data

Now it’s time to analyze the IoT streaming data using Amazon Kinesis Analytics.

  1. Open the Amazon Kinesis Analytics console and create a new application. Give the application a name, and then choose Create Application.
  2. On the next screen, choose Connect to a source. Choose the raw incoming data stream that you created earlier. (Note the stream name Source_SQL_STREAM_001 because you will need it later.)
  3. Use the default settings for everything else. When the schema discovery process is complete, it displays a success message with the formatted stream sample in a table as shown in the following screenshot. Review the data, and then choose Save and continue.
  4. Next, choose Go to SQL editor. When prompted, choose Yes, start application.
  5. Copy the following SQL code and paste it into the SQL editor window.
    CREATE OR REPLACE STREAM "TEMP_STREAM" (
       "iotName"        varchar (40),
       "iotValue"   integer,
       "ANOMALY_SCORE"  DOUBLE);
    -- Creates an output stream and defines a schema
    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "iotName"       varchar(40),
       "iotValue"       integer,
       "ANOMALY_SCORE"  DOUBLE,
       "created" TimeStamp);
     
    -- Compute an anomaly score for each record in the source stream
    -- using Random Cut Forest
    CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "TEMP_STREAM"
    SELECT STREAM "iotName", "iotValue", ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    );
    
    -- Sort records by descending anomaly score, insert into output stream
    CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "iotName", "iotValue", ANOMALY_SCORE, ROWTIME FROM "TEMP_STREAM"
    ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

 

  1. Choose Save and run SQL.
    As the application is running, it displays the results as stream data arrives. If you don’t see any data coming in, run the Python script again to generate some fresh data. When there is data, it appears in a grid as shown in the following screenshot.Note that you are selecting data from the source stream name Source_SQL_STREAM_001 that you created previously. Also note the ANOMALY_SCORE column. This is the value that the Random_Cut_Forest function calculates based on the temperature ranges provided by the Python script. Higher (anomaly) temperature ranges have a higher score.Looking at the SQL code, note that the first two blocks of code create two new streams to store temporary data and the final result. The third block of code analyzes the raw source data (Stream_Pump_1) using the Random_Cut_Forest function. It calculates an anomaly score (ANOMALY_SCORE) and inserts it into the TEMP_STREAM stream. The final code block loads the result stored in the TEMP_STREAM into DESTINATION_SQL_STREAM.
  2. Choose Exit (done editing) next to the Save and run SQL button to return to the application configuration page.

Load processed data into the Kinesis Firehose delivery stream

Now, you can export the result from DESTINATION_SQL_STREAM into the Amazon Kinesis Firehose stream that you created previously.

  1. On the application configuration page, choose Connect to a destination.
  2. Choose the stream name that you created earlier, and use the default settings for everything else. Then choose Save and Continue.
  3. On the application configuration page, choose Exit to Kinesis Analytics applications to return to the Amazon Kinesis Analytics console.
  4. Run the Python script again for 4–5 minutes to generate enough data to flow through Amazon Kinesis Streams, Kinesis Analytics, Kinesis Firehose, and finally into the Amazon ES domain.
  5. Open the Kinesis Firehose console, choose the stream, and then choose the Monitoring
  6. As the processed data flows into Kinesis Firehose and Amazon ES, the metrics appear on the Delivery Stream metrics page. Keep in mind that the metrics page takes a few minutes to refresh with the latest data.
  7. Open the Amazon Elasticsearch Service dashboard in the AWS Management Console. The count in the Searchable documents column increases as shown in the following screenshot. In addition, the domain shows a cluster health of Yellow. This is because, by default, it needs two instances to deploy redundant copies of the index. To fix this, you can deploy two instances instead of one.

Visualize the data using Kibana

Now it’s time to launch Kibana and visualize the data.

  1. Use the ES domain link to go to the cluster detail page, and then choose the Kibana link as shown in the following screenshot.

    If you’re working behind a proxy or firewall, see the “Use a proxy to simplify request signing” section in this blog post to learn how to work with a proxy.
  2. In the Kibana dashboard, choose the Discover tab to perform a query.
  3. You can also visualize the data using the different types of charts offered by Kibana. For example, by going to the Visualize tab, you can quickly create a split bar chart that aggregates by ANOMALY_SCORE per minute.


Conclusion

In this post, you learned how to use Amazon Kinesis to collect, process, and analyze real-time streaming data, and then export the results to Amazon ES for analysis and visualization with Kibana. If you have comments about this post, add them to the “Comments” section below. If you have questions or issues with implementing this solution, please open a new thread on the Amazon Kinesis or Amazon ES discussion forums.


Next Steps

Take your skills to the next level. Learn real-time clickstream anomaly detection with Amazon Kinesis Analytics.

 


About the Author

Tristan Li is a Solutions Architect with Amazon Web Services. He works with enterprise customers in the US, helping them adopt cloud technology to build scalable and secure solutions on AWS.

 

 

 

 

Under the Hood of Server-Side Encryption for Amazon Kinesis Streams

Post Syndicated from Damian Wylie original https://aws.amazon.com/blogs/big-data/under-the-hood-of-server-side-encryption-for-amazon-kinesis-streams/

Customers are using Amazon Kinesis Streams to ingest, process, and deliver data in real time from millions of devices or applications. Use cases for Kinesis Streams vary, but a few common ones include IoT data ingestion and analytics, log processing, clickstream analytics, and enterprise data bus architectures.

Within milliseconds of data arrival, applications (KCL, Apache Spark, AWS Lambda, Amazon Kinesis Analytics) attached to a stream are continuously mining value or delivering data to downstream destinations. Customers are then scaling their streams elastically to match demand. They pay incrementally for the resources that they need, while taking advantage of a fully managed, serverless streaming data service that allows them to focus on adding value closer to their customers.

These benefits are great; however, AWS learned that many customers could not take advantage of Kinesis Streams unless their data-at-rest within a stream was encrypted. Many customers did not want to manage encryption on their own, so they asked for a fully managed, automatic, server-side encryption mechanism leveraging centralized AWS Key Management Service (AWS KMS) customer master keys (CMK).

Motivated by this feedback, AWS added another fully managed, low cost aspect to Kinesis Streams by delivering server-side encryption via KMS managed encryption keys (SSE-KMS) in the following regions:

  • US East (N. Virginia)
  • US West (Oregon)
  • US West (N. California)
  • EU (Ireland)
  • Asia Pacific (Singapore)
  • Asia Pacific (Tokyo)

In this post, I cover the mechanics of the Kinesis Streams server-side encryption feature. I also share a few best practices and considerations so that you can get started quickly.

Understanding the mechanics

The following section walks you through how Kinesis Streams uses CMKs to encrypt a message in the PutRecord or PutRecords path before it is propagated to the Kinesis Streams storage layer, and then decrypt it in the GetRecords path after it has been retrieved from the storage layer.

When server-side encryption is enabled—which takes just a few clicks in the console—the partition key and payload for every incoming record is encrypted automatically as it’s flowing into Kinesis Streams, using the selected CMK. When data is at rest within a stream, it’s encrypted.

When records are retrieved through a GetRecords request from the encrypted stream, they are decrypted automatically as they are flowing out of the service. That means your Kinesis Streams producers and consumers do not need to be aware of encryption. You have a fully managed data encryption feature at your fingertips, which can be enabled within seconds.

AWS also makes it easy to audit the application of server-side encryption. You can use the AWS Management Console for instant stream-level verification; the responses from PutRecord, PutRecords, and getRecords; or AWS CloudTrail.

Calling PutRecord or PutRecords

When server-side encryption is enabled for a particular stream, Kinesis Streams and KMS perform the following actions when your applications call PutRecord or PutRecords on a stream with server-side encryption enabled. The Amazon Kinesis Producer Library (KPL) uses PutRecords.

 

  1. Data is sent from a customer’s producer (client) to a Kinesis stream using TLS via HTTPS. Data in transit to a stream is encrypted by default.
  2. After data is received, it is momentarily stored in RAM within a front-end proxy layer.
  3. Kinesis Streams authenticates the producer, then impersonates the producer to request input keying material from KMS.
  4. KMS creates key material, encrypts it by using CMK, and sends both the plaintext and encrypted key material to the service, encrypted with TLS.
  5. The client uses the plaintext key material to derive data encryption keys (data keys) that are unique per-record.
  6. The client encrypts the payload and partition key using the data key in RAM within the front-end proxy layer and removes the plaintext data key from memory.
  7. The client appends the encrypted key material to the encrypted data.
  8. The plaintext key material is securely cached in memory within the front-end layer for reuse, until it expires after 5 minutes.
  9. The client delivers the encrypted message to a back-end store where it is stored at rest and fetchable by an authorized consumer through a GetRecords The Amazon Kinesis Client Library (KCL) calls GetRecords to retrieve records from a stream.

Calling getRecords

Kinesis Streams and KMS perform the following actions when your applications call GetRecords on a server-side encrypted stream.

 

  1. When a GeRecords call is made, the front-end proxy layer retrieves the encrypted record from its back-end store.
  2. The consumer (client) makes a request to KMS using a token generated by the customer’s request. KMS authorizes it.
  3. The client requests that KMS decrypt the encrypted key material.
  4. KMS decrypts the encrypted key material and sends the plaintext key material to the client.
  5. Kinesis Streams derives the per-record data keys from the decrypted key material.
  6. If the calling application is authorized, the client decrypts the payload and removes the plaintext data key from memory.
  7. The client delivers the payload over TLS and HTTPS to the consumer, requesting the records. Data in transit to a consumer is encrypted by default.

Verifying server-side encryption

Auditors or administrators often ask for proof that server-side encryption was or is enabled. Here are a few ways to do this.

To check if encryption is enabled now for your streams:

  • Use the AWS Management Console or the DescribeStream API operation. You can also see what CMK is being used for encryption.
  • See encryption in action by looking at responses from PutRecord, PutRecords, or GetRecords When encryption is enabled, the encryptionType parameter is set to “KMS”. If encryption is not enabled, encryptionType is not included in the response.

Sample PutRecord response

{
    "SequenceNumber": "49573959617140871741560010162505906306417380215064887298",
    "ShardId": "shardId-000000000000",
    "EncryptionType": "KMS"
}

Sample GetRecords response

{
    "Records": [
        {
            "Data": "aGVsbG8gd29ybGQ=", 
            "PartitionKey": "test", 
            "ApproximateArrivalTimestamp": 1498292565.825, 
            "EncryptionType": "KMS", 
            "SequenceNumber": "495735762417140871741560010162505906306417380215064887298"
        }, 
        {
            "Data": "ZnJvZG8gbGl2ZXMK", 
            "PartitionKey": "3d0d9301-3c30-4c48-a9a8-e485b2982b28", 
            "ApproximateArrivalTimestamp": 1498292801.747, 
            "EncryptionType": "KMS", 
            "SequenceNumber": "49573959617140871741560010162507115232237011062036103170"
        }
    ], 
    "NextShardIterator": "AAAAAAAAAAEvFypHZDx/4bJVAS34puwdiNcwssKqbh/XhRK7HSYRq3RS+YXJnVKJ8j0gQUt94bONdqQYHk9X9JHgefMUDKzDzndy5WbZWO4CS3hRdMdrbmJ/9KoR4lOfZvqTLt6JWQjDqXv0IaKs06/LHYcEA3oPcyQLOTJHdJl2EzplCTZnn/U295ovxvqF9g9DY8y2nVoMkdFLmdcEMVXjhCDKiRIt", 
    "MillisBehindLatest": 0
}

To check if encryption was enabled, use CloudTrail, which logs the StartStreamEncryption() and StopStreamEncryption() API calls made against a particular stream.

Getting started

It’s very easy to enable, disable, or modify server-side encryption for a particular stream.

  1. In the Kinesis Streams console, select a stream and choose Details.
  2. Select a CMK and select Enabled.
  3. Choose Save.

You can enable encryption only for a live stream, not upon stream creation.  Follow the same process to disable a stream. To use a different CMK, select it and choose Save.

Each of these tasks can also be accomplished using the StartStreamEncryption and StopStreamEncryption API operations.

Considerations

There are a few considerations you should be aware of when using server-side encryption for Kinesis Streams:

  • Permissions
  • Costs
  • Performance

Permissions

One benefit of using the “(Default) aws/kinesis” AWS managed key is that every producer and consumer with permissions to call PutRecord, PutRecords, or GetRecords inherits the right permissions over the “(Default) aws/kinesis” key automatically.

However, this is not necessarily the same case for a CMK. Kinesis Streams producers and consumers do not need to be aware of encryption. However, if you enable encryption using a custom master key but a producer or consumer doesn’t have IAM permissions to use it, PutRecord, PutRecords, or GetRecords requests fail.

This is a great security feature. On the other hand, it can effectively lead to data loss if you inadvertently apply a custom master key that restricts producers and consumers from interacting from the Kinesis stream. Take precautions when applying a custom master key. For more information about the minimum IAM permissions required for producers and consumers interacting with an encrypted stream, see Using Server-Side Encryption.

Costs

When you apply server-side encryption, you are subject to KMS API usage and key costs. Unlike custom KMS master keys, the “(Default) aws/kinesis” CMK is offered free of charge. However, you still need to pay for the API usage costs that Kinesis Streams incurs on your behalf.

API usage costs apply for every CMK, including custom ones. Kinesis Streams calls KMS approximately every 5 minutes when it is rotating the data key. In a 30-day month, the total cost of KMS API calls initiated by a Kinesis stream should be less than a few dollars.

Performance

During testing, AWS discovered that there was a slight increase (typically 0.2 millisecond or less per record) with put and get record latencies due to the additional overhead of encryption.

If you have questions or suggestions, please comment below.

Visualize and Monitor Amazon EC2 Events with Amazon CloudWatch Events and Amazon Kinesis Firehose

Post Syndicated from Karan Desai original https://aws.amazon.com/blogs/big-data/visualize-and-monitor-amazon-ec2-events-with-amazon-cloudwatch-events-and-amazon-kinesis-firehose/

Monitoring your AWS environment is important for security, performance, and cost control purposes. For example, by monitoring and analyzing API calls made to your Amazon EC2 instances, you can trace security incidents and gain insights into administrative behaviors and access patterns. The kinds of events you might monitor include console logins, Amazon EBS snapshot creation/deletion/modification, VPC creation/deletion/modification, and instance reboots, etc.

In this post, I show you how to build a near real-time API monitoring solution for EC2 events using Amazon CloudWatch Events and Amazon Kinesis Firehose. Please be sure to have Amazon CloudTrail enabled in your account.

  • CloudWatch Events offers a near real-time stream of system events that describe changes in AWS resources. CloudWatch Events now supports Kinesis Firehose as a target.
  • Kinesis Firehose is a fully managed service for continuously capturing, transforming, and delivering data in minutes to storage and analytics destinations such as Amazon S3, Amazon Kinesis Analytics, Amazon Redshift, and Amazon Elasticsearch Service.

Walkthrough

For this walkthrough, you create a CloudWatch event rule that matches specific EC2 events such as:

  • Starting, stopping, and terminating an instance
  • Creating and deleting VPC route tables
  • Creating and deleting a security group
  • Creating, deleting, and modifying instance volumes and snapshots

Your CloudWatch event target is a Kinesis Firehose delivery stream that delivers this data to an Elasticsearch cluster, where you set up Kibana for visualization. Using this solution, you can easily load and visualize EC2 events in minutes without setting up complicated data pipelines.

Set up the Elasticsearch cluster

Create the Amazon ES domain in the Amazon ES console, or by using the create-elasticsearch-domain command in the AWS CLI.

This example uses the following configuration:

  • Domain Name: esLogSearch
  • Elasticsearch Version: 1
  • Instance Count: 2
  • Instance type:elasticsearch
  • Enable dedicated master: true
  • Enable zone awareness: true
  • Restrict Amazon ES to an IP-based access policy

Other settings are left as the defaults.

Create a Kinesis Firehose delivery stream

In the Kinesis Firehose console, create a new delivery stream with Amazon ES as the destination. For detailed steps, see Create a Kinesis Firehose Delivery Stream to Amazon Elasticsearch Service.

Set up CloudWatch Events

Create a rule, and configure the event source and target. You can choose to configure multiple event sources with several AWS resources, along with options to specify specific or multiple event types.

In the CloudWatch console, choose Events.

For Service Name, choose EC2.

In Event Pattern Preview, choose Edit and copy the pattern below. For this walkthrough, I selected events that are specific to the EC2 API, but you can modify it to include events for any of your AWS resources.

 

{
	"source": [
		"aws.ec2"
	],
	"detail-type": [
		"AWS API Call via CloudTrail"
	],
	"detail": {
		"eventSource": [
			"ec2.amazonaws.com"
		],
		"eventName": [
			"RunInstances",
			"StopInstances",
			"StartInstances",
			"CreateFlowLogs",
			"CreateImage",
			"CreateNatGateway",
			"CreateVpc",
			"DeleteKeyPair",
			"DeleteNatGateway",
			"DeleteRoute",
			"DeleteRouteTable",
"CreateSnapshot",
"DeleteSnapshot",
			"DeleteVpc",
			"DeleteVpcEndpoints",
			"DeleteSecurityGroup",
			"ModifyVolume",
			"ModifyVpcEndpoint",
			"TerminateInstances"
		]
	}
}

The following screenshot shows what your event looks like in the console.

Next, choose Add target and select the delivery stream that you just created.

Set up Kibana on the Elasticsearch cluster

Amazon ES provides a default installation of Kibana with every Amazon ES domain. You can find the Kibana endpoint on your domain dashboard in the Amazon ES console. You can restrict Amazon ES access to an IP-based access policy.

In the Kibana console, for Index name or pattern, type log. This is the name of the Elasticsearch index.

For Time-field name, choose @time.

To view the events, choose Discover.

The following chart demonstrates the API operations and the number of times that they have been triggered in the past 12 hours.

Summary

In this post, you created a continuous, near real-time solution to monitor various EC2 events such as starting and shutting down instances, creating VPCs, etc. Likewise, you can build a continuous monitoring solution for all the API operations that are relevant to your daily AWS operations and resources.

With Kinesis Firehose as a new target for CloudWatch Events, you can retrieve, transform, and load system events to the storage and analytics destination of your choice in minutes, without setting up complicated data pipelines.

If you have any questions or suggestions, please comment below.


Additional Reading

Learn how to build a serverless architecture to analyze Amazon CloudFront access logs using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

 

 

 

Build a Serverless Architecture to Analyze Amazon CloudFront Access Logs Using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

Post Syndicated from Rajeev Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-serverless-architecture-to-analyze-amazon-cloudfront-access-logs-using-aws-lambda-amazon-athena-and-amazon-kinesis-analytics/

Nowadays, it’s common for a web server to be fronted by a global content delivery service, like Amazon CloudFront. This type of front end accelerates delivery of websites, APIs, media content, and other web assets to provide a better experience to users across the globe.

The insights gained by analysis of Amazon CloudFront access logs helps improve website availability through bot detection and mitigation, optimizing web content based on the devices and browser used to view your webpages, reducing perceived latency by caching of popular object closer to its viewer, and so on. This results in a significant improvement in the overall perceived experience for the user.

This blog post provides a way to build a serverless architecture to generate some of these insights. To do so, we analyze Amazon CloudFront access logs both at rest and in transit through the stream. This serverless architecture uses Amazon Athena to analyze large volumes of CloudFront access logs (on the scale of terabytes per day), and Amazon Kinesis Analytics for streaming analysis.

The analytic queries in this blog post focus on three common use cases:

  1. Detection of common bots using the user agent string
  2. Calculation of current bandwidth usage per Amazon CloudFront distribution per edge location
  3. Determination of the current top 50 viewers

However, you can easily extend the architecture described to power dashboards for monitoring, reporting, and trigger alarms based on deeper insights gained by processing and analyzing the logs. Some examples are dashboards for cache performance, usage and viewer patterns, and so on.

Following we show a diagram of this architecture.

Prerequisites

Before you set up this architecture, install the AWS Command Line Interface (AWS CLI) tool on your local machine, if you don’t have it already.

Setup summary

The following steps are involved in setting up the serverless architecture on the AWS platform:

  1. Create an Amazon S3 bucket for your Amazon CloudFront access logs to be delivered to and stored in.
  2. Create a second Amazon S3 bucket to receive processed logs and store the partitioned data for interactive analysis.
  3. Create an Amazon Kinesis Firehose delivery stream to batch, compress, and deliver the preprocessed logs for analysis.
  4. Create an AWS Lambda function to preprocess the logs for analysis.
  5. Configure Amazon S3 event notification on the CloudFront access logs bucket, which contains the raw logs, to trigger the Lambda preprocessing function.
  6. Create an Amazon DynamoDB table to look up partition details, such as partition specification and partition location.
  7. Create an Amazon Athena table for interactive analysis.
  8. Create a second AWS Lambda function to add new partitions to the Athena table based on the log delivered to the processed logs bucket.
  9. Configure Amazon S3 event notification on the processed logs bucket to trigger the Lambda partitioning function.
  10. Configure Amazon Kinesis Analytics application for analysis of the logs directly from the stream.

ETL and preprocessing

In this section, we parse the CloudFront access logs as they are delivered, which occurs multiple times in an hour. We filter out commented records and use the user agent string to decipher the browser name, the name of the operating system, and whether the request has been made by a bot. For more details on how to decipher the preceding information based on the user agent string, see user-agents 1.1.0 in the Python documentation.

We use the Lambda preprocessing function to perform these tasks on individual rows of the access log. On successful completion, the rows are pushed to an Amazon Kinesis Firehose delivery stream to be persistently stored in an Amazon S3 bucket, the processed logs bucket.

To create a Firehose delivery stream with a new or existing S3 bucket as the destination, follow the steps described in Create a Firehose Delivery Stream to Amazon S3 in the S3 documentation. Keep most of the default settings, but select an AWS Identity and Access Management (IAM) role that has write access to your S3 bucket and specify GZIP compression. Name the delivery stream CloudFrontLogsToS3.

Another pre-requisite for this setup is to create an IAM role that provides the necessary permissions our AWS Lambda function to get the data from S3, process it, and deliver it to the CloudFrontLogsToS3 delivery stream.

Let’s use the AWS CLI to create the IAM role using the following the steps:

  1. Create the IAM policy (lambda-exec-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (lambda-cflogs-exec-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To download the policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/lambda-exec-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/assume-lambda-policy.json  <path_on_your_local_machine>

Following is the lambda-exec-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchAccess",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Sid": "S3Access",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        },
        {
            "Sid": "FirehoseAccess",
            "Effect": "Allow",
            "Action": [
                "firehose:ListDeliveryStreams",
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3"
            ]
        }
    ]
}

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-exec-policy --policy-document file://<path>/lambda-exec-policy.json

To create the AWS Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Following is the assume-lambda-policy.json file, to grant Lambda permission to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To attach the policy (lambda-exec-policy) created to the AWS Lambda execution role (lambda-cflogs-exec-role), type the following command.

aws iam attach-role-policy --role-name lambda-cflogs-exec-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-exec-policy

Now that we have created the CloudFrontLogsToS3 Firehose delivery stream and the lambda-cflogs-exec-role IAM role for Lambda, the next step is to create a Lambda preprocessing function.

This Lambda preprocessing function parses the CloudFront access logs delivered into the S3 bucket and performs a few transformation and mapping operations on the data. The Lambda function adds descriptive information, such as the browser and the operating system that were used to make this request based on the user agent string found in the logs. The Lambda function also adds information about the web distribution to support scenarios where CloudFront access logs are delivered to a centralized S3 bucket from multiple distributions. With the solution in this blog post, you can get insights across distributions and their edge locations.

Use the Lambda Management Console to create a new Lambda function with a Python 2.7 runtime and the s3-get-object-python blueprint. Open the console, and on the Configure triggers page, choose the name of the S3 bucket where the CloudFront access logs are delivered. Choose Put for Event type. For Prefix, type the name of the prefix, if any, for the folder where CloudFront access logs are delivered, for example cloudfront-logs/. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable trigger.

Choose Next and provide a function name to identify this Lambda preprocessing function.

For Code entry type, choose Upload a file from Amazon S3. For S3 link URL, type https.amazonaws.com//preprocessing-lambda/pre-data.zip. In the section, also create an environment variable with the key KINESIS_FIREHOSE_STREAM and a value with the name of the Firehose delivery stream as CloudFrontLogsToS3.

Choose lambda-cflogs-exec-role as the IAM role for the Lambda function, and type prep-data.lambda_handler for the value for Handler.

Choose Next, and then choose Create Lambda.

Table creation in Amazon Athena

In this step, we will build the Athena table. Use the Athena console in the same region and create the table using the query editor.

CREATE EXTERNAL TABLE IF NOT EXISTS cf_logs (
  logdate date,
  logtime string,
  location string,
  bytes bigint,
  requestip string,
  method string,
  host string,
  uri string,
  status bigint,
  referrer string,
  useragent string,
  uriquery string,
  cookie string,
  resulttype string,
  requestid string,
  header string,
  csprotocol string,
  csbytes string,
  timetaken bigint,
  forwardedfor string,
  sslprotocol string,
  sslcipher string,
  responseresulttype string,
  protocolversion string,
  browserfamily string,
  osfamily string,
  isbot string,
  filename string,
  distribution string
)
PARTITIONED BY(year string, month string, day string, hour string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION 's3://<pre-processing-log-bucket>/prefix/';

Creation of the Athena partition

A popular website with millions of requests each day routed using Amazon CloudFront can generate a large volume of logs, on the order of a few terabytes a day. We strongly recommend that you partition your data to effectively restrict the amount of data scanned by each query. Partitioning significantly improves query performance and substantially reduces cost. The Lambda partitioning function adds the partition information to the Athena table for the data delivered to the preprocessed logs bucket.

Before delivering the preprocessed Amazon CloudFront logs file into the preprocessed logs bucket, Amazon Kinesis Firehose adds a UTC time prefix in the format YYYY/MM/DD/HH. This approach supports multilevel partitioning of the data by year, month, date, and hour. You can invoke the Lambda partitioning function every time a new processed Amazon CloudFront log is delivered to the preprocessed logs bucket. To do so, configure the Lambda partitioning function to be triggered by an S3 Put event.

For a website with millions of requests, a large number of preprocessed logs can be delivered multiple times in an hour—for example, at the interval of one each second. To avoid querying the Athena table for partition information every time a preprocessed log file is delivered, you can create an Amazon DynamoDB table for fast lookup.

Based on the year, month, data and hour in the prefix of the delivered log, the Lambda partitioning function checks if the partition specification exists in the Amazon DynamoDB table. If it doesn’t, it’s added to the table using an atomic operation, and then the Athena table is updated.

Type the following command to create the Amazon DynamoDB table.

aws dynamodb create-table --table-name athenapartitiondetails \
--attribute-definitions AttributeName=PartitionSpec,AttributeType=S \
--key-schema AttributeName=PartitionSpec,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=100,WriteCapacityUnits=100

Here the following is true:

  • PartitionSpec is the hash key and is a representation of the partition signature—for example, year=”2017”; month=”05”; day=”15”; hour=”10”.
  • Depending on the rate at which the processed log files are delivered to the processed log bucket, you might have to increase the ReadCapacityUnits and WriteCapacityUnits values, if these are throttled.

The other attributes besides PartitionSpec are the following:

  • PartitionPath – The S3 path associated with the partition.
  • PartitionType – The type of partition used (Hour, Month, Date, Year, or ALL). In this case, ALL is used.

Next step is to create the IAM role to provide permissions for the Lambda partitioning function. You require permissions to do the following:

  1. Look up and write partition information to DynamoDB.
  2. Alter the Athena table with new partition information.
  3. Perform Amazon CloudWatch logs operations.
  4. Perform Amazon S3 operations.

To download the policy document to your local machine, type following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/lambda-partition-function-execution-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/assume-lambda-policy.json <path_on_your_local_machine>

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(lambda-partition-exec-policy) used by the Lambda execution role.
  2. Create the Lambda execution role (lambda-partition-execution-role)and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-partition-exec-policy --policy-document file://<path>/lambda-partition-function-execution-policy.json

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-partition-execution-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

To attach the policy (lambda-partition-exec-policy) created to the AWS Lambda execution role (lambda-partition-execution-role), type the following command.

aws iam attach-role-policy --role-name lambda-partition-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-partition-exec-policy

Following is the lambda-partition-function-execution-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
            	"Sid": "DDBTableAccess",
            	"Effect": "Allow",
            	"Action": "dynamodb:PutItem"
            	"Resource": "arn:aws:dynamodb*:*:table/athenapartitiondetails"
        	},
        	{
            	"Sid": "S3Access",
            	"Effect": "Allow",
            	"Action": [
                		"s3:GetBucketLocation",
                		"s3:GetObject",
                		"s3:ListBucket",
                		"s3:ListBucketMultipartUploads",
                		"s3:ListMultipartUploadParts",
                		"s3:AbortMultipartUpload",
                		"s3:PutObject"
            	],
          		"Resource":"arn:aws:s3:::*"
		},
	              {
		      "Sid": "AthenaAccess",
      		"Effect": "Allow",
      		"Action": [ "athena:*" ],
      		"Resource": [ "*" ]
	      },
        	{
            	"Sid": "CloudWatchLogsAccess",
            	"Effect": "Allow",
            	"Action": [
                		"logs:CreateLogGroup",
                		"logs:CreateLogStream",
             	   	"logs:PutLogEvents"
            	],
            	"Resource": "arn:aws:logs:*:*:*"
        	}
    ]
}

Download the .jar file containing the Java deployment package to your local machine.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/aws-lambda-athena-1.0.0.jar <path_on_your_local_machine>

From the AWS Management Console, create a new Lambda function with Java8 as the runtime. Select the Blank Function blueprint.

On the Configure triggers page, choose the name of the S3 bucket where the preprocessed logs are delivered. Choose Put for the Event Type. For Prefix, type the name of the prefix folder, if any, where preprocessed logs are delivered by Firehose—for example, out/. For Suffix, type the name of the compression format that the Firehose stream (CloudFrontLogToS3) delivers the preprocessed logs —for example, gz. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable Trigger.

Choose Next and provide a function name to identify this Lambda partitioning function.

Choose Java8 for Runtime for the AWS Lambda function. Choose Upload a .ZIP or .JAR file for the Code entry type, and choose Upload to upload the downloaded aws-lambda-athena-1.0.0.jar file.

Next, create the following environment variables for the Lambda function:

  • TABLE_NAME – The name of the Athena table (for example, cf_logs).
  • PARTITION_TYPE – The partition to be created based on the Athena table for the logs delivered to the sub folders in S3 bucket based on Year, Month, Date, Hour, or Set this to ALL to use Year, Month, Date, and Hour.
  • DDB_TABLE_NAME – The name of the DynamoDB table holding partition information (for example, athenapartitiondetails).
  • ATHENA_REGION – The current AWS Region for the Athena table to construct the JDBC connection string.
  • S3_STAGING_DIR – The Amazon S3 location where your query output is written. The JDBC driver asks Athena to read the results and provide rows of data back to the user (for example, s3://<bucketname>/<folder>/).

To configure the function handler and IAM, for Handler copy and paste the name of the handler: com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3EventWithDDB::handleRequest. Choose the existing IAM role, lambda-partition-execution-role.

Choose Next and then Create Lambda.

Interactive analysis using Amazon Athena

In this section, we analyze the historical data that’s been collected since we added the partitions to the Amazon Athena table for data delivered to the preprocessing logs bucket.

Scenario 1 is robot traffic by edge location.

SELECT COUNT(*) AS ct, requestip, location FROM cf_logs
WHERE isbot='True'
GROUP BY requestip, location
ORDER BY ct DESC;

Scenario 2 is total bytes transferred per distribution for each edge location for your website.

SELECT distribution, location, SUM(bytes) as totalBytes
FROM cf_logs
GROUP BY location, distribution;

Scenario 3 is the top 50 viewers of your website.

SELECT requestip, COUNT(*) AS ct  FROM cf_logs
GROUP BY requestip
ORDER BY ct DESC;

Streaming analysis using Amazon Kinesis Analytics

In this section, you deploy a stream processing application using Amazon Kinesis Analytics to analyze the preprocessed Amazon CloudFront log streams. This application analyzes directly from the Amazon Kinesis Stream as it is delivered to the preprocessing logs bucket. The stream queries in section are focused on gaining the following insights:

  • The IP address of the bot, identified by its Amazon CloudFront edge location, that is currently sending requests to your website. The query also includes the total bytes transferred as part of the response.
  • The total bytes served per distribution per population for your website.
  • The top 10 viewers of your website.

To download the firehose-access-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/firehose-access-policy.json  <path_on_your_local_machine>

To download the kinesisanalytics-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/assume-kinesisanalytics-policy.json <path_on_your_local_machine>

Before we create the Amazon Kinesis Analytics application, we need to create the IAM role to provide permission for the analytics application to access Amazon Kinesis Firehose stream.

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(firehose-access-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (ka-execution-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

Following is the firehose-access-policy.json file, which is the IAM policy used by Kinesis Analytics to read Firehose delivery stream.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
    	"Sid": "AmazonFirehoseAccess",
    	"Effect": "Allow",
    	"Action": [
       	"firehose:DescribeDeliveryStream",
        	"firehose:Get*"
    	],
    	"Resource": [
              "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3”
       ]
     }
}

Following is the assume-kinesisanalytics-policy.json file, to grant Amazon Kinesis Analytics permissions to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesisanalytics.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To create the IAM policy used by Analytics access role, type the following command.

aws iam create-policy --policy-name firehose-access-policy --policy-document file://<path>/firehose-access-policy.json

To create the Analytics execution role and assign the service to use this role, type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To attach the policy (irehose-access-policy) created to the Analytics execution role (ka-execution-role), type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To deploy the Analytics application, first download the configuration file and then modify ResourceARN and RoleARN for the Amazon Kinesis Firehose input configuration.

"KinesisFirehoseInput": { 
    "ResourceARN": "arn:aws:firehose:<region>:<account-id>:deliverystream/CloudFrontLogsToS3", 
    "RoleARN": "arn:aws:iam:<account-id>:role/ka-execution-role"
}

To download the Analytics application configuration file, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis//kinesisanalytics/kinesis-analytics-app-configuration.json <path_on_your_local_machine>

To deploy the application, type the following command.

aws kinesisanalytics create-application --application-name "cf-log-analysis" --cli-input-json file://<path>/kinesis-analytics-app-configuration.json

To start the application, type the following command.

aws kinesisanalytics start-application --application-name "cf-log-analysis" --input-configuration Id="1.1",InputStartingPositionConfiguration={InputStartingPosition="NOW"}

SQL queries using Amazon Kinesis Analytics

Scenario 1 is a query for detecting bots for sending request to your website detection for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BOT_DETECTION" (requesttime TIME, destribution VARCHAR(16), requestip VARCHAR(64), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BOT_DETECTION_PUMP" AS INSERT INTO "BOT_DETECTION"
--
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "request_ip" as requestip, 
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
WHERE "is_bot" = true
GROUP BY "request_ip", "edge_location", "distribution_name",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 2 is a query for total bytes transferred per distribution for each edge location for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BYTES_TRANSFFERED" (requesttime TIME, destribution VARCHAR(16), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BYTES_TRANSFFERED_PUMP" AS INSERT INTO "BYTES_TRANSFFERED"
-- Bytes Transffered per second per web destribution by edge location
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
GROUP BY "distribution_name", "edge_location", "request_date",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 3 is a query for the top 50 viewers for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "TOP_TALKERS" (requestip VARCHAR(64), requestcount DOUBLE);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "TOP_TALKERS_PUMP" AS INSERT INTO "TOP_TALKERS"
-- Top Ten Talker
SELECT STREAM ITEM as requestip, ITEM_COUNT as requestcount FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "CF_LOG_STREAM_001"),
  'request_ip', -- name of column in single quotes
  50, -- number of top items
  60 -- tumbling window size in seconds
  )
);

Conclusion

Following the steps in this blog post, you just built an end-to-end serverless architecture to analyze Amazon CloudFront access logs. You analyzed these both in interactive and streaming mode, using Amazon Athena and Amazon Kinesis Analytics respectively.

By creating a partition in Athena for the logs delivered to a centralized bucket, this architecture is optimized for performance and cost when analyzing large volumes of logs for popular websites that receive millions of requests. Here, we have focused on just three common use cases for analysis, sharing the analytic queries as part of the post. However, you can extend this architecture to gain deeper insights and generate usage reports to reduce latency and increase availability. This way, you can provide a better experience on your websites fronted with Amazon CloudFront.

In this blog post, we focused on building serverless architecture to analyze Amazon CloudFront access logs. Our plan is to extend the solution to provide rich visualization as part of our next blog post.


About the Authors

Rajeev Srinivasan is a Senior Solution Architect for AWS. He works very close with our customers to provide big data and NoSQL solution leveraging the AWS platform and enjoys coding . In his spare time he enjoys riding his motorcycle and reading books.

 

Sai Sriparasa is a consultant with AWS Professional Services. He works with our customers to provide strategic and tactical big data solutions with an emphasis on automation, operations & security on AWS. In his spare time, he follows sports and current affairs.

 

 


Related

Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight

Build a Visualization and Monitoring Dashboard for IoT Data with Amazon Kinesis Analytics and Amazon QuickSight

Post Syndicated from Karan Desai original https://aws.amazon.com/blogs/big-data/build-a-visualization-and-monitoring-dashboard-for-iot-data-with-amazon-kinesis-analytics-and-amazon-quicksight/

Customers across the world are increasingly building innovative Internet of Things (IoT) workloads on AWS. With AWS, they can handle the constant stream of data coming from millions of new, internet-connected devices. This data can be a valuable source of information if it can be processed, analyzed, and visualized quickly in a scalable, cost-efficient manner. Engineers and developers can monitor performance and troubleshoot issues while sales and marketing can track usage patterns and statistics to base business decisions.

In this post, I demonstrate a sample solution to build a quick and easy monitoring and visualization dashboard for your IoT data using AWS serverless and managed services. There’s no need for purchasing any additional software or hardware. If you are already using AWS IoT, you can build this dashboard to tap into your existing device data. If you are new to AWS IoT, you can be up and running in minutes using sample data. Later, you can customize it to your needs, as your business grows to millions of devices and messages.

Architecture

The following is a high-level architecture diagram showing the serverless setup to configure.

 

AWS service overview

AWS IoT is a managed cloud platform that lets connected devices interact easily and securely with cloud applications and other devices. AWS IoT can process and route billions of messages to AWS endpoints and to other devices reliably and securely.

Amazon Kinesis Firehose is the easiest way to capture, transform, and load streaming data continuously into AWS from thousands of data sources, such as IoT devices. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration.

Amazon Kinesis Analytics allows you to process streaming data coming from IoT devices in real time with standard SQL, without having to learn new programming languages or processing frameworks, providing actionable insights promptly.

The processed data is fed into Amazon QuickSight, which is a fast, cloud-powered business analytics service that makes it easy to build visualizations, perform ad-hoc analysis, and quickly get business insights from the data.

The most popular way for Internet-connected devices to send data is using MQTT messages. The AWS IoT gateway receives these messages from registered IoT devices. The solution in this post uses device data from AWS Simple Beer Service (SBS), a series of internet-connected kegerators sending sensor outputs such as temperature, humidity, and sound levels in a JSON payload. You can use any existing IoT data source that you may have.

The AWS IoT rules engine allows selecting data from message payloads, processing it, and sending it to other services. You forward the data to a Firehose delivery stream to consolidate the continuous data stream into batches for further processing. The batched data is also stored temporarily in an Amazon S3 bucket for later retrieval and can be set for deletion after a specified time using S3 Lifecycle Management rules.

The incoming data from the Firehose delivery stream is fed into an Analytics application that provides an easy way to process the data in real time using standard SQL queries. Analytics allows writing standard SQL queries to extract specific components from the incoming data stream and perform real-time ETL on it. In this post, you use this feature to aggregate minimum and maximum temperature values from the sensors per minute. You load it in Amazon QuickSight to create a monitoring dashboard and check if the devices are over-heating or cooling down during use. You also extract every device’s location, parameters such as temperature, sound levels, humidity, and the time stamp in Analytics to use on the visualization dashboard.

The processed data from the two queries is fed into two Firehose delivery streams, both of which batch the data into CSV files every minute and store it in S3. The batching time interval is configurable between 1 and 15 minutes in 1-second intervals.

Finally, you use Amazon QuickSight to ingest the processed CSV files from S3 as a data source to build visualizations. Amazon QuickSight’s super-fast, parallel, in-memory, calculation engine (SPICE) parses the ingested data and allows you to create a variety of visualizations with different graph types. You can also use the Amazon QuickSight built-in Story feature to combine visualizations into business dashboards that can be shared in a secure manner.

Implementation

AWS IoT, Amazon Kinesis, and Amazon QuickSight are all fully managed services, which means you can complete the entire setup in just a few steps using the AWS Management Console. Don’t worry about setting up any underlying hardware or installing any additional software. So, get started.

Step 1. Set up your AWS IoT data source

Do you currently use AWS IoT? If you have an existing IoT thing set up and running on AWS IoT, you can skip to Step 2.

If you have an AWS IoT button or other IoT devices that can publish MQTT messages and would like to use that for the setup, follow the Getting Started with AWS IoT topic to connect your thing to AWS IoT. Continue to Step 2.

If you do not have an existing IoT device, you can generate simulated device data using a script on your local machine and have it publish to AWS IoT. The following script lets you set up your AWS IoT environment and publish simulated data that mimics device data from Simple Beer Service.

Generate sample Data

Running the sbs.py Python script generates fictitious AWS IoT messages from multiple SBS devices. The IoT rule sends the message to Firehose for further processing.

The script requires access to AWS CLI credentials and boto3 installation on the machine running the script. Download and run the following Python script:

https://github.com/awslabs/sbs-iot-data-generator/blob/master/sbs.py

The script generates random data that looks like the following:

{"deviceParameter": "Temperature", "deviceValue": 33, "deviceId": "SBS01", "dateTime": "2017-02-03 11:29:37"}
{"deviceParameter": "Sound", "deviceValue": 140, "deviceId": "SBS03", "dateTime": "2017-02-03 11:29:38"}
{"deviceParameter": "Humidity", "deviceValue": 63, "deviceId": "SBS01", "dateTime": "2017-02-03 11:29:39"}
{"deviceParameter": "Flow", "deviceValue": 80, "deviceId": "SBS04", "dateTime": "2017-02-03 11:29:41"}

Run the script and keep it running for the duration of the project to generate sufficient data.

Tip: If you encounter any issues running the script from your local machine, launch an EC2 instance and run the script there as a root user. Remember to assign an appropriate IAM role to your instance at the time of launch that allows it to access AWS IoT.

Step 2. Create three Firehose delivery streams

For this post, you require three Firehose delivery streams:  one to batch raw data from AWS IoT, and two to batch output device data and aggregated data from Analytics.

  1. In the console, choose Firehose.
  2. Create all three Firehose delivery streams using the following field values.

Delivery stream 1:

NameIoT-Source-Stream
S3 bucket<your unique name>-kinesis
S3 prefixsource/

Delivery stream 2:

NameIoT-Destination-Data-Stream
S3 bucket<your unique name>-kinesis
S3 prefixdata/

Delivery stream 3:

NameIoT-Destination-Aggregate-Stream
S3 bucket<your unique name>-kinesis
S3 prefixaggregate/

Step 3. Set up AWS IoT to receive and forward incoming data

  1. In the console, choose IoT.
  2. Create a new AWS IoT rule with the following field values.
NameIoT_to_Firehose
Attribute*
Topic Filter/sbs/devicedata/#
Add ActionSend messages to an Amazon Kinesis Firehose stream (select IoT-Source-Stream from dropdown)
Select Separator“\n (newline)”

A quick check before proceeding further: make sure that you have run the script to generate simulated IoT data or that your IoT Thing is running and delivering data. If not, set it up now. The Amazon Kinesis Analytics application you set up in the next step needs the data to process it further.

Step 4: Create an Analytics application to process data

  1. In the console, choose Kinesis.
  2. Create a new application.
  3. Enter a name of your choice, for example, SBS-IoT-Data.
  4. For the source, choose IoT-Source-Stream.

Analytics auto-discovers the schema on the data by sampling records from the input stream. It also includes an in-built SQL editor that allows you to write standard SQL queries to transform incoming data.

Tip: If Analytics is unable to discover your incoming data, it may be missing the appropriate IAM permissions. In the IAM console, select the role that you assigned to your IoT rule in Step 3. Make sure that it has the ARN of the IoT-Source-Data Firehose stream listed in the firehose:putRecord section.

Here is a sample SQL query that generates two output streams:

  • DESTINATION_SQL_BASIC_STREAM contains the device ID, device parameter, its value, and the time stamp from the incoming stream.
  • DESTINATION_SQL_AGGREGATE_STREAM aggregates the maximum and minimum values of temperatures from the sensors over a one-minute period from the incoming data.
-- Create an output stream with four columns, which is used to send IoT data to the destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_BASIC_STREAM" (dateTime TIMESTAMP, deviceId VARCHAR(8), deviceParameter VARCHAR(16), deviceValue INTEGER);

-- Create a pump that continuously selects from the source stream and inserts it into the output data stream
CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "DESTINATION_SQL_BASIC_STREAM"

-- Filter specific columns from the source stream
SELECT STREAM "dateTime", "deviceId", "deviceParameter", "deviceValue" FROM "SOURCE_SQL_STREAM_001";

-- Create a second output stream with three columns, which is used to send aggregated min/max data to the destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_AGGREGATE_STREAM" (dateTime TIMESTAMP, highestTemp SMALLINT, lowestTemp SMALLINT);

-- Create a pump that continuously selects from a source stream 
CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS INSERT INTO "DESTINATION_SQL_AGGREGATE_STREAM"

-- Extract time in minutes, plus the highest and lowest value of device temperature in that minute, into the destination aggregate stream, aggregated per minute
SELECT STREAM FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) AS "dateTime", MAX("deviceValue") AS "highestTemp", MIN("deviceValue") AS "lowestTemp" FROM "SOURCE_SQL_STREAM_001" WHERE "deviceParameter"='Temperature' GROUP BY FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE);

Real-time analytics shows the results of the SQL query. If everything is working correctly, you see three streams listed, similar to the following screenshot.

Step 5: Connect the Analytics application to output Firehose delivery streams

You create two destinations for the two delivery streams that you created in the previous step. A single Analytics application can have multiple destinations defined; however, this needs to be set up using the AWS CLI, not from the console. If you do not already have it, install the AWS CLI on your local machine and configure it with your credentials.

Tip: If you are running the IoT script from an EC2 instance, it comes pre-installed with the AWS CLI.

Create the first destination delivery stream 

The AWS CLI command to create a new output Firehose delivery stream is as follows:

aws kinesisanalytics add-application-output --application-name <Name of Analytics Application> --current-application-version-id <number> --application-output 'Name=DESTINATION_SQL_BASIC_STREAM,KinesisFirehoseOutput={ResourceARN=<ARN of IoT-Data-Stream>,RoleARN=<ARN of Analytics application>,DestinationSchema={RecordFormatType=CSV}'

Do not copy this into the CLI just yet! Before entering this command, make the following four changes to personalize it:

  • For Name of Analytics Application, enter the value from Step 4, or from the Analytics console.
  • For current-application-version-ID, run the following command:
aws kinesisanalytics describe-application --application-name <application name from above>; | grep ApplicationVersionId
  • For ResourceARN, run the following command:
aws firehose describe-delivery-stream --delivery-stream-name IoT-Destination-Data-Stream | grep DeliveryStreamARN
  • For RoleARN, run the following command:
aws kinesisanalytics describe-application --application-name <application name from above>; | grep RoleARN

Now, paste the complete command in the AWS CLI and press Enter. If there are any errors, the response provides details. If everything goes well, a new destination delivery stream is created to send the first query (DESTINATION_SQL_BASIC_STREAM) to IoT-Destination-Data-Stream.

Create the second destination delivery stream

Following similar steps as above, create a second destination Firehose delivery stream with the following changes:

  • For Name of Analytics Application, enter the same name as the first delivery stream.
  • For current-application-version-ID, increment by 1 from the previous value (unless you made other changes in between these steps). If unsure, run the same command as above to get it again.
  • For ResourceARN, get the value by running the following CLI command:
aws firehose describe-delivery-stream --delivery-stream-name IoT-Destination-Aggregate-Stream | grep DeliveryStreamARN
  • For RoleArn, enter the same value as the first stream.

Run the aws kinesisanalytics CLI command, similar to the previous step but with the new parameters substituted. This creates the second output Firehose destination delivery stream.

Update the IAM role for Analytics to allow writing to both output streams.

  1. In the console, choose IAM, Roles.
  2. Select the role that you created with Analytics in Step 4.
  3. Choose Policy, JSON, and Edit.
  4. Find “Sid”: “WriteOutputFirehose” in the JSON document, go to the “Resource” section and make sure that it includes Resource ARNs of both streams that you found in the previous step.
  5. If it has only one ARN, add the second ARN and choose Save.

This completes the Amazon Kinesis setup. The incoming IoT data is processed by Analytics and delivered, using two output delivery streams, to two separate folders in your S3 bucket.

Step 6: Set up Amazon QuickSight to analyze the data

To build the visualization dashboard, ingest the processed CSV files from the S3 bucket into Amazon QuickSight.

  1. In the console, choose QuickSight.
  2. If this is your first time using Amazon QuickSight, you are asked to create a new account. Follow the prompts.
  3. When you are logged in to your account, choose New Analysis and enter a name of your choice.
  4. Choose New data set for the analysis or, if you have previously imported your data set, select one from the available data sets.
  5. You import two data sets: one with general device parameters information, and the other with aggregates of maximum and minimum temperatures for monitoring. For the first data set, choose S3 from the list of available data sources and enter a name, for example, IoT Device Data.
  6. The location of the S3 bucket and the objects to use are provided to Amazon QuickSight as a manifest file. Create a new manifest file following the supported formats for Amazon S3 manifest files.
  7. In the URIPrefixes section, provide your appropriate S3 bucket and folder location for the general device data. Hint: it should include <your unique name>-kinesis/data/.

Your manifest file should look similar to the following:

{ 
    "fileLocations": [                                                    
              {"URIPrefixes": ["https://s3.amazonaws.com/<YOUR_BUCKET_NAME>/data/<YEAR>/<MONTH>/<DATE>/<HOUR>/"]}
     ],
     "globalUploadSettings": { 
     "format": "CSV",  
     "delimiter": ","
    }
}

Amazon QuickSight imports and parses the data set, and provides available data fields that can be used for making graphs. The Edit/Preview data button allows you to format and transform the data, change data types, and filter or join your data. Make sure that the columns have the correct titles. If not, you can edit them and then save.

Tip: choose the downward arrow on the top right and unselect Files include headers to give each column appropriate headers. Choose Save. This takes you back to the data sets page.

Follow the same steps as above to import the second data set. This time, your manifest should include your aggregate data set folder on S3, which is named <your unique name>-kinesis/aggregate/. Update headers if necessary and choose Save & visualize.

Build an analysis

The visualization screen shows the data set that you last imported, which in this case is the aggregate data. To include the general device data as well, for Fields on the top left, choose Edit analysis data sets. Choose Add data set and select the other data set that you saved earlier.

Now both data sets are available on the analysis screen. For Visual Types at bottom left, select the type of graph to make. For Fields, select the fields to visualize. For example, drag Device ID, Device Parameter, and Value to Field wells, as shown in the screenshot below, to generate a visualization of average parameter values compared across devices.

You can create another visual by choose +Add. This time, select a line graph to show monitoring of the maximum temperature values of the sensors in any minute, from the aggregate data set.

If you would like to create an interactive story to present to your team or organization, you can choose the Story option on the left panel. Create a dashboard with multiple visualizations, to save and share securely with the intended audience. An example of a story is shown below.

Conclusion

Any data is valuable only when it can be actually put to use. In this post, you’ve seen how it’s possible to quickly build a simple Analytics application to ingest, process, and visualize IoT data in near real time entirely using AWS managed services. This solution is scalable and reliable, and costs a fraction of other business intelligence solutions. It is easy enough that anyone with an AWS account can build and use it without any special training.

If you have any questions or suggestions, please comment below.


About the Author

Karan Desai is a Solutions Architect with Amazon Web Services. He works with startups and small businesses in the US, helping them adopt cloud technology to build scalable and secure solutions using AWS. In his spare time, he likes to build personal IoT projects, travel to offbeat places and write about it.

 

 


Related

Visualize Big Data with Amazon QuickSight, Presto, and Apache Spark on Amazon EMR

 

 

 

 

 

 

 

Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator

Post Syndicated from Allan MacInnis original https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/

When building a streaming data solution, most customers want to test it with data that is similar to their production data. Creating this data and streaming it to your solution can often be the most tedious task in testing the solution.

Amazon Kinesis Streams and Amazon Kinesis Firehose enable you to continuously capture and store terabytes of data per hour from hundreds of thousands of sources. Amazon Kinesis Analytics gives you the ability to use standard SQL to analyze and aggregate this data in real-time. It’s easy to create an Amazon Kinesis stream or Firehose delivery stream with just a few clicks in the AWS Management Console (or a few commands using the AWS CLI or Amazon Kinesis API). However, to generate a continuous stream of test data, you must write a custom process or script that runs continuously, using the AWS SDK or CLI to send test records to Amazon Kinesis. Although this task is necessary to adequately test your solution, it means more complexity and longer development and testing times.

Wouldn’t it be great if there were a user-friendly tool to generate test data and send it to Amazon Kinesis? Well, now there is—the Amazon Kinesis Data Generator (KDG).

KDG overview

The KDG simplifies the task of generating data and sending it to Amazon Kinesis. The tool provides a user-friendly UI that runs directly in your browser. With the KDG, you can do the following:

  • Create templates that represent records for your specific use cases
  • Populate the templates with fixed data or random data
  • Save the templates for future use
  • Continuously send thousands of records per second to your Amazon Kinesis stream or Firehose delivery stream

The KDG is open source, and you can find the source code on the Amazon Kinesis Data Generator repo in GitHub. Because the tool is a collection of static HTML and JavaScript files that run directly in your browser, you can start using it immediately without downloading or cloning the project. It is enabled as a static site in GitHub, and we created a short URL to access it.

To get started immediately, check it out at http://amzn.to/datagen.

Using the KDG

Getting started with the KDG requires only three short steps:

  1. Create an Amazon Cognito user in your AWS account (first-time only).
  2. Use this user’s credentials to log in to the KDG.
  3. Create a record template for your data.

When you’ve completed these steps, you can then send data to Streams or Firehose.

Create an Amazon Cognito user

The KDG is a great example of a mobile application that uses Amazon Cognito for a user repository and user authentication, and the AWS JavaScript SDK to communicate with AWS services directly from your browser. For information about how to build your own JavaScript application that uses Amazon Cognito, see Use Amazon Cognito in your website for simple AWS authentication on the AWS Mobile Blog.

Before you can start sending data to your Amazon Kinesis stream, you must create an Amazon Cognito user in your account who can write to Streams and Firehose. When you create the user, you create a username and password for that user. You use those credentials to sign in to the KDG. To simplify creating the Amazon Cognito user in your account, we created a Lambda function and a CloudFormation template. For more information about creating the Amazon Cognito user in your AWS account, see Configure Your AWS Account.

Note:  It’s important that you use the URL provided by the output of the CloudFormation stack the first time that you access the KDG. This URL contains parameters needed by the KDG. The KDG stores the values of these parameters locally, so you can then access the tool using the short URL, http://amzn.to/datagen.

Log in to the KDG

After you create an Amazon Cognito user in your account, the next step is to log in to the KDG. To do this, provide the username and password that you created earlier.

On the main page, you can configure your data templates and send data to an Amazon Kinesis stream or Firehose delivery stream.

The basic configuration is simple enough. All fields on the page are required:

  • Region: Choose the AWS Region that contains the Amazon Kinesis stream or Firehose delivery stream to receive your streaming data.
  • Stream/firehose name: Choose the name of the stream or delivery stream to receive your streaming data.
  • Records per second: Enter the number of records to send to your stream or delivery stream each second.
  • Record template: Enter the raw data, or a template that represents your data structure, to be used for each record sent by the KDG. For information about creating templates for your data, see the “Creating Record Templates” section, later in this post.

When you set the Records per second value, consider that the KDG isn’t intended to be a data producer for load-testing your application. However, it can easily send several thousand records per second from a single tab in your browser, which is plenty of data for most applications. In testing, the KDG has produced 80,000 records per second to a single Amazon Kinesis stream, but your mileage may vary. The maximum rate at which it produces records depends on your computer’s specs and the complexity of your record template.

Ensure that your stream or delivery stream is scaled appropriately:

  • 1,000 records/second or 1 MB/second to an Amazon Kinesis stream
  • 5,000 records/second or 5 MB/second to a Firehose delivery stream

Otherwise, Amazon Kinesis may reject records, and you won’t achieve your desired throughput. For more information about adding capacity to a stream by adding more shards, see Resharding a Stream. For information about increasing the capacity of a delivery stream, see Amazon Kinesis Firehose Limits.

Create record templates

The Record Template field is a free-text field where you can enter any text that represents a single streaming data record. You can create a single line of static data, so that each record sent to Amazon Kinesis is identical. Or, you can format the text as a template.

In this case, the KDG substitutes portions of the template with fake or random data before sending the record. This lets you introduce randomness or variability in each record that is sent in your data stream. The KDG uses Faker.js, an open source library, to generate fake data. For more information, see the faker.js project page in GitHub. The easiest way to see how this works is to review an example.

To simulate records being sent from a weather sensor Internet of Things (IoT) device, you want each record to be formatted in JSON. The following is an example of what a final record must look like:

{
	"sensorId": 40,
	"currentTemperature": 76,
	"status": "OK"
} 

For this use case, you want to simulate sending data from one of 50 sensors, so the sensorID field can be an integer between 1 and 50. The temperature value can range between 10 and 150, so the currentTemperature field should contain a value in this range. Finally, the status value can be one of three possible values: OK, FAIL, and WARN. The KDG template format uses moustache syntax (double curly-braces) to enclose items that should be replaced before the record is sent to Amazon Kinesis. To model the record, the template looks like this:

{
    "sensorId": {{random.number(50)}},
    "currentTemperature": {{random.number(
        {
            "min":10,
            "max":150
        }
    )}},
    "status": "{{random.arrayElement(
        ["OK","FAIL","WARN"]
    )}}"
}

Take a look at one more example, simulating a stream of records that represent rows from an Apache access log. A single Apache access log entry might look like this:

76.0.56.179 - - [29/Apr/2017:16:32:11 -05:00] "GET /wp-admin" 200 8233 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0 rv:6.0; CY) AppleWebKit/535.0.0 (KHTML, like Gecko) Version/4.0.3 Safari/535.0.0"

The following example shows how to create a template for the Apache access log:

{{internet.ip}} - - [{{date.now("DD/MMM/YYYY:HH:mm:ss Z")}}] "{{random.weightedArrayElement({"weights":[0.6,0.1,0.1,0.2],"data":["GET","POST","DELETE","PUT"]})}} {{random.arrayElement(["/list","/wp-content","/wp-admin","/explore","/search/tag/list","/app/main/posts","/posts/posts/explore"])}}" {{random.weightedArrayElement({"weights": [0.9,0.04,0.02,0.04], "data":["200","404","500","301"]})}} {{random.number(10000)}} "-" "{{internet.userAgent}}"

For more information about creating your own templates, see the Record Template section of the KDG documentation.

The KDG saves the templates that you create in your local browser storage. As long as you use the same browser on the same computer, you can reuse up to five templates.

Summary

Testing your streaming data solution has never been easier. Get started today by visiting the KDG hosted UI or its Amazon Kinesis Data Generator page in GitHub. The project is licensed under the Apache 2.0 license, so feel free to clone and modify it for your own use as necessary. And of course, please submit any issues or pull requests via GitHub.

If you have any questions or suggestions, please add them below.

 


About the Author

Allan MacInnis is a Solutions Architect at Amazon Web Services. He works with our customers to help them build streaming data solutions using Amazon Kinesis. In his spare time, he enjoys mountain biking and spending time with his family.

 

 


Related

Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount