Tag Archives: Amazon Kinesis

Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects

Post Syndicated from Rajeev Chakrabarti original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/

In February 2019, Amazon Web Services (AWS) announced a new feature in Amazon Kinesis Data Firehose called Custom Prefixes for Amazon S3 Objects. It lets customers specify a custom expression for the Amazon S3 prefix where data records are delivered. Previously, Kinesis Data Firehose allowed only specifying a literal prefix. This prefix was then combined with a static date-formatted prefix to create the output folder in a fixed format. Customers asked for flexibility, so AWS listened and delivered.

Kinesis Data Firehose is most commonly used to consume event data from streaming sources, such as applications or IoT devices.  The data then is typically stored in a data lake, so it can be processed and eventually queried.  When storing data on Amazon S3, it is a best practice to partition or group related data and store it together in the same folder.  This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thus improving performance and reducing cost.

A common way to group data is by date.  Kinesis Data Firehose automatically groups data and stores it into the appropriate folders on Amazon S3 based on the date.  However, the naming of folders in Amazon S3 is not compatible with Apache Hive naming conventions. This makes data more difficult to catalog using AWS Glue crawlers and analyze using big data tools.

This post discusses a new capability that lets us customize how Kinesis Data Firehose names the output folders in Amazon S3. It covers how custom prefixes work, the intended use cases, and includes step-by-step instructions to try the feature in your own account.

The need for custom prefixes for Amazon S3 objects

Previously, Kinesis Data Firehose created a static Universal Coordinated Time (UTC) based folder structure in the format YYYY/MM/DD/HH. It then appended it to the provided prefix before writing objects to Amazon S3. For example, if you provided a prefix “mydatalake/”, the generated folder hierarchy would be “mydatalake/2019/02/09/13”.  However, to be compatible with Hive naming conventions, the folder structure is expected to follow the format “/partitionkey=partitionvalue”.  Using this naming convention, data can be easily cataloged with AWS Glue crawlers, resulting in proper partition names.

Other methods for managing partitions also become possible such as running MSCK REPAIR TABLE in Amazon Athena or Apache Hive on Amazon EMR, which can add all partitions through a single statement. Furthermore, you can use other date-based partitioning patterns like “/dt=2019-02-09-13/” instead of expanding the date out into folders.  This is helpful in reducing the total number of partitions that need to be maintained as the table grows over time. It also simplifies range queries. Providing the ability to specify custom prefixes obviates the need for an additional ETL step to put the data in the right folder structure improving the time to insight.

How custom prefixes for Amazon S3 objects works

This new capability does not let you use any date or timestamp value from your event data, nor can you use any other arbitrary value in the event. Kinesis Data Firehose uses an internal timestamp field called ApproximateArrivalTimestamp. Each data record includes an ApproximateArrivalTimestamp (in UTC) that is set when a stream successfully receives and stores the record. This is commonly referred to as a server-side timestamp. Kinesis Data Firehose buffers incoming records according to the configured buffering hints and delivers them into Amazon S3 objects for the Amazon S3 destination. The resulting objects in Amazon S3 may contain multiple records, each with a different ApproximateArrivalTimestamp. When evaluating timestamps, Kinesis Data Firehose uses the ApproximateArrivalTimestamp of the oldest record that’s contained in the Amazon S3 object being written.

Kinesis Data Firehose also provides the ability to deliver records to a different error output location when there is a delivery, AWS Lambda transformation or format conversion failure. Previously, the error output location could not be configured and was determined by the type of delivery failure. With this release, the error output location (ErrorOutputPrefix) can also be configured. One benefit of this new capability is that you can separate failed records into date partitioned folders for easy reprocessing.

So how do you specify the custom Prefix and the ErrorOutputPrefix? You use an expression of the form: !{namespace:value}, where the namespace can be either firehose or timestamp. The value can be either “random-string” or “error-output-type” for the firehose namespace or a date pattern for the timestamp namespace in the Java DateTimeFormatter format. In a single expression, you can use a combination of the two namespaces although the !{firehose: error-output-type} can be used only in the ErrorOutputPrefix. For more information and examples, see Custom Prefixes for Amazon S3 Objects.

Writing streaming data into Amazon S3 with Kinesis Data Firehose

This walkthrough describes how streaming data can be written into Amazon S3 with Kinesis Data Firehose using a Hive compatible folder structure.  It then shows how AWS Glue crawlers can infer the schema and extract the proper partition names that we designated in Kinesis Data Firehose, and catalog them in AWS Glue Data Catalog.  Finally, we run sample queries to show that partitions are indeed being recognized.

To demonstrate this, we use python code to generate sample data.  We also use a Lambda transform on Kinesis Data Firehose to forcibly create failures. This demonstrates how data can be saved to the error output location. The code that you need for this walkthrough is included here in GitHub.

For this walkthrough, this is the architecture that we are building:

Step 1: Create an Amazon S3 bucket

Create an S3 bucket to be used by Kinesis Data Firehose to deliver event records. We use the AWS Command Line Interface (AWS CLI) to create the Amazon S3 bucket in the US East (N. Virginia) Region. Remember to substitute the bucket name in the example for your own.

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

Step 2: Lambda Transform (optional)

The incoming events have an ApproximateArrivalTimestamp field in the event payload.  This is sufficient to create a proper folder structure on Amazon S3.  However, when querying the data it may be beneficial to expose this timestamp value as a top level column for easy filtering and validation.  To accomplish this, we create a Lambda function that adds the ApproximateArrivalTimestamp as a top level field in the data payload. The data payload is what Kinesis Data Firehose writes as an object in Amazon S3. Additionally, the Lambda code also artificially generates some processing errors that are delivered to the “ErrorOutputPrefix” location specified for the delivery destination to illustrate the use of expressions in the “ErrorOutputPrefix.”

Create an IAM role for the Lambda transform function

First, create a role for the Lambda function called LambdaBasicRole. The TrustPolicyForLambda.json file is included in the GitHub repository.

$ aws iam create-role --role-name KDFLambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json

After the role is created, attach the managed Lambda basic execution policy to it.

$ aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name KDFLambdaBasicRole

Lambda function

To create the Lambda function, start with the Python Kinesis Data Firehose blueprint “General Firehose Processing” and then modify it. For more information about the structure of the records and what must be returned, see Amazon Kinesis Data Firehose Data Transformation.

Zip up the Python file, and then create the Lambda function using the AWS CLI. The CreateLambdaFunctionS3CustomPrefixes.json file is included in the GitHub repository.

aws lambda create-function --zip-file "fileb://lambda_function.zip" --cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

Step3. Delivery Stream

Next, create the Kinesis Data Firehose delivery stream. The createdeliverystream.json file is included in the GitHub repository.

 aws firehose create-delivery-stream --cli-input-json file://createdeliverystream.json

In the previous configuration, we defined a Prefix and an ErrorOutputPrefix under the “ExtendedS3DestinationConfiguration” element. We defined the same for the “S3BackupConfiguration” element. Note that when the “ProcessingConfiguration” element is set to “Disabled”, the ErrorOutputPrefix parameter of the “ExtendedS3DestinationConfiguration” element exists only for consistency. It otherwise has no significance.

We’ve chosen a prefix that will result in a folder structure compatible with hive-style partitioning. This is the prefix we used:

“fhbase/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/”

Kinesis Data Firehose first creates a base folder called “fhbase” directly under the Amazon S3 bucket. Second, it evaluates the expressions !{timestamp:YYYY}, !{timestamp:MM}, !{timestamp:dd}, and !{timestamp:HH} to year, month, day and hour using the Java DateTimeFormatter format. For example, an ApproximateArrivalTimestamp of 1549754078390 in UNIX epoch time, which is 2019-02-09T16:13:01.000000Z in UTC would evaluate to “year=2019”, “month=02”, “day=09” and “hour=16”.  Therefore, the location in Amazon S3 where data records that are delivered evaluate to “fhbase/year=2019/month=02/day=09/hour=16/”.

Similarly, the ErrorOutputPrefix “fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/” results in a base folder called “fherroroutputbase” directly under the S3 bucket. The expression !{firehose:random-string} evaluates to an 11 character random string like “ztWxkdg3Thg”.  If you use this more than once in the same expression, every instance evaluates to a new random string. The expression !{firehose:error-output-type} evaluates to one of the following:

  1. “processing-failed” for Lambda transformation delivery failures
  2. “elasticsearch-failed” for an Amazon ES destination delivery failures
  3. “splunk-failed” for Splunk destination delivery failures
  4. “format-conversion-failed” for data format conversion failures

So, the location for an Amazon S3 object containing the delivery failed records for a Lambda transformation could evaluate to: fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/.

You can run aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample to describe the delivery stream created.

Next, enable encryption-at-rest for the delivery stream:

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

Or Create the delivery stream using the AWS Console

  1. Choose the source. For this example, I use Direct PUT.
  2. Choose if you would like to transform the incoming records with a Lambda transformation. I chose Enabled, and chose the name of the Lambda function that I had created earlier.

  1. Choose the destination. I chose the Amazon S3 destination.

  1. Choose the Amazon S3 bucket. I chose the Amazon S3 bucket that I had created earlier in this exercise.

  1. Specify the Amazon S3 Prefix and the Amazon S3 error prefix. This corresponds to the “Prefix” and “ErrorOutputPrefix” explained earlier in the context of the AWS CLI input JSON.

  1. Choose whether you would like to back up the raw (before transformation) records to another Amazon S3 location. I chose Enabled and specified the same bucket (you could choose a different bucket). I also specified a different prefix from the transformed records – the base folder is different but the folder structure below that is the same. This would make it more efficient to crawl this location using an AWS Glue crawler or create external tables in Athena or Redshift Spectrum pointing to this location.

  1. Specify the buffering hints for the Amazon S3 destination. I chose 1 MB and 240 seconds.
  2. Choose the S3 Compression and encryption settings. I chose no compression for the transformed records’ location. I chose to encrypt the Amazon S3 location at rest by using the service-managed AWS KMS customer master key (CMK).
  3. Choose whether you want to enable Error Logging in Cloudwatch. I chose Enabled.
  4. Specify the IAM role that you want Kinesis Data Firehose to assume to access resources on your behalf. Choose either Create new or Choose to display a new screen. Choose Create a new IAM role, name the role, and then choose Allow.
  5. Choose Create Delivery Stream.

The delivery stream is now created and active. You can send events to it.

 Test with sample data

I used Python code to generate sample data. The structure of the generated data is as follows:

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}
{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}
{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

Sample code to generate data and push it into Kinesis Data Firehose is included in the GitHub repository.

After you start sending events to the Kinesis Data Firehose delivery stream, objects should start appearing under the specified prefixes in Amazon S3.

I wanted to illustrate Lambda invoke errors and the appearance of files in the ErrorOutputPrefix location for Lambda transform errors. Therefore, I did not give permissions to the “firehose_delivery_role” to invoke my Lambda function. The following file showed up in the location specified by the ErrorOutputPrefix.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

Here is a snippet of the contents of the error file that I previously mentioned.

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

After I gave the “firehose_delivery_role” the appropriate permissions, the data objects showed up in the “Prefix” location specified for the Amazon S3 destination.

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

Also, because the Lambda code in my Lambda transform set the status failed for 10 percent of the records, those showed up in the ErrorOutputPrefix location for Lambda transform errors.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

Here is a snippet of the content of the error file:

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

You’re now ready to create an AWS Glue crawler. For more information about using the AWS Glue Data Catalog, see Populating the AWS Glue Data Catalog.

  1. In the AWS Glue console, go to Crawlers, and choose Add Crawler.

  1. Add information about your crawler, then choose Next.
  2. In the Include Path, specify the Amazon S3 bucket name that you entered under the Amazon S3 destination. Also include the static prefix used when you created the Kinesis Data Firehose delivery stream. Do not include the custom prefix expression.
  3. Choose Next.

  1. Choose Next, No, Next.
  2. Specify the IAM role that AWS Glue would use. I chose to create a new IAM Role. Choose Next.
  3. Specify a schedule to run the crawler. I chose to Run it on Demand. Choose Next.
  4. Specify where the crawler adds the crawled and discovered tables. I chose the default database. Choose Next.

  1. Choose Finish.
  1. The crawler has been created and is ready to be run. Choose Run crawler.

  1. In the AWS Glue console, go to Tables. You can see that a table has been created with the name of the base folder. Choose fhbase.

The crawler has discovered and populated the table and its properties.

You can see the discovered schema. The crawler has identified and created the partitions based on the folder structure specified by the prefix expression.

Open the Amazon Athena console, and select the default database from the drop-down menu. Write the following query in the New query1 window, then choose Run query.

SELECT * FROM "default"."fhbase"

where year = '2019' and day = '12' and hour = '17'

order by approxarrtimestamputcfh desc

Notice that Amazon Athena recognizes the fhbase table as a partitioned table. The query can take advantage of the partitions in the query to filter the results.

Conclusion

As this post illustrates, Custom Prefixes for Amazon S3 objects provides much flexibility to customize the folder structure, where Kinesis Data Firehose delivers the data records and failure records in Amazon S3. Having control over the folder structure and naming in Amazon S3 simplifies data discovery, cataloging, and access. As a result, it helps get insight more expediently and helps you better manage the cost of your queries.

 


About the Author

Rajeev Chakrabarti is a Kinesis specialist solutions architect.

 

 

 

 

Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/

Stream processing facilitates the collection, processing, and analysis of real-time data and enables the continuous generation of insights and quick reactions to emerging situations. This capability is useful when the value of derived insights diminishes over time. Hence, the faster you can react to a detected situation, the more valuable the reaction is going to be. Consider, for instance, a streaming application that analyzes and blocks fraudulent credit card transactions while they occur. Compare that application to a traditional batch-oriented approach that identifies fraudulent transactions at the end of every business day and generates a nice report for you to read the next morning.

It is quite common for the value of insights to diminish over time. Therefore, using stream processing can substantially improve the value of your analytics application. However, building and operating a streaming application that continuously receives and processes data is much more challenging than operating a traditional batch-oriented analytics application.

In this post, we discuss how you can use Apache Flink and Amazon Kinesis Data Analytics for Java Applications to address these challenges. We explore how to build a reliable, scalable, and highly available streaming architecture based on managed services that substantially reduce the operational overhead compared to a self-managed environment. We particularly focus on how to prepare and run Flink applications with Kinesis Data Analytics for Java Applications. To this end, we use an exemplary scenario that includes source code and AWS CloudFormation templates. You can follow along with this example using your own AWS account or adapt the code according to your specific requirements.

Challenges of running streaming applications

When you build a streaming application, the downstream systems naturally rely on a continuous and timely generation of output. Accordingly, there are much higher requirements on the availability of the streaming application. There is also much less time to address operational issues compared to a traditional batch-based approach. In a batch-processing scenario, when a job that runs once at the end of a business day fails, you can often restart the failed job and still complete the computation by the next morning, when the results are needed. In contrast, when a streaming application fails, downstream systems that consume the output might be affected within minutes, or even sooner, when the expected output no longer arrives in time.

Moreover, in case of failure, you can’t simply delete all intermediate results and restart a failed processing job, as it is commonly done in the batch-processing case. The output of a streaming job is continuously consumed by downstream systems. Output that has already been consumed cannot easily be retracted. Therefore, the entire processing pipeline is much more sensitive to duplicates that are introduced by an application that is restarted on failure. Furthermore, the computations of a streaming application often rely on some kind of internal state that can be corrupted or even lost when the application fails.

Last but not least, streaming applications often deal with a varying amount of throughput. Therefore, scaling the application according to the current load is highly desirable. When the load increases, the infrastructure that supports the streaming application must scale to keep the application from becoming overloaded, falling behind, and producing results that are no longer relevant. On the other hand, when the load decreases, the infrastructure should scale in again to remain cost effective by not provisioning more resources than are needed.

A reliable and scalable streaming architecture based on Flink and Kinesis Data Analytics for Java Applications

Apache Flink is an open-source project that is tailored to stateful computations over unbounded and bounded datasets. Flink addresses many of the challenges that are common when analyzing streaming data by supporting different APIs (including Java and SQL), rich time semantics, and state management capabilities. It can also recover from failures while maintaining exactly-once processing semantics. Therefore, Flink is well suited for analyzing streaming data with low latency.

In this post, we illustrate how to deploy, operate, and scale a Flink application with Kinesis Data Analytics for Java Applications. We use a scenario to analyze the telemetry data of a taxi fleet in New York City in near-real time to optimize the fleet operation. In this scenario, every taxi in the fleet is capturing information about completed trips. The tracked information includes the pickup and drop-off locations, number of passengers, and generated revenue. This information is ingested into a Kinesis data stream as a simple JSON blob. From there, the data is processed by a Flink application, which is deployed to Kinesis Data Analytics for Java Applications. This application identifies areas that are currently requesting a high number of taxi rides. The derived insights are finally persisted into Amazon Elasticsearch Service, where they can be accessed and visualized using Kibana.

This scenario leads to the following architecture, which is separated into three stages for the ingestion, processing, and presentation of data.

Separating the different aspects of the infrastructure is a common approach in this domain and has several benefits over a more tightly coupled architecture.

First, the Kinesis data stream serves as a buffer that decouples the producers from the consumers. Taxis can persist the events that they generate into the data stream regardless of the condition of, for instance, the processing layer, which might be currently recovering from a node failure. Likewise, the derived data remains available through Kibana even if the ingestion or processing layer is currently unavailable due to some operational issues. Last but not least, all components can be scaled independently and can use infrastructure that is specifically tailored according to their individual requirements.

This architecture also allows you to experiment and adopt new technologies in the future. Multiple independent applications can concurrently consume the data stored in the Kinesis data stream. You can then test how a new version of an existing application performs with a copy of the production traffic. But you can also introduce a different tool and technology stack to analyze the data, again without affecting the existing production application. For example, it is common to persist the raw event data to Amazon S3 by adding a Kinesis Data Firehose delivery stream as a second consumer to the Kinesis data stream. This facilitates long-term archiving of the data, which you can then use to evaluate ad hoc queries or analyze historic trends.

All in all, separating the different aspects of the architecture into ingestion, processing, and presentation nicely decouples different components, making the architecture more robust. It furthermore allows you to choose different tools for different purposes and gives you a lot of flexibility to change or evolve the architecture over time.

For the rest of this post, we focus on using Apache Flink and Kinesis Data Analytics for Java Applications to identify areas that currently request a high number of taxi rides. We also derive the average trip duration to the New York City airports. But with this architecture, you also have the option to consume the incoming events using other tools, such as Apache Spark Structured Streaming and Kinesis Data Firehose, instead of, or in addition to, what is described here.

Let’s kick the tires!

To see the described architecture in action, execute the following AWS CloudFormation template in your own AWS account. The template first builds the Flink application that analyzes the incoming taxi trips, including the Flink Kinesis Connector that is required to read data from a Kinesis data stream. It then creates the infrastructure and submits the Flink application to Kinesis Data Analytics for Java Applications.

The entire process of building the application and creating the infrastructure takes about 20 minutes. After the AWS CloudFormation stack is created, the Flink application has been deployed as a Kinesis Data Analytics for Java application. It then waits for events in the data stream to arrive. Checkpointing is enabled so that the application can seamlessly recover from failures of the underlying infrastructure while Kinesis Data Analytics for Java Applications manages the checkpoints on your behalf. In addition, automatic scaling is configured so that Kinesis Data Analytics for Java Applications automatically allocates or removes resources and scales the application (that is, it adapts its parallelism) in response to changes in the incoming traffic.

To populate the Kinesis data stream, we use a Java application that replays a public dataset of historic taxi trips made in New York City into the data stream. The Java application has already been downloaded to an Amazon EC2 instance that was provisioned by AWS CloudFormation. You just need to connect to the instance and execute the JAR file to start ingesting events into the stream.

You can obtain all of the following commands, including their correct parameters, from the output section of the AWS CloudFormation template that you executed previously.

$ ssh [email protected]«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-1.0.jar -stream «Kinesis data stream name» -region «AWS region» -speedup 3600

The speedup parameter determines how much faster the data is ingested into the Kinesis data stream relative to the actual occurrence of the historic events. With the given parameters, the Java application ingests an hour of historic data within one second. This results in a throughput of roughly 13k events and 6 MB of data per second, which completely saturates the Kinesis data stream (more on this later).

You can then go ahead and inspect the derived data through the Kibana dashboard that has been created. Or you can create your own visualizations to explore the data in Kibana.

https://«Elasticsearch endpoint»/_plugin/kibana/app/kibana#/dashboard/nyc-tlc-dashboard

The prepared Kibana dashboard contains a heatmap and a line graph. The heatmap visualizes locations where taxis are currently requested, and it shows that the highest demand for taxis is Manhattan. Moreover, the JFK and LaGuardia airports are also spots on the map where substantially more rides are requested compared to their direct neighborhoods. The line graph visualizes the average trip duration to these two airports. The following image shows how it steadily increases throughout the day until it abruptly drops in the evening.

For this post, the Elasticsearch cluster is configured to accept connections from the IP address range specified as a parameter of the AWS CloudFormation template. For production workloads, it’s much more desirable to further tighten the security of your Elasticsearch domain, for instance, by using Amazon Cognito for Kibana access control.

Scaling the architecture to increase its throughput

For this post, the Kinesis data stream was deliberately underprovisioned so that the Java application is completely saturating the data stream. When you closely inspect the output of the Java application, you’ll notice that the “replay lag” is continuously increasing. This means that the producer cannot ingest events as quickly as it is required according to the specified speedup parameter.

You can dive deeper into the metrics of the data stream by accessing it through an Amazon CloudWatch Dashboard. You can then see that the WriteProvisionedThroughputExceeded metric is slightly increased: Roughly 0.4 percent of the records are not accepted into the stream as the respective requests are throttled. In other terms, the data stream is underprovisioned, in particular as the producer pauses the ingestion of new events when too many events are in flight.

To increase the throughput of the data stream, you can simply update the number of shards from 6 to 12 with a couple of clicks on the console and through an API call, respectively. For production environments, you might even want to automate this procedure. For details on how to automatically scale a Kinesis data stream, see the blog post Scaling Amazon Kinesis Data Streams with AWS Application Auto Scaling.

When the scaling operation of the stream finishes, you can observe how the “replay lag” decreases and more events are ingested into the stream.

However, as a direct result, more events need to be processed. So now the Kinesis Data Analytics for Java application becomes overloaded and can no longer keep up with the increased number of incoming events. You can observe this through the millisBehindLatest metric, which is published to CloudWatch. The metric reports the time difference between the oldest record currently read by the Kinesis Data Analytics for Java application and the latest record in the stream according to the ingestion time in milliseconds. So it indicates how much behind the processing is from the tip of the stream.

As these metrics show, 10 minutes after the scaling operation finishes, processing is already more than 3 minutes behind the latest event in the stream. Even worse, it steadily keeps falling behind, continuously widening this gap.

However, in contrast to Kinesis Data Streams, Kinesis Data Analytics for Java Applications natively supports auto scaling. After a couple of minutes, you can see the effect of the scaling activities in the metrics. The millisBehindLatest metric starts to decrease until it reaches zero, when the processing has caught up with the tip of the Kinesis data stream.

However, notice how the millisBehindLatest metric spikes just before it starts to decline. This is caused by the way that scaling a Kinesis Data Analytics for Java application works today. To scale a running application, the internal state of the application is persisted into a so-called savepoint. This savepoint is exposed as a snapshot by Kinesis Data Analytics for Java Applications. Subsequently, the running instance of the application is terminated, and a new instance of the same application with more resources and a higher parallelism is created. The new instance of the application then populates its internal state from the snapshot and resumes the processing from where the now terminated instance left off.

Accordingly, the scaling operation causes a brief interruption of the processing, which explains the spike in metric. However, this operation is transparent to the producers and consumers. Producers can continue to write to the Kinesis data stream because they are nicely decoupled from the application. Likewise, consumers can still use Kibana to view their dashboards, although they might not see the latest data because it hasn’t yet been processed.

Let’s step back for a moment and review what you just did: You created a fully managed, highly available, scalable streaming architecture. You ingested and analyzed up to 25k events per second. You doubled the throughput of the architecture by scaling the Kinesis data stream and the Kinesis Data Analytics for Java application with a couple of clicks. You did all this while the architecture remained fully functional and kept receiving and processing events, not losing a single event. You also could have scaled the Elasticsearch cluster as seamlessly as the other components. But we’ll leave that as an exercise for the interested reader.

Try to imagine what it would have taken you to build something similar from scratch.

Prepare Flink applications for Kinesis Data Analytics for Java Applications

Now that you have seen the streaming application in action, let’s look at what is required to deploy and run a Flink application with Kinesis Data Analytics for Java Applications.

Similar to other deployment methods, the Flink application is first built and packaged into a fat JAR, which contains all the necessary dependencies for the application to run. The resulting fat JAR is then uploaded to Amazon S3. The location of the fat JAR on S3 and some additional configuration parameters are then used to create an application that can be executed by Kinesis Data Analytics for Java Applications. So instead of logging in to a cluster and directly submitting a job to the Flink runtime, you upload the respective fat JAR to S3. You then create a Kinesis Data Analytics for Java application that you can interact with using API calls, the console, and the AWS CLI, respectively.

Adapt the Flink configuration and runtime parameters

To obtain a valid Kinesis Data Analytics for Java application, the fat JAR of the Flink application must include certain dependencies. When you use Apache Maven to build your Flink application, you can simply add another dependency to the .pom file of your project.

<!—pom.xml ->
<project>
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-kinesisanalytics-runtime</artifactId>
            <version>1.0.1</version>
        </dependency>
    </dependencies>
    ...
</project>

You can then specify parameters that are passed to the resulting Kinesis Data Analytics for Java application when it is created or updated. These parameters are basically key-value pairs that are contained in a property map that is part of a property group.

"ApplicationConfiguration": {
    "EnvironmentProperties": {
        "PropertyGroups": [
            {
                "PropertyGroupId": "FlinkApplicationProperties",
                "PropertyMap": {
                    "InputStreamName": "...",
                    ...
                }
            }
        ]
    },
    ...
}

You can then obtain the values of these parameters in the application code from the Kinesis Data Analytics for Java Applications runtime. For example, the following code snippet gets the name of the Kinesis data stream that the application should connect to from the FlinkApplicationProperties property group.

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

String kinesisStreamName = flinkProperties.getProperty("InputStreamName");

You use the same mechanism to configure other properties for the Kinesis Data Analytics for Java application (for example, checkpointing and the parallelism of the application) that are usually specified as a parameter or configuration option directly to the Flink runtime.

"ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
        "CheckpointConfiguration": {
            "ConfigurationType": "DEFAULT"
        },
        "MonitoringConfiguration": {
            "ConfigurationType": "CUSTOM",
            "MetricsLevel": "TASK",
            "LogLevel": "INFO"
        },
        "ParallelismConfiguration": {
            "ConfigurationType": "DEFAULT"
        }
    },
    ...
}

With this configuration, the checkpointing and parallelism settings are left at their default. This enables checkpointing and auto scaling and sets the initial parallelism of the Kinesis Data Analytics for Java application to one. Moreover, the log level is increased to INFO and CloudWatch metrics are collected for every subtask of the application.

Build the Flink Kinesis Connector

When you are building a Flink application that reads data from a Kinesis data stream, you might notice that the Flink Kinesis Connector is not available from Maven central. You actually need to build it yourself. The following steps build the connector for any recent Apache Flink release. However, because Kinesis Data Analytics for Java Applications is based on Flink 1.6.2, you can use this specific version for now.

$ wget -qO- https://github.com/apache/flink/archive/release-1.6.2.zip | bsdtar -xf-

$ cd flink-release-1.6.2

$ mvn clean package -B -DskipTests -Dfast -Pinclude-kinesis -pl flink-connectors/flink-connector-kinesis

Note that the connector has already been built and stored on S3 by the AWS CloudFormation template. You can simply download the JAR file of the connector from there and put it in your local Maven repository using the following Maven command:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.6.2.jar -DpomFile flink-connector-kinesis_2.11-1.6.2.pom.xml

Integrate the Flink Elasticsearch sink with Amazon Elasticsearch Service

Beginning with the 1.6 release, Apache Flink comes with an Elasticsearch connector that supports the Elasticsearch APIs over HTTP. Therefore, it can natively talk to the endpoints that are provided by Amazon Elasticsearch Service.

You just need to decide how to authenticate requests against the public endpoint of the Elasticsearch cluster. You can whitelist individual IPs for access to the cluster. However, the recommended way of authenticating against the Amazon Elasticsearch Service endpoint is to add authentication information to AWS requests using IAM credentials and the Signature Version 4 signing process.

To extend the Flink Elasticsearch connector, which is not aware of the AWS specific signing process, you can use the open-source aws-signing-request-interceptor, which is available from Maven central. You just need to add an interceptor to the Elasticsearch sink that is called just before the request is sent to the Amazon Elasticsearch Service endpoint. The interceptor can then sign the request using the permission of the role that has been configured for the Kinesis Data Analytics for Java application.

final List<HttpHost> httpHosts = Arrays.asList(HttpHost.create("https://...")));

ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<T>() {
      ...
    }
);

final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, "eu-west-1", "es", clock);

esSinkBuilder.setRestClientFactory(
    restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(
        callback -> callback.addInterceptorLast(new AWSSigningRequestInterceptor(awsSigner))
    )
);

esSinkBuilder.build();

Note that the actual code in the GitHub repository is a bit more sophisticated because you need to obtain a serializable request interceptor. But the basic approach to sign requests remains the same.

Monitor and debug the Flink application

When running a Kinesis Data Analytics for Java application, you don’t get direct access to the cluster that runs Flink. This is because the underlying infrastructure is completely managed by the service. You merely interact with the service through an API. However, you can still obtain metrics and logging information through CloudWatch and CloudWatch Logs, respectively.

The Kinesis Data Analytics for Java application exposes a lot of operational metrics, ranging from metrics for the entire application down to metrics for individual processes of operators of the application. You can control which level of detail is adequate or required for your purposes. In fact, the metrics used in the previous section were all obtained through CloudWatch.

In addition to operational metrics, you can configure the Kinesis Data Analytics for Java application to write messages to CloudWatch Logs. This capability seamlessly integrates with common logging frameworks, such as Apache Log4j and the Simple Logging Facade for Java (SLF4J). So it is useful for debugging and identifying the cause of operational issues.

To enable logging for your Kinesis Data Analytics for Java application, just specify an existing CloudWatch log stream as a logging option when you start the application, as follows:

final Logger LOG = LoggerFactory.getLogger(...);

LOG.info("Starting to consume events from stream {}", flinkProperties.getProperty("InputStreamName"));

After the log messages are persisted into CloudWatch Logs, you can easily query and analyze them through CloudWatch Logs Insights

Conclusion

In this post, you not only built a reliable, scalable, and highly available streaming application based on Apache Flink and Kinesis Data Analytics for Java Applications. You also scaled the different components while ingesting and analyzing up to 25k events per second in near-real time. In large parts, this scenario was enabled by using managed services, so you didn’t need to spend time on provisioning and configuring the underlying infrastructure.

The sources of the application and the AWS CloudFormation template used in this post are available from GitHub for your reference. You can dive into all the details of the Flink application and the configuration of the underlying services. I’m curious to see what you will build when you can focus on analyzing data in a streaming fashion rather than spending time on managing and operating infrastructure.

 


About the Author

Steffen Hausmann is a specialist solutions architect with AWS.

 

 

 

 

Improve clinical trial outcomes by using AWS technologies

Post Syndicated from Mayank Thakkar original https://aws.amazon.com/blogs/big-data/improve-clinical-trial-outcomes-by-using-aws-technologies/

We are living in a golden age of innovation, where personalized medicine is making it possible to cure diseases that we never thought curable. Digital medicine is helping people with diseases get healthier, and we are constantly discovering how to use the body’s immune system to target and eradicate cancer cells. According to a report published by ClinicalTrials.gov, the number of registered studies hit 293,000 in 2018, representing a 250x growth since 2000.

However, an internal rate of return (IRR) analysis by Endpoints News, using data from EvaluatePharma, highlights some interesting trends. A flourishing trend in pharma innovation is supported by strong growth in registered studies. However, the IRR shows a rapidly declining trend, from around 17 percent in 2000 to below the cost of capital in 2017 and projected to go to 0 percent by 2020.

This blog post is the first installment in a series that focuses on the end-to-end workflow of collecting, storing, processing, visualizing, and acting on clinical trials data in a compliant and secure manner. The series also discusses the application of artificial intelligence and machine learning technologies to the world of clinical trials. In this post, we highlight common architectural patterns that AWS customers use to modernize their clinical trials. These incorporate mobile technologies for better evidence generation, cost reduction, increasing quality, improving access, and making medicine more personalized for patients.

Improving the outcomes of clinical trials and reducing costs

Biotech and pharma organizations are feeling the pressure to use resources as efficiently as possible. This pressure forces them to search for any opportunity to streamline processes, get faster, and stay more secure, all while decreasing costs. More and more life sciences companies are targeting biologics, CAR-T, and precision medicine therapeutics, with focus shifting towards smaller, geographically distributed patient segments. This shift has resulted in an increasing mandate to capture data from previously unavailable, nontraditional sources. These sources include mobile devices, IoT devices, and in-home and clinical devices. Life sciences companies merge data from these sources with data from traditional clinical trials to build robust evidence around the safety and efficacy of a drug.

Early last year, the Clinical Trials Transformation Initiative (CTTI) provided recommendations about using mobile technologies for capturing holistic, high quality, attributable, real-world data from patients and for submission to the U.S. Food and Drug Administration (FDA). By using mobile technologies, life sciences companies can reduce barriers to trial participation and lower costs associated with conducting clinical trials. Global regulatory groups such as the FDA, Health Canada, and Medicines and Healthcare products Regulatory Agency (MHRA), among others, are also in favor of using mobile technologies. Mobile technologies can make patient recruitment more efficient, reach endpoints faster, and reduce the cost and time required to conduct clinical trials.

Improvised data ingestion using mobile technologies can speed up outcomes, reduce costs, and improve the accuracy of clinical trials. This is especially true when mobile data ingestion is supplemented with artificial intelligence and machine learning (AI/ML) technologies.

Together, they can usher in a new age of smart clinical trials.

At the same time, traditional clinical trial processes and technology designed for mass-marketed blockbuster drugs can’t effectively meet emerging industry needs. This leaves life sciences and pharmaceutical companies in need of assistance for evolving their clinical trial operations. These circumstances result in making clinical trials one of the largest areas of investment for bringing a new drug to market.

Using mobile technologies with traditional technologies in clinical trials can improve the outcomes of the trials and simultaneously reduce costs. Some of the use cases that the integration of various technologies enables include these:

  • Identifying and tracking participants in clinical trials
    • Identifying participants for clinical trials recruitment
    • Educating and informing patients participating in clinical trials
    • Implementing standardized protocols and sharing associated information to trial participants
    • Tracking adverse events and safety profiles
  • Integrating genomic and phenotypic data for identifying novel biomarkers
  • Integrating mobile data into clinical trials for better clinical trial management
  • Creation of a patient-control arm based on historical data
  • Stratifying cohorts based on treatment, claims, and registry datasets
  • Building a collaborative, interoperable network for data sharing and knowledge creation
  • Building compliance-ready infrastructure for clinical trial management

The AWS Cloud provides HIPAA eligible services and solutions. As an AWS customer, you can use these to build solutions for global implementation of mobile devices and sensors in trials, secure capture of streaming Internet of Things (IoT) data, and advanced analytics through visualization tools or AI/ML capabilities. Some of the use cases these services and solutions enable are finding and recruiting patients using smart analytics, facilitating global data management, and remote or in-patient site monitoring. Others include predicting lack of adherence, detecting adverse events, and accelerating trial outcomes along with optimizing trial costs.

Clinical Trials 2.0 (CT2.0) at AWS is geared toward facilitating wider adoption of cloud-native services to enable data ingestion from disparate sources, cost-optimized and reliable storage, and holistic analytics. At the same time, CT2.0 provides the granular access control, end-to-end security, and global scalability needed to conduct clinical trials more efficiently.

Reference architecture

One of the typical architectures for managing a clinical trial using mobile technologies is shown following. This architecture focuses on capturing real-time data from mobile sources and providing a way to process it.

* – Additional considerations such as data security, access control, and compliance need to be incorporated into the architecture and are discussed in the remainder of this post.

Managing a trial by using this architecture consists of the following five major steps.

Step 1: Collect data

Mobile devices, personal wearables, instruments, and smart-devices are extensively being used (or being considered) by global pharmaceutical companies in patient care and clinical trials to provide data for activity tracking, vital signs monitoring, and so on, in real-time. Devices like infusion pumps, personal use dialysis machines, and so on require tracking and alerting of device consumables and calibration status. Remote settings management is also a major use case for these kinds of devices. The end-user mobile devices used in the clinical trial emit a lot of telemetry data that requires real-time data capture, data cleansing, transformation, and analysis.

Typically, these devices are connected to an edge node or a smart phone. Such a connection provides sufficient computing resources to stream data to AWS IoT Core. AWS IoT Core can then be configured to write data to Amazon Kinesis Data Firehose in near real time. Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3. S3 provides online, flexible, cost efficient, pay-as-you-go storage that can replicate your data on three Availability Zones within an AWS Region. The edge node or smart phone can use the AWS IoT SDKs to publish and subscribe device data to AWS IoT Core with MQTT. This process uses the AWS method of authentication (called ‘SigV4’), X.509 certificate–based authentication, and customer-created token-based authentication (through custom authorizers). This authenticated approach enables you to map your choice of policies to each certificate and remotely control device or application access. You can also use the Kinesis Data Firehose encryption feature to enable server-side data encryption.

You can also capture additional data such as Case Report Forms (CRF), Electronic Medical Records (EMR), and medical images using Picture Archiving and Communication Systems (PACS). In addition, you can capture laboratory data (Labs) and other Patient Reported Outcomes data (ePRO). AWS provides multiple tools and services to effectively and securely connect to these data sources, enabling you to ingest data in various volumes, variety, and velocities. For more information about creating a HealthCare Data Hub and ingesting Digital Imaging and Communications in Medicine (DICOM) data, see the AWS Big Data Blog post Create a Healthcare Data Hub with AWS and Mirth Connect.

Step 2: Store data

After data is ingested from the devices and wearables used in the clinical trial, Kinesis Data Firehose is used to store the data on Amazon S3. This stored data serves as a raw copy and can later be used for historical analysis and pattern prediction. Using Amazon S3’s lifecycle policies, you can periodically move your data to reduced cost storage such as Amazon S3 Glacier for further optimizing their storage costs. Using Amazon S3 Intelligent Tiering can automatically optimize costs when data access patterns change, without performance impact or operational overhead by moving data between two access tiers—frequent access and infrequent access. You can also choose to encrypt data at rest and in motion using various encryption options available on S3.

Amazon S3 offers an extremely durable, highly available, and infinitely scalable data storage infrastructure, simplifying most data processing, backup, and replication tasks.

Step 3: Data processingfast lane

After collecting and storing a raw copy of the data, Amazon S3 is configured to publish events to AWS Lambda and invoke a Lambda function by passing the event data as a parameter. The Lambda function is used to extract the key performance indicators (KPIs) such as adverse event notifications, medication adherence, and treatment schedule management from the incoming data. You can use Lambda to process these KPIs and store them in Amazon DynamoDB, along with encryption at rest, which powers a near-real-time clinical trial status dashboard. This alerts clinical trial coordinators in real time so that appropriate interventions can take place.

In addition to this, using a data warehouse full of medical records, you can train and implement a machine learning model. This model can predict which patients are about to switch medications or might exhibit adherence challenges in the future. Such prediction can enable clinical trial coordinators to narrow in on those patients with mitigation strategies.

Step 4: Data processing—batch

For historical analysis and pattern prediction, the staged data (stored in S3) is processed in batches. A Lambda function is used to trigger the extract, transform, and load (ETL) process every time new data is added to the raw data S3 bucket. This Lambda function triggers an ETL process using AWS Glue, a fully managed ETL service that makes it easy for you to prepare and load your data for analytics. This approach helps in mining current and historical data to derive actionable insights, which is stored on Amazon S3.

From there, data is loaded on to Amazon Redshift, a cost-effective, petabyte-scale data warehouse offering from AWS. You can also use Amazon Redshift Spectrum to extend data warehousing out to exabytes without loading any data to Amazon Redshift, as detailed in the Big Data blog post Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required. This enables you to provide an all-encompassing picture of the entire clinical trial to your clinical trial coordinators, enabling you to react and respond faster.

In addition to this, you can train and implement a machine learning model to identify patients who might be at risk for adherence challenges. This enables clinical trial coordinators to reinforce patient education and support.

Step 5: Visualize and act on data

After the data is processed and ready to be consumed, you can use Amazon QuickSight, a cloud-native business intelligence service from AWS that offers native Amazon Redshift connectivity. Amazon QuickSight is serverless and so can be rolled out to your audiences in hours. You can also use a host of third-party reporting tools, which can use AWS-supplied JDBC or ODBC drivers or open-source PostgreSQL drivers to connect with Amazon Redshift. These tools include TIBCO Spotfire Analytics, Tableau Server, Qlik Sense Enterprise, Looker, and others. Real-time data processing (step 3 preceding) combines with historical-view batch processing (step 4). Together, they empower contract research organizations (CROs), study managers, trial coordinators, and other entities involved in the clinical trial journey to make effective and informed decisions at a speed and frequency that was previously unavailable. Using Amazon QuickSight’s unique Pay-per-Session pricing model, you can optimize costs for your bursty usage models by paying only when users access the dashboards.

Using Amazon Simple Notification Service (Amazon SNS), real-time feedback based on incoming data and telemetry is sent to patients by using text messages, mobile push, and emails. In addition, study managers and coordinators can send Amazon SNS notifications to patients. Amazon SNS provides a fully managed pub/sub messaging for micro services, distributed systems, and serverless applications. It’s designed for high-throughput, push-based, many-to-many messaging. Alerts and notifications can be based on current data or a combination of current and historical data.

To encrypt messages published to Amazon SNS, you can follow the steps listed in the post Encrypting messages published to Amazon SNS with AWS KMS, on the AWS Compute Blog.   

Data security, data privacy, data integrity, and compliance considerations

At AWS, customer trust is our top priority. We deliver services to millions of active customers, including enterprises, educational institutions, and government agencies in over 190 countries. Our customers include financial services providers, healthcare providers, and governmental agencies, who trust us with some of their most sensitive information.

To facilitate this, along with the services mentioned earlier, you should also use AWS Identity and Access Management (IAM) service. IAM enables you to maintain segregation of access, fine-grained access control, and securing end user mobile and web applications. You can also use AWS Security Token Service (AWS STS) to provide secure, self-expiring, time-boxed, temporary security credentials to third-party administrators and service providers, greatly strengthening your security posture. You can use AWS CloudTrail to log IAM and STS API calls. Additionally, AWS IoT Device Management makes it easy to securely onboard, organize, monitor, and remotely manage IoT devices at scale.

With AWS, you can add an additional layer of security to your data at rest in the cloud. AWS provides scalable and efficient encryption features for services like Amazon EBS, Amazon S3, Amazon Redshift, Amazon SNSAWS Glue, and many more. Flexible key management options, including AWS Key Management Service, enable you to choose whether to have AWS manage the encryption keys or to keep complete control over their keys. In addition, AWS provides APIs for you to integrate encryption and data protection with any of the services that you develop or deploy in an AWS environment.

As a customer, you maintain ownership of your data, and select which AWS services can process, store, and host the content. Generally speaking, AWS doesn’t access or use customers’ content for any purpose without their consent. AWS never uses customer data to derive information for marketing or advertising.

When evaluating the security of a cloud solution, it’s important that you understand and distinguish between the security of the cloud and security in the cloud. The AWS Shared Responsibility Model details this relationship.

To assist you with your compliance efforts, AWS continues to add more services to the various compliance regulations, attestations, certifications, and programs across the world. To decide which services are suitable for you, see the services in scope page.

You can also use various services like, but not limited to, AWS CloudTrail, AWS Config, Amazon GuardDuty, and AWS Key Management Service (AWS KMS) to enhance your compliance and auditing efforts. Find more details in the AWS Compliance Solutions Guide.

Final thoughts

With the ever-growing interconnectivity and technological advances in the field of medical devices, mobile devices and sensors can improve numerous aspects of clinical trials. They can help in recruitment, informed consent, patient counseling, and patient communication management. They can also improve protocol and medication adherence, clinical endpoints measurement, and the process of alerting participants on adverse events. Smart sensors, smart mobile devices, and robust interconnecting systems can be central in conducting clinical trials.

Every biopharma organization conducting or sponsoring a clinical trial activity faces the conundrum of advancing their approach to trials while maintaining overall trial performance and data consistency. The AWS Cloud enables a new dimension for how data is collected, stored, and used for clinical trials. It thus addresses that conundrum as we march towards a new reality of how drugs are brought to market. The AWS Cloud abstracts away technical challenges such as scaling, security, and establishing a cost-efficient IT infrastructure. In doing so, it allows biopharma organizations to focus on their core mission of improving patent lives through the development of effective, groundbreaking treatments.

 


About the Author

Mayank Thakkar – Global Solutions Architect, AWS HealthCare and Life Sciences

 

 

 

 

Deven Atnoor, Ph.D. – Industry Specialist, AWS HealthCare and Life Sciences

 

 

 

 

Create real-time clickstream sessions and run analytics with Amazon Kinesis Data Analytics, AWS Glue, and Amazon Athena

Post Syndicated from Hugo Rozestraten original https://aws.amazon.com/blogs/big-data/create-real-time-clickstream-sessions-and-run-analytics-with-amazon-kinesis-data-analytics-aws-glue-and-amazon-athena/

Clickstream events are small pieces of data that are generated continuously with high speed and volume. Often, clickstream events are generated by user actions, and it is useful to analyze them.

For example, you can detect user behavior in a website or application by analyzing the sequence of clicks a user makes, the amount of time the user spends, where they usually begin the navigation, and how it ends. By tracking this user behavior in real time, you can update recommendations, perform advanced A/B testing, push notifications based on session length, and much more. To track and analyze these events, you need to identify and create sessions from them. The process of identifying events in the data and creating sessions is known as sessionization.

Capturing and processing data clickstream events in real time can be difficult. As the number of users and web and mobile assets you have increases, so does the volume of data. Amazon Kinesis provides you with the capabilities necessary to ingest this data in real time and generate useful statistics immediately so that you can take action.

When you run sessionization on clickstream data, you identify events and assign them to a session with a specified key and lag period. After each event has a key, you can perform analytics on them. The use cases for sessionization vary widely, and have different requirements. For example, you might need to identify and create sessions from events in web analytics to track user actions. Sessionization is also broadly used across many different areas, such as log data and IoT.

This blog post demonstrates how to identify and create sessions from real-time clickstream events and then analyze them using Amazon Kinesis Data Analytics.

Why did we choose Kinesis Data Analytics?

Clickstream data arrives continuously as thousands of messages per second receiving new events. When you analyze the effectiveness of new application features, site layout, or marketing campaigns, it is important to analyze them in real time so that you can take action faster.

To perform the sessionization in batch jobs, you could use a tool such as AWS Glue or Amazon EMR. But with daily schedules, queries and aggregation, it can take more resources and time because each aggregation involves working with large amounts of data. Performing sessionization in Kinesis Data Analytics takes less time and gives you a lower latency between the sessions generation. You can trigger real-time alerts with AWS Lambda functions based on conditions, such as session time that is shorter than 20 seconds, or a machine learning endpoint.

Identifying a session among thousands of clicks

A session is a short-lived and interactive exchange between two or more devices and/or users. For example, it can be a user browsing and then exiting your website, or an IoT device waking up to perform a job and then going back to sleep. These interactions result in a series of events that occur in sequence that start and end, or a session. A start and an end of a session can be difficult to determine, and are often defined by a time period without a relevant event associated with a user or device. A session starts when a new event arrives after a specified “lag” time period has passed without an event arriving. A session ends in a similar manner, when a new event does not arrive within the specified lag period.

This blog post relies on several other posts about performing batch analytics on SQL data with sessions. My two favorite posts on this subject are Sessionization in SQL, Hive, Pig and Python from Dataiku and Finding User Session with SQL by Benn Stancil at Mode. Both posts take advantage of SQL window functions to identify and build sessions from clickstream events.

ANSI added SQL window functions to the SQL standard in 2003 and has since expanded them. Window functions work naturally with streaming data and enable you to easily translate batch SQL examples to Kinesis Data Analytics.

In this use case, I group the events of a specific user as described in the following simplified example. In this example, I use distinct navigation patterns from three users to analyze user behavior. To begin, I group events by user ID to obtain some statistics from data, as shown following:

In this example, for “User ID 20,” the minimum timestamp is 2018-11-29 23:35:10 and the maximum timestamp is 2018-11-29 23:35:44. This provides a 34 seconds-long session, starting with action “B_10” and ending with action “A_02.” These “actions” are identification of the application’s buttons in this example.

Suppose that after several minutes, new “User ID 20” actions arrive. Would you consider them as running in the same session? A user can abort a navigation or start a new one. Also, applications often have timeouts. You have to decide what is the maximum session length to consider it a new session. A session can run anywhere from 20 to 50 seconds, or from 1 to 5 minutes.

There are other elements that you might want to consider, such as a client IP or a machine ID. These elements allow you to separate sessions that occur on different devices.

High-level solution overview

The end-to-end scenario described in this post uses Amazon Kinesis Data Streams to capture the clickstream data and Kinesis Data Analytics to build and analyze the sessions. The aggregated analytics are used to trigger real-time events on Lambda and then send them to Kinesis Data Firehose. Kinesis Data Firehose sends data to an Amazon S3 bucket, where it is ingested to a table by an AWS Glue crawler and made available for running queries with Amazon Athena. You can use this table for ad hoc analysis.

The following diagram shows an end-to-end sessionization solution.

  • Data ingestion: You can use Kinesis Data Streams to build custom applications that process or analyze streaming data for specialized needs. Kinesis Data Streams can continuously capture and store terabytes of data per hour from hundreds of thousands of sources, such as website clickstreams, financial transactions, social media feeds, IT logs, and location-tracking events.
  • Data sessionization: Kinesis Data Analytics is the easiest way to process streaming data in real time with standard SQL without having to learn new programming languages or processing frameworks. With Kinesis Data Analytics, you can query streaming data or build entire streaming applications using SQL, so that you can gain actionable insights and respond to your business and customer needs promptly.
  • Data processing and storage: The sessionization stream is read from Kinesis Data Analytics using an AWS Lambda function. The function triggers two events: one real-time dashboard in Amazon CloudWatch and a second one to persist data with Kinesis Data Firehose.
  • Data analysis: AWS Glue is used to crawl Amazon S3 and build or update metadata definition for Amazon Athena tables.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena provides connectivity to any application using JDBC or ODBC drivers.

  • Data visualization: Amazon QuickSight is a visualization tool that is natively used to build dashboards over Amazon Athena data.
  • Monitoring: Amazon CloudWatch is a tool that lets you monitor the streaming activities, such as the number of bytes processed or delivered per second, or the number of failures.

After you finish the sessionization stage in Kinesis Data Analytics, you can output data into different tools. For example, you can use a Lambda function to process the data on the fly and take actions such as send SMS alerts or roll back a deployment. To learn how to implement such workflows based on AWS Lambda output, see my other blog post Implement Log Analytics using Amazon Kinesis Data Analytics. In this post, we send data to Amazon CloudWatch, and build a real-time dashboard.

Lambda clickstream generator

To generate the workload, you can use a Python Lambda function with random values, simulating a beer-selling application.

The same user ID can have sessions on different devices, such as a tablet, a browser, or a phone application. This information is captured by the device ID. As a result, the data for the Lambda function payload has these parameters: a user ID, a device ID, a client event, and a client timestamp, as shown in the following example.

The following is the code for the Lambda function payload generator, which is scheduled using CloudWatch Events scheduled events:

...
def getReferrer():
    x = random.randint(1,5)
    x = x*50 
    y = x+30 
    data = {}
    data['user_id'] = random.randint(x,y)
    data['device_id'] = random.choice(['mobile','computer', 'tablet', 'mobile','computer'])
    data['client_event'] = random.choice(['beer_vitrine_nav','beer_checkout','beer_product_detail',
    'beer_products','beer_selection','beer_cart'])
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['client_timestamp'] = str_now
    return data

def lambda_handler(event, context):
...
        data = json.dumps(getReferrer())
        kinesis.put_record(
                StreamName='sessionsclicks',
                Data=data,
                PartitionKey='partitionkey')

As a result, the following payloads are sent to Kinesis Data Analytics:

Using window SQL functions in Kinesis Data Analytics

Grouping sessions lets us combine all the events from a given user ID or a device ID that occurred during a specific time period. Amazon Kinesis Data Analytics SQL queries in your application code execute continuously over in-application streams. You need to specify bounded queries using a window defined in terms of time or rows. These queries are called window SQL functions.

I had three available options for windowed query functions in Kinesis Data Analytics: sliding windows, tumbling windows, and stagger windows. I chose stagger window because it has some good features for the sessionization use case, as follows:

  • Stagger windows open when the first event that matches a partition key condition arrives. So for each key, it evaluates its particular window as opposed to the other window functions that evaluate one unique window for all the partition keys matched.
  • When dealing with clickstreams, you cannot rely on the order that events arrive in the stream, but when the stream was generated. Stagger windows handle the arrival of out-of-order events well. The time when the window is opened and when the window closes is considered based on the age specified, which is measured from the time when the window opened.

To partition by the timestamp, I chose to write two distinct SQL functions.

In Kinesis Data Analytics, SOURCE_SQL_STREAM_001 is by default the main stream from the source. In this case, it’s receiving the source payload from Kinesis Data Streams.

Kinesis Data Analytics SQL – Create a stream

The following function creates a stream to receive the query aggregation result:

-- CREATE a Stream to receive the query aggregation result
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(
  session_id VARCHAR(60),
  user_id INTEGER,
  device_id VARCHAR(10),
  timeagg timestamp,
  events INTEGER,
  beginnavigation VARCHAR(32),
  endnavigation VARCHAR(32),
  beginsession VARCHAR(25),
  endsession VARCHAR(25),
  duration_sec INTEGER
);

Kinesis Data Analytics SQL – Using a SECOND interval “STEP” function

The following function creates the PUMP and inserts it as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_SEC" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
    SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
    UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
    ||cast( UNIX_TIMESTAMP(STEP("client_timestamp" by interval '30' second))/1000 as VARCHAR(20))) as session_id,
    "user_id" , "device_id",
-- create a common rounded STEP timestamp for this session
    STEP("client_timestamp" by interval '30' second),
-- Count the number of client events , clicks on this session
    COUNT("client_event") events,
-- What was the first navigation action
    first_value("client_event") as beginnavigation,
-- what was the last navigation action    
    last_value("client_event") as endnavigation,
-- begining minute and second  
    SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second      
    SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration    
    TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream    
    FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with STEP to Seconds, for Seconds intervals    
    WINDOWED BY STAGGER (
                PARTITION BY "user_id", "device_id", STEP("client_timestamp" by interval '30' second) 
                RANGE INTERVAL '30' SECOND );

Kinesis Data Analytics SQL – Using a MINUTE interval “FLOOR” function

The following code creates the PUMP and inserts as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_MIN" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
||cast(UNIX_TIMESTAMP(FLOOR("client_timestamp" TO MINUTE))/1000 as VARCHAR(20))) as session_id,
"user_id" , "device_id",
-- create a common rounded timestamp for this session
FLOOR("client_timestamp" TO MINUTE),
-- Count the number of client events , clicks on this session
COUNT("client_event") events,
-- What was the first navigation action
first_value("client_event") as beginnavigation,
-- what was the last navigation action
last_value("client_event") as endnavigation,
-- begining minute and second
SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second
SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration
TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream
FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with floor to Minute, for Minute intervals
WINDOWED BY STAGGER (
            PARTITION BY "user_id", "device_id", FLOOR("client_timestamp" TO MINUTE) 
            RANGE INTERVAL '1' MINUTE);

Sessions

In Kinesis Data Analytics, you can view the resulting data transformed by the SQL, with the sessions identification and information. Session_ID is calculated by User_ID + (3 Chars) of DEVICE_ID + rounded Unix timestamp without the milliseconds.

Automated deployment with AWS CloudFormation

All the steps of this end-to-end solution are included in an AWS CloudFormation template. Fire up the template, add the code on your web server, and voilà, you get real-time sessionization.

This AWS CloudFormation template is intended to be deployed only in the us-east-1 Region.

Create the stack

Step 1: To get started, sign into the AWS Management Console, and then open the stagger window template.

Step 2: On the AWS CloudFormation console, choose Next, and complete the AWS CloudFormation parameters:

  • Stack name: The name of the stack (blog-sessionization or sessions-blog)
  • StreamName: sessionsblog
  • Stream Shard Count: 1 or 2 (1 MB/s) per shard.
  • Bucket Name:  Change to a unique name, for example session-n-bucket-hhug123121.
  • Buffer Interval: 60–900 seconds buffering hint for Kinesis Data Firehose before the data is send to Amazon S3 from Kinesis Data Firehose.
  • Buffer Size: 1–128 MB per file, if the interval is not achieved first.
  • Destination Prefix: Aggregated (internal folder of the bucket to save aggregated data).
  • Base sessions on seconds or minutes: Choose which you want (minutes will start with 1 minute, seconds will start with 30 seconds).

Step 3: Check if the launch has completed, and if it has not, check for errors.

The most common error is when you point to an Amazon S3 bucket that already exists.

Process the data

Step 1: After the deployment, navigate to the solution on the Amazon Kinesis console.

Step 2: Go to the Kinesis Analytics applications page, and choose AnalyticsApp-blog-sessionizationXXXXX, as follows.

Step 3: Choose Run application to start the application.

Step 4: Wait a few seconds for the application to be available, and then choose Application details.

Step 5: On the Application details page, choose Go to SQL results.

Step 6: Examine the SQL code and SOURCE_SQL_STREAM, and change the INTERVAL if you’d like.

Step 7: Choose the Real-time analytics tab to check the DESTINATION_SQL_STREAM results.

 

Step 8: Check the Destination tab to view the AWS Lambda function as the destination to your aggregation.

Step 8: Check the CloudWatch real-time dashboard.

Open the Sessionization-<your cloudformation stack name> dashboard.

Check the number of “events” during the sessions, and the “session duration” behavior from a timeframe. Then you can make decisions, such as whether you need to roll back a new site layout or new features of your application.

Step 9: Open the AWS Glue console and run the crawler that the AWS CloudFormation template created for you.

Choose the crawler job, and then choose Run crawler.

Analyze the data

Step 1: After the job finishes, open the Amazon Athena console and explore the data.

On the Athena console, choose the sessionization database in the list. You should see two tables created based on the data in Amazon S3: rawdata and aggregated.

Step 2: Choose the vertical ellipsis (three dots) on the right side to explore each of the tables, as shown in the following screenshots.

Step 3: Create a view on the Athena console to query only today’s data from your aggregated table, as follows:

CREATE OR REPLACE VIEW clicks_today AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) and
cast(partition_2 as integer)=day(current_date) ;

The successful query appears on the console as follows:

Step 4: Create a view to query only the current month data from your aggregated table, as in the following example:

CREATE OR REPLACE VIEW clicks_month AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) ;

The successful query appears as follows:

Step 5: Query data with the sessions grouped by the session duration ordered by sessions, as follows:

SELECT duration_sec, count(1) sessions 
FROM "clicks_today"
where duration_sec>0
group by duration_sec
order by sessions desc;

The query results appear as follows:

Visualize the data

Step 1: Open the Amazon QuickSight console.

If you have never used Amazon QuickSight, perform this setup first.

Step 2: Set up Amazon QuickSight account settings to access Athena and your S3 bucket.

First, select the Amazon Athena check box. Select the Amazon S3 check box to edit Amazon QuickSight access to your S3 buckets.

Choose the buckets that you want to make available, and then choose Select buckets.

Step 3: Choose Manage data.

Step 4: Choose NEW DATASET.

In the list of data sources, choose Athena.

Step 5: Enter daily_session as your data source name.

Step 6: Choose the view that you created for daily sessions, and choose Select.

Step 7: Then you can choose to use either SPICE (cache) or direct query access.

Step 8: Choose beginnavigation and duration_sec as metrics.

Step 9: Choose +Add to add a new visualization.

Step 10: In Visual types, choose the Tree map graph type.

Step 11: For Group by, choose device_id; for Size, choose duration_sec (Sum); and for Color, choose events (Sum).

Summary

In this post, I described how to perform sessionization of clickstream events and analyze them in a serverless architecture. The use of a Kinesis Data Analytics stagger window makes the SQL code short and easy to write and understand. The integration between the services enables a complete data flow with minimal coding.

You also learned about ways to explore and visualize this data using Amazon Athena, AWS Glue, and Amazon QuickSight.

To learn more about the Amazon Kinesis family of use cases, check the Amazon Kinesis Big Data Blog page.

If you have questions or suggestions, please leave a comment below.

Do more with Amazon Kinesis Data Analytics

To explore other ways to gain insights using Kinesis Data Analytics, see Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Author

Hugo is an analytics and database specialist solutions architect at Amazon Web Services out of São Paulo (Brazil). He is currently engaged with several Data Lake and Analytics projects for customers in Latin America. He loves family time, dogs and mountain biking.

 

 

 

 

Our data lake story: How Woot.com built a serverless data lake on AWS

Post Syndicated from Karthik Kumar Odapally original https://aws.amazon.com/blogs/big-data/our-data-lake-story-how-woot-com-built-a-serverless-data-lake-on-aws/

In this post, we talk about designing a cloud-native data warehouse as a replacement for our legacy data warehouse built on a relational database.

At the beginning of the design process, the simplest solution appeared to be a straightforward lift-and-shift migration from one relational database to another. However, we decided to step back and focus first on what we really needed out of a data warehouse. We started looking at how we could decouple our legacy Oracle database into smaller microservices, using the right tool for the right job. Our process wasn’t just about using the AWS tools. More, it was about having a mind shift to use cloud-native technologies to get us to our final state.

This migration required developing new extract, transform, load (ETL) pipelines to get new data flowing in while also migrating existing data. Because of this migration, we were able to deprecate multiple servers and move to a fully serverless data warehouse orchestrated by AWS Glue.

In this blog post, we are going to show you:

  • Why we chose a serverless data lake for our data warehouse.
  • An architectural diagram of Woot’s systems.
  • An overview of the migration project.
  • Our migration results.

Architectural and design concerns

Here are some of the design points that we considered:

  • Customer experience. We always start with what our customer needs, and then work backwards from there. Our data warehouse is used across the business by people with varying level of technical expertise. We focused on the ability for different types of users to gain insights into their operations and to provide better feedback mechanisms to improve the overall customer experience.
  • Minimal infrastructure maintenance. The “Woot data warehouse team” is really just one person—Chaya! Because of this, it’s important for us to focus on AWS services that enable us to use cloud-native technologies. These remove the undifferentiated heavy lifting of managing infrastructure as demand changes and technologies evolve.
  • Responsiveness to data source changes. Our data warehouse gets data from a range of internal services. In our existing data warehouse, any updates to those services required manual updates to ETL jobs and tables. The response times for these data sources are critical to our key stakeholders. This requires us to take a data-driven approach to selecting a high-performance architecture.
  • Separation from production systems. Access to our production systems is tightly coupled. To allow multiple users, we needed to decouple it from our production systems and minimize the complexities of navigating resources in multiple VPCs.

Based on these requirements, we decided to change the data warehouse both operationally and architecturally. From an operational standpoint, we designed a new shared responsibility model for data ingestion. Architecturally, we chose a serverless model over a traditional relational database. These two decisions ended up driving every design and implementation decision that we made in our migration.

As we moved to a shared responsibility model, several important points came up. First, our new way of data ingestion was a major cultural shift for Woot’s technical organization. In the past, data ingestion had been exclusively the responsibility of the data warehouse team and required customized pipelines to pull data from services. We decided to shift to “push, not pull”: Services should send data to the data warehouse.

This is where shared responsibility came in. For the first time, our development teams had ownership over their services’ data in the data warehouse. However, we didn’t want our developers to have to become mini data engineers. Instead, we had to give them an easy way to push data that fit with the existing skill set of a developer. The data also needed to be accessible by the range of technologies used by our website.

These considerations led us to select the following AWS services for our serverless data warehouse:

The following diagram shows at a high level how we use these services.

Tradeoffs

These components together met all of our requirements and enabled our shared responsibility model. However, we made few tradeoffs compared to a lift-and-shift migration to another relational database:

  • The biggest tradeoff was upfront effort vs. ongoing maintenance. We effectively had to start from scratch with all of our data pipelines and introduce a new technology into all of our website services, which required a concerted effort across multiple teams. Minimal ongoing maintenance was a core requirement. We were willing to make this tradeoff to take advantage of the managed infrastructure of the serverless components that we use.
  • Another tradeoff was balancing usability for nontechnical users vs. taking advantage of big data technologies. Making customer experience a core requirement helped us navigate the decision-making when considering these tradeoffs. Ultimately, only switching to another relational database would mean that our customers would have the same experience, not a better one.

Building data pipelines with Kinesis Data Firehose and Lambda

Because our site already runs on AWS, using an AWS SDK to send data to Kinesis Data Firehose was an easy sell to developers. Things like the following were considerations:

  • Direct PUT ingestion for Kinesis Data Firehose is natural for developers to implement, works in all languages used across our services, and delivers data to Amazon S3.
  • Using S3 for data storage means that we automatically get high availability, scalability, and durability. And because S3 is a global resource, it enables us to manage the data warehouse in a separate AWS account and avoid the complexity of navigating multiple VPCs.

We also consume data stored in Amazon DynamoDB tables. Kinesis Data Firehose again provided the core of the solution, this time combined with DynamoDB Streams and Lambda. For each DynamoDB table, we enabled DynamoDB Streams and then used the stream to trigger a Lambda function.

The Lambda function cleans the DynamoDB stream output and writes the cleaned JSON to Kinesis Data Firehose using boto3. After doing this, it converges with the other process and outputs the data to S3. For more information, see How to Stream Data from Amazon DynamoDB to Amazon Aurora using AWS Lambda and Amazon Kinesis Firehose on the AWS Database Blog.

Lambda gave us more fine-grained control and enabled us to move files between accounts:

  • We enabled S3 event notifications on the S3 bucket and created an Amazon SNS topic to receive notifications whenever Kinesis Data Firehose put an object in the bucket.
  • The SNS topic triggered a Lambda function, which took the Kinesis output and moved it to the data warehouse account in our chosen partition structure.

S3 event notifications can trigger Lambda functions, but we chose SNS as an intermediary because the S3 bucket and Lambda function were in separate accounts.

Migrating existing data with AWS DMS and AWS Glue

We needed to migrate data from our existing RDS database to S3, which we accomplished with AWS DMS. DMS natively supports S3 as a target, as described in the DMS documentation.

Setting this up was relatively straightforward. We exported data directly from our production VPC to the separate data warehouse account by tweaking the connection attributes in DMS. The string that we used was this:

"cannedAclForObjects=BUCKET_OWNER_FULL_CONTROL;compressionType=GZIP;addColumnName=true;”

This code gives ownership to the bucket owner (the destination data warehouse account), compresses the files to save on storage costs, and includes all column names. After the data was in S3, we used an AWS Glue crawler to infer the schemas of all exported tables and then compared against the source data.

With AWS Glue, some of the challenges we overcame were these:

  • Unstructured text data, such as forum and blog posts. DMS exports these to CSV. This approach conflicted with the commas present in the text data. We opted to use AWS Glue to export data from RDS to S3 in Parquet format, which is unaffected by commas because it encodes columns directly.
  • Cross-account exports. We resolved this by including the code

"glueContext._jsc.hadoopConfiguration().set("fs.s3.canned.acl", "BucketOwnerFullControl”)”

at the top of each AWS Glue job to grant bucket owner access to all S3 files produced by AWS Glue.

Overall, AWS DMS was quicker to set up and great for exporting large amounts of data with rule-based transformations. AWS Glue required more upfront effort to set up jobs, but provided better results for cases where we needed more control over the output.

If you’re looking to convert existing raw data (CSV or JSON) into Parquet, you can set up an AWS Glue job to do that. The process is described in the AWS Big Data Blog post Build a data lake foundation with AWS Glue and Amazon S3.

Bringing it all together with AWS Glue, Amazon Athena, and Amazon QuickSight

After data landed in S3, it was time for the real fun to start: actually working with the data! Can you tell I’m a data engineer? For me, a big part of the fun was exploring AWS Glue:

  • AWS Glue handles our ETL job scheduling.
  • AWS Glue crawlers manage the metadata in the AWS Glue Data Catalog.

Crawlers are the “secret sauce” that enables us to be responsive to schema changes. Throughout the pipeline, we chose to make each step as schema-agnostic as possible, which allows any schema changes to flow through until they reach AWS Glue.

However, raw data is not ideal for most of our business users, because it often has duplicates or incorrect data types. Most importantly, the data out of Firehose is in JSON format, but we quickly observed significant query performance gains from using Parquet format. Here, we used one of the performance tips in the Big Data Blog post Top 10 performance tuning tips for Amazon Athena.

With our shared responsibility model, the data warehouse and BI teams are responsible for the final processing of data into curated datasets ready for reporting. Using Lambda and AWS Glue enables these teams to work in Python and SQL (the core languages for Amazon data engineering and BI roles). It also enables them to deploy code with minimal infrastructure setup or maintenance.

Our ETL process is as follows:

  • Scheduled triggers.
  • Series of conditional triggers that control the flow of subsequent jobs that depend on previous jobs.
  • A similar pattern across many jobs of reading in the raw data, deduplicating the data, and then writing to Parquet. We centralized this logic by creating a Python library of functions and uploading it to S3. We then included that library in the AWS Glue job as an additional Python library. For more information on how to do this, see Using Python Libraries with AWS Glue in the AWS Glue documentation.

We also migrated complex jobs used to create reporting tables with business metrics:

  • The AWS Glue use of PySpark simplified the migration of these queries, because you can embed SparkSQL queries directly in the job.
  • Converting to SparkSQL took some trial and error, but ultimately required less work than translating SQL queries into Spark methods. However, for people on our BI team who had previously worked with Pandas or Spark, working with Spark dataframes was a natural transition. As someone who used SQL for several years before learning Python, I appreciate that PySpark lets me quickly switch back and forth between SQL and an object-oriented framework.

Another hidden benefit of using AWS Glue jobs is that the AWS Glue version of Python (like Lambda) already has boto3 installed. Thus, ETL jobs can directly use AWS API operations without additional configuration.

For example, some of our longer-running jobs created read inconsistency if a user happened to query that table while AWS Glue was writing data to S3. We modified the AWS Glue jobs to write to a temporary directory with Spark and then used boto3 to move the files into place. Doing this reduced read inconsistency by up to 90 percent. It was great to have this functionality readily available, which may not have been the case if we managed our own Spark cluster.

Comparing previous state and current state

After we had all the datasets in place, it was time for our customers to come on board and start querying. This is where we really leveled up the customer experience.

Previously, users had to download a SQL client, request a user name and password, set it up, and learn SQL to get data out. Now, users just sign in to the AWS Management Console through automatically provisioned IAM roles and run queries in their browser with Athena. Or if they want to skip SQL altogether, they can use our Amazon QuickSight account with accounts managed through our pre-existing Active Directory server.

Integration with Active Directory was a big win for us. We wanted to enable users to get up and running without having to wait for an account to be created or managing separate credentials. We already use Active Directory across the company for access to multiple resources. Upgrading to Amazon QuickSight Enterprise Edition enabled us to manage access with our existing AD groups and credentials.

Migration results

Our legacy data warehouse was developed over the course of five years. We recreated it as a serverless data lake using AWS Glue in about three months.

In the end, it took more upfront effort than simply migrating to another relational database. We also dealt with more uncertainty because we used many products that were relatively new to us (especially AWS Glue).

However, in the months since the migration was completed, we’ve gotten great feedback from data warehouse users about the new tools. Our users have been amazed by these things:

  • How fast Athena is.
  • How intuitive and beautiful Amazon QuickSight is. They love that no setup is required—it’s easy enough that even our CEO has started using it!
  • That Athena plus the AWS Glue Data Catalog have given us the performance gains of a true big data platform, but for end users it retains the look and feel of a relational database.

Summary

From an operational perspective, the investment has already started to pay off. Literally: Our operating costs have fallen by almost 90 percent.

Personally, I was thrilled that recently I was able to take a three-week vacation and didn’t get paged once, thanks to the serverless infrastructure. And for our BI engineers in addition to myself, the S3-centric architecture is enabling us to experiment with new technologies by integrating seamlessly with other services, such as Amazon EMR, Amazon SageMaker, Amazon Redshift Spectrum, and Lambda. It’s been exciting to see how these services have grown in the time since we’ve adopted them (for example, the recent AWS Glue launch of Amazon CloudWatch metrics and Athena’s launch of views).

We are thrilled that we’ve invested in technologies that continue to grow as we do. We are incredibly proud of our team for accomplishing this ambitious migration. We hope our experience can inspire other engineers to dive in to building a data lake of their own.

For additional information, see these similar AWS Big Data blog posts:


About the authors

Chaya Carey is a data engineer at Woot.com. At Woot, she’s responsible for managing the data warehouse and other scalable data solutions. Outside of work, she’s passionate about Seattle’s bar and restaurant scene, books, and video games.

 

 

 

Karthik Odapally is a senior solutions architect at AWS. His passion is to build cost-effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.

 

 

 

 

ICYMI: Serverless Q4 2018

Post Syndicated from Chris Munns original https://aws.amazon.com/blogs/compute/icymi-serverless-q4-2018/

This post is courtesy of Eric Johnson, Senior Developer Advocate – AWS Serverless

Welcome to the fourth edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

This edition of ICYMI includes all announcements from AWS re:Invent 2018!

If you didn’t see them, check our Q1 ICYMIQ2 ICYMI, and Q3 ICYMI posts for what happened then.

So, what might you have missed this past quarter? Here’s the recap.

New features

AWS Lambda introduced the Lambda runtime API and Lambda layers, which enable developers to bring their own runtime and share common code across Lambda functions. With the release of the runtime API, we can now support runtimes from AWS partners such as the PHP runtime from Stackery and the Erlang and Elixir runtimes from Alert Logic. Using layers, partners such as Datadog and Twistlock have also simplified the process of using their Lambda code libraries.

To meet the demand of larger Lambda payloads, Lambda doubled the payload size of asynchronous calls to 256 KB.

In early October, Lambda also lengthened the runtime limit by enabling Lambda functions that can run up to 15 minutes.

Lambda also rolled out native support for Ruby 2.5 and Python 3.7.

You can now process Amazon Kinesis streams up to 68% faster with AWS Lambda support for Kinesis Data Streams enhanced fan-out and HTTP/2 for faster streaming.

Lambda also released a new Application view in the console. It’s a high-level view of all of the resources in your application. It also gives you a quick view of deployment status with the ability to view service metrics and custom dashboards.

Application Load Balancers added support for targeting Lambda functions. ALBs can provide a simple HTTP/S front end to Lambda functions. ALB features such as host- and path-based routing are supported to allow flexibility in triggering Lambda functions.

Amazon API Gateway added support for AWS WAF. You can use AWS WAF for your Amazon API Gateway APIs to protect from attacks such as SQL injection and cross-site scripting (XSS).

API Gateway has also improved parameter support by adding support for multi-value parameters. You can now pass multiple values for the same key in the header and query string when calling the API. Returning multiple headers with the same name in the API response is also supported. For example, you can send multiple Set-Cookie headers.

In October, API Gateway relaunched the Serverless Developer Portal. It provides a catalog of published APIs and associated documentation that enable self-service discovery and onboarding. You can customize it for branding through either custom domain names or logo/styling updates. In November, we made it easier to launch the developer portal from the Serverless Application Repository.

In the continuous effort to decrease customer costs, API Gateway introduced tiered pricing. The tiered pricing model allows the cost of API Gateway at scale with an API Requests price as low as $1.51 per million requests at the highest tier.

Last but definitely not least, API Gateway released support for WebSocket APIs in mid-December as a final holiday gift. With this new feature, developers can build bidirectional communication applications without having to provision and manage any servers. This has been a long-awaited and highly anticipated announcement for the serverless community.

AWS Step Functions added eight new service integrations. With this release, the steps of your workflow can exist on Amazon ECS, AWS Fargate, Amazon DynamoDB, Amazon SNS, Amazon SQS, AWS Batch, AWS Glue, and Amazon SageMaker. This is in addition to the services that Step Functions already supports: AWS Lambda and Amazon EC2.

Step Functions expanded by announcing availability in the EU (Paris) and South America (São Paulo) Regions.

The AWS Serverless Application Repository increased its functionality by supporting more resources in the repository. The Serverless Application Repository now supports Application Auto Scaling, Amazon Athena, AWS AppSync, AWS Certificate Manager, Amazon CloudFront, AWS CodeBuild, AWS CodePipeline, AWS Glue, AWS dentity and Access Management, Amazon SNS, Amazon SQS, AWS Systems Manager, and AWS Step Functions.

The Serverless Application Repository also released support for nested applications. Nested applications enable you to build highly sophisticated serverless architectures by reusing services that are independently authored and maintained but easily composed using AWS SAM and the Serverless Application Repository.

AWS SAM made authorization simpler by introducing SAM support for authorizers. Enabling authorization for your APIs is as simple as defining an Amazon Cognito user pool or an API Gateway Lambda authorizer as a property of your API in your SAM template.

AWS SAM CLI introduced two new commands. First, you can now build locally with the sam build command. This functionality allows you to compile deployment artifacts for Lambda functions written in Python. Second, the sam publish command allows you to publish your SAM application to the Serverless Application Repository.

Our SAM tooling team also released the AWS Toolkit for PyCharm, which provides an integrated experience for developing serverless applications in Python.

The AWS Toolkits for Visual Studio Code (Developer Preview) and IntelliJ (Developer Preview) are still in active development and will include similar features when they become generally available.

AWS SAM and the AWS SAM CLI implemented support for Lambda layers. Using a SAM template, you can manage your layers, and using the AWS SAM CLI, you can develop and debug Lambda functions that are dependent on layers.

Amazon DynamoDB added support for transactions, allowing developer to enforce all-or-nothing operations. In addition to transaction support, Amazon DynamoDB Accelerator also added support for DynamoDB transactions.

Amazon DynamoDB also announced Amazon DynamoDB on-demand, a flexible new billing option for DynamoDB capable of serving thousands of requests per second without capacity planning. DynamoDB on-demand offers simple pay-per-request pricing for read and write requests so that you only pay for what you use, making it easy to balance costs and performance.

AWS Amplify released the Amplify Console, which is a continuous deployment and hosting service for modern web applications with serverless backends. Modern web applications include single-page app frameworks such as React, Angular, and Vue and static-site generators such as Jekyll, Hugo, and Gatsby.

Amazon SQS announced support for Amazon VPC Endpoints using PrivateLink.

Serverless blogs

October

November

December

Tech talks

We hold several Serverless tech talks throughout the year, so look out for them in the Serverless section of the AWS Online Tech Talks page. Here are the three tech talks that we delivered in Q4:

Twitch

We’ve been so busy livestreaming on Twitch that you’re most certainly missing out if you aren’t following along!

For information about upcoming broadcasts and recent livestreams, keep an eye on AWS on Twitch for more Serverless videos and on the Join us on Twitch AWS page.

New Home for SAM Docs

This quarter, we moved all SAM docs to https://docs.aws.amazon.com/serverless-application-model. Everything you need to know about SAM is there. If you don’t find what you’re looking for, let us know!

In other news

The schedule is out for 2019 AWS Global Summits in cities around the world. AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. Summits are held in major cities around the world. They attract technologists from all industries and skill levels who want to discover how AWS can help them innovate quickly and deliver flexible, reliable solutions at scale. Get notified when to register and learn more at the AWS Global Summits website.

Still looking for more?

The Serverless landing page has lots of information. The resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials. Check it out!

Manage centralized Microsoft Exchange Server logs using Amazon Kinesis Agent for Windows

Post Syndicated from Vijay Amirtharaj original https://aws.amazon.com/blogs/big-data/manage-centralized-microsoft-exchange-server-logs-using-amazon-kinesis-agent-for-windows/

Microsoft Exchange servers store different types of logs. These log types include message tracking, Exchange Web Services (EWS), Internet Information Services (IIS), and application/system event logs. With Exchange servers deployed on a global scale, logs are often scattered in multiple directories that are local to these servers. This requires Exchange administrators to log into each server to monitor status, health, and events. Centralizing these logs and converting them into useful metrics allows Exchange administrators to identify a majority of issues, like high load or service/application errors without logging into each server.

This blog post discusses an efficient architecture to stream, analyze, and store Microsoft Exchange Server logs. For frequent queries and operational analytics, we use Amazon Elasticsearch Service (Amazon ES) and Kibana for real-time visualization. For example, you can provide various types of reports. These reports can be top email senders and recipients, top HTTP status codes in IIS logs, top error codes in EWS logs, and narrow down spikes in load/errors. For infrequent queries such as audit, legal and compliance requirements, we use Amazon S3 as the final destination. It provides low-cost storage options and high durability and Amazon Athena for simple queries using standard SQL.

Amazon Kinesis Agent for Microsoft Windows (Kinesis Agent for Windows) is a highly configurable and extensible agent. Kinesis Agent for Windows gathers, parses, transforms, and streams logs, events, and metrics to various AWS services, including Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon CloudWatch. It helps to make it more efficient and reliable to centralize logs from Windows-based services. This lets you see the extent of an issue, monitor those issues, and generate an alarm when errors or loads breach certain thresholds. For more information about Kinesis Agent for Windows, see What is Amazon Kinesis Agent for Microsoft Windows?

Parsing logs

Amazon ES requires JSON-formatted data. Kinesis Agent for Windows efficiently parses the Exchange logs lines in CSV format and converts them to JSON. You can enrich the data by using Kinesis Agent for Windows by adding details, such as the hostname, EC2 instance ID, and custom date and time formats to help pinpoint the exact issue reported in the logs. Kinesis Agent for Windows dynamically computes the log header. It does this even if the header names are changed, or if there are multiple header lines in a single log file because of a service restart. It streams the right data even when logs are rotated.

The log flow

In this use case, we send the same log to both Amazon ES for real-time analytics and to Amazon S3 for offline analytics with Amazon Athena. Instead of streaming the data twice from the host to each destination, you can configure Kinesis Agent for Windows to stream it once to a Kinesis data stream. From the stream, Amazon Kinesis Data Firehose gathers logs and delivers them to Amazon ES. Another Kinesis Data Firehose gathers the same logs and delivers them to an Amazon S3 bucket for Amazon Athena. If there is a need to send logs to another destination, we can use another Kinesis Data Firehose instance.

AWS Lambda periodically analyzes the logs in Amazon ES and post statistics to CloudWatch metrics. CloudWatch alarms are used to trigger on the anomalies detected in the posted metrics.

Kibana visualizes the log data. By looking at spikes and anomalies in the graphs, we can drill down into specific log data. That helps us diagnose specific problems with the Exchange service. Several authentication features protect access to Kibana. For information about using Amazon Cognito with an identity provider, see Amazon Cognito Authentication for Kibana.

Agent configuration

Kinesis Agent for Windows configurations are described in the appsettings.json located at %PROGRAMFILES%\Amazon\AWSKinesisTap\ path. It is here where we define the sources (log location), sinks (Kinesis Data Stream information), and pipes, which connect the source and sinks.

The following is an example source configuration that queries all files with a .log extension under the specified directory. When ExchangeLogSource is set as the type, it dynamically parses the log lines for the header. It then automatically picks up the column that is needed for ‘Time Stamp’.

    "Sources": [
        {
            "Id": "MessageTracking-LogsSource",
            "SourceType": "ExchangeLogSource",
            "Directory": "C:\\Program Files\\Microsoft\\Exchange Server\\V15\\TransportRoles\\Logs\\MessageTracking",
            "FileNameFilter": "*.log",
            "TimeZoneKind": "UTC",
            "TimeStampField": "date-time" //Optional. ExchangeLogSource can automatically detect if the TimestampField name is "date-time" or "DateTime". For other names, please specify
        }
    ]

Message tracking logs are similar to the following sample.


2018-10-22T10:53:13.404Z, 10.00.00.00, ExchangeServer01,10.00.00.00,ExchangeServer01,;250 2.0.0 OK;ClientSubmitTime:2018-10-22T10:53:10.680Z,Intra-Organization SMTP Send Connector,SMTP,SEND,157882997807893,<[email protected]>,9b3f4489-a158-4126-0d41-08d6380c8f0f,[email protected],250 2.1.5 Recipient OK,[email protected], [email protected],…


// Sinks (Destinations) define where the logs go

Next, we define the sinks or destination where the logs go. We can also stream logs to a Kinesis Data Stream in another AWS account by assuming the role that has access to the stream. For information about how to set up access, see Sink Security Configuration. Logs are converted to JSON when the Format is specified.

"Sinks": [
        {
            "Id": "MessageTracking-Kinesis-Sink",
            "SinkType": "KinesisStream",
            "Region": "us-west-2",
            "RoleARN": "arn:aws:iam::<another aws account>::role/exch-kinesis-log", // only if logs are sent to Kinesis Data Stream in another account.
            "StreamName": "ex-messagetracking",
            "Format": "json"
        }
    ]

// Pipes, connects sources and sinks.

Pipes connect the source and destination. This is helpful when there is a need to take multiple sources to a destination, or vice versa.

   "Pipes": [
        {
            "Id": "MessageTracking-Kinesis-Pipe",
            "SourceRef": "MessageTracking-LogsSource",
            "SinkRef": "MessageTracking-Kinesis-Sink"
        }
    ]

Kinesis Agent for Windows converts each log line into a JSON blob, before sending the log to Kinesis Data Streams. This is shown in the following sample.

{
      "date-time": "2018-10-22T10:53:13.404Z",
      "client-ip": "10.00.00.00",
      "client-hostname": "ExchangeServer01",
      "server-ip": "10.00.00.00",
      "server-hostname": "ExchangeServer01",
      "source-context": ";250 2.0.0 OK;ClientSubmitTime:2018-10-22T10:53:10.680Z",
      "connector-id": "Intra-Organization SMTP Send Connector",
      "source": "SMTP",
      "event-id": "SEND",
      "internal-message-id": "157882997807893",
      "message-id": "<[email protected]>",
      "network-message-id": "9b3f4489-a158-4126-0d41-08d6380c8f0f",
      "recipient-address": "[email protected]",
      "recipient-status": "250 2.1.5 Recipient OK",
      "sender-address": "[email protected]",
      "return-path": "[email protected]",
      ….
    }

Operational analytics with visualization

When working on an outage or a critical issue, immediate availability of logs is helpful. Here is an example. There is a SPAM outbreak, and we must know the top senders and top recipients. Having those results when the issue is happening helps Exchange administrators to mitigate that risk, by writing a rule to drop those SPAM messages.

For example, the following Kibana graph visualizes data from Amazon ES. It represents top email senders within a time window. The graph shows a top sender, [email protected], which should be investigated further. By using an Amazon ES API call, you can retrieve the aggregated results and take action programmatically. Placing alerts on this data helps with early detection and help with mitigations actions to prevent more spam from coming through. For more information about how to create visualization in Kibana, see Creating a Visualization.

The following is a line graph in Amazon CloudWatch showing statistics posted by AWS Lambda querying logs from Amazon ES.

Thresholds and alarms can then be configured to alert in CloudWatch alarms.

Here is an AWS Lambda code example in Python for querying an Amazon ES endpoint. It returns the top five senders from Microsoft Exchange message tracking logs for the last 15 minutes. It then posts the count of messages sent by the top-most sender to CloudWatch metrics.

import datetime
import logging
import boto3
import os
from aws_requests_auth.aws_auth import AWSRequestsAuth
from elasticsearch import Elasticsearch, RequestsHttpConnection

#Constants Declarations
epoch = datetime.datetime.utcfromtimestamp(0)
session = boto3.Session()
credentials = session.get_credentials().get_frozen_credentials()
es_host = 'search-ex-messagetracking-xxxxxxxxxxxxxxxxxxxxxxxxxx.us-west-2.es.amazonaws.com'

#Main function
def lambda_handler(event, context):
    now_time = datetime.datetime.now()
    query_end_time = unix_time_millis(now_time)
    query_start_time = unix_time_millis(now_time - datetime.timedelta(minutes=15))
          
    awsauth = AWSRequestsAuth(
        aws_access_key=credentials.access_key,
        aws_secret_access_key=credentials.secret_key,
        aws_token=credentials.token,
        aws_host=es_host,
        aws_region=session.region_name,
        aws_service='es'
    )

    es = Elasticsearch(
        hosts=[{'host': es_host, 'port': 443}],
        http_auth=awsauth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection
    )
    
    querybody = construct_agg_query("sender-address.keyword", query_start_time, query_end_time)
    log2CW("TopSenders","NoOfMsgsByTopSender",query_es(es,querybody))

# Function Declarations 

def query_es(es_param,querybody_param):
    es_results = es_param.search(index="timestamp-*", body=querybody_param)
    parsed_value = es_results['aggregations']['3']['buckets']
    print(parsed_value)
    print(parsed_value[0]['doc_count'])
    return parsed_value[0]['doc_count']

def unix_time_millis(dt):
    return int((dt - epoch).total_seconds() * 1000.0) 
    
def construct_agg_query(query_keyword, start_time, end_time):
    return_query_body = {
                            "query": {
                                "bool": {
                                "must": [
                                    {
                                    "match_all": {}
                                    },
                                    {
                                    "range": {
                                        "date-time": {
                                        "gte": start_time,
                                        "lte": end_time,
                                        "format": "epoch_millis"
                                        }
                                    }
                                    }
                                ],
                                "must_not": []
                                }
                            },
                            "size": 0,
                            "_source": {
                                "excludes": []
                            },
                            "aggs": {
                                "3": {
                                "terms": {
                                    "field": query_keyword,
                                    "size": 5,
                                    "order": {
                                    "_count": "desc"
                                    }
                                }
                                }
                            }
                        }
    return return_query_body
    
def log2CW (dimension_value, error_code, error_value):
    cloudwatch = boto3.client('cloudwatch')
    response = cloudwatch.put_metric_data(
        MetricData = [
            {
                'MetricName': error_code,
                'Dimensions': [
                    {
                        'Name': 'TransportService',
                        'Value': dimension_value
                    }
                ],
                'Unit': 'None',
                'Value': error_value
            }
        ],
        Namespace = 'Exchange/TransportService'
    )

Summary

Amazon Kinesis Agent for Microsoft Windows parses, converts the log lines to JSON, and streams the data to Amazon Kinesis Data Streams. In the use case in this blog post, we streamed several hundred Microsoft Exchange Server logs into Amazon Kinesis Data Streams in less than a minute. With native integration to AWS services, Kinesis Agent for Windows is effective in getting the logs to centralized AWS storage and AWS analytics services. Let us know your use cases and happy logging!

Additional resources

 


About the Author

 Vijay Amirtharaj is a Systems and Development Engineer in Amazon Web Services. Vijay is passionate in developing well architected email solutions. He enjoys reading about new technologies and loves spending time with family and friends.

 

 

 

 

Stream Amazon CloudWatch Logs to a Centralized Account for Audit and Analysis

Post Syndicated from David Bailey original https://aws.amazon.com/blogs/architecture/stream-amazon-cloudwatch-logs-to-a-centralized-account-for-audit-and-analysis/

A key component of enterprise multi-account environments is logging. Centralized logging provides a single point of access to all salient logs generated across accounts and regions, and is critical for auditing, security and compliance. While some customers use the built-in ability to push Amazon CloudWatch Logs directly into Amazon Elasticsearch Service for analysis, others would prefer to move all logs into a centralized Amazon Simple Storage Service (Amazon S3) bucket location for access by several custom and third-party tools. In this blog post, I will show you how to forward existing and any new CloudWatch Logs log groups created in the future to a cross-account centralized logging Amazon S3 bucket.

The streaming architecture I use in the destination logging account is a streamlined version of the architecture and AWS CloudFormation templates from the Central logging in Multi-Account Environments blog post by Mahmoud Matouk. This blog post assumes some knowledge of CloudFormation, Python3 and the boto3 AWS SDK. You will need to have or configure an AWS working account and logging account, an IAM access and secret key for those accounts, and a working environment containing Python and the boto3 SDK. (For assistance, see the Getting Started Resource Center and Start Building with SDKs and Tools.) All CloudFormation templates and Python code used in this article can be found in this GitHub Repository.

Setting Up the Solution

You need to create or use an existing S3 bucket for storing CloudFormation templates and Python code for an AWS Lambda function. This S3 bucket is referred to throughout the blog post as the <S3 infrastructure-bucket>. Ensure that the bucket does not block new bucket policies or cross-account access by checking the bucket’s Permissions tab and the Public access settings button.

You also need a bucket policy that allows each account that needs to stream logs to access it when we create the AWS Lambda function below. To do so, update your bucket policy to include each new account you create and the <S3 infrastructure-bucket> ARN from the top of the Bucket policy editor page to modify this template:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                  "03XXXXXXXX85",
                  "29XXXXXXXX02",
                  "13XXXXXXXX96",
                  "37XXXXXXXX30",
                  "86XXXXXXXX95"
                ]
            },
            "Action": [
                "s3:Get*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::<S3 infrastructure-bucket>",
                "arn:aws:s3:::<S3 infrastructure-bucket>/*"
            ]
        }
    ]
}

Clone a local copy of the CloudFormation templates and Python code from the GitHub repository. Compress the CentralLogging.py and lambda.py into a .zip file for the lambda function we create below and name it AddSubscriptionFilter.zip. Load these local files into the <S3 infrastructure-bucket>. I recommend using folders called /python for the .py files, /lambdas for the AddSubscriptionFilter.zip file and /cfn for the CloudFormation templates.

Multi-Account Configuration and the Central Logging Account

One form of multi-account configuration is the Landing Zone offering, which provides a core logging account for storing all logs for auditing. I use this account configuration as an example in this blog post. Initially, the Landing Zone setup creates several stack sets and resources, including roles, security groups, alarms, lambda functions, a cloud trail stream and an S3 bucket.

If you are not using a Landing Zone, create an appropriately named S3 bucket in the account you have chosen as a logging account. This S3 bucket will be referred to later as the <LoggingS3Bucket>. To mimic what the Landing Zone calls its logging bucket, you can use the format aws-landing-zone-logs-<Account Number><Region>, or simply pick an appropriate name for the centralized logging location. In a production environment, remember that it is critical to lock down the access to logging resources and the permissions allowed within the account to prevent deletion or tampering with the logs.

Figure 1 - Initial Landing Zone logging account resources

Figure 1 – Initial Landing Zone logging account resources

The S3 bucket – aws-landing-zone-logs-<Account Number><Region> is the most important resource created by the stack-sets for logging purposes. It contains all of the logs streamed to it from all of the accounts. Initially, the Landing Zone only sends the AWS CloudTrail and AWS Config logs to this S3 bucket.

In order to send all of the other CloudWatch Logs that are necessary for auditing, we need to add a destination and streaming mechanism to the logging account.

Logging Account Insfrastructure

The additional infrastructure required in the central logging account provides a destination for the log group subscription filters and a stream for log events that are sent from all accounts and appropriate regions to load them into the <LoggingS3Bucket> repository. The selection of these particular AWS resources is important, because Kinesis Data Streams is the only resource currently supported as a destination for cross-account CloudWatch Logs subscription filters.

The centralLogging.yml CloudFormation template automates the creation of the entire required infrastructure in the core logging account. Make sure to run it in each of the regions in which you need to centralize logs. The log group subscription filter and destination regions must match in order to successfully stream the logs.

Installation Instructions:

  1. Modify the centralLogging.yml template to add your account numbers for all of the accounts you want to stream logs from into the DestinationPolicy where you see the <AccountNumberHere> placeholders. Remove any unused placeholders.
  2. In the same DestinationPolicy, modify the final arn statement, replacing <region> with the region it will be run in (e.g., us-east-1), and the <logging account number> with the account number of the logging account where this template is to be run.
  3. Log in to the core logging account and access the AWS management console using administrator credentials.
  4. Navigate to CloudFormation and click the Create Stack button.
  5. Select Specify an Amazon S3 template URL and enter the Link for the centralLogging.yml template found in the <S3 infrastructure-bucket>.
  6. Enter a stack name, such as CentralizedLogging, and the one parameter called LoggingS3Bucket. Enter in the ARN of the logging bucket: arn:aws:s3::: <LoggingS3Bucket>. This can be obtained by opening the S3 console, clicking on the bucket icon next to this bucket, and then clicking the Copy Bucket ARN button.
  7. Skip the next page, acknowledge the creation of IAM resources, and Create the stack.
  8. When the stack completes, select the stack name to go to stack details and open the Outputs. Copy the value of the DestinationArnExport, which will be needed as a parameter for the script in the next section.

Upon successful creation of this CloudFormation stack, the following new resources will be created:

  • Amazon CloudWatch Logs Destination
  • Amazon Kinesis Stream
  • Amazon Kinesis Firehose Stream
  • Two AWS Identity and Access Management (IAM) Roles
Figure 2 - New infrastructure required in the centralized logging account

Figure 2 – New infrastructure required in the centralized logging account

Because the Landing Zone is a multi-account offering, the Log Destination is required to be the destination for all subscription filters. The key feature of the destination is its DestinationPolicy. Whenever a new account is added to the environment, its account number needs to be added to this DestinationPolicy in order for logs to be sent to it from the new account. Add the new account number in the centralLogging.yml CloudFormation template, and run an update in CloudFormation to complete the addition. A sample Destination Policy looks like this:

{
  "Version" : "2012-10-17",
  "Statement" : [
    {
      "Effect" : "Allow",
      "Principal" : {
        "AWS" : [
          "03XXXXXXXX85",
          "29XXXXXXXX02",
          "13XXXXXXXX96",
          "37XXXXXXXX30",
          "86XXXXXXXX95"
        ]
      },
      "Action" : "logs:PutSubscriptionFilter",
      "Resource" : "arn:aws:logs:<Region>:<LoggingAccountNumber>:destination:CentralLogDestination"
    }
  ]
}

The Kinesis Stream get records from the Logs Destination and holds them for 48 hours. Kinesis Streams scale by adding shards. The CloudFormation template starts the stream with two shards. You need to monitor this as instances and applications are deployed into the accounts, however, because all CloudWatch log objects will flow through this stream, and it will need to be scaled up at some point. To scale, change the number of shards (ShardCount) in the Kinesis Stream resource (KinesisLoggingStream) to the required number. See the Amazon Kinesis Data Streams FAQ documentation to confirm the capacity and throughput of each shard.

Kinesis Firehose provides a simple and efficient mechanism to retrieve the records from the Kinesis Stream and load them into the <LoggingS3Bucket> repository. It uses the CloudFormation template parameter to know where to load the logs. All of the CloudWatch logs loaded by Firehose will be under the prefix /CentralizedAccountsLog. The buffering hints for Firehose suggest that the logs be loaded every 5 minutes or 50 MB. Leave the CompressionFormat UNCOMPRESSED, since the logs are already compressed.

There are two AWS Identity and Access Management (IAM) roles created for this infrastructure. The first, CWLtoKinesisRole is used by the destination to allow CloudWatch Logs from all regions to use the destination to put the log object records into the Kinesis Stream, as well as to pass the role. The second, FirehoseDeliveryRole, allows Firehose to get the log object records from the Kinesis Stream, and then to load them into S3 logging bucket.

Once you have successfully created this infrastructure, the next step is to add the subscription filters to existing log groups.

Adding Subscription Filters to Existing Log Groups

The next step in the process is to add subscription filters for the Log Destination in the core logging account to all existing log groups. Several log groups are created by the Landing Zone, or you may have created them by using various AWS services or by logging application events. For every new AWS account, you will need to run the init_account_central_logging.py Python script to add the subscription filters to all the existing log groups.

The init_account_central_logging.py script takes one parameter, which is the Log Destination ARN. Use the Destination ARN you copied from the stack details output in the previous section as the parameter to the script.

The init_account_central_logging.py script first adds this Destination ARN to the AWS Systems Manager Parameter Store so that the core logic that creates the subscription filter can use it. The script then gets a list of all existing log groups, iterates over them, deletes any existing subscription filters (because there can only be one subscription filter per log group and attempting to create another would cause an error), and then adds the new subscription filter to the centralized logging account to the Log Destination.

Figure 3 - Run script to add subscription filters to existing log groups

Figure 3 – Run script to add subscription filters to existing log groups

Installation Instructions:

  1. Make sure that Python and boto3 are installed and accessible in the client computer – consider loading into a virtual environment to keep dependencies separate.
  2. Set the AWS_PROFILE environment variable to the appropriate AWS account profile.
  3. Log in to the proper account, and obtain administrator or other credentials with appropriate permissions, and add the account access key and secret key to the AWS credentials file.
  4. Set the region and output in the AWS config file.
  5. Download and place two python files into a working directory: init_account_central_logging.py and CentralLogging.py.
  6. Run the script using the command python3 ./init_account_central_logging.py -d <LogDestinationArn>.

Use the AWS Management Console to validate the results. Navigate to CloudWatch Logs and view all of the log groups. Each one should now have a subscription filter named “Logs (CentralLogDestination).”

Automatically Adding Subscription Filters to New Log Groups

The final step to set up the centralized log streaming capability is to run a CloudFormation script to create resources that automatically add subscription filters to new log groups. New log groups are created in accounts by resources (e.g., Lambda functions) and by applications. A subscription filter must be added to every new log group in order to deliver its log events to the logging account,

The AddSubscriptionFilter.yml CloudFormation template contains resources to automatically add subscription filters.

First, it creates a role that allows it to access the lambda code that is stored in a centralized location – the <S3 infrastructure-bucket>. (Remember that its S3 bucket policy must contain this account number in order to access the lambda code.)

Second, the template creates the AddSubscriptionLambda, which reuses the core logic shared by the script in the last section. It retrieves the proper destination from the Parameter Store, deletes any existing subscription filter from the log group, and adds the new subscription filter to the newly created log group. This lambda function is triggered by a CloudWatch event rule.

Third, the CloudFormation creates a Lambda Permission, which allows the event trigger to invoke this particular lambda.

Finally, the CloudFormation template creates an Amazon CloudWatch Events Rule that acts as a trigger for the lambda. This rule looks for an event coming from CloudTrail that signals the creation of a new log group. For each create log group event found, it invokes the AddSubscriptionLambda.

Figure 4 - Infrastructure to automatically add a subscription filter to a new log group and the log flow to the centralized account

Figure 4 – Infrastructure to automatically add a subscription filter to a new log group and the log flow to the centralized account

Installation Instructions:

(Important note: This functionality requires that the LogDestination parameter be properly set to the LogDestinationArn in the Parameter Store before the Lambda will run successfully. The script in the previous step sets this parameter, or it can be done manually. Make certain that the destination specified is in this same region.)

  1. Ensure that the <S3 infrastructure-bucket> has the AddSubscriptionFilter.zip file containing the Python code files lambda.py and CentralLogging.py.
  2. Log in to the appropriate account, and access using administrator credentials. Make sure that the region is set properly.
  3. Navigate to Cloudformation and click the Create Stack button.
  4. Select Specify an Amazon S3 template URL and enter the Link for the AddSubscriptionFilter.yml template found in <S3 infrastructure-bucket>
  5. Enter a stack name, such as AddSubscription.
  6. Enter the two parameters, the <S3 infrastructure-bucket> name (not ARN) and the folder and file name (e.g., lambdas/AddSubscriptionFilter.zip)
  7. Skip the next page, acknowledge the creation of IAM resources, and Create the stack.

In order to test that the automated addition of subscription filters is working properly, use the AWS Management Console to navigate to CloudWatch Logs and click the Actions button. Select Create New Log Group and enter a random log group name, such as “testLogGroup.” When first created, the log group will not have a subscription filter. After a few minutes, refresh the display and you should see the new subscription filter on the log group. At this point, you can delete the test log group.

New Account Setup

As a reminder, when you add new accounts that you want to have stream log events to the central logging account, you will need to configure the new accounts in two places in order for this functionality to work properly.

First, add the account number to the LoggingDestination property DestinationPolicy in the centralLogging.yml template. Then, update the CloudFormation stack.

Second, modify the bucket policy for the <S3 infrastructure-bucket>. Select the Permissions tab, then the Bucket Policy button. Add the new account to allow cross-account access to the lambda code by adding the line “arn:aws:iam::<new account number>:root” to the Principal.AWS list.

Conclusion

Centralized logging is a key component in enterprise multi-account architectures. In this blog post, I have built on the central logging in multi-account environments streaming architecture to automatically subscribe all CloudWatch Logs log groups to send all log events to an S3 bucket in a designated logging account. The solution uses a script to add subscription filters to existing log groups, and a lambda function to automatically place a subscription filter on all new log groups created within the account. This can be used to forward application logs, security logs, VPC flow logs, or any other important logs that are required for audit, security, or compliance purposes.

About the author

David BaileyDavid Bailey is a Cloud Infrastructure Architect with AWS Professional Services specializing in serverless application architecture, IoT, and artificial intelligence. He has spent decades architecting and developing complex custom software applications, as well as teaching internationally on object-oriented design, expert systems, and neural networks.

 

 

New – Amazon Kinesis Data Analytics for Java

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-amazon-kinesis-data-analytics-for-java/

Customers are using Amazon Kinesis to collect, process, and analyze real-time streaming data. In this way, they can react quickly to new information from their business, their infrastructure, or their customers. For example, Epic Games ingests more than 1.5 million game events per second for its popular online game, Fornite.

With Amazon Kinesis Data Analytics you can process data in real-time using standard SQL. While SQL provides an easy way to quickly query large volumes of streaming data without learning new frameworks or languages, many customers also want to build more sophisticated data processing applications using general-purpose programming languages.

Using Java with Amazon Kinesis Data Analytics

Today, we are introducing support for Java in Amazon Kinesis Data Analytics. Now, developers can use their own Java code to create powerful real-time applications that process streaming data like continuously transforming and loading data into their data lakes, generating metrics to feed real-time gaming leaderboards, applying machine learning models to data streams from connected devices, and more.

To use this new functionality, developers build applications using open source libraries which include built-in operators for common data processing functions that allow applications to organize, transform, aggregate, and analyze data at any scale. These libraries are both open source and you can run them anywhere:

  • Apache Flink, an open source framework and engine for processing data streams.
  • AWS SDK for Java, providing Java APIs for many AWS services.

Developers can use these Java libraries within their Integrated Development Environment (IDE) of choice. Using these libraries, the following AWS services can be integrated with as little as one line of code:

  • Streaming Data Sources: Amazon Kinesis Data Streams
  • Streaming Destinations: Amazon S3, Amazon DynamoDB, Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose

In addition to the pre-built AWS integrations, the Java libraries include more connectors to tools like Cassandra, ElasticSearch, RabbitMQ, Redis, and more, and the ability to build custom integrations.

Building a Kinesis Data Streams Java Application

I prepared a simple Java application that implements the “mandatory” word count example for data processing. I send some paragraphs of text in input and I get, every five seconds, the number of times each word is being used as output.

First, I create two Kinesis Data Streams:

  • TextInputStream, where I am going to send my input records
  • WordCountOutputStream, where I am going to read the output of the Java application

 

Here is the code of the word-count Java application. To read and write from Kinesis Data Streams, I am using the Kinesis Connector from the Apache Flink project.

public class StreamingJob {

    private static final String region = "us-east-1";
    private static final String inputStreamName = "TextInputStream";
    private static final String outputStreamName = "WordCountOutputStream";

    private static DataStream<String> createSourceFromStaticConfig(
            StreamExecutionEnvironment env) {
        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
            "LATEST");

        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
            new SimpleStringSchema(), inputProperties));
    }

    private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
        Properties outputProperties = new Properties();
        outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

        FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
            SimpleStringSchema(), outputProperties);
        sink.setDefaultStream(outputStreamName);
        sink.setDefaultPartition("0");
        return sink;
    }

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> input = createSourceFromStaticConfig(env);

        input.flatMap(new Tokenizer())
             .keyBy(0)
             .timeWindow(Time.seconds(5))
             .sum(1)
             .map(new MapFunction<Tuple2<String, Integer>, String>() {
                 @Override
                 public String map(Tuple2<String, Integer> value) throws Exception {
                     return value.f0 + "," + value.f1.toString();
                }
             })
             .addSink(createSinkFromStaticConfig());

        env.execute("Word Count");
    }

    public static final class Tokenizer
        implements FlatMapFunction<String, Tuple2<String, Integer>> {

		@Override
		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
			String[] tokens = value.toLowerCase().split("\\W+");
			for (String token : tokens) {
				if (token.length() > 0) {
					out.collect(new Tuple2<>(token, 1));
				}
			}
		}
    }
    
}

The most important part of the application is the manipulation of the input object, where I apply a few DataStream Transformations:

  1. I start with a DataFrame containing the String from the input stream.
  2. I use a Tokenizer in a FlatMap to split the sentence into “words”, each word followed by the number “1”.
  3. I apply the KeyBy operator to logically partition the stream in respect to the “word”.
  4. I use a 5 seconds tumbling window.
  5. I aggregate within the window, summing up for each word the number “1” to count them.
  6. I use a simple Map for each record to join the word and the number into a comma-separated values (CSV) String that I send to the output stream.

One of the most powerful operators shown here is the KeyBy operator. It enables you to re-organize a particular stream by a specified key in real-time. This type of re-keying enables further downstream operations like aggregations, counts, and much more. This enables you to set up streaming map-reduce on different keys within the same application.

I build the Java application using Maven and load the output JAR to an Amazon Simple Storage Service (S3) bucket in the region where I want to deploy the application. In the Kinesis Data Analytics console, I create a new application and select “Flink” as runtime:

I then configure the application to use the code on my S3 bucket. The console updates the IAM role for the application to have permissions to read the code.

You can optionally add key/value properties to the configuration of the application. You can read those properties from within the application, to provide customization at deployment time.

For monitoring, I leave the default metrics. I enable logging to Amazon CloudWatch, for errors only.

Don’t forget to add permissions to the IAM role created by the console to allow the Kinesis Analytics application to read and write from the streams used for input and output, TextInputStream and WordCountOutputStream in my case.

I can now start the application with the “Run” button, and when it is running, I use a script that I prepared to put some text (I am using a description of the Amazon Kinesis platform) in the input stream:

$ python put_records.py TextInputStream
Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data...

The behavior of my application is summarized in the console in the Application Graph, a visual representation of the data flow consisting of operators and intermediate results (complex applications, using multiple streams, have a much more interesting graph):

To read the output stream, I am using a Lambda function written in Python. I am using the one provided with the Kinesis Record Aggregation & Deaggregation Modules for AWS Lambda, that provides automatic “de-aggregation” of records aggregated by the Amazon Kinesis Producer Library (KPL).

As expected, in the CloudWatch Logs console I get the list of the words and the number of times they were used, updated every 5 seconds by the Lambda function:

Pricing and Availability

With Amazon Kinesis Data Analytics for Java, you pay only for what you use. Pricing is similar to Amazon Kinesis Data Analytics for SQL, but there are a few differences.

For Java applications, you are charged a single additional Amazon Kinesis Processing Unit (KPU) per application, used for application orchestration. Java applications are also charged for running application storage and durable application backups. Running application storage is used for Amazon Kinesis Data Analytics’ stateful processing capabilities and is charged per GB-month. Durable application backups are optional and provide a point-in-time recovery point for applications, charged per GB-month.

For example, pricing is $0.11 per KPU hour in US East (N. Virginia), and you are charged for running application storage ($0.10 per GB-month) and durable application backups ($0.023 per GB-month).

Available Now

Amazon Kinesis Data Analytics for Java is available now in US East (N. Virginia), US East (Ohio), US West (Oregon), EU West (Ireland).

I only scratched the surface of the capabilities for stream processing enabled by the support of Java in Amazon Kinesis Data Analytics. I think this is a powerful tool that can enable new use cases. Let me know what you are going to build with it!

Scaling Amazon Kinesis Data Streams with AWS Application Auto Scaling

Post Syndicated from Giorgio Nobile original https://aws.amazon.com/blogs/big-data/scaling-amazon-kinesis-data-streams-with-aws-application-auto-scaling/

Recently, AWS launched a new feature of AWS Application Auto Scaling that let you define scaling policies that automatically add and remove shards to an Amazon Kinesis Data Stream. For more detailed information about this feature, see the Application Auto Scaling GitHub repository.

As your streaming information increases, you require a scaling solution to accommodate all requests. If you have a decrease in streaming information, you might use scaling to reduce costs. Currently, you scale an Amazon Kinesis Data Stream shard programmatically. Alternatively, you can use the Amazon Kinesis Scaling Utilities. To do so, you can use each utility manually, or automated with an AWS Elastic Beanstalk environment.

With the new feature of Application Auto Scaling, you can use AWS services to create a scaling solution without manual intervention or complex solutions.

Auto scaling solution overview

This blog post shows you how to deploy an auto scaling solution for your Amazon Kinesis Data Streams based on the default Amazon CloudWatch metrics. It also provides an AWS CloudFormation template to set up the environment automatically and the code related to the lambda function.

How the auto scaling solution works

Begin with a CloudWatch alarm that monitors Kinesis Data Stream shard metrics. When a custom threshold of the alarm is reached, for example because the number of requests has grown, the alarm is fired. This firing sends a notification to an Application Auto Scaling policy that responds based on the stated preference, scale up or down.

When the scaling policy is triggered, Application Auto Scaling calls an API operation. The call passes the new number of Kinesis Data Stream shards for the desired capacity (for more information, see here). The call also passes the name of the resource to scale, provided by Amazon API Gateway. Amazon API Gateway invokes an AWS Lambda function. Based on the information sent by Application Auto Scaling, the Lambda function increases or decreases the number of shards in the Kinesis Data Stream. It does so by using Kinesis Data Stream’s UpdateShardCount API operation. The following diagram illustrates the scenario.

As you can see from the diagram, AWS System Manager Parameter Store is also involved. We use Parameter Store to store the desired capacity value that Application Auto Scaling sends to API Gateway to increase or decrease the capacity. (In this scenario, the capacity is the number of shards.) In fact, Application Auto Scaling often invokes API Gateway to get the status of the custom resource, in this case the Kinesis Data Stream. It does so to see if there are actions to be taken and if previous actions were successful. Because Lambda is stateless, we need somewhere to save the desired capacity value communicated by Application Auto Scaling at any point.

Solution components

This solution uses the following components:

Application Auto Scaling scalable target – A scalable target is a resource registered with the Application Auto Scaling service. The service can scale any defined and registered resources. A scalable target handles the minimum and maximum value for the scalable dimension. It requires the following parameters:

  • ResourceId: The resource that is the scalable target. For custom resources, such as in the following example, specify the OutputValue returned from the AWS CloudFormation template.
  • RoleARN: The service-linked role used to grant permission to modify scalable target resources.
  • ScalableDimension: The dimension of the scalable target. For custom resources, the value must be custom-resource:ResourceType:Property.
  • ServiceNamespace: The namespace of the AWS service. In this case, this value is the custom resource.

Scaling policy – After you register a scalable target, you can apply a scaling policy that describes how the service should scale.

The following policy types are supported:

  • TargetTrackingScaling — Only for Amazon DynamoDB
  • StepScaling — Supported by Amazon ECS, Amazon EC2 Spot Fleets, and Amazon RDS
  • TargetTrackingScaling — Supported by Amazon ECS, EC2 Spot Fleets, and Amazon RDS
  • StepScaling — Supported by other services

In our scenario, we use a StepScaling policy, because we are using a custom resource type, as discussed later in Scaling policy and scheduled actions section. However, custom resource type can also support scheduled actions.

API Gateway – In our solution, we use Amazon API Gateway to expose a secure REST endpoint. Application Auto Scaling uses this endpoint to send authenticated calls, using IAM, to get the current capacity of the custom service to scale with HTTP GET. Application Auto Scaling also uses this endpoint to adjust the relative capacity of the custom service (with HTTP PATCH).

CloudWatch metrics and alarms – KPI to monitor and trigger an alarm directed to the Application Auto Scaling endpoint.

Lambda function – In our scenario, the AWS Lambda function mainly does two tasks:

  1. If the API request is GET, the Lambda function returns JSON that includes the information of the status of the custom resource that Application Auto Scaling controls. In this case, this custom resource is the Kinesis Data Stream.
  2. If the API request is PATCH, the Lambda function stores the new desired capacity in a DynamoDB table. The Lambda function then calls the UpdateShardCount API operation for the Kinesis Data Stream.

AWS System Manager Parameter Store – KPI to monitor and trigger an alarm directed to the Application Auto Scaling endpoint.

Prerequisites

Prerequisites for this solution include the following:

  • User credentials with permissions that allow you to configure automatic scaling and create the required service-linked role. For more information, see the Application Auto Scaling User Guide.
  • Permissions to create a stack using an AWS CloudFormation template, plus full access permissions to resources within the stack. For more information, see the AWS CloudFormation User Guide.

Scaling policy and scheduled actionsseconds

You can use the same architecture to work in two different situations for your Amazon Kinesis Data Stream:

  1. The first is predictable traffic, which means the scheduled actions. An example of predictable traffic is when your Kinesis Data Stream endpoint sees growing traffic in specific time window. In this case, you can make sure that an Application Auto Scaling scheduled action increases the number of Kinesis Data Stream shards to meet the demand. For instance, you might increase the number of shards at 12:00 p.m. and decrease them at 8:00 p.m.
  2. The second is the classic on-demand scenario, which specifies the scaling policy. In this case, you create an Application Auto Scaling scaling policy that increases or decreases the number of Kinesis Data Stream shards to meet the client demand.

In this blog post we are going to focus on the seconds scenario with the scaling policy, as we believe it is more challenging to implement.

Limitations

Application Auto Scaling can scale up and down continuously to make sure that you can meet your demand. However, Kinesis Data Streams have some limitations to consider when configuring Application Auto Scaling. With Kinesis Data Streams, you can’t do the following:

  • Scale more than twice for each rolling 24-hour period for each stream
  • Scale up to more than double your current shard count for a stream
  • Scale down below half your current shard count for a stream
  • Scale up to more than 500 shards in a stream
  • Scale a stream with more than 500 shards down unless the result is fewer than 500 shards
  • Scale up to more than the shard limit for your account

If you need to scale more than once a day, you can use this AWS Support form to request an increase to this limit.

Choosing the metric

When choosing the metrics to monitor to scale up and down, we can use the stream-level metrics IncomingBytes and IncomingRecords, as described in the Kinesis Data Streams documentation. Kinesis supports streaming 1 MiB of data per second or 1000 records per second. We can use IncomingBytes and IncomingRecords to set an alarm based on a threshold, let’s say 80 percent. We do this to call the Application Auto Scaling service before Amazon Kinesis start throttling our requests. This is the most effective method to proactively scale our resource. However, we need to set up the right cooldown period in Application Auto Scaling to avoid multiple scaling actions triggered by both metrics at the same time.

Alternatively, we can use the WriteProvisionedThroughputExceeded metric to scale when we reach the Amazon Kinesis shard limit, as described in the CloudWatch documentation.

In this example, we use the first approach, using IncomingRecords.

Deploying and testing the solution

To test the solution, we can use the AWS CloudFormation template found here. The AWS CloudFormation template automatically creates for you: the API Gateway, the Lambda function, the Kinesis Data Stream, the DynamoDB table, and the Application Auto Scaling group, and its scaling policy.

Deploying the solution

To let AWS CloudFormation create these resources on your behalf:

  1. Open the AWS Management Console in the AWS Region you want to deploy the solution to, and on the Services menu, choose CloudFormation.
  2. Choose Create Stack, choose Upload a template to Amazon S3, and then choose the file custom-application-autoscaling-kinesis.yaml included in the solution.
  3. Give a friendly name to the stack. Specify the Amazon S3 bucket that contains the compressed version of AWS Lambda function (index.py) included in the solution.
  4. For Options, you can specify tags for your stack and an optional IAM role to be used by AWS CloudFormation to create resources. If the role isn’t specified, a new role is created. You can also perform additional configuration for rollback settings and notification options.
  5. The review section shows a recap of the information. Be sure to select the two AWS CloudFormation acknowledgements to allow AWS CloudFormation to create resources with custom names on your behalf. Also, create a change set, because the AWS CloudFormation template includes the AWS::Serverless-2016-10-31
  6. Choose stream level metrics to create the resources present in the stack.

Testing the solution

Now that the environment is created, test it. To manually fire the Amazon CloudWatch alarm, we must generate traffic to the stream. By taking advantage of the Amazon Kinesis Data Generator, this is an efficient way to do it.

  1. First, it is necessary to follow this guide to set up your Amazon Kinesis Data Generator https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html
  2. After the generator is created, it is necessary to select the Region and the newly created Kinesis Data Stream, in our case Kinesis-MyKinesisStream-1MUOGAD9OBCJH
  3. In Records per second insert a value greater than 1000 if you have one shard. Otherwise, multiply this number time the number of shards (for instance, if you have two shards, 1500 * 2 = 3000).
  4. In the form, enter test, and then choose Send data.
  5. Now that the traffic is being generated, open the Amazon CloudWatch console, and in Alarms, choose Alarms.
  6. In the ALARM list, select IncomingRecords-alarm-outOpen the History tab on the bottom of the page to see that the alarm triggered the Application Auto Scaling.

To verify that the number of open shards has been updated:

  1. Open the Amazon Kinesis console and select Data Streams, then select your Data Stream, in our case Kinesis-MyKinesisStream-1MUOGAD9OBCJH.
  2. In Details, it is possible to see that the number of shards increased to three, as shown in the following example:

Cleaning up the environment after testing

To clean up the environment after the testing, the procedure is straight-forward. By removing the AWS CloudFormation stack, everything is removed, as follows:

  1. Open the AWS Management Console in the AWS Region that you want to deploy the solution to, and select the CloudFormation stack from the list.
  2. Click on Actions and Delete Stack.
  3. OPTIONALLY: you can delete the S3 bucket and the Lambda function that you created.

Conclusion

This post described how you use Application Auto Scaling service to automatically scale Amazon Kinesis Data Stream. With the help of Amazon API Gateway, you can allow Application Auto Scaling to securely invoke the AWS Lambda function that interacts with the desired stream.


About the Authors

Giorgio Nobile works as Solutions Architect for Amazon Web Services in Italy. He works with enterprise customers and helps them to embrace the digital transformation. Giorgio’s field of expertise covers Big Data. In his free time, Giorgio loves playing with his two children and is addicted to DIY and snowboarding.

 

 

 

Diego Natali works as Solutions Architect for Amazon Web Services in Italy. With several years engineering background, he helps ISV and Start up customers designing flexible and resilient architectures using AWS services. In his spare time he enjoys watching movies and riding his dirt bike.

 

 

 

 

Your guide to Amazon Kinesis sessions, chalk talks, and workshops at AWS re:Invent 2018

Post Syndicated from Larry Heathcote original https://aws.amazon.com/blogs/big-data/your-guide-to-amazon-kinesis-sessions-chalk-talks-and-workshops-at-aws-reinvent-2018/

AWS re:Invent 2018 is almost here! This post includes a list of Amazon Kinesis sessions, chalk talks, and workshops at AWS re:Invent 2018. You can choose the link next to each session description for the session schedule. Use the information to help schedule your conference week in Las Vegas to learn more about Amazon Kinesis.

Sessions

ANT208 – Serverless Video Ingestion & Analytics with Amazon Kinesis Video Streams

Amazon Kinesis Video Streams makes it easy to capture live video, play it back, and store it for real-time and batch-oriented ML-driven analytics. In this session, we first dive deep on the top five best practices for getting started and scaling with Amazon Kinesis Video Streams. Next, we demonstrate a streaming video from a standard USB camera connected to a laptop, and we perform a live playback on a standard browser within minutes. We also have on stage members of Amazon Go, who are building the next generation of physical retail store experiences powered by their “just walk out” technology. They walk through the technical details of their integration with Kinesis Video Streams and highlight their successes and difficulties along the way.

ANT310 – Architecting for Real-Time Insights with Amazon Kinesis

Amazon Kinesis makes it easy to speed up the time it takes for you to get valuable, real-time insights from your streaming data. In this session, we walk through the most popular applications that customers implement using Amazon Kinesis, including streaming extract-transform-load, continuous metric generation, and responsive analytics. Our customer Autodesk joins us to describe how they created real-time metrics generation and analytics using Amazon Kinesis and Amazon Elasticsearch Service. They walk us through their architecture and the best practices they learned in building and deploying their real-time analytics solution.

ANT322-R – High Performance Data Streaming with Amazon Kinesis: Best Practices

Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information. In this session, we dive deep into best practices for Kinesis Data Streams and Kinesis Data Firehose to get the most performance out of your data streaming applications. Our customer NICE inContact joins us to discuss how they utilize Amazon Kinesis Data Streams to make real-time decisions on customer contact routing and agent assignments for its Call Center as a Service (CCaaS) Platform. NICE inContact walks through their architecture and requirements for low-latency, accurate processing to be as responsive as possible to changes.

ANT322-R1 – High Performance Data Streaming with Amazon Kinesis: Best Practices

Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information. In this session, we dive deep into best practices for Kinesis Data Streams and Kinesis Data Firehose to get the most performance out of your data streaming applications. Comcast uses Amazon Kinesis Data Streams to build a Streaming Data Platform that centralizes data exchanges. It is foundational to the way our data analysts and data scientists derive real-time insights from the data. In the second part of this talk, Comcast zooms into how to properly scale a Kinesis stream. We first list the factors to consider to avoid scaling issues with standard Kinesis stream consumption, and then we see how the new fan-out feature changes these scaling considerations.

SRV316-R & SRV316-R1 – Serverless Stream Processing Pipeline Best Practices

Real-time analytics has traditionally been analyzed using batch processing in DWH/Hadoop environments. Common use cases use data lakes, data science, and machine learning (ML). Creating serverless data-driven architecture and serverless streaming solutions with services like Amazon Kinesis, AWS Lambda, and Amazon Athena can solve real-time ingestion, storage, and analytics challenges, and help you focus on application logic without managing infrastructure. In this session, we introduce design patterns, best practices, and share customer journeys from batch to real-time insights in building modern serverless data-driven architecture applications. Hear how Intel built the Intel Pharma Analytics Platform using a serverless architecture. This AI cloud-based offering enables remote monitoring of patients using an array of sensors, wearable devices, and ML algorithms to objectively quantify the impact of interventions and power clinical studies in various therapeutics conditions.

SEC402-R – AWS, I Choose You: Pokemon’s Battle against the Bots

Join us for this advanced-level talk to learn about Pokemon’s journey defending against DDoS attacks and bad bots with AWS WAF, AWS Shield, and other AWS services. We go through their initial challenges and the evolution of their bot mitigation solution, which includes offline log analysis and dynamic updates of badbot IPs along with rate-based rules. This is an advanced talk and assumes some knowledge of Amazon DynamoDB, Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, AWS Firewall Manager, AWS Shield, and AWS WAF.

Chalk Talks

ANT358 – Serverless Stream Processing Tips & Tricks

Streaming data ingestion and near real-time analysis gives you immediate insights into your data. By using AWS Lambda with Amazon Kinesis, you can obtain these insights without the need to manage servers. But are you doing this in the most optimal way? In this interactive session, we review the best practices for using Lambda with Kinesis, and how to avoid common pitfalls.

ANT359 – Considerations for Building Your First Streaming Application

Do you want to increase your knowledge of AWS big data web services and launch your first big data application on the cloud? In this chalk talk, we provide an overview of many of the AWS analytics services, including Amazon EMR, Amazon Kinesis, Amazon Athena, and Amazon Redshift. We discuss how they are architected together to solve common big data problems, such as ingestion, ETL, and real-time analytics.

ANT360 – Don’t Wait Until Tomorrow: From Batch to Streaming

In recent years, there has been explosive growth in the number of connected devices and real-time data sources. Data is being produced continuously and its production rate is accelerating. Businesses can no longer wait for hours or days to use this data. To gain the most valuable insights, they must use this data immediately so they can react quickly to new information. In this chalk talk, we discuss how to take advantage of streaming data sources to analyze and react in near-real time. In addition, we present different options for how to solve a real-world scenario and walk through those solutions.

ANT361 – Using Amazon Kinesis Data Streams as a Low-Latency Message Bus

Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information. In this chalk talk, we dive deep into best practices for Kinesis Data Streams and how to optimize for low-latency, multi-consumer solutions.

BAP328-R & BAP328-R1 – Architectures for Gaining Data Insights into Your Contact Center Experience

Join us for a deep dive into using Amazon Kinesis Data Analytics for insight into what’s happening with the contacts and agents in your Amazon Connect contact center. Learn how to leverage AWS analytics and ML services to inspect, transform, and gain insight into the customer’s journey through your contact center. We also show you how to use Alexa for Business to receive timely voice-activated business intelligence on your contact center’s performance.

Workshops

Before starting a workshop, you should have a basic understanding of Amazon Kinesis. Please bring your laptop and power supply to the workshop.

ANT213-R – Build Your First Big Data Application on AWS

Do you want to increase your knowledge of AWS big data web services and launch your first big data application on the cloud? In this session, we walk you through simplifying big data processing as a data bus comprising ingest, store, process, and visualize. You will build a big data application using AWS managed services, including Amazon Athena, Amazon Kinesis, Amazon DynamoDB, and Amazon S3. Along the way, we review architecture design patterns for big data applications and give you access to a take-home lab so you can rebuild and customize the application yourself.

ANT213-R1 – Build Your First Big Data Application on AWS

Do you want to increase your knowledge of AWS big data web services and launch your first big data application on the cloud? In this session, we walk you through simplifying big data processing as a data bus comprising ingest, store, process, and visualize. You will build a big data application using AWS managed services, including Amazon Athena, Amazon Kinesis, Amazon DynamoDB, and Amazon S3. Along the way, we review architecture design patterns for big data applications and give you access to a take-home lab so you can rebuild and customize the application yourself.

ANT357 – Stream Video, Analyze It in Real Time, and Share It in Real Time

Video is ‘big data.’ Image sensors—in our smartphones, smart home devices, and traffic cameras—are getting Internet-connected. Massive streams of video data are generated, but currently not mined for real-time insights to drive businesses forward. In this workshop, learn to capture, process, and analyze video streams. Build and configure your camera device’s media pipeline to start streaming video into the AWS Cloud using Amazon Kinesis Video Streams. Next, build and deploy your own machine learning (ML) model in Amazon SageMaker to generate inferences about objects or activities in your video stream. Finally, build a browser-based web player to view the video in Live and On-Demand modes, including the analyzed video stream. In this workshop, you use Amazon Kinesis Video Streams, Amazon SageMaker, Amazon Rekognition Video, and Amazon ECS.

ANT362 – Use Streaming Data to Gain Real-Time Insights into Your Business

In recent years, there has been an explosive growth in the number of connected devices and real-time data sources. Because of this, data is being continuously produced, and its production rate is accelerating. Businesses can no longer wait for hours or days to use this data. To gain the most valuable insights, they must use this data immediately so they can react quickly to new information. In this workshop, you will learn how to take advantage of streaming data sources to analyze and react in near real time. We provide several requirements for a real-world streaming data scenario, and you’re tasked with creating a solution that successfully satisfies the requirements using services such as Amazon Kinesis, AWS Lambda, and Amazon SNS.

ANT318-R – Build, Deploy and Serve Machine learning models on streaming data using Amazon SageMaker, Apache Spark on Amazon EMR and Amazon Kinesis

As data exponentially grows in organizations, there is an increasing need to use machine learning (ML) to gather insights from this data at scale and to use those insights to perform real-time predictions on incoming data. In this workshop, we walk you through how to train an Apache Spark model using Amazon SageMaker that points to Apache Livy and running on an Amazon EMR Spark cluster. We also show you how to host the Spark model on Amazon SageMaker to serve a RESTful inference API. Finally, we show you how to use the RESTful API to serve real-time predictions on streaming data from Amazon Kinesis Data Streams.

ANT318-R1 – Build, Deploy and Serve Machine learning models on streaming data using Amazon SageMaker, Apache Spark on Amazon EMR and Amazon Kinesis

As data exponentially grows in organizations, there is an increasing need to use machine learning (ML) to gather insights from this data at scale and to use those insights to perform real-time predictions on incoming data. In this workshop, we walk you through how to train an Apache Spark model using Amazon SageMaker that points to Apache Livy and running on an Amazon EMR Spark cluster. We also show you how to host the Spark model on Amazon SageMaker to serve a RESTful inference API. Finally, we show you how to use the RESTful API to serve real-time predictions on streaming data from Amazon Kinesis Data Streams.

GPSWS406 – Advanced Serverless Data Processing

In this hands-on workshop, you learn best practices and architectural patterns for building streaming data processing pipelines without servers. Using Amazon Kinesis, AWS Lambda, and other services, you have the opportunity to build, deploy, and monitor an application to ingest and process high-velocity data at scale. This advanced workshop assumes that you have experience writing Lambda functions and understand the basics of the AWS serverless platform, so come ready to dive into the deep end. Bring your laptop with a full keyboard. We provide a sandbox AWS account for you to use during the workshop.

MAE309 – Build an AWS Analytics Solution to Monitor the Video Streaming Experience

In this workshop, we build and deploy an end-to-end analytics solution for monitoring the video streaming experience. We integrate an open source video player with Amazon Kinesis Data Streams to capture events in real time. We explore the data available for capture and a variety of use cases: from generating alerts on poor experience to content recommendations based on user behavior. We also show you how this real-time data can be archived in a data lake and further used to generate reports of aggregate performance and experience across a number of dimensions.

ADT401 – Real-Time Web Analytics with Amazon Kinesis Data Analytics

Knowing what users are doing on your websites in real time provides insights you can act on without waiting for delayed batch processing of clickstream data. Watching the immediate impact on user behavior after new releases, detecting and responding to anomalies, situational awareness, and evaluating trends are all benefits of real-time website analytics. In this workshop, we build a cost-optimized platform to capture web beacon traffic, analyze it for interesting metrics, and display it on a customized dashboard. We start by deploying the Web Analytics Solution Accelerator, then once the core is complete, we extend their solution to capture new and interesting metrics, process those with Amazon Kinesis Data Analytics, and display new graphs on their custom dashboard. Participants come away with a fully functional system for capturing, analyzing, and displaying valuable website metrics in real time.

GAM305 – Dynamic Encounters for Veteran Players Using Machine Learning

Are you trying to keep your game fresh for long-time players but don’t have the resources to keep building new handcrafted content? Join this session to learn how to launch dynamic content for groups of players without relying on static techniques like instancing or spawn points. We dive into Amazon Kinesis and Amazon Kinesis Data Analytics for real-time data collection and hotspot detection, and machine learning for encounter-building based on observed player behavior.

Conclusion

We look forward to seeing you at AWS re:Invent 2018 in Las Vegas. In addition to the sessions described in this blog post, please stop by the Analytics booth during Expo hours to learn more about Amazon Kinesis.

 


About the Author

Larry Heathcote is a Principal Product Marketing Manager at Amazon Web Services. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys family time, home projects, grilling out, and classic barbeque.

 

 

Turn Windows DHCP Server logs into actionable metrics using Amazon Kinesis Agent for Windows

Post Syndicated from Ted Balsimo original https://aws.amazon.com/blogs/big-data/turn-windows-dhcp-server-logs-into-actionable-metrics-using-amazon-kinesis-agent-for-windows/

Understanding Windows system and service health on a global scale is challenging. You capture server log data, and then analyze and manipulate the data in real time to create actionable telemetry insights. Amazon Kinesis Agent for Microsoft Windows makes it efficient to ingest Windows server log data into your AWS ecosystem for analysis. This blog post discusses using Kinesis Agent for Windows to capture and aggregate Windows Dynamic Host Configuration Protocol (DHCP) server logs. Then turning that data into service health graphs in Amazon CloudWatch.

How do you quantify network access metrics of a team across the globe? Specifically, in the northeast corner of the ninth floor in that team’s building? Does the Wireless Access Point (WAP) in that part of the building provide the team network access reliably and consistently? Or does the subnet that WAP is configured with, run out of IP addresses and deny network access to that team? This is the more definitive problem this article solves using Kinesis Agent for Windows.

Detecting customer impact as a result of scope exhaustion

Windows DHCP leases are divided into network subnets, referred to as scopes. These scopes are mapped to dedicated physical locations on a large corporate network. A scope is considered to be full when all IP addresses that belong to it are in use. This is known as “scope exhaustion”. When “scope exhaustion” occurs, any new clients are denied an IP address lease on that subnet. This is referred to as a “lease refusal.”  Commonly, a DHCP scope is defined for the exact number of devices that are expected, and no more. In these instances an exhausted scope is expected, which makes an alert meaningless if it is based purely on scope exhaustion.

When a Windows DHCP server refuses a lease due to “scope exhaustion,” it writes a specific record to the DHCP audit log. The record for this event is Event ID 14, “a lease request could not be satisfied because the scope’s address pool was exhausted.” The record itself has limited value unless the records are continuously observed and tallied for occurrences and patterns. Monitoring for this scenario is a challenge when there is a globally scaled DHCP service with hundreds of Windows Server DHCP failover relationships, meaning  two servers per relationship. And moreover when the service has thousands of scopes and millions of IP addresses. Here’s where the Kinesis Agent for Windows provides quite an advantage.

The DHCP Server log files are in a default C:\Windows\System32\dhcp directory. The IPv4 log file names are prefixed with DhcpSrvLog. The IPv6 log file names are prefixed with DhcpV6SrvLog. This post focuses on the IPv4 logs.

The log files have a header at the top of each file that defines and describes the potential Event IDs. It also shows the order of the comma-separated values in each log record. For a better idea about these logs, refer to the ones generated by your DHCP service. For more information, see Analyze DHCP Server Log Files, which is the official documentation from Microsoft. The following is a set of example log records that include an Event 14 record, indicating that a scope is full.

24,10/19/18,00:00:18,Database Cleanup Begin,,,,,0,6,,,,,,,,,0
18,10/19/18,00:00:18,Expired,192.168.1.251,,,,0,6,,,,,,,,,0
30,10/19/18,00:00:18,DNS Update Request,192.168.1.35,TEST-SERVER.domain.com,,,0,6,,,,,,,,,0
17,10/19/18,00:00:18,DNS record not deleted,192.168.5.35,,,,0,6,,,,,,,,,0
25,10/19/18,00:00:18,0 leases expired and 5 leases deleted,,,,,0,6,,,,,,,,,0
32,10/19/18,00:00:18,DNS Update Successful,192.168.1.35,TEST-SERVER.domain.com,,,0,6,,,,,,,,,0
14,10/19/18,00:00:19,Scope Full,192.168.3.10,,,,0,6,,,,,,,,,0
11,10/19/18,00:00:20,Renew,192.168.2.105,,00AABBCCDDEE,,1584371322,0,,,,0x506F6C79636F6D2D53504950333335,Polycom,,,,0
36,10/19/18,00:00:25,Packet dropped because of Client ID hash mismatch or standby server.,192.168.1.100,,EEDDCCBBAA00,,0,6,,,,,,,,,0

Important –  Windows DHCP Server logs do not include data to distinguish which server they were generated on, which is important information when analyzing an aggregated dataset for full scope events. The Kinesis Agent for Windows includes a feature called ObjectDecoration. It allows custom values to be injected into each log record. Values can be hardcoded or dynamically created from environment variables, such as ComputerName, located on each computer. In this use case, there are multiple DHCP failover relationships which metrics are posted for. So the values of the failover relationship names are interpreted from the server hostnames. An example hostname is: DHCP-<FailoverRelatioshipName>-nn. The appsettings.json file contents used for configuring Kinesis Agent for Windows is provided in this post, and includes ObjectDecoration used in this implementation.

Collecting, storing, and analyzing DHCP server logs

This section guides you through setting up the AWS serverless infrastructure to collect, store and analyze Windows DHCP server logs. You first define two workflows: log ingestion and log processing.

Log Ingestion

Kinesis Agent for Windows detects new records in near real-time and sends them to an Amazon Kinesis Data Firehose delivery stream. The delivery stream is configured to send batches of data, on a time interval or a size limit, in a compressed format to an Amazon S3 bucket.

Log Processing

The S3 bucket is configured to send Amazon S3 Event Notifications to an Amazon Simple Notification Service (Amazon SNS) topic. An AWS Lambda function is subscribed to the SNS topic. When it is triggered, it gets the object from S3, and decompresses and processes the log data. It then posts an Amazon CloudWatch metric for every Event ID 14 found.

Note: By triggering from the S3 bucket to an Amazon SNS topic, other consumers of the data can subscribe to the same SNS topic. Also, you can filter out records other than Event 14 directly on the Kinesis Agent for Windows using the RegexFilterPipe feature, if you do not want to collect them. Additionally, you can configure Kinesis Agent for Windows to “pipe” another stream of the same log data or its filtered subset to a separate, dedicated, Kinesis Data Firehose, or other destination supported by the sink declarations for Kinesis Agent for Windows.

Build the log ingestion infrastructure

The next step is to create the AWS resources. Because the log source is a Windows environment, we use PowerShell to automate the build process. The AWS Tools for PowerShell provides cmdlets for developing and managing infrastructure within the AWS ecosystem.

For help with the log ingestion path, see this article about Using Amazon Kinesis Firehose that shows how to automate building the Amazon S3 bucket and Amazon Kinesis Firehose delivery stream. The Kinesis Agent for Windows setup part of this path is covered after the infrastructure components are built.

After creating your S3 bucket and Kinesis Firehose delivery stream, do the following to add the additional configurations:

  • Enable a bucket lifecycle policy to archive data to Amazon Glacier. This helps to minimize storage costs.
  • Compress the data prior to transmitting to Amazon S3. This helps to minimize storage costs.
  • Batch the data prior to transmitting to Amazon S3. This batches the most data with the shortest delivery time.
  • Include a S3 bucket prefix for the log destination. This helps to separate logs with different schemas.
  • Enable CloudWatch logging on the delivery stream. This helps to help troubleshoot any potential problems with ingestion.

Here is a code sample for making these configurations.

# Get existing objects
$s3BucketName = '<s3-bucket-name>'
$firehoseDeliveryStreamName = '<delivery-stream-name>'
$roleName = '<role-name>'

$s3Bucket = Get-S3Bucket -BucketName $s3BucketName
$firehoseDeliveryStream = Get-KINFDeliveryStream -DeliveryStreamName $firehoseDeliveryStreamName
$iamRole = Get-IAMRole -RoleName $roleName

# Enable S3 Bucket archival to Glacier
$s3BucketLifecycleRuleId = 'Archive to Glacier after 14 days'
$s3BucketLifecycleRuleTransition = New-Object Amazon.S3.Model.LifecycleTransition
$s3BucketLifecycleRuleTransition.Days = 14
$s3BucketLifecycleRuleTransition.StorageClass = 'GLACIER'

$s3BucketLifecycleRule = New-Object Amazon.S3.Model.LifecycleRule
$s3BucketLifecycleRule.Id = $s3BucketLifecycleRuleId
$s3BucketLifecycleRule.Status = 'Enabled'
$s3BucketLifecycleRule.Transition = $s3BucketLifecycleRuleTransition
$s3BucketLifecycleRule.Prefix = $null

Write-S3LifecycleConfiguration –BucketName $s3Bucket.BucketName -Configuration_Rule $s3BucketLifecycleRule

# Enable CW Logging options including creation of a log group and a log stream 
$logGroupName = "/aws/kinesisfirehose/$firehoseDeliveryStreamName"
$logStreamName = 'S3Delivery'

# Create CloudWatch LogGroup and LogStream
New-CWLLogGroup -LogGroupName $logGroupName
New-CWLLogStream -LogGroupName $logGroupName -LogStreamName $logStreamName

# Define Kinesis Firehose Logging Options
$loggingOptions = New-Object Amazon.KinesisFirehose.Model.CloudWatchLoggingOptions
$loggingOptions.Enabled = $true
$loggingOptions.LogGroupName = $logGroupName
$loggingOptions.LogStreamName = $logStreamName

# Define Buffering hints object for traffic between Delivery Stream and S3
$bufferingHints = New-Object Amazon.KinesisFirehose.Model.BufferingHints
$bufferingHints.IntervalInSeconds = 60
$bufferingHints.SizeInMBs = 128

# Define Kinesis Firehose S3 Destination Update
$s3Destination = New-Object Amazon.KinesisFirehose.Model.ExtendedS3DestinationUpdate
$s3Destination.BucketARN = 'arn:aws:s3:::{0}' -f $s3Bucket.BucketName
$s3Destination.RoleARN = $iamRole.Arn
$s3Destination.CompressionFormat = 'GZIP'
$s3Destination.BufferingHints = $bufferingHints
$s3Destination.Prefix = 'DHCPServerLogs'
$s3Destination.CloudWatchLoggingOptions = $loggingOptions

# Update the Kinesis Firehose Delivery Stream
$kinfdUpdateParams = @{
    CurrentDeliveryStreamVersionId   = $firehoseDeliveryStream.VersionId
    DeliveryStreamName               = $firehoseDeliveryStreamName
    DestinationId                    = $firehoseDeliveryStream.Destinations.DestinationId
    ExtendedS3DestinationUpdate      = $s3Destination
}
Update-KINFDestination @kinfdUpdateParams

Build the log processing infrastructure

# Create the SNS topic. The ARN of the created topic is returned.
$snsTopicName = 'DHCPLogLambdaNotifier'
$topicArn = New-SNSTopic -Name $snsTopicName

# Give S3 permissions to send notifications to the Topic
$topicPolicy = @"
{
    "Version": "2008-10-17",
    "Id": "__default_policy_ID",
    "Statement": [
        {
            "Sid": "S3Publish",
            "Effect": "Allow",
            "Principal": {
                "AWS": "*"
            },
            "Action": [
                "SNS:Publish"
            ],
            "Resource": "$topicArn",
            "Condition": {
                "StringEquals": {
                    "aws:SourceArn": "arn:aws:s3:::$s3BucketName"
                }
            }
        }
    ]
}
"@
Set-SNSTopicAttribute -TopicArn $topicArn -AttributeName 'Policy' -AttributeValue $topicPolicy

# Configure S3 Bucket SNS Topic Event Notification
$s3BucketTopicConfigurationFilterRule = New-Object Amazon.S3.Model.FilterRule
$s3BucketTopicConfigurationFilterRule.Name = 'Prefix'
$s3BucketTopicConfigurationFilterRule.Value = 'DHCPServerLogs/'

$s3BucketTopicConfigurationS3KeyFilter = New-Object Amazon.S3.Model.S3KeyFilter
$s3BucketTopicConfigurationS3KeyFilter.FilterRules = $s3BucketTopicConfigurationFilterRule

$s3BucketTopicConfigurationFilter = New-Object Amazon.S3.Model.Filter
$s3BucketTopicConfigurationFilter.S3KeyFilter = $s3BucketTopicConfigurationS3KeyFilter

$s3BucketTopicConfiguration = New-Object Amazon.S3.Model.TopicConfiguration
$s3BucketTopicConfiguration.Id = 'NotifySNS'
$s3BucketTopicConfiguration.Topic = $topicArn
$s3BucketTopicConfiguration.Filter = $s3BucketTopicConfigurationFilter
$s3BucketTopicConfiguration.Events = New-Object Amazon.S3.EventType 's3:ObjectCreated:*'
Write-S3BucketNotification -BucketName $s3BucketName -TopicConfiguration $s3BucketTopicConfiguration

# Create IAM policy and add AssumeRole policy for Lambda
$lambdaPolicy = @"
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatch",
            "Effect": "Allow",
            "Action": [
                  "cloudwatch:PutMetricData",
                  "cloudwatch:GetMetricData",
                  "cloudwatch:GetMetricStatistics",
                  "cloudwatch:ListMetrics"
            ],
            "Resource": "*"
        },
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                  "s3:ListBucket",
                  "s3:GetObject",
                  "s3:GetBucketLocation",
                  "s3:GetBucketNotification"
            ],
            "Resource": [
                  "arn:aws:s3:::$s3BucketName",
                  "arn:aws:s3:::$s3BucketName/*"
            ]
        },
        {
            "Sid": "Log",
            "Effect": "Allow",
            "Action": [
                  "logs:CreateLogStream",
                  "logs:PutLogEvents"
            ],
            "Resource": [
                  "arn:aws:logs:*:*:*"
            ]
        }
    ]
}
"@

$iamRoleName = 'LambdaDHCPLogProcessor'
New-IAMPolicy -PolicyName $iamRoleName -PolicyDocument $lambdaPolicy
$assumeRolePolicy = @"
{
    "Version": "2012-10-17",
    "Statement": [
    {
        "Sid": "LambdaAssumeRole",
        "Effect": "Allow",
        "Principal": {
            "Service": "lambda.amazonaws.com"
        },
        "Action": [
            "sts:AssumeRole"
        ]
    }]
}
"@
$lambdaIamRole = New-IAMRole -RoleName $iamRoleName -AssumeRolePolicyDocument $assumeRolePolicy
$iamPolicy = Get-IAMPolicies | Where-Object {$_.PolicyName -eq $iamRoleName}
Register-IAMRolePolicy -RoleName $iamRoleName -PolicyArn $iamPolicy.Arn

# Create the Lambda Function
$lambdaFunctionParams = @{
        Description = 'For DHCP lease refusal metric posting'
        FunctionName = 'LeaseRefusalMetrics'
        ZipFilename = '.\LeaseRefusalMetrics.zip'
        Handler = 'LeaseRefusalMetrics.lambda_handler'
        Role = $lambdaIamRole.Arn
        Runtime = 'python3.6'
}
$lambdaFunction = Publish-LMFunction @lambdaFunctionParams

# Subscribe the Lambda Function to the SNS topic
$snsSubscriptionArn = Connect-SNSNotification -TopicARN $topicArn -Protocol Lambda -Endpoint $lambdaFunction.FunctionArn

# Add permission to the Lambda Function's policy so SNS can invoke it
Add-LMPermission -FunctionName $lambdaFunctionParams.FunctionName -Action "lambda:Invoke" -Principal sns.amazonaws.com -SourceArn $topicArn -StatementId (Get-Random)

Lambda function for processing logs and creating metrics

The Lambda function is where the log data is actually inspected for identifying Scope Full events and posting metrics. The code was originally written in Python. However, it could be converted to run with the recently released PowerShell language support in AWS Lambda. This script should be named LeaseRefusalMetrics.py. It should be in the contents of the LeaseRefusalMetrics.zip file as indicated in the buildout process shown in the previous section. The workflow is as follows:

  • Get and decompress the S3 object.
  • Read the S3 object data in as a byte stream and break it into individual records.
  • Scan the data for Event 14 (scope exhausted).
  • Break apart each comma-separated value from the Event 14 record.
  • Parse out values for the metric dimensions.
  • Post metric data to CloudWatch, resulting in three metrics.
from __future__ import print_function
import json
import urllib.parse
from io import BytesIO
from gzip import GzipFile
import datetime
from datetime import timedelta
import re
import boto3

s3Client = boto3.client('s3')
cwClient = boto3.client('cloudwatch')

def lambda_handler(event, context):
    print(json.dumps(event))

    s3message = json.loads(event['Records'][0]['Sns']['Message'])
    print(json.dumps(s3message))

    bucket = s3message['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(s3message['Records'][0]['s3']['object']['key'], encoding='utf-8')

    # Decompress the S3 object and read data in as byte stream
    try:
        response = s3Client.get_object(Bucket=bucket, Key=key)
        bytestream = BytesIO(response['Body'].read())
        s3object_text = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e


    # Split ByteStream into individual records and search for event 14 (scope exhausted)
    # When Event 14 is found, break apart each individual CSV record to parse out values
    # for metric dimensions: dhcpFailoverRelationshipName, dhcpScopeId. Finally, post a metric to CloudWatch.
    # Sample record: "DHCP-FailoverRelationshipName-01:::14,11/06/18,03:04:00,Scope Full,192.168.243.0,,,,0,6,,,,,,,,,0"

    records = s3object_text.splitlines()
    for record in records:
        recordValues = ""
        hostTemp = ""
        temp = ""
        date = ""
        time = ""
        dhcpFailoverRelationshipNameTemp = ""
        dhcpFailoverRelationshipName = ""
        dhcpHostName = ""
        dhcpScopeId = ""

        if re.match(r'^DHCP-.*:::14,', record, re.IGNORECASE):
            print("DHCP Full Scope Event: " + record)
            recordValues = record.split(',')
            hostTemp = recordValues[0]
            dhcpScopeId = str(recordValues[4])

            eventDate = str(recordValues[1]).replace("/","-")
            eventTime = recordValues[2]
            strDateTime = eventDate + " " + eventTime
            eventDTime = datetime.datetime.strptime(strDateTime, "%m-%d-%y %H:%M:%S")

            temp = hostTemp.split(':::')
            dhcpHostName = temp[0]
            dhcpFailoverRelationshipNameTemp = dhcpHostName.split('-')
            dhcpFailoverRelationshipName = dhcpFailoverRelationshipNameTemp[1]

            response = cwClient.put_metric_data(
                Namespace='DHCPService',
                MetricData=[
                    {
                        'MetricName': 'ScopeFullLeaseRefusal',
                        'Dimensions': [
                            {
                                'Name': 'DHCPFailoverRelationshipName',
                                'Value': dhcpFailoverRelationshipName
                            },
                            {
                                'Name': 'DHCPScopeId',
                                'Value': dhcpScopeId
                            }
                        ],
                        'Timestamp': eventDTime,
                        'Value': 1,
                        'Unit': 'Count',
                        'StorageResolution': 60
                    },
                    {
                        'MetricName': 'ScopeFullLeaseRefusal',
                        'Dimensions': [
                            {
                                'Name': 'DHCPFailoverRelationshipName',
                                'Value': dhcpFailoverRelationshipName
                            }
                        ],
                        'Timestamp': strDateTime,
                        'Value': 1,
                        'Unit': 'Count',
                        'StorageResolution': 60
                    },
                    {
                        'MetricName': 'AggregateLeaseRefusal',
                        'Dimensions': [
                            {
                                'Name': 'ScopeFullLeaseRefusal',
                                'Value': 'ScopeFullLeaseRefusal'
                            }
                        ],
                        'Timestamp': strDateTime,
                        'Value': 1,
                        'Unit': 'Count',
                        'StorageResolution': 60
                    }
                ]
            )

            print("CW.Put_Metric Response: ", response)

Configure Kinesis Agent for Windows to send logs

Next, configure Kinesis Agent for Windows to start collecting log data so it can be processed. Follow the Installing Kinesis Agent for Windows guide to install and configure agents. The appsettings.json file contents used by the agents are shown below, along with some comments that are particular to this implementation.

{
    "Sources": [
        {
            "Id": "DHCPServerLog",
            "SourceType": "DirectorySource",
            "Directory": "C:\\Windows\\System32\\dhcp",
            "FileNameFilter": "Dhcp*SrvLog-*.log", <--------------- Only capture specific file names
            "InitialPosition": "Bookmark",
            "RecordParser": "SingleLine" <------------------------- Indicates that each distinct record is line delimited
        }
    ],
    "Pipes": [
        {
            "Id": "DHCPServerLog2Firehose",
            "SourceRef": "DHCPServerLog",
            "SinkRef": "DHCPServerLogsToFirehose",
            "Type": "RegexFilterPipe",
            "FilterPattern": "^\\d{2},.*" <------------------------ Indicates to only collect records that begin with two 
        }                                                           digits followed by a comma. Recall our log file has a
    ],                                                              header which we don't need to collect. 
    "Sinks": [
        {
            "Id": "DHCPServerLogsToFirehose",
            "SinkType": "KinesisFirehose",
            "StreamName": "DHCPServerLogs",
            "TextDecoration": "{ComputerName}:::{_record}" <------- Decorate each log record by pre-pending the hostname 
        }                                                           and the following custom delimiter ':::'
    ]
}

View the log data

The following is an efficient way to view the ingested data in Amazon S3:

  1. Sign in to the AWS Management Console and open the Amazon S3 console. Go to the S3 bucket that the Kinesis Data Firehose delivery streams are streaming to, and choose an object.
  2. Choose the Select from
  3. Under File format, choose CSV, and then choose Show file preview.

You should see DHCP logs appear in the preview text box. It includes the ObjectDecoration that was applied to the Kinesis Agent for Windows configuration:

Log processing output

The following set of metrics are populated in CloudWatch as a result of the log processing:

  • An aggregate lease refusal metric of all DHCP failover relationships globally.
  • A metric graph that shows lease refusals for each DHCP failover relationship.
  • A metric graph for lease refusals for each scope, with the owning DHCP failover relationship.

View the graphs in CloudWatch Metrics

This section shows how to view the graphs that the Lambda function populated in CloudWatch metrics.  This presumes there are Event 14 records that have been processed.

Note: You can inject a test Event 14 record into your DHCP log file. For more information, see Using Amazon CloudWatch Metrics or, more specifically, how to View Available Metrics.

To view the graphs:

  1. Sign in to the Amazon CloudWatch console.
  2. In the navigation pane, choose Metrics, and browse to the DHCPService namespace that the Lambda function populated. In the namespace, you should see a set of three metric dimensions that were also created (“DHCPScopeId, DHCPFailoverRelationship“, “DHCPFailoverRelationship“, and “ScopeFullLeaseRefusal“).
  3. You can drill down further into any one of the metric dimensions to view the graphs. Choose the ScopeFullLeaseRefusal metric dimension. There should be one metric present, AggregateLeaseRefusal, which is the aggregate of all lease refusals, globally.
  4. Choose the metric to view the data points.

Here, the graphs for each of our metric dimensions have been added to a CloudWatch dashboard for an overall snapshot. For more information about how to accomplish this, see Using Amazon CloudWatch Dashboards. The following graphs show one failover relationship name (ABC).

The graphs show that a particular DHCP scope (192.168.4.0) in the ABC failover relationship has a high volume of lease refusals as a result of scope exhaustion. In fact, it can be as many as 335 lease refusals per hour at one point in time.

Another observation is that the lines in the metric graphs break at certain times, and also never return to zero. This is because a value is posted only when Event 14 is detected. A zero is not posted at times when Event 14 is not detected. This simplifies the Lambda function and eliminates the need to make determinations at runtime about whether to post zeros. If there are a large number of failover relationships or scopes, the additional data visualization can have the effect of muddying the waters. Because the only concern is seeing when, where, and how many lease refusals are occurring, these metric graphs work efficiently and serve their purpose.

Conclusion

Using Kinesis Agent for Windows, our disparate DHCP server log data has been transformed into actionable CloudWatch metrics. This gives us the ability to identify DHCP scope exhaustion and the resulting customer impact. We can then work with the network engineering team to expand certain DHCP scopes with additional IP addresses to improve the overall customer experience.

The Amazon Kinesis Agent for Microsoft Windows makes your job more efficient. It speeds up results by letting you focus on building your solution. You don’t get bogged down with implementing solutions to collect and store log data. We hope that this use case sparked your curiosity to use Amazon Kinesis Agent for Microsoft Windows for more creative and sophisticated purposes, especially when taking advantage of the full breadth of AWS services. There are so many possibilities when using Amazon Kinesis Agent for Microsoft Windows.

Additional Resources

 


About the Author

Ted Balsimo is a Systems Development Engineer at Amazon Web Services.

Collect, parse, transform, and stream Windows events, logs, and metrics using Amazon Kinesis Agent for Microsoft Windows

Post Syndicated from Harvir Singh original https://aws.amazon.com/blogs/big-data/collect-parse-transform-and-stream-windows-events-logs-and-metrics-using-amazon-kinesis-agent-for-microsoft-windows/

A complete data pipeline that includes Amazon Kinesis Agent for Microsoft Windows (KA4W) can help you analyze and monitor the performance, security, and availability of Windows-based services. You can build near-real-time dashboards and alarms for your Windows services. You can also use visualization and business intelligence tools such as Amazon Athena, Kibana, Amazon QuickSight, and Amazon CloudWatch to rapidly locate, diagnose, and resolve operational and security issues.

KA4W eliminates cloud-based log processing by parsing and transforming logs into standard formats such as JSON. These formats can then be immediately consumed by the visualization and business intelligence tools in the data pipeline.

Here are a few words from one of our customers summarizing their experience with KA4W:

“The new Amazon Kinesis Agent for Microsoft Windows has simplified our workflow for streaming logs by eliminating complicated orchestration between multiple interconnected systems. The agent was easy to set up, configure, update, and most importantly performs significantly better. In all, the Amazon Kinesis Agent for Microsoft Windows has the potential to markedly improve visibility of issues in our environment and reduce operational cost.”– Sanjay Kumar, senior software engineer at Autodesk Inc.

In this post, we review how the new Kinesis Agent for Windows enables streaming analytics use cases related to Windows applications, servers, and workstations. We also show you how to get started with the new agent. By using KA4W to push real-time data into Amazon Kinesis services, you can solve a wide range of operational issues, including the following:

  • Monitoring of Dynamic Host Configuration Protocol (DHCP) servers for identifying IP lease refusals in case of exhausted scopes
  • Monitoring of Microsoft Exchange servers for identifying top email senders, mail-storm situations, and heavy load conditions
  • Monitoring of web-based application and Internet Information Services (IIS) logs for performance, availability, and security issues
  • Monitoring of domain controllers for Active Directory and security issues
  • Enabling security intelligence platforms to ingest log files for forensics and penetration testing

Overview of Kinesis Agent for Windows

Amazon Kinesis Agent for Microsoft Windows (KA4W) is a configurable and extensible agent. It runs on Windows laptops, desktop computers, and servers, either on-premises or in the AWS Cloud. KA4W efficiently and reliably gathers, parses, transforms, and streams logs, events, and metrics to various AWS services, including Kinesis Data Streams, Kinesis Data Firehose, CloudWatch, and CloudWatch Logs.

Kinesis Agent for Windows provides built-in parsers that simplify the processing of logs from common Windows services such as Exchange, Active Directory, DHCP, Microsoft SharePoint, and Windows security logs. It solves many technical and operational challenges for streaming logs, events, and metrics to AWS services, including the following:

  • Handling large volume of log files, processing and transforming logs and events from many different sources and formats in near-real time
  • Handling different kinds of log rotation approaches and accessing log files even when those logs files are locked by log writers
  • Reducing data transfer and storage costs by filtering unnecessary data before delivering to AWS services
  • Adding context to the collected data that enables precise analysis and rapid resolution of operational and security issues
  • Providing data about the health of the agent itself, which confirms the accuracy and completeness of the data collected and streamed

The following diagram illustrates some of the ways you can build custom, real-time data pipelines by using Kinesis Agent for Windows and stream-processing frameworks.

Amazon Kinesis Agent for Microsoft Windows includes an array of plugins. By configuring these plugins, you can customize KA4W to satisfy most requirements for the collection, transformation, and near-real-time delivery of logs, events, and metrics. You can even create your own plugin if you have custom requirements. Plugins are categorized by sources, pipes, and sinks.

Sources are the plugins that gather various Windows logs, events, and metrics. KA4W comes with multiple built-in source plugins, including the following:

  • DirectorySource
  • ExchangeLogSource
  • 3SVCLogSource
  • UlsSource (SharePoint)
  • WindowsEventLogSource
  • WindowsETWEventSource
  • WindowsPerformanceCounterSource

For more information about these sources, see Source Declarations in the Kinesis Agent for Windows User Guide.

Pipes connect sources and sinks. You can use pipes to filter unnecessary data to improve data transfer and reduce storage and API usage cost. Filters improve data quality and provide an effective way to restrict the amount of data for analysis. For details on configuring pipes, see Pipe Declarations in the Kinesis Agent for Windows User Guide.

Sinks are the plugins that stream logs, event, and metrics data to different AWS services. Kinesis Agent for Windows comes with multiple built-in sink plugins, such as KinesisStream, KinesisFirehose, CloudWatch, and CloudWatchLogs. Kinesis Agent for Windows needs to authenticate with AWS services to send data. For details on sinks and security configuration, see Sink Declarations in the Kinesis Agent for Windows User Guide.

Monitoring a web server example

Suppose that you have a web-based application and want to monitor the underlying web server. To do so, follow these steps to build the data pipeline that is required to move Windows events and web server logs to Kinesis Data Firehose and Amazon S3 for analytics:

  1. Install Kinesis Agent for Windows
  2. Create the streams
  3. Set the permissions
  4. Configure Kinesis Agent for Windows
  5. Start Kinesis Agent for Windows
  6. View the ingested logs

Install Kinesis Agent for Windows

Go to the Amazon Kinesis Agent for Microsoft Windows download page, and follow the instructions to download the agent. For example, you can run the following command in an elevated PowerShell command prompt:

Invoke-Expression ((New-Object System.Net.WebClient).DownloadString('https://s3-us-west-2.amazonaws.com/kinesis-agent-windows/downloads/InstallKinesisTap.ps1'))

Create streams

Create two Kinesis Data Firehose delivery streams named EventLogStream and W3SVCLogStream. Configure both streams to deliver data to Amazon Simple Storage Service (Amazon S3). To expedite log delivery to Amazon S3, reduce the delivery stream buffer size to 1 MB and the buffer interval to 60 seconds. This results in more frequent writes to Amazon S3.

Set permissions

If your host is an Amazon EC2 instance, the security configuration is simpler, and you only need to grant your Amazon EC2 instance role PutRecordBatch permission to the streams created. A sample IAM (security) policy looks like the following:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:region:account-id:deliverystream/delivery-stream-name"
            ]
        }
    ]
}

Configure Kinesis Agent for Windows

For this example, replace the content of “C:\Program Files\Amazon\AWSKinesisTap\appsettings.json” with the following:

{
    "Sources": [
        {
            "Id": "ApplicationLog",
            "SourceType": "WindowsEventLogSource",
            "LogName": "Application"
        },
        {
            "Id": "SecurityLog",
            "SourceType": "WindowsEventLogSource",
            "LogName": "Security"
        },
        {
            "Id": "SystemLog",
            "SourceType": "WindowsEventLogSource",
            "LogName": "System"
        },
        {
            "Id": "W3SVCLog1",
            "SourceType": "W3SVCLogSource",
            "Directory": "C:\\inetpub\\logs\\LogFiles\\W3SVC1",
            "FileNameFilter": "*.log",
            "TimeZoneKind": "UTC"
        }
    ],
    "Sinks": [
        {
            "Id": "EventLogSink",
            "SinkType": "KinesisFirehose",
            "StreamName": "EventLogStream",
            "Format": "json"
        },
        {
            "Id": "W3SVCLogSink",
            "SinkType": "KinesisFirehose",
            "Region": "us-west-2",
            "StreamName": "W3SVCLogStream",
            "Format": "json"
        }
    ],
    "Pipes": [
        {
            "Id": "ApplicationLogToFirehose",
            "SourceRef": "ApplicationLog",
            "SinkRef": "EventLogSink"
        },
        {
            "Id": "SecurityLogToFirehose",
            "SourceRef": "SecurityLog",
            "SinkRef": "EventLogSink"
        },
        {
            "Id": "SystemLogToFirehose",
            "SourceRef": "SystemLog",
            "SinkRef": "EventLogSink"
        },
        {
            "Id": "W3SVCLog1ToKinesisStream",
            "SourceRef": "W3SVCLog1",
            "SinkRef": "W3SVCLogSink"
        }
    ]
}

In the preceding configuration file, there are four sources. The first three are for different Windows Event Logs, and the last one is for IIS (W3SVC) logs. Typically IIS writes logs to the C:\inetpub\logs\LogFiles\W3SVC1 directory.

There are two KinesisFirehose sinks: EventLogSink sends data to EventLogStream, and W3SVCLogSink sends data to W3SVCLogStream. For both sinks, you set the Format key-value pair to json. This instructs the agent to send logs in JSON-formatted data to Kinesis Data Firehose.

The first three pipes connect the first three sources to the EventLogSink, and the last pipe connects the last source to the W3SVCLogSink.

In this example, authentication and authorization are based on the instance profile that is associated with the Amazon EC2 instance that is running Kinesis Agent for Windows. There are other approaches to authentication and authorization. For more information, see “Sink Security Configuration” in Sink Declarations in the Kinesis Agent for Windows User Guide.

Start Kinesis Agent for Windows

Note: During the development of Kinesis Agent for Windows, the internal name was “AWSKinesisTap”. To maintain backward compatibility, we have maintained this terminology within the agent’s configuration.

There are a few ways to start the agent:

  1. Start the agent from the services applet.
  2. Start the agent from an elevated command prompt and run net start AWSKinesisTap.
  3. Start the agent from an elevated PowerShell command prompt and run Start-Service -Name AWSKinesisTap.

If Kinesis Agent for Windows does not start, check the application event log. If the agent starts, you can find the logs in “C:\Program Data\Amazon\AWSKinesisTap\logs”.

If you run into difficulty, see Troubleshooting in the Kinesis Agent for Windows User Guide.

View the log data

Here is a quick and easy way to view the ingested data in Amazon S3:

  1. Sign in to the AWS Management Console, and open the Amazon S3 console. Go to the S3 bucket that the Kinesis Data Firehose delivery streams are streaming to, and choose an object.
  2. Choose the Select from
  3. Under File format, choose JSON, and then choose Show file preview.

The following shows an example for a Windows Event Log:

The Amazon Kinesis Agent for Microsoft Windows User Guide contains a tutorial that demonstrates a powerful scenario using Amazon Athena to query the data in Amazon S3.

Cost

Amazon Kinesis Agent for Microsoft Windows is free to use. However, you pay for the AWS resources that interact with your complete data pipeline, such as Kinesis Data Streams, Kinesis Data Firehose, and Amazon S3.

Performance

Amazon Kinesis Agent for Microsoft Windows consumes a minimal amount of system resources. Memory and CPU usage can vary based on the amount of data streaming to AWS services and the virtual or physical hardware configuration of the machines hosting Kinesis Agent for Windows.

Summary

Amazon Kinesis Agent for Microsoft Windows streams logs, events, and metrics to AWS services. It is a key part of constructing an efficient, reliable, and cost-effective data pipeline for discovering, preventing, and resolving complex operational and security issues with Windows desktop and server machines. The example in this post demonstrates the simplicity of configuring a custom-tailored solution for gathering and streaming operational data from a web server host. Kinesis Agent for Windows allows you to choose the right AWS services for your scenarios to construct data pipelines and gain deeper insight into your operational challenges.

Going further

Kinesis Agent for Windows is very flexible and has many additional features. Here are a few suggestions for next steps:

  • Review the many configuration examples in the Kinesis Agent for Windows User Guide.
  • Learn about DirectorySource, a flexible source plugin that parses many types of text-based logs stored in the Windows file system. For details on supported parsers, see Source Declarations.
  • View the configuration file, which has variables support to allow the agent to get information from the environment variables and Amazon EC2 metadata.
  • Decorate the logs using TextDecoration and ObjectDecoration.
  • Configure Kinesis Agent for Windows to update itself and its configuration file. For more information, see Configuring Automatic Updates.

Additional resources


About the authors

Harvir Singh is a Software Development Manager for Amazon Kinesis Agent for Microsoft Windows.

 

 

 

 

Li Chen is a Senior Software Development Engineer for Amazon Kinesis Agent for Microsoft Windows.

 

 

 

 

Bonnie Feinberg is a Senior Software Development Engineer for Amazon Kinesis Agent for Microsoft Windows.

Analyze and visualize your VPC network traffic using Amazon Kinesis and Amazon Athena

Post Syndicated from Allan MacInnis original https://aws.amazon.com/blogs/big-data/analyze-and-visualize-your-vpc-network-traffic-using-amazon-kinesis-and-amazon-athena/

Network log analysis is a common practice in many organizations.  By capturing and analyzing network logs, you can learn how devices on your network are communicating with each other, and the internet.  There are many reasons for performing log analysis, such as audit and compliance, system troubleshooting, or security forensics.  Within an Amazon Virtual Private Cloud (VPC), you can capture network flows with VPC Flow Logs.  You can create a flow log for a VPC, a subnet, or a network interface.  If you create a flow log for a subnet or VPC, each network interface in the VPC or subnet is monitored. Flow log data is published to a log group in Amazon CloudWatch Logs, and each network interface has a unique log stream.

CloudWatch Logs provides some great tools to get insights into this log data.  However, in most cases, you want to efficiently archive the log data to S3 and query it using SQL.  This provides more flexibility and control over log retention and the analysis you want to perform.  But also, you often want the ability to obtain near real-time insights into that log data by performing analysis automatically, soon after the log data has been generated.  And, you want to visualize certain network characteristics on a dashboard so you can more clearly understand the network traffic within your VPC.  So how can you accomplish both efficient log archival to S3, real-time network analysis, and data visualization?  This can be accomplished by combining several capabilities of CloudWatch, Amazon Kinesis, AWS Glue, and Amazon Athena, but setting up this solution and configuring all the services can be daunting.

In this blog post, we describe the complete solution for collecting, analyzing, and visualizing VPC flow log data.  In addition, we created a single AWS CloudFormation template that lets you efficiently deploy this solution into your own account.

Solution overview

This section describes the overall architecture and each step of this solution.

We want the ability to query the flow log data in a one-time, or ad hoc, fashion. We also want to analyze it in near real time. So our flow log data takes two paths through the solution.  For ad hoc queries, we use Amazon Athena.  By using Athena, you can use standard SQL to query data that has been written to S3.  An Athena best practice to improve query performance and reduce cost is to store data in a columnar format such as Apache Parquet.  This solution uses Kinesis Data Firehose’s record format conversion feature to convert the flow log data to Parquet before it writes the files to S3. Converting the data into a compressed, columnar format lowers your cost and improves query performance by enabling Athena to scan less data from S3 when executing your queries.

By streaming the data to Kinesis Data Firehose from CloudWatch logs, we have enabled our second path for near real-time analysis on the flow log data.  Kinesis Data Analytics is used to analyze the log data as soon as it is delivered to Kinesis Data Firehose.  The Analytics application aggregates key data from the flow logs and creates custom CloudWatch metrics that are used to drive a near real-time CloudWatch dashboard.

Let’s review each step in detail.

1.  VPC Flow Logs

The VPC Flow Logs feature contains the network flows in a VPC.  In this solution, it is assumed that you want to capture all network traffic within a single VPC.  By using the CloudFormation template, and you can define the VPC you want to capture.  Each line in the flow log contains space-delimited information about the packets traversing the network between two entities, which are source and destination.  The log line contains details including the source and destination IP addresses and ports, the number of packets, and the action taken on that data. Examples of the action taken would be whether it was accepted or rejected.  Here’s an example of a typical flow log:

2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK

For more information about each item in the line, see Flow Log Records.  Note that VPC flow logs buffer for about 10 minutes before they’re delivered to CloudWatch Logs.

2.  Stream to Kinesis Data Firehose

By creating a CloudWatch Logs subscription, our flow logs can automatically be streamed when they arrive in CloudWatch Logs.  This solution’s subscription filter uses Kinesis Data Firehose as its destination.  Kinesis Data Firehose is the most effective way to load streaming data into data stores, such as Amazon S3.  The CloudWatch Logs subscription filter has also been configured to parse the space-delimited log lines and create a structured JSON object for each line in the log.  The naming convention for each attribute in the object follow the names defined by for each element by VPC Flow Logs.  Therefore, the example log line referenced earlier streams as the following JSON record:

{
    "version": 2,
    "account-id": "123456789010",
    "interface-id": "eni-abc123de",
    "srcaddr": "172.31.16.139",
    "dstaddr": "172.31.16.21",
    "srcport": 20641,
    "dstport": 22,
    "protocol": 6,
    "packets": 20,
    "bytes": 4249,
    "start": 1418530010,
    "end": 1418530070,
    "action": "ACCEPT",
    "log-status": "OK"
}

CloudWatch Logs subscriptions sends data to the configured destination as a gzipped collection of records.  Before we can analyze the data, we must first decompress it.

3.  Decompress records with AWS Lambda

There may be situations where you want to transform or enrich streaming data before writing it to its final destination.  In this solution, we must decompress the data that is streamed from CloudWatch Logs.  With the Amazon Kinesis Data Firehose Data Transformation feature, we can decompress the data with an AWS Lambda function.  Kinesis Data Firehose manages the invocation of the function.  Inside the function, the data is decompressed and returned to Kinesis Data Firehose.  The complete source code for the Lambda function can be found here.

4.  Convert data to Apache Parquet

To take advantage of the performance capabilities in Amazon Athena, we convert the streaming data to Apache Parquet before persisting it to S3.  We use the record format conversion capabilities of Kinesis Data Firehose to perform this conversion.  When converting from JSON to Parquet, Kinesis Data Firehose must know the schema.  To accomplish this, we configure a table in the Glue Data Catalog.  In this table, we map the attributes of our incoming JSON records to fields in the table.

5.  Persist data to Amazon S3

When using the data format conversion feature in Kinesis Data Firehose, the only supported destination is S3.  Kinesis Data Firehose buffers data for a period of time, or until a data size threshold is met, before it creates the Parquet files in S3.  In general, converting to Parquet results in effective file compression.  If the file size is too small, it isn’t optimal for Athena queries.  To maximize the file sizes created in S3, the solution has been configured to buffer for 15 minutes, or 128 MB.  However, you can adjust this configuration to meet your needs by using the Kinesis Data Firehose console.

6.  Query flow logs with SQL in Athena

In this solution, Athena uses the database and table created in the Glue Data Catalog to make your flow log data queryable.  There are sample queries to review later in this article.

7.  Analyze the network flows in near real-time with Kinesis Data Analytics

Following the data through the first six steps, the solution enables you to query flow log data using SQL in Athena.  This is great for ad hoc queries, or querying data that was generated over a long period of time.  However, to get the most out of the data, you should analyze it as soon as possible after it is generated.  To accomplish this, the solution uses Kinesis Data Analytics (KDA) to analyze the flow logs and extract some immediate insights.  Kinesis Data Analytics (KDA) enables you to query streaming data using SQL so you can get immediate insights into your data.  In this solution, the KDA application uses a Lambda function to decompress the gzipped records from Kinesis Data Firehose, and then analyzes the flow log data to create some aggregations of interest.  The KDA application creates the following aggregations:

  • A count of rejected TCP packets, every 15 minutes.
  • A count of rejected TCP packets by protocol, every 15 minutes.

These metrics are aggregated over a 15-minute window.  At the end of the window, KDA invokes a Lambda function, passing the aggregated values as input to the function.

8.  Write the aggregations as custom CloudWatch metrics

At the end of the 15-minute window, KDA invokes a Lambda function, passing the aggregated values.  The Lambda function writes these values to CloudWatch as custom metrics. This enables the solution to support alarms on those metrics using CloudWatch alarms, and it enables custom dashboards to be created from the metrics.

9.  View the aggregated data in CloudWatch dashboards

CloudWatch dashboards are customizable home pages in the CloudWatch console for monitoring your resources in a single view.  You can use CloudWatch dashboards to create customized views of the metrics and alarms for your AWS resources. In this solution, we create a dashboard that monitors the custom aggregations created in our KDA application. The solution creates a sample dashboard to get you started, but you should review the metrics and create a dashboard and alarms to meet your needs.

Deploying the solution

To deploy this solution into your own account, you use the CloudFormation template to create the stack. You can deploy the solution stack into the following AWS Regions: US East (N. Virginia), US West (Oregon), and EU (Ireland).  To deploy, choose the link for the Region where you want to deploy.  The CloudFormation console for that Region opens, and the template URL is pre-populated:

Deploy the solution in:

US East (N. Virginia)

The Create Stack wizard for CloudFormation will be opened.  The template location is pre-populated.  Click Next, and you will prompted to provide values for several template parameters.

Let’s review what each parameter represents:

  • Stack name — The name for this CloudFormation stack.  You can rename it from the default, but choose a short (up to 16 characters) name, and ensure your name uses only lower-case letters.  The value you use here will be used as a prefix in the name of many of the resources created by this stack.  By providing a short name with lower-case letters, the names for those resources will pass the resource naming rules.
  • S3BucketName — The name of the S3 bucket into which the Parquet files are delivered. This name must be globally unique.
  • VPCId — The ID of the existing VPC for which flow logs are captured.

Choose Next, and accept any defaults for the remainder of the CloudFormation wizard. The stack is created in a few minutes.

Analyze the flow log data

After the stack has been deployed, it may take up to 15 minutes before data can be queried in Athena, or viewed in the CloudWatch dashboard.  Let’s look at a few sample queries you can run in Athena to learn more about the network traffic within your VPC.

Navigate to the Athena console in the Region where you deployed the stack.  In the console, choose the database named “vpc_flow_logs”.  Notice that this database contains one table, named “flow_logs.”  Run the following query to see which protocol is being rejected the most within your VPC:

select protocol, sum(packets) as rejected_packets
from flow_logs
where action = 'REJECT'
group by protocol
order by rejected_packets desc

Your results should look similar to the following example

This example shows that the value for the protocol box follows the standard defined by the Internet Assigned Numbers Authority (IANA).  So in the previous example, the top two rejected protocols are TCP and ICMP.

Here are a few additional queries to help you understand the network traffic in your VPC:

Identify the top 10 IP addresses from which packets were rejected in the past 2 weeks:

SELECT
	srcaddr,
	SUM(packets) AS rejected_packets
FROM flow_logs
WHERE start >= current_timestamp - interval '14' day
GROUP BY srcaddr
ORDER BY rejected_packets DESC
LIMIT 10;

Identify the top 10 servers that are receiving the highest number of HTTPS requests:

SELECT
	dstaddr,
	SUM(packets) AS packet_count
FROM flow_logs
WHERE dstport = '443'
GROUP BY dstaddr
ORDER BY packet_count DESC
LIMIT 10;

Now let’s look at the analysis we’re performing in real time with Kinesis Data Analytics.  By default, the solution creates a dashboard named “VPC-Flow-Log-Analysis.”  On this dashboard, we’ve created a few default widgets.  The aggregate data being generated by KDA is plotted in a few charts, as shown in the following example:

This example shows that the Rejected Packets per Protocol chart has been created to plot only a subset of all possible protocols.  Modify this widget to show the protocols that are relevant for your environment.

Next steps

The solution outlined in this blog post provides an efficient way to get started with analyzing VPC Flow Logs.  To get the most out of this solution, consider these next steps:

  • Create partitions in the Glue table to help optimize Athena query performance. The current solution creates data in S3 partitioned by Y/M/D/H, however these S3 prefixes are not automatically mapped to Glue partitions.  This means that Athena queries scan all Parquet files.  As the volume of data grows, the Athena query performance degrades.  For more information about partitioning and Athena tuning, see Top 10 Performance Tuning Tips for Amazon Athena.
  • Apply the solution to additional VPCs, or in different regions. If your account contains multiple VPCs, or if your infrastructure is deployed in multiple Regions, you must create the stack in those Regions.  If you have multiple VPCs within the same Region, you can create a new flow log for each additional VPC by using the VPC console.  Configure the flow log to deliver to the same Destination Log group that you created with the stack was initially created (CWLogGroupName parameter value in the CloudFormation template).
  • Modify the default widgets in the CloudWatch dashboard. The stack created a couple of default CloudWatch dashboards; however, you can create more to meet your needs, based on the insights you’d like to get from the flow logs in your environment.
  • Create additional queries in Athena to learn more about your network behavior.

Conclusion

Using the solution provided in this blog post, you can quickly analyze the network traffic in your VPC.  It provides both a near real-time solution, and also the capabilities to query historical data.  You can get the most out of this solution by extending it with queries and visualizations of your own to meet the needs of your system.

 


Additional Reading

If you found this post useful, be sure to check out Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift and Preprocessing Data in Amazon Kinesis Analytics with AWS Lambda.


About the Author

Allan MacInnis is a Solutions Architect at Amazon Web Services. He works with our customers to help them build streaming data solutions using Amazon Kinesis. In his spare time, he enjoys mountain biking and spending time with his family.

 

 

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 2

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-2/

In part 1 of this series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we discuss how to process and visualize the data by creating a serverless data lake that uses key analytics to create actionable data.

Create a serverless data lake and explore data using AWS Glue, Amazon Athena, and Amazon QuickSight

As we discussed in part 1, you can store heart rate data in an Amazon S3 bucket using Kinesis Data Streams. However, storing data in a repository is not enough. You also need to be able to catalog and store the associated metadata related to your repository so that you can extract the meaningful pieces for analytics.

For a serverless data lake, you can use AWS Glue, which is a fully managed data catalog and ETL (extract, transform, and load) service. AWS Glue simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. As you get your AWS Glue Data Catalog data partitioned and compressed for optimal performance, you can use Amazon Athena for the direct query to S3 data. You can then visualize the data using Amazon QuickSight.

The following diagram depicts the data lake that is created in this demonstration:

Amazon S3 now has the raw data stored from the Kinesis process. The first task is to prepare the Data Catalog and identify what data attributes are available to query and analyze. To do this task, you need to create a database in AWS Glue that will hold the table created by the AWS Glue crawler.

An AWS Glue crawler scans through the raw data available in an S3 bucket and creates a data table with a Data Catalog. You can add a scheduler to the crawler to run periodically and scan new data as required. For specific steps to create a database and crawler in AWS Glue, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3.

The following figure shows the summary screen for a crawler configuration in AWS Glue:

After configuring the crawler, choose Finish, and then choose Crawler in the navigation bar. Select the crawler that you created, and choose Run crawler.

The crawler process can take 20–60 seconds to initiate. It depends on the Data Catalog, and it creates a table in your database as defined during the crawler configuration.

You can choose the table name and explore the Data Catalog and table:

In the demonstration table details, our data has three attribute time stamps as value_time, the person’s ID as id, and the heart rate as colvalue. These attributes are identified and listed by the AWS Glue crawler. You can see other information such as the data format (text) and the record count (approx. 15,000 with each record size of 61 bytes).

You can use Athena to query the raw data. To access Athena directly from the AWS Glue console, choose the table, and then choose View data on the Actions menu, as shown following:

As noted, the data is currently in a JSON format and we haven’t partitioned it. This means that Athena continues to scan more data, which increases the query cost. The best practice is to always partition data and to convert the data into a columnar format like Apache Parquet or Apache ORC. This reduces the amount of data scans while running a query. Having fewer data scans means better query performance at a lower cost.

To accomplish this, AWS Glue generates an ETL script for you. You can schedule it to run periodically for your data processing, which removes the necessity for complex code writing. AWS Glue is a managed service that runs on top of a warm Apache Spark cluster that is managed by AWS. You can run your own script in AWS Glue or modify a script provided by AWS Glue that meets your requirements. For examples of how to build a custom script for your solution, see Providing Your Own Custom Scripts in the AWS Glue Developer Guide.

For detailed steps to create a job, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3. The following figure shows the final AWS Glue job configuration summary for this demonstration:

In this example configuration, we enabled the job bookmark, which helps AWS Glue maintain state information and prevents the reprocessing of old data. You only want to process new data when rerunning on a scheduled interval.

When you choose Finish, AWS Glue generates a Python script. This script processes your data and stores it in a columnar format in the destination S3 bucket specified in the job configuration.

If you choose Run Job, it takes time to complete depending on the amount of data and data processing units (DPUs) configured. By default, a job is configured with 10 DPUs, which can be increased. A single DPU provides processing capacity that consists of 4 vCPUs of compute and 16 GB of memory.

After the job is complete, inspect your destination S3 bucket, and you will find that your data is now in columnar Parquet format.

Partitioning has emerged as an important technique for organizing datasets so that they can be queried efficiently by a variety of big data systems. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. For information about efficiently processing partitioned datasets using AWS Glue, see the blog post Work with partitioned data in AWS Glue.

You can create triggers for your job that run the job periodically to process new data as it is transmitted to your S3 bucket. For detailed steps on how to configure a job trigger, see Triggering Jobs in AWS Glue.

The next step is to create a crawler for the Parquet data so that a table can be created. The following image shows the configuration for our Parquet crawler:

Choose Finish, and execute the crawler.

Explore your database, and you will notice that one more table was created in the Parquet format.

You can use this new table for direct queries to reduce costs and to increase the query performance of this demonstration.

Because AWS Glue is integrated with Athena, you will find in the Athena console an AWS Glue catalog already available with the table catalog. Fetch 10 rows from Athena in a new Parquet table like you did for the JSON data table in the previous steps.

As the following image shows, we fetched the first 10 rows of heartbeat data from a Parquet format table. This same Athena query scanned only 4.99 KB of data compared to 205 KB of data that was scanned in a raw format. Also, there was a significant improvement in query performance in terms of run time.

Visualize data in Amazon QuickSight

Amazon QuickSight is a data visualization service that you can use to analyze data that has been combined. For more detailed instructions, see the Amazon QuickSight User Guide.

The first step in Amazon QuickSight is to create a new Amazon Athena data source. Choose the heartbeat database created in AWS Glue, and then choose the table that was created by the AWS Glue crawler.

Choose Import to SPICE for quicker analytics. This option creates a data cache and improves graph loading. All non-database datasets must use SPICE. To learn more about SPICE, see Managing SPICE Capacity.

Choose Visualize, and wait for SPICE to import the data to the cache. You can also schedule a periodic refresh so that new data is loaded to SPICE as the data is pipelined to the S3 bucket.

When the SPICE import is complete, you can create a visual dashboard easily. The following figure shows graphs displaying the occurrence of heart rate records per device.  The first graph is a horizontally stacked bar chart, which shows the percentage of heart rate occurrence per device. In the second graph, you can visualize the heart rate count group to the heart rate device.

Conclusion

Processing streaming data at scale is relevant in every industry. Whether you process data from wearables to tackle human health issues or address predictive maintenance in manufacturing centers, AWS can help you simplify your data ingestion and analysis while keeping your overall IT expenditure manageable.

In this two-part series, you learned how to ingest streaming data from a heart rate sensor and visualize it in such a way to create actionable insights. The current state of the art available in the big data and machine learning space makes it possible to ingest terabytes and petabytes of data and extract useful and actionable information from that process.


Additional Reading

If you found this post useful, be sure to check out Work with partitioned data in AWS Glue, and 10 visualizations to try in Amazon QuickSight with sample data.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team. His passion is leveraging the cloud to transform the carrier industry. He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University. As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS. His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 1

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-1/

Sports-related minor traumatic brain injuries (mTBI) continue to incite concern among different groups in the medical, sports, and parenting community. At the recreational level, approximately 1.6–3.8 million related mTBI incidents occur in the United States every year, and in most cases, are not treated at the hospital. (See “The epidemiology and impact of traumatic brain injury: a brief overview” in Additional resources.) The estimated medical and indirect costs of minor traumatic brain injury are reaching $60 billion annually.

Although emergency facilities in North America collect data on admitted traumatic brain injuries (TBI) cases, there isn’t meaningful data on the number of unreported mTBIs among athletes. Recent studies indicate a significant rate of under-reporting of sports-related mTBI due to many factors. These factors include the simple inability of team staff to either recognize the signs and symptoms or to actually witness the impact. (See “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates” in Additional resources.)

The majority of players involved in hockey and football are not college or professional athletes. There are over 3 million youth hockey players and approximately 5 million registered participants in football. (See “Head Impact Exposure in Youth Football” in Additional resources.) These recreational athletes don’t have basic access to medical staff trained in concussion recognition and sideline injury assessment. A user-friendly measurement and a smartphone-based assessment tool would facilitate the process between identifying potential head injuries, assessment, and return to play (RTP) criteria.

Recently, the use of instrumented sports helmets, including the Head Impact Telemetry System (HITS), has allowed for detailed recording of impacts to the head in many research trials. This practice has led to recommendations to alter contact in practices and certain helmet design parameters. (See “Head impact severity measures for evaluating mild traumatic brain injury risk exposure” in Additional resources.) However, due to the higher costs of the HITS system and complexity of the equipment, it is not a practical impact alert device for the general recreational population.

A simple, practical, and affordable system for measuring head trauma within the sports environment, subject to the absence of trained medical personnel, is required.

Given the proliferation of smartphones, we felt that this was a practical device to investigate to provide this type of monitoring.  All smartphone devices have an embedded Bluetooth communication system to receive and transmit data at various ranges.  For the purposes of this demonstration, we chose a class 1 Bluetooth device as the hardware communication method. We chose it because of its simplicity, widely accepted standard, and compatibility to interface with existing smartphones and IoT devices.

Remote monitoring typically involves collecting information from devices (for example, wearables) at the edge, integrating that information into a data lake, and generating inferences that can then be served back to the relevant stakeholders. Additionally, in some cases, compute and inference must also be done at the edge to shorten the feedback loop between data collection and response.

This use case can be extended to many other use cases in myriad verticals. In this two-part series, we show you how to build a data pipeline in support of a data lake. We use key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we focus on generating simple inferences from that data that can support RTP parameters.

Architectural overview

Here is the AWS architecture that we cover in this two-part series:

Note: For the purposes of our demonstration, we chose to use heart rate monitoring sensors rather than helmet sensors because they are significantly easier to acquire. Both types of sensors are very similar in how they transmit data. They are also very similar in terms of how they are integrated into a data lake solution.

The resulting demonstration transfers the heartbeat data using the following components:

  • AWS Greengrass set up with a Raspberry Pi 3 to stream heart rate data into the cloud.
  • Data is ingested via Amazon Kinesis Data Streams, and raw data is stored in an Amazon S3 bucket using Kinesis Data Firehose. Find more details about writing to Kinesis Data Firehose using Kinesis Data Streams.
  • Kinesis Data Analytics averages out the heartbeat-per-minute data during stream data ingestion and passes the average to an AWS Lambda
  • AWS Lambda enriches the heartbeat data by comparing the real-time data with baseline information stored in Amazon DynamoDB.
  • AWS Lambda sends SMS/email alerts via an Amazon SNS topic if the heartbeat rate is greater than 120 BPM, for example.
  • AWS Glue runs an extract, transform, and load (ETL) job. This job transforms the data store in a JSON format to a compressed Apache Parquet columnar format and applies that transformed partition for faster query processing. AWS Glue is a fully managed ETL service for crawling data stored in an Amazon S3 bucket and building a metadata catalog.
  • Amazon Athena is used for ad hoc query analysis on the data that is processed by AWS Glue. This data is also available for machine learning processing using predictive analysis to reduce heart disease risk.
  • Amazon QuickSight is a fully managed visualization tool. It uses Amazon Athena as a data source and depicts visual line and pie charts to show the heart rate data in a visual dashboard.

All data pipelines are serverless and are refreshed periodically to provide up-to-date data.

You can use Kinesis Data Firehose to transform the data in the pipeline to a compressed Parquet format without needing to use AWS Glue. For the purposes of this post, we are using AWS Glue to highlight its capabilities, including a centralized AWS Glue Data Catalog. This Data Catalog can be used by Athena for ad hoc queries and by Apache Spark EMR to run complex machine learning processes. AWS Glue also lets you edit generated ETL scripts and supports “bring your own ETL” to process data for more complex use cases.

Configuring key processes to support the pipeline

The following sections describe how to set up and configure the devices and services used in the demonstration to build a data pipeline in support of a data lake.

Remote sensors and IoT devices

You can use commercially available heart rate monitors to collect electrocardiography (ECG) information such as heart rate. The monitor is strapped around the chest area with the sensor placed over the sternum for better accuracy. The monitor measures the heart rate and sends the data over Bluetooth Low Energy (BLE) to a Raspberry Pi 3. The following figure depicts the device-side architecture for our demonstration.

The Raspberry Pi 3 is host to both the IoT device and the AWS Greengrass core. The IoT device is responsible for connecting to the heart rate monitor over BLE and collecting the heart rate data. The collected data is then sent locally to the AWS Greengrass core, where it can be processed and routed to the cloud through a secure connection. The AWS Greengrass core serves as the “edge” gateway for the heart rate monitor.

Set up AWS Greengrass core software on Raspberry Pi 3

To prepare your Raspberry Pi for running AWS Greengrass software, follow the instructions in Environment Setup for Greengrass in the AWS Greengrass Developer Guide.

After setting up your Raspberry Pi, you are ready to install AWS Greengrass and create your first Greengrass group. Create a Greengrass group by following the steps in Configure AWS Greengrass on AWS IoT. Then install the appropriate certificates to the Raspberry Pi by following the steps to start AWS Greengrass on a core device.

The preceding steps deploy a Greengrass group that consists of three discrete configurable items: a device, a subscription list, and the connectivity information.

The core device is a set of code that is responsible for collecting the heart rate information from the sensor and sending it to the AWS Greengrass core. This device is using the AWS IoT Device SDK for Python including the Greengrass Discovery API.

Use the following AWS CLI command to create a Greengrass group:

aws greengrass create-group --name heartRateGroup

To complete the setup, follow the steps in Create AWS IoT Devices in an AWS Greengrass Group.

After you complete the setup, the heart rate data is routed from the device to the AWS IoT Core service using AWS Greengrass. As such, you need to add a single subscription in the Greengrass group to facilitate this message route:

Here, your device is named Heartrate_Sensor, and the target is the IoT Cloud on the topic iot/heartrate. That means that when your device publishes to the iot/heartrate topic, AWS Greengrass also sends this message to the AWS IoT Core service on the same topic. Then you can use the breadth of AWS services to process the data.

The connectivity information is configured to use the local host because the IoT device resides on the Raspberry Pi 3 along with the AWS Greengrass core software. The IoT device uses the Discovery API, which is responsible for retrieving the connectivity information of the AWS Greengrass core that the IoT device is associated with.

The IoT device then uses the endpoint and port information to open a secure TLS connection to AWS Greengrass core, where the heart rate data is sent. The AWS Greengrass core connectivity information should be depicted as follows:

The power of AWS Greengrass core is that you can deploy AWS Lambda functions and new subscriptions to process the heart rate information locally on the Raspberry Pi 3. For example, you can deploy an AWS Lambda function that can trigger a reaction if the detected heart rate is reaching a set threshold. In this scenario, different individuals might require different thresholds and responses, so you could theoretically deploy unique Lambda functions on a per-individual basis if needed.

Configure AWS Greengrass and AWS IoT Core

To enable further processing and storage of the heart rate data messages published from AWS Greengrass core to AWS IoT Core, create an AWS IoT rule. The AWS IoT rule retrieves messages published to the IoT/heartrate topic and sends them to the Kinesis data stream through an AWS IoT rule action for Kinesis action.  

Simulate heart rate data

You might not have access to an IoT device, but you still want to run a proof of concept (PoC) around heart rate use cases. You can simulate data by creating a shell script and deploying that data simulation script on an Amazon EC2 instance. Refer to the EC2 user guide to get started with Amazon EC2 Linux instances.

On the Amazon EC2 instance, create a shell script kinesis_client_HeartRate.sh, and copy the provided code to start writing some records into the Kinesis data stream. Be sure to create your Kinesis data stream and replace the variable <your_stream_name> in the following script.

#!/bin/sh
while true
do
  deviceID=$(( ( RANDOM % 10 )  + 1 ))
  heartRate=$(jot -r 1 60 140)
  echo "$deviceID,$heartRate"
  aws kinesis put-record --stream-name <your_stream_name> --data "$deviceID,$heartRate"$'\n' --partition-key $deviceID --region us-east-1
done

You can also use the Kinesis Data Generator to create data and then stream it to your solution or demonstration. For details on its use, see the blog post Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.

Ingest data using Kinesis and manage alerts with Lambda, DynamoDB, and Amazon SNS

Now you need to ingest data from the IoT device, which can be processed for real-time notifications when abnormal heart rates are detected.

Streaming data from the heart rate monitoring device is ingested to Kinesis Data Streams. Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data. For this project, the data stream was configured with one open shard and a data retention period of 24 hours. This lets you send 1 MB of data or 1,000 events per second and read 2 MB of data per second. If you need to support more devices, you can scale up and add more shards using the UpdateShardCount API or the Amazon Kinesis scaling utility.

You can configure your data stream by using the following AWS CLI command (and then using the appropriate flag to turn on encryption).

aws kinesis create-stream --stream-name hearrate_stream --shard-count 1

You can use an AWS CloudFormation template to create the entire stack depicted in the following architecture diagram.

When launching an AWS CloudFormation template, be sure to enter your email address or mobile phone number with the appropriate endpoint protocol (“Email” or “SMS”) as parameters:

Alternatively, you can follow the manual steps in the documentation links that are provided in this post.

Streaming data in Kinesis can be processed and analyzed in real time by Kinesis clients. Refer to the Kinesis Data Streams Developer Guide to learn how to create a Kinesis data stream.

To identify abnormal heart rate information, you must use real-time analytics to detect abnormal behavior. You can use Kinesis Data Analytics to perform analytics on streaming data in real time. Kinesis Data Analytics consists of three configurable components: source, real-time analytics, and destination. Refer to the AWS documentation to learn the detailed steps to configure Kinesis Data Analytics.

Kinesis Data Analytics uses Kinesis Data Streams as the source stream for the data. In the source configuration process, if there are scenarios where in-filtering or masking records is required, you can preprocess records using AWS Lambda. The data in this particular case is relatively simple, so you don’t need preprocessing of records on the data.

The Kinesis Data Analytics schema editor lets you edit and transform the schema if required. In the following example, we transformed the second column to Value instead of COL_Value.

The SQL code to perform the real-time analysis of the data has to be copied to the SQL Editor for real-time analytics. The following is the sample code that was used for this demonstration.

“CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                                   VALUEROWTIME TIMESTAMP,
                                   ID INTEGER, 
                                   COLVALUE INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT STREAM ROWTIME,
              ID,
              AVG("Value") AS HEARTRATE
FROM     "SOURCE_SQL_STREAM_001"
GROUP BY ID, 
         STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) HAVING AVG("Value") > 120 OR AVG("Value") < 40;”

This code generates DESTINATION_SQL_STREAM. It inserts values into the stream only when the average value of the heart beat that is received from SOURCE_SQL_STREAM_001 is greater than 120 or less than 40 in the 60-second time window.

For more information about the tumbling window concept, see Tumbling Windows (Aggregations Using GROUP BY).

Next, add an AWS Lambda function as one of your destinations, and configure it as follows:

In the destination editor, make sure that the stream name selected is the DESTINATION_SQL_STREAM. You only want to trigger the Lambda function when anomalies in the heart rate are detected. The output format can be JSON or CSV. In this example, our Lambda function expects the data in JSON format, so we chose JSON.

Athlete and athletic trainer registration information is stored in the heartrate Registrations DynamoDB table. Amazon DynamoDB offers fully managed encryption at rest using an AWS Key Management Service (AWS KMS) managed encryption key for DynamoDB. You need to create a table with encryption at rest enabled. Follow the detailed steps in Amazon DynamoDB Encryption at Rest.

Each record in the table should include deviceid, customerid, firstname, lastname, and mobile. The following is an example table record for reference.

{
  "customerid": {
    "S": "3"
  },
  "deviceid": {
    "S": "7"
  },
  "email": {
    "S": "[email protected]"
  },
  "firstname": {
    "S": "John"
  },
  "lastname": {
    "S": "Smith"
  },
  "mobile": {
    "S": "19999999999"
  }
}

Refer to the DynamoDB Developer Guide for complete instructions for creating and populating a DynamoDB table.

The Lambda function is created to process the record passed from the Kinesis Data Analytics application.  The node.js Lambda function retrieves the athlete and athletic trainer information from the DynamoDB registrations table. It then alerts the athletic trainer to the event by sending a cellular text message via the Amazon Simple Notification Service (Amazon SNS).

Note: The default AWS account limit for Amazon SNS for mobile messages is $1.00 per month. You can increase this limit through an SNS Limit Increase case as described in AWS Service Limits.

You now create a new Lambda function with a runtime of Node.js 6.10 and choose the Create a custom role option for IAM permissions.  If you are new to deploying Lambda functions, see Create a Simple Lambda Function.

You must configure the new Lambda function with a specific IAM role, providing privileges to Amazon CloudWatch Logs, Amazon DynamoDB, and Amazon SNS as provided in the supplied AWS CloudFormation template.

The provided AWS Lambda function retrieves the HR Monitor Device ID and HR Average from the base64-encoded JSON message that is passed from Kinesis Data Analytics.  After retrieving the HR Monitor Device ID, the function then queries the DynamoDB Athlete registration table to retrieve the athlete and athletic trainer information.

Finally, the AWS Lambda function sends a mobile text notification (which does not contain any sensitive information) to the athletic trainer’s mobile number retrieved from the athlete data by using the Amazon SNS service.

To store the streaming data to an S3 bucket for further analysis and visualization using other tools, you can use Kinesis Data Firehose to connect the pipeline to Amazon S3 storage.  To learn more, see Create a Kinesis Data Firehose Delivery Stream.

Kinesis Data Firehose delivers the streaming data in intervals to the destination S3 bucket. The intervals can be defined using either an S3 buffer size or an S3 buffer interval (or both, whichever exceeds the first metric). The data in the Data Firehose delivery stream can be transformed. It also lets you back up the source record before applying any transformation. The data can be encrypted and compressed to GZip, Zip, or Snappy format to store the data in a columnar format like Apache Parquet and Apache ORC. This improves the query performance and reduces the storage footprint. You should enable error logging for operational and production troubleshooting.

Conclusion

In part 1 of this blog series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and Lambda. In part 2, we’ll discuss how to deploy a serverless data lake and use key analytics to create actionable insights from the data lake.

Additional resources

Langlois, J.A., Rutland-Brown, W. & Wald, M., “The epidemiology and impact of traumatic brain injury: a brief overview,” Journal of Head Trauma Rehabilitation, Vol. 21, No. 5, 2006, pp. 375-378.

Echlin, S. E., Tator, C. H., Cusimano, M. D., Cantu, R. C., Taunton, J. E., Upshur E. G., Hall, C. R., Johnson, A. M., Forwell, L. A., Skopelja, E. N., “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates,” Neurosurg Focus, 29 (5):E4, 2010

Daniel, R. W., Rowson, S., Duma, S. M., “Head Impact Exposure in Youth Football,” Annals of Biomedical Engineering., Vol. 10, 2012, 1007.

Greenwald, R. M., Gwin, J. T., Chu, J. J., Crisco, J. J., “Head impact severity measures for evaluating mild traumatic brain injury risk exposure,” Neurosurgery Vol. 62, 2008, pp. 789–79


Additional Reading

If you found this post useful, be sure to check out Setting Up Just-in-Time Provisioning with AWS IoT Core, and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team.  His passion is leveraging the cloud to transform the carrier industry.  He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University.  As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS.  His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

Amazon Kinesis Data Streams Adds Enhanced Fan-Out and HTTP/2 for Faster Streaming

Post Syndicated from Randall Hunt original https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/

A few weeks ago, we launched two significant performance improving features for Amazon Kinesis Data Streams (KDS): enhanced fan-out and an HTTP/2 data retrieval API. Enhanced fan-out allows developers to scale up the number of stream consumers (applications reading data from a stream in real-time) by offering each stream consumer its own read throughput. Meanwhile, the HTTP/2 data retrieval API allows data to be delivered from producers to consumers in 70 milliseconds or better (a 65% improvement) in typical scenarios. These new features enable developers to build faster, more reactive, highly parallel, and latency-sensitive applications on top of Kinesis Data Streams.

Kinesis actually refers to a family of streaming services: Kinesis Video Streams, Kinesis Data Firehose, Kinesis Data Analytics, and the topic of today’s blog post, Kinesis Data Streams (KDS). Kinesis Data Streams allows developers to easily and continuously collect, process, and analyze streaming data in real-time with a fully-managed and massively scalable service. KDS can capture gigabytes of data per second from hundreds of thousands of sources – everything from website clickstreams and social media feeds to financial transactions and location-tracking events.

Kinesis Data Streams are scaled using the concept of a shard. One shard provides an ingest capacity of 1MB/second or 1000 records/second and an output capacity of 2MB/second. It’s not uncommon for customers to have thousands or tens of thousands of shards supporting 10s of GB/sec of ingest and egress. Before the enhanced fan-out capability, that 2MB/second/shard output was shared between all of the applications consuming data from the stream. With enhanced fan-out developers can register stream consumers to use enhanced fan-out and receive their own 2MB/second pipe of read throughput per shard, and this throughput automatically scales with the number of shards in a stream. Prior to the launch of Enhanced Fan-out customers would frequently fan-out their data out to multiple streams to support their desired read throughput for their downstream applications. That sounds like undifferentiated heavy lifting to us, and that’s something we decided our customers shouldn’t need to worry about. Customers pay for enhanced fan-out based on the amount of data retrieved from the stream using enhanced fan-out and the number of consumers registered per-shard. You can find additional info on the pricing page.

Before we jump into a description of the new API, let’s cover a few quick notes about HTTP/2 and how we use that with the new SubscribeToShard API.

HTTP/2

HTTP/2 is a major revision to the HTTP network protocol that introduces a new method for framing and transporting data between clients and servers. It’s a binary protocol. It enables many new features focused on decreasing latency and increasing throughput. The first gain is the use of HPACK to compress headers. Another useful feature is connection multiplexing which allows us to use a single TCP connection for multiple parallel non-blocking requests. Additionally, instead of the traditional request-response semantics of HTTP, the communication pipe is bidirectional. A server using HTTP/2 can push multiple responses to a client without waiting for the client to request those resources. Kinesis’s SubscribeToShard API takes advantage of this server push feature to receive new records and makes use of another HTTP/2 feature called flow control. Kinesis pushes data to the consumer and keeps track of the number of bytes that have been unacknowledged. The client acknowledges bytes received by sending WINDOW_UPDATE frames to the server. If the client can’t handle the rate of data, then Kinesis will pause the flow of data until a new WINDOW_UPDATE frame is received or until the 5 minute subscription expires.

Now that we have a grasp on SubscribeToShard and HTTP/2 let’s cover how we use this to take advantage of enhanced fan-out!

Using Enhanced Fan-out

The easiest way to make use of enhanced fan-out is to use the updated Kinesis Client Library 2.0 (KCL). KCL will automatically register itself as a consumer of the stream. Then KCL will enumerate the shards and subscribe to them using the new SubscribeToShard API. It will also continuously call SubscribeToShard whenever the underlying connections are terminated. Under the hood, KCL handles checkpointing and state management of a distributed app with a Amazon DynamoDB table it creates in your AWS account. You can see an example of this in the documentation.

The general process for using enhanced fan-out is:

  1. Call RegisterStreamConsumer and provide the StreamARN and ConsumerName (commonly the application name). Save the ConsumerARN returned by this API call. As soon as the consumer is registered, enhanced fan-out is enabled and billing for consumer-shard-hours begins.
  2. Enumerate stream shards and call SubscribeToShard on each of them with the ConsumerARN returned by RegisterStreamConsumer. This establishes an HTTP/2 connection, and KDS will push SubscribeToShardEvents to the listening client. These connections are terminated by KDS every 5 minutes, so the client will need to call SubscribeToShard again if you want to continue receiving events. Bytes pushed to the client using enhanced fan-out are billed under enhanced fan-out data retrieval rates.
  3. Finally, remember to call DeregisterStreamConsumer when you’re no longer using the consumer since it does have an associated cost.

You can see some example code walking through this process in the documentation.

You can view Amazon CloudWatch metrics and manage consumer applications in the console, including deregistering them.

Available Now

Enhanced fan-out and the new HTTP/2 SubscribeToShard API are both available now in all regions for new streams and existing streams. There’s a lot more information than what I’ve covered in this blog post in the documentation. There is a per-stream limit of 5 consumer applications (e.g., 5 different KCL applications) reading from all shards but this can be increased with a  support ticket. I’m excited to see customers take advantage of these new features to reduce the complexity of managing multiple stream consumers and to increase the speed and parallelism of their real-time applications.

As always feel free to leave comments below or on Twitter.

Randall

Build a blockchain analytic solution with AWS Lambda, Amazon Kinesis, and Amazon Athena

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/big-data/build-a-blockchain-analytic-solution-with-aws-lambda-amazon-kinesis-and-amazon-athena/

There are many potential benefits to using a blockchain. A blockchain is a distributed data structure that can record transactions in a verifiable and immutable manner. Depending upon the use case, there are opportunities for reducing costs, improving speed and efficiency, stronger regulatory compliance, and greater resilience and scalability.

Early adopters of the blockchain are finding innovative ways of using it in such areas as finance, healthcare, eGovernment, and non-profit organizations. The blockchain was even initially pioneered as the key technology behind the cryptocurrency Bitcoin.

Many of the opportunities to use blockchains arise from their design. They are typically large-scale distributed systems that often consist of many thousands of nodes. It can be challenging to gain insight into user activity, events, anomalies, and other state changes on a blockchain. But AWS analytics services provide the ability to analyze blockchain applications and provide meaningful information about these areas.

Walkthrough

In this post, we’ll show you how to:

You can readily adapt this Ethereum deployment and the blockchain analytics for use with a wide range of blockchain scenarios.

Prerequisites

This post assumes that you are familiar with AWS and Ethereum. The following documentation provides background reading to help you perform the steps described in this post:

Additionally, it’s useful to be familiar with Amazon Kinesis, AWS Lambda, Amazon QuickSight, and Amazon Athena to get the most out of this blog post. For more information, see:

For an introduction to serverless computing with AWS Lambda, see Introduction to AWS Lambda – Serverless Compute on Amazon Web Services.

Blockchain 101

Before we proceed with the solution in this post, we’ll provide a short discussion regarding blockchains and Ethereum, which is the blockchain implementation used in this solution.

In short, blockchains are a means for achieving consensus. The motivation behind blockchain was in allowing the Bitcoin network to agree upon the order of financial transactions while resisting vulnerability, malicious threats, and omission errors. Other blockchain implementations are used to agree upon the state of generic computation. This is achieved through a process called mining, whereby an arbitrary computational problem is solved to make falsifying transactions computationally challenging.

Ethereum is a major blockchain implementation. Unlike Bitcoin and other earlier blockchain systems, Ethereum is not solely a cryptocurrency platform, though it does have its own cryptocurrency called Ether. Ethereum extends the blockchain concept by building an Ethereum virtual machine (VM) that is Turing-complete on top of the blockchain. This allows for the development of smart contracts, which are programs that run on the blockchain. The appeal of smart contracts is the ability to translate natural language contracts, such as insurance contracts, into code that can run on Ethereum. This allows contractual agreements to be built without the need for a centralized authority, such as a bank or notary, potentially decreasing time to market and reducing costs.

An overview of the blockchain solution

The following is an overview of the solution provided in this post. The solution consists of:

  • An Ethereum blockchain running on Amazon Elastic Container Service (Amazon ECS) via the AWS Blockchain Template
  • An Application Load Balancer, providing access to the various Ethereum APIs.
  • A Lambda function, which deploys a smart contract to the blockchain
  • A Lambda function, which runs transactions against the smart contract
  • A Lambda function, which listens for events on the smart contract and pushes those events to Amazon Kinesis
  • An Amazon DynamoDB table used to share the blockchain state between Lambda functions
  • A blockchain analytics pipeline that uses Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, Amazon Kinesis Data Streams, and Amazon Athena.
  • An analytics dashboard built using Amazon QuickSight

The solution is presented in the following architectural diagram:

As shown, the solution is comprised of two main portions:

  • The blockchain hosted on Amazon Elastic Compute Cloud (Amazon EC2) and the Lambda functions that interact with the blockchain.
  • The analytics pipeline based around Kinesis that consumes data from the blockchain.

The AWS CloudFormation template we provide deploys the left side of that architecture diagram up to and including Kinesis Data Streams. It is the right side of the diagram that we’re going to build in this post.

Create the initial resources

  1. First, download the AWS CloudFormation template from: https://s3.amazonaws.com/blockchainblog/blockchainblogpost.template
  2. Use AWS CloudFormation to launch the template. The AWS CloudFormation stack deploys a virtual private cloud (VPC), two subnets, and a series of Lambda functions, which interact with the blockchain. This provides a foundation on which to build the analytics pipeline. You can either provide your own CIDR blocks or use the default parameters. Each subnet must have at least eight IP addresses.
  3. Deploy the AWS Blockchain Templates. The AWS Blockchain Templates make it efficient to deploy Ethereum and Hyperledger blockchain networks on AWS. In this case, we’re deploying an Ethereum network into the VPC created by the AWS CloudFormation template in step 2.
  4. Launch the following AWS CloudFormation template: https://aws-blockchain-templates-us-east-1.s3.us-east-1.amazonaws.com/ethereum/templates/latest/ethereum-network.template.yaml This template requires a number of parameters:
  • Set the Initial List of Accounts to the following predefined accounts the Lambda functions use:
0x34db0A1D7FE9D482C389b191e703Bf0182E0baE3,0xB3Bbce5d76aF28EcE4318c28479565F802f96808,0x877108a8825222cf669Ca9bFA3397D6973fE1640,0xb272056E07C94C7E762F642685bE822df6d08D03,0x0c00e92343f7AA255e0BBC17b21a02f188b53D6C,0xaDf205a5fcb846C4f8D5e9f5228196e3c157e8E0,0x1373a92b9BEbBCda6B87a4B5F94137Bc64E47261,0x9038284431F878f17F4387943169d5263eA55650,0xe1cd3399F6b0A1Ef6ac8Cebe228D7609B601ca8a,0x0A67cCC3FD9d664D815D229CEA7EF215d4C00A0a
  • In VPC Network Configuration:
    • Set the VPC ID to the blockchainblog VPC created by the first AWS CloudFormation template.
    • Add the blockchainblog-public subnet to the list of subnets to use.
    • Add blockchainblog-public and blockchainblog-private to the list of ALB subnets.
  • In Security Configuration:
    • Choose your Amazon EC2 key pair.
    • Provide the blockchainblog security group.
    • Provide the blockchainblog-ec2-role for the Amazon EC2 role.
    • Provide the blockchainblog-ecs-role for the Amazon ECS role.
    • Set the ALB security group to the blockchainblog security group.
  1. Leave all other variables unchanged, create the template, and wait for all resources to be deployed. This deploys an Ethereum blockchain, starts the mining process, and exposes the Web3 API through an Application Load Balancer.

After the resources are created, move on to deploying the smart contract.

Deploy a smart contract

To use the blockchain, deploy a smart contract to it. This smart contract is not complex — it provides the functions for holding an auction.

The auction contract represents a public auction, which is an auction whereby all parties involved can be identified. The user offering the item to be auctioned deploys the contract and other users can bid using the contract. The auction is considered completed after a pre-defined number of blocks have been mined. When the auction ends, losing bids can then be withdrawn and the funds returned to the bidders. Later, the user who created the auction can withdraw the funds of the winning bid.

Note that the contract does nothing to ensure that the winner receives the commodity in question. In fact, this contract is entirely separate from what is being auctioned. The contract could be extended to provide this functionality, but for the scope of this post, we’re keeping the contract simple.

The auction contract is located at https://s3.amazonaws.com/blockchainblog/Auction.sol.

Examine the auction contract

The auction contract is automatically pulled by our Lambda function and deployed to our blockchain. This contract is written in a domain-specific language called Solidity. The syntax is inspired by the C family of languages; however, unlike C it doesn’t compile to object code. Instead, it compiles to bytecode, which runs on the Ethereum VM.

This smart contract has two functions: bid and withdraw. Bid allows users to bid in the auction, and withdraw allows users to withdraw funds from the contract when the auction has finished. The auction owner can obtain the winning bid and the losers can recoup their funds. Note that the data structure BidEvent is similar to a C struct, and is how we’ll trigger Solidity events. The Solidity events are captured and sent to our analytics pipeline.

Now it’s time to deploy our smart contract, run transactions against it, and listen for events by using pre-built Lambda functions. The following diagram shows the interactions of these Lambda functions:

DeployContract is a Lambda function created by the AWS CloudFormation stack that we deployed earlier. This function takes our Solidity source code from the Amazon Simple Storage Service (Amazon S3) bucket, compiles it to EVM bytecode using the solc compiler, deploys that to our blockchain, and stores the blockchain address of the contract in a DynamoDB table. The function interacts with the Ethereum blockchain on our Amazon EC2 instance via the web3 1.0.0 API. You can see the source code for this function at https://s3.amazonaws.com/blockchainblog/DeployContract.zip.

After deploying the AWS CloudFormation template, wait about 5 minutes before deploying the contract to give the blockchain time to start the mining process. The majority of this time is the blockchain generating the initial directed acyclic graph (DAG).

DeployContract can be invoked in the Lambda console by testing it with an empty test event. Before invoking the function, provide it with the address of the blockchain. To do this, locate the output of the AWS Blockchain Template and obtain the EthJSONRPCURL value from the output. Later, provide this value in an environment variable named BLOCKCHAIN_HOST, for the DeployContract function, as shown in the following example:

Now invoke the DeployContract function. It should print various states, including the blockchain address of the deployed contract and the JSON ABI of the contract. After the function completes, the contract is deployed to our private blockchain and available for use. If the function produces an error, it’s likely because the blockchain has not yet been initialized. Wait a few minutes after creating the AWS CloudFormation template before invoking DeployContract.

Execute Transactions

To generate some transaction data to analyze, we must first have some transactions. To get transactions, we are using a second Lambda function named ExecuteTransactions.

In the smart contract, an event is specified at the start of the file. Events are a useful mechanism in Solidity that can be used as a callback to code outside of the blockchain. The final Lambda function, ListenForTransactions, listens for events occurring against the contract and then sends those events to Kinesis for analysis.

Ethereum currently does not support sending events directly to Kinesis. So we’ll run the ListenForTransactions function to pull events from the blockchain. We can do this manually by invoking the function with an empty test event. ListenForTransactions pulls all events from the blockchain since the last time it was run. However, if we wanted transactions to be pulled from the blockchain in real time, we’d want the function running perpetually. In the following section, you can optionally schedule the Lambda function to run periodically or regularly. Once again, provide the address of the Ethereum RPC endpoint via the BLOCKCHAIN_HOST environment variable, per DeployContract for both ListenForTransactions and for ExecuteTransactions.

Optional: Use an Amazon CloudWatch event to schedule ListenForTransactions

To have ListenForTransactions run continually, we’ll use Amazon CloudWatch Events as a trigger for our Lambda function. In the Amazon CloudWatch console, choose the Triggers tab, and add a new Amazon CloudWatch Events trigger, with the schedule pattern rate(5). This ensures that the function is continually running and thus ensure that all events are sent to Kinesis as soon as they occur. This allows us to do near real-time processing of events against our smart contract. However, if we want to reduce costs, or if real-time analytics isn’t a key objective, we could run our ListenForTransactions function periodically. Running the function periodically fetches all events since the last time it was run; however, this is less timely than having it wait for events to occur.

To configure a CloudWatch event to trigger ListenForTransactions:

  1. In the designer section on the Lambda console for ListenForTransactions, select CloudWatch events
  2. Click on configure and scroll down to the CloudWatch event configuration
  3. Select Create New Rule from the rule dropdown menu
  4. Name the rule and provide a description
  5. Select schedule expression
  6. Provide the expression: rate(5)
  7. Select enable trigger
  8. Click add

After the function is scheduled, we can then generate some events against our contract. We can run ExecuteTransactions, with an empty test event. We can do this any number of times to generate more transactions to send to our analytics pipeline. ExecuteTransactions produces batches of 10 transactions at a time.

Analyze Transactions with Kinesis Data Analytics

Because our Lambda function is listening to events on our smart contract, all voting activity is sent to a Kinesis Data Stream that was already by an AWS CloudFormation called BlockchainBlogEvents.

Right now, all events go to Kinesis but no further. We’ll persist our events for analysis with Athena later on. To do so, navigate to the Kinesis Data Streams console and choose the BlockchainBlog stream that was created for you.

  1. In the upper right-hand corner, choose Connect to Firehose. This forwards all events to a Kinesis Data Firehose stream, which delivers them to an S3 bucket.
  2. Name the delivery stream choose Next, and don’t enable record transformation.
  3. Provide an S3 bucket in which to store your results. Remember so you can use it later with Athena.

All events coming from the blockchain should now be persisted to Amazon S3.

Now that our events are being persisted, we’ll use Kinesis Data Analytics to perform a series of real-time analytics on the Kinesis Data Stream. Later, we’ll perform batch analytics on the data stored in Amazon S3 via Athena.

First, look at Kinesis Data Analytics. Our ListenForTransactions Lambda function sends a message to a stream each time a transaction is run against our Auction smart contract.

The message is a JSON object. It contains the address of the bidder who initiated the transaction, how much they bid, the contract they bid on, when the transaction was run, and which block the transaction was added to.

Kinesis Data Analytics processes each incoming message to the stream and lets us perform analysis over the stream. In this example, we use Kinesis Data Analytics to:

  1. Calculate the amount of Ether being bid in each block within a sliding window of 15 seconds.
  2. Detect the number of unique bidders occurring within a sliding window of 15 seconds.

Our sliding window is 15 seconds because this is the Ethereum target block time. This is the measure of how long it takes to add a block to the blockchain. By setting the sliding window to 15 seconds, we can gain insight into transactions occurring within the mining interval. We could also set the window to be longer to learn how it pertains to our auction application.

To start with our real time analytics, we must create a Kinesis data analytics application. To do so:

  1. Navigate to the Kinesis data analytics application console on the AWS Management Console.
  2. Create a new Kinesis data analytics application with appropriate name and description, then specify the pre-made blockchainblog Kinesis Data Stream as the source.
  3. Run ExecuteTransactions to send a set of transactions to Kinesis and automatically discover the schema.
  4. Open the SQL editor for the application.

Next, we’re going to add SQL to our Kinesis data analytics application to find out the amount of Ether being sent in each block. This includes all bids sent to the contract and all funds withdrawn from a completed auction.

Copy the following SQL, paste it into the SQL editor in Kinesis Data Analytics, then execute it.

CREATE OR REPLACE STREAM "SPEND_PER_BLOCK_STREAM" (block INTEGER, spend INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "SPEND_PER_BLOCK_STREAM"

SELECT STREAM "Block", SUM("Amount") AS block_sum
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "Block", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '15' SECOND);

This simple piece of SQL provides some insight into our smart contract. The output of SPEND_PER_BLOCK_STREAM yields the block number and the volume of funds, from our contract, in that block. This output explains how much cryptocurrency is spent in relation to our smart contract and when it’s spent.

Make sure that there is data for the Kinesis data analytics application to process by running the ExecuteTransactions and ListenForTransactions functions. You can run these functions either with an Amazon CloudWatch event or manually.

Now, we’ll modify our application to detect the number of unique bidders placing bids within a 15-second window. This is about the time required to add a block to the blockchain. To do so, add the following code to our Kinesis data analytics application:

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    NUMBER_OF_DISTINCT_ITEMS BIGINT
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
   INSERT INTO "DESTINATION_SQL_STREAM" 
      SELECT STREAM * 
      FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
          CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            'Bidder',                                     
            10                                                 
      )
);

The resulting output of this code is the count of unique bidders occurring within the 15-second window. This is useful in helping us understand who is running transactions against our contract. For example, if it’s a large number of blockchain addresses responsible for the bids or if it is a smaller number of addresses bidding.

Finally, as denoted in our architectural diagram, we can add a destination stream to our Kinesis data analytics application. We’ll send the output of our application to Kinesis Data Firehose to persist the results. Then we’ll enable the resulting data to be used in batch analytics with Athena or other tools. To send the output, create a destination for the analytics output stream and point it at a Kinesis Data Firehose stream.

This section has shown real time analytics that can be run on blockchain transactions. The next section shows using Athena to run batch analytics against our stored transactions.

Analyze Transactions with Athena

In this section, we’ll create a table in Athena so we can query our transaction data in batch form.

  1. Create a database in Athena and then create a table, specifying the location that you provided earlier to Kinesis Data Firehose. Your configuration should look like the following example:

  1. Choose Next, choose JSON as the input format, then click next.
  2. In Columns, provide the data types for each of our columns. Choose Bulk add columns, then copy and paste the following column information:
Block int, Blockhash string, Bidder string, Maxbidder string, Contractowner string, Amount int, Auction string, EventTimestamp string
ColumnDescription
BlockThe block that this event pertains to.
AuctionWhich auction smart contract the event pertains to
ContractOwnerThe address of the owner of the contract
BidderThe address of the bidder
BlockHashThe SHA hash of the block
AddressThe address of the transaction
MaxBidderThe address of the currently winning bidder (current to when the event was generated)
AmountThe amount of the bid

 

  1. Click next and then create the table.

After you configure Athena, you can then explore the data. First, look at whether the user who created the auction has bid in their own auction. Most auctions typically disallow this bidding, but our smart contract doesn’t prohibit this. We could solve this by modifying the contract, but for now let’s see if we can detect this via Athena. Run the following query:

select * from events where contractowner=bidder

The result should resemble the following:

You should see at least one instance where the contract owner has bid on their own contract. Note that the Lambda function running transactions does this at random. Bidding on one’s own contract could be permissible or it might violate the terms of the auction. In that scenario, we can easily detect this violation.

This scenario is an example of using analytics to detect and enforce compliance in a blockchain-backed system. Compliance remains an open question for many blockchain users, as detecting regulatory and compliance issues involving smart contracts often involves significant complexity. Analytics is one way to gain insight and answer these regulatory questions.

Useful queries for analyzing transactions

This section provides some other queries that we can use to analyze our smart contract transactions.

Find the number of transactions per block

SELECT block, COUNT(amount) as transactions FROM events Group By block    

This query yields results similar to the following:

Find the winning bid for each auction

SELECT DISTINCT t.auction, t.amount
    FROM events t
        INNER JOIN (SELECT auction, MAX(amount) AS maxamount
                        FROM events
                        GROUP BY auction) q
            ON t.auction = q.auction
                AND t.amount = q.maxamount

This query yields a set of results such as the following:

The results show each auction that you’ve created on the blockchain and the resulting highest bid.

Visualize queries with Amazon QuickSight

Instead of querying data in plain SQL, it is often beneficial to have a graphical representation of your analysis. You can do this with Amazon QuickSight, which can use Athena as a data source. As a result, with little effort we can build a dashboard solution on top of what we’ve already built. We’re going to use Amazon QuickSight to visualize data stored in Amazon S3, via Athena.

In Amazon QuickSight, we can create a new data source and use the Athena database and table that we created earlier.

To create a new data source

  1. Open the Amazon QuickSight console, then choose New Dataset.
  2. From the list of data sources, choose Athena, then name your data source.

  1. Choose the database and table in Athena that you created earlier.

  1. Import the data into SPICE. SPICE is instrumental for faster querying and visualization of data, without having to go directly to the source data. For more information about SPICE, see the Amazon QuickSight Documentation.
  2. Choose Visualize to start investigating the data.

With Amazon QuickSight, we can visualize the behavior of our simulated blockchain users. We’ll choose Amount as our measurement and Auction as our dimension from teh QuickSight side pane. This shows us how much ether has been bid in each auction. Doing so yields results similar to the following:

The amount depends on the number of times you ran the ExecuteTransactions function.

If we look at MaxBidder, we see a pie chart. In the chart, we can see which blockchain address (user) is most often our highest bidder. This looks like the following:

This sort of information can be challenging to obtain from within a blockchain-based application. But in Amazon QuickSight, with our analytics pipeline, getting the information can be easier.

Finally, we can look at the mining time in Amazon QuickSight by choosing Eventtimestamp as the x-axis, choosing block as the y-axis, and using the minimum aggregate function. This produces a line graph that resembles the following:

The graph shows that we start at around block 9200 and have a steady rate of mining occurring. This is roughly consistent with around a 15 to 20 second block mining time. Note that the time stamp is in Unix time.

This section has shown analysis that can be performed on a blockchain event to understand the behavior of both the blockchain and the smart contracts deployed to it. Using the same methodology, you can build your own analytics pipelines that perform useful analytics that shed light on your blockchain-backed applications.

Conclusion

Blockchain is an emerging technology with a great deal of potential. AWS analytics services provide a means to gain insight into blockchain applications that run over thousands of nodes and deal with millions of transactions. This allows developers to better understand the complexities of blockchain applications and aid in the creation of new applications. Moreover, the analytics portion can all be done without provisioning servers, reducing the need for managing infrastructure. This allows you to focus on building the blockchain applications that you want.

Important: Remember to destroy the stacks created by AWS CloudFormation. Also delete the resources you deployed, including the scheduled Lambda function that listens for blockchain events.


Additional Reading

If you found this post useful, be sure to check out Analyze Apache Parquet optimized data using 10 visualizatinos to try in Amazon QuickSight with sample data and Analyzing Bitcoin Data: AWS CloudFormation Support for AWS Glue.

 


About the Author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.

 

 

AWS Online Tech Talks – June 2018

Post Syndicated from Devin Watson original https://aws.amazon.com/blogs/aws/aws-online-tech-talks-june-2018/

AWS Online Tech Talks – June 2018

Join us this month to learn about AWS services and solutions. New this month, we have a fireside chat with the GM of Amazon WorkSpaces and our 2nd episode of the “How to re:Invent” series. We’ll also cover best practices, deep dives, use cases and more! Join us and register today!

Note – All sessions are free and in Pacific Time.

Tech talks featured this month:

 

Analytics & Big Data

June 18, 2018 | 11:00 AM – 11:45 AM PTGet Started with Real-Time Streaming Data in Under 5 Minutes – Learn how to use Amazon Kinesis to capture, store, and analyze streaming data in real-time including IoT device data, VPC flow logs, and clickstream data.
June 20, 2018 | 11:00 AM – 11:45 AM PT – Insights For Everyone – Deploying Data across your Organization – Learn how to deploy data at scale using AWS Analytics and QuickSight’s new reader role and usage based pricing.

 

AWS re:Invent
June 13, 2018 | 05:00 PM – 05:30 PM PTEpisode 2: AWS re:Invent Breakout Content Secret Sauce – Hear from one of our own AWS content experts as we dive deep into the re:Invent content strategy and how we maintain a high bar.
Compute

June 25, 2018 | 01:00 PM – 01:45 PM PTAccelerating Containerized Workloads with Amazon EC2 Spot Instances – Learn how to efficiently deploy containerized workloads and easily manage clusters at any scale at a fraction of the cost with Spot Instances.

June 26, 2018 | 01:00 PM – 01:45 PM PTEnsuring Your Windows Server Workloads Are Well-Architected – Get the benefits, best practices and tools on running your Microsoft Workloads on AWS leveraging a well-architected approach.

 

Containers
June 25, 2018 | 09:00 AM – 09:45 AM PTRunning Kubernetes on AWS – Learn about the basics of running Kubernetes on AWS including how setup masters, networking, security, and add auto-scaling to your cluster.

 

Databases

June 18, 2018 | 01:00 PM – 01:45 PM PTOracle to Amazon Aurora Migration, Step by Step – Learn how to migrate your Oracle database to Amazon Aurora.
DevOps

June 20, 2018 | 09:00 AM – 09:45 AM PTSet Up a CI/CD Pipeline for Deploying Containers Using the AWS Developer Tools – Learn how to set up a CI/CD pipeline for deploying containers using the AWS Developer Tools.

 

Enterprise & Hybrid
June 18, 2018 | 09:00 AM – 09:45 AM PTDe-risking Enterprise Migration with AWS Managed Services – Learn how enterprise customers are de-risking cloud adoption with AWS Managed Services.

June 19, 2018 | 11:00 AM – 11:45 AM PTLaunch AWS Faster using Automated Landing Zones – Learn how the AWS Landing Zone can automate the set up of best practice baselines when setting up new

 

AWS Environments

June 21, 2018 | 11:00 AM – 11:45 AM PTLeading Your Team Through a Cloud Transformation – Learn how you can help lead your organization through a cloud transformation.

June 21, 2018 | 01:00 PM – 01:45 PM PTEnabling New Retail Customer Experiences with Big Data – Learn how AWS can help retailers realize actual value from their big data and deliver on differentiated retail customer experiences.

June 28, 2018 | 01:00 PM – 01:45 PM PTFireside Chat: End User Collaboration on AWS – Learn how End User Compute services can help you deliver access to desktops and applications anywhere, anytime, using any device.
IoT

June 27, 2018 | 11:00 AM – 11:45 AM PTAWS IoT in the Connected Home – Learn how to use AWS IoT to build innovative Connected Home products.

 

Machine Learning

June 19, 2018 | 09:00 AM – 09:45 AM PTIntegrating Amazon SageMaker into your Enterprise – Learn how to integrate Amazon SageMaker and other AWS Services within an Enterprise environment.

June 21, 2018 | 09:00 AM – 09:45 AM PTBuilding Text Analytics Applications on AWS using Amazon Comprehend – Learn how you can unlock the value of your unstructured data with NLP-based text analytics.

 

Management Tools

June 20, 2018 | 01:00 PM – 01:45 PM PTOptimizing Application Performance and Costs with Auto Scaling – Learn how selecting the right scaling option can help optimize application performance and costs.

 

Mobile
June 25, 2018 | 11:00 AM – 11:45 AM PTDrive User Engagement with Amazon Pinpoint – Learn how Amazon Pinpoint simplifies and streamlines effective user engagement.

 

Security, Identity & Compliance

June 26, 2018 | 09:00 AM – 09:45 AM PTUnderstanding AWS Secrets Manager – Learn how AWS Secrets Manager helps you rotate and manage access to secrets centrally.
June 28, 2018 | 09:00 AM – 09:45 AM PTUsing Amazon Inspector to Discover Potential Security Issues – See how Amazon Inspector can be used to discover security issues of your instances.

 

Serverless

June 19, 2018 | 01:00 PM – 01:45 PM PTProductionize Serverless Application Building and Deployments with AWS SAM – Learn expert tips and techniques for building and deploying serverless applications at scale with AWS SAM.

 

Storage

June 26, 2018 | 11:00 AM – 11:45 AM PTDeep Dive: Hybrid Cloud Storage with AWS Storage Gateway – Learn how you can reduce your on-premises infrastructure by using the AWS Storage Gateway to connecting your applications to the scalable and reliable AWS storage services.
June 27, 2018 | 01:00 PM – 01:45 PM PTChanging the Game: Extending Compute Capabilities to the Edge – Discover how to change the game for IIoT and edge analytics applications with AWS Snowball Edge plus enhanced Compute instances.
June 28, 2018 | 11:00 AM – 11:45 AM PTBig Data and Analytics Workloads on Amazon EFS – Get best practices and deployment advice for running big data and analytics workloads on Amazon EFS.