Tag Archives: Amazon Kinesis

New – AWS SAM Local (Beta) – Build and Test Serverless Applications Locally

Post Syndicated from Randall Hunt original https://aws.amazon.com/blogs/aws/new-aws-sam-local-beta-build-and-test-serverless-applications-locally/

Today we’re releasing a beta of a new tool, SAM Local, that makes it easy to build and test your serverless applications locally. In this post we’ll use SAM local to build, debug, and deploy a quick application that allows us to vote on tabs or spaces by curling an endpoint. AWS introduced Serverless Application Model (SAM) last year to make it easier for developers to deploy serverless applications. If you’re not already familiar with SAM my colleague Orr wrote a great post on how to use SAM that you can read in about 5 minutes. At it’s core, SAM is a powerful open source specification built on AWS CloudFormation that makes it easy to keep your serverless infrastructure as code – and they have the cutest mascot.

SAM Local takes all the good parts of SAM and brings them to your local machine.

There are a couple of ways to install SAM Local but the easiest is through NPM. A quick npm install -g aws-sam-local should get us going but if you want the latest version you can always install straight from the source: go get github.com/awslabs/aws-sam-local (this will create a binary named aws-sam-local, not sam).

I like to vote on things so let’s write a quick SAM application to vote on Spaces versus Tabs. We’ll use a very simple, but powerful, architecture of API Gateway fronting a Lambda function and we’ll store our results in DynamoDB. In the end a user should be able to curl our API curl https://SOMEURL/ -d '{"vote": "spaces"}' and get back the number of votes.

Let’s start by writing a simple SAM template.yaml:

AWSTemplateFormatVersion : '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
  VotesTable:
    Type: "AWS::Serverless::SimpleTable"
  VoteSpacesTabs:
    Type: "AWS::Serverless::Function"
    Properties:
      Runtime: python3.6
      Handler: lambda_function.lambda_handler
      Policies: AmazonDynamoDBFullAccess
      Environment:
        Variables:
          TABLE_NAME: !Ref VotesTable
      Events:
        Vote:
          Type: Api
          Properties:
            Path: /
            Method: post

So we create a [dynamo_i] table that we expose to our Lambda function through an environment variable called TABLE_NAME.

To test that this template is valid I’ll go ahead and call sam validate to make sure I haven’t fat-fingered anything. It returns Valid! so let’s go ahead and get to work on our Lambda function.

import os
import os
import json
import boto3
votes_table = boto3.resource('dynamodb').Table(os.getenv('TABLE_NAME'))

def lambda_handler(event, context):
    print(event)
    if event['httpMethod'] == 'GET':
        resp = votes_table.scan()
        return {'body': json.dumps({item['id']: int(item['votes']) for item in resp['Items']})}
    elif event['httpMethod'] == 'POST':
        try:
            body = json.loads(event['body'])
        except:
            return {'statusCode': 400, 'body': 'malformed json input'}
        if 'vote' not in body:
            return {'statusCode': 400, 'body': 'missing vote in request body'}
        if body['vote'] not in ['spaces', 'tabs']:
            return {'statusCode': 400, 'body': 'vote value must be "spaces" or "tabs"'}

        resp = votes_table.update_item(
            Key={'id': body['vote']},
            UpdateExpression='ADD votes :incr',
            ExpressionAttributeValues={':incr': 1},
            ReturnValues='ALL_NEW'
        )
        return {'body': "{} now has {} votes".format(body['vote'], resp['Attributes']['votes'])}

So let’s test this locally. I’ll need to create a real DynamoDB database to talk to and I’ll need to provide the name of that database through the enviornment variable TABLE_NAME. I could do that with an env.json file or I can just pass it on the command line. First, I can call:
$ echo '{"httpMethod": "POST", "body": "{\"vote\": \"spaces\"}"}' |\
TABLE_NAME="vote-spaces-tabs" sam local invoke "VoteSpacesTabs"

to test the Lambda – it returns the number of votes for spaces so theoritically everything is working. Typing all of that out is a pain so I could generate a sample event with sam local generate-event api and pass that in to the local invocation. Far easier than all of that is just running our API locally. Let’s do that: sam local start-api. Now I can curl my local endpoints to test everything out.
I’ll run the command: $ curl -d '{"vote": "tabs"}' http://127.0.0.1:3000/ and it returns: “tabs now has 12 votes”. Now, of course I did not write this function perfectly on my first try. I edited and saved several times. One of the benefits of hot-reloading is that as I change the function I don’t have to do any additional work to test the new function. This makes iterative development vastly easier.

Let’s say we don’t want to deal with accessing a real DynamoDB database over the network though. What are our options? Well we can download DynamoDB Local and launch it with java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb. Then we can have our Lambda function use the AWS_SAM_LOCAL environment variable to make some decisions about how to behave. Let’s modify our function a bit:

import os
import json
import boto3
if os.getenv("AWS_SAM_LOCAL"):
    votes_table = boto3.resource(
        'dynamodb',
        endpoint_url="http://docker.for.mac.localhost:8000/"
    ).Table("spaces-tabs-votes")
else:
    votes_table = boto3.resource('dynamodb').Table(os.getenv('TABLE_NAME'))

Now we’re using a local endpoint to connect to our local database which makes working without wifi a little easier.

SAM local even supports interactive debugging! In Java and Node.js I can just pass the -d flag and a port to immediately enable the debugger. For Python I could use a library like import epdb; epdb.serve() and connect that way. Then we can call sam local invoke -d 8080 "VoteSpacesTabs" and our function will pause execution waiting for you to step through with the debugger.

Alright, I think we’ve got everything working so let’s deploy this!

First I’ll call the sam package command which is just an alias for aws cloudformation package and then I’ll use the result of that command to sam deploy.

$ sam package --template-file template.yaml --s3-bucket MYAWESOMEBUCKET --output-template-file package.yaml
Uploading to 144e47a4a08f8338faae894afe7563c3  90570 / 90570.0  (100.00%)
Successfully packaged artifacts and wrote output template to file package.yaml.
Execute the following command to deploy the packaged template
aws cloudformation deploy --template-file package.yaml --stack-name 
$ sam deploy --template-file package.yaml --stack-name VoteForSpaces --capabilities CAPABILITY_IAM
Waiting for changeset to be created..
Waiting for stack create/update to complete
Successfully created/updated stack - VoteForSpaces

Which brings us to our API:
.

I’m going to hop over into the production stage and add some rate limiting in case you guys start voting a lot – but otherwise we’ve taken our local work and deployed it to the cloud without much effort at all. I always enjoy it when things work on the first deploy!

You can vote now and watch the results live! http://spaces-or-tabs.s3-website-us-east-1.amazonaws.com/

We hope that SAM Local makes it easier for you to test, debug, and deploy your serverless apps. We have a CONTRIBUTING.md guide and we welcome pull requests. Please tweet at us to let us know what cool things you build. You can see our What’s New post here and the documentation is live here.

Randall

AWS Encryption SDK: How to Decide if Data Key Caching Is Right for Your Application

Post Syndicated from June Blender original https://aws.amazon.com/blogs/security/aws-encryption-sdk-how-to-decide-if-data-key-caching-is-right-for-your-application/

AWS KMS image

Today, the AWS Crypto Tools team introduced a new feature in the AWS Encryption SDK: data key caching. Data key caching lets you reuse the data keys that protect your data, instead of generating a new data key for each encryption operation.

Data key caching can reduce latency, improve throughput, reduce cost, and help you stay within service limits as your application scales. In particular, caching might help if your application is hitting the AWS Key Management Service (KMS) requests-per-second limit and raising the limit does not solve the problem.

However, these benefits come with some security tradeoffs. Encryption best practices generally discourage extensive reuse of data keys.

In this blog post, I explore those tradeoffs and provide information that can help you decide whether data key caching is a good strategy for your application. I also explain how data key caching is implemented in the AWS Encryption SDK and describe the security thresholds that you can set to limit the reuse of data keys. Finally, I provide some practical examples of using the security thresholds to meet cost, performance, and security goals.

Introducing data key caching

The AWS Encryption SDK is a client-side encryption library that makes it easier for you to implement cryptography best practices in your application. It includes secure default behavior for developers who are not encryption experts, while being flexible enough to work for the most experienced users.

In the AWS Encryption SDK, by default, you generate a new data key for each encryption operation. This is the most secure practice. However, in some applications, the overhead of generating a new data key for each operation is not acceptable.

Data key caching saves the plaintext and ciphertext of the data keys you use in a configurable cache. When you need a key to encrypt or decrypt data, you can reuse a data key from the cache instead of creating a new data key. You can create multiple data key caches and configure each one independently. Most importantly, the AWS Encryption SDK provides security thresholds that you can set to determine how much data key reuse you will allow.

To make data key caching easier to implement, the AWS Encryption SDK provides LocalCryptoMaterialsCache, an in-memory, least-recently-used cache with a configurable size. The SDK manages the cache for you, including adding store, search, and match logic to all encryption and decryption operations.

We recommend that you use LocalCryptoMaterialsCache as it is, but you can customize it, or substitute a compatible cache. However, you should never store plaintext data keys on disk.

The AWS Encryption SDK documentation includes sample code in Java and Python for an application that uses data key caching to encrypt data sent to and from Amazon Kinesis Streams.

Balance cost and security

Your decision to use data key caching should balance cost—in time, money, and resources—against security. In every consideration, though, the balance should favor your security requirements. As a rule, use the minimal caching required to achieve your cost and performance goals.

Before implementing data key caching, consider the details of your applications, your security requirements, and the cost and frequency of your encryption operations. In general, your application can benefit from data key caching if each operation is slow or expensive, or if you encrypt and decrypt data frequently. If the cost and speed of your encryption operations are already acceptable or can be improved by other means, do not use a data key cache.

Data key caching can be the right choice for your application if you have high encryption and decryption traffic. For example, if you are hitting your KMS requests-per-second limit, caching can help because you get some of your data keys from the cache instead of calling KMS for every request.

However, you can also create a case in the AWS Support Center to raise the KMS limit for your account. If raising the limit solves the problem, you do not need data key caching.

Configure caching thresholds for cost and security

In the AWS Encryption SDK, you can configure data key caching to allow just enough data key reuse to meet your cost and performance targets while conforming to the security requirements of your application. The SDK enforces the thresholds so that you can use them with any compatible cache.

The data key caching security thresholds apply to each cache entry. The AWS Encryption SDK will not use the data key from a cache entry that exceeds any of the thresholds that you set.

  • Maximum age (required): Set the lifetime of each cached key to be long enough to get cache hits, but short enough to limit exposure of a plaintext data key in memory to a specific time period.

You can use the maximum age threshold like a key rotation policy. Use it to limit the reuse of data keys and minimize exposure of cryptographic materials. You can also use it to evict data keys when the type or source of data that your application is processing changes.

  • Maximum messages encrypted (optional; default is 232 messages): Set the number of messages protected by each cached data key to be large enough to get value from reuse, but small enough to limit the number of messages that might potentially be exposed.

The AWS Encryption SDK only caches data keys that use an algorithm suite with a key derivation function. This technique avoids the cryptographic limits on the number of bytes encrypted with a single key. However, the more data that a key encrypts, the more data that is exposed if the data key is compromised.

Limiting the number of messages, rather than the number of bytes, is particularly useful if your application encrypts many messages of a similar size or when potential exposure must be limited to very few messages. This threshold is also useful when you want to reuse a data key for a particular type of message and know in advance how many messages of that type you have. You can also use an encryption context to select particular cached data keys for your encryption requests.

  • Maximum bytes encrypted (optional; default is 263 – 1): Set the bytes protected by each cached data key to be large enough to allow the reuse you need, but small enough to limit the amount of data encrypted under the same key.

Limiting the number of bytes, rather than the number of messages, is preferable when your application encrypts messages of widely varying size or when possibly exposing large amounts of data is much more of a concern than exposing smaller amounts of data.

In addition to these security thresholds, the LocalCryptoMaterialsCache in the AWS Encryption SDK lets you set its capacity, which is the maximum number of entries the cache can hold.

Use the capacity value to tune the performance of your LocalCryptoMaterialsCache. In general, use the smallest value that will achieve the performance improvements that your application requires. You might want to test with a very small cache of 5–10 entries and expand if necessary. You will need a slightly larger cache if you are using the cache for both encryption and decryption requests, or if you are using encryption contexts to select particular cache entries.

Consider these cache configuration examples

After you determine the security and performance requirements of your application, consider the cache security thresholds carefully and adjust them to meet your needs. There are no magic numbers for these thresholds: the ideal settings are specific to each application, its security and performance requirements, and budget. Use the minimal amount of caching necessary to get acceptable performance and cost.

The following examples show ways you can use the LocalCryptoMaterialsCache capacity setting and the security thresholds to help meet your security requirements:

  • Slow master key operations: If your master key processes only 100 transactions per second (TPS) but your application needs to process 1,000 TPS, you can meet your application requirements by allowing a maximum of 10 messages to be protected under each data key.
  • High frequency and volume: If your master key costs $0.01 per operation and you need to process a consistent 1,000 TPS while staying within a budget of $100,000 per month, allow a maximum of 275 messages for each cache entry.
  • Burst traffic: If your application’s processing bursts to 100 TPS for five seconds in each minute but is otherwise zero, and your master key costs $0.01 per operation, setting maximum messages to 3 can achieve significant savings. To prevent data keys from being reused across bursts (55 seconds), set the maximum age of each cached data key to 20 seconds.
  • Expensive master key operations: If your application uses a low-throughput encryption service that costs as much as $1.00 per operation, you might want to minimize the number of operations. To do so, create a cache that is large enough to contain the data keys you need. Then, set the byte and message limits high enough to allow reuse while conforming to your security requirements. For example, if your security requirements do not permit a data key to encrypt more than 10 GB of data, setting bytes processed to 10 GB still significantly minimizes operations and conforms to your security requirements.

Learn more about data key caching

To learn more about data key caching, including how to implement it, how to set the security thresholds, and details about the caching components, see Data Key Caching in the AWS Encryption SDK. Also, see the AWS Encryption SDKs for Java and Python as well as the Javadoc and Python documentation.

If you have comments about this blog post, submit them in the “Comments” section below. If you have questions, file an issue in the GitHub repos for the Encryption SDK in Java or Python, or start a new thread on the KMS forum.

– June

AWS CloudFormation Supports Amazon Kinesis Analytics Applications

Post Syndicated from Ryan Nienhuis original https://aws.amazon.com/blogs/big-data/aws-cloudformation-supports-amazon-kinesis-analytics-applications/

You can now provision and manage resources for Amazon Kinesis Analytics applications using AWS CloudFormation.  Kinesis Analytics is the easiest way to process streaming data in real time with standard SQL, without having to learn new programming languages or processing frameworks. Kinesis Analytics enables you to query streaming data or build entire streaming applications using SQL. Using the service, you gain actionable insights and can respond to your business and customer needs promptly.

Customers can create CloudFormation templates that easily create or update Kinesis Analytics applications. Typically, a template is used as a way to manage code across different environments, or to prototype a new streaming data solution quickly.

We have created two sample templates using past AWS Big Data Blog posts that referenced Kinesis Analytics.

For more information about the new feature, see the AWS Cloudformation User Guide.

 

AWS Hot Startups – July 2017

Post Syndicated from Tina Barr original https://aws.amazon.com/blogs/aws/aws-hot-startups-july-2017/

Welcome back to another month of Hot Startups! Every day, startups are creating innovative and exciting businesses, applications, and products around the world. Each month we feature a handful of startups doing cool things using AWS.

July is all about learning! These companies are focused on providing access to tools and resources to expand knowledge and skills in different ways.

This month’s startups:

  • CodeHS – provides fun and accessible computer science curriculum for middle and high schools.
  • Insight – offers intensive fellowships to grow technical talent in Data Science.
  • iTranslate – enables people to read, write, and speak in over 90 languages, anywhere in the world.

CodeHS (San Francisco, CA)

In 2012, Stanford students Zach Galant and Jeremy Keeshin were computer science majors and TAs for introductory classes when they noticed a trend among their peers. Many wished that they had been exposed to computer science earlier in life. In their senior year, Zach and Jeremy launched CodeHS to give middle and high schools the opportunity to provide a fun, accessible computer science education to students everywhere. CodeHS is a web-based curriculum pathway complete with teacher resources, lesson plans, and professional development opportunities. The curriculum is supplemented with time-saving teacher tools to help with lesson planning, grading and reviewing student code, and managing their classroom.

CodeHS aspires to empower all students to meaningfully impact the future, and believe that coding is becoming a new foundational skill, along with reading and writing, that allows students to further explore any interest or area of study. At the time CodeHS was founded in 2012, only 10% of high schools in America offered a computer science course. Zach and Jeremy set out to change that by providing a solution that made it easy for schools and districts to get started. With CodeHS, thousands of teachers have been trained and are teaching hundreds of thousands of students all over the world. To use CodeHS, all that’s needed is the internet and a web browser. Students can write and run their code online, and teachers can immediately see what the students are working on and how they are doing.

Amazon EC2, Amazon RDS, Amazon ElastiCache, Amazon CloudFront, and Amazon S3 make it possible for CodeHS to scale their site to meet the needs of schools all over the world. CodeHS also relies on AWS to compile and run student code in the browser, which is extremely important when teaching server-side languages like Java that powers the AP course. Since usage rises and falls based on school schedules, Amazon CloudWatch and ELBs are used to easily scale up when students are running code so they have a seamless experience.

Be sure to visit the CodeHS website, and to learn more about bringing computer science to your school, click here!

Insight (Palo Alto, CA)

Insight was founded in 2012 to create a new educational model, optimize hiring for data teams, and facilitate successful career transitions among data professionals. Over the last 5 years, Insight has kept ahead of market trends and launched a series of professional training fellowships including Data Science, Health Data Science, Data Engineering, and Artificial Intelligence. Finding individuals with the right skill set, background, and culture fit is a challenge for big companies and startups alike, and Insight is focused on developing top talent through intensive 7-week fellowships. To date, Insight has over 1,000 alumni at over 350 companies including Amazon, Google, Netflix, Twitter, and The New York Times.

The Data Engineering team at Insight is well-versed in the current ecosystem of open source tools and technologies and provides mentorship on the best practices in this space. The technical teams are continually working with external groups in a variety of data advisory and mentorship capacities, but the majority of Insight partners participate in professional sessions. Companies visit the Insight office to speak with fellows in an informal setting and provide details on the type of work they are doing and how their teams are growing. These sessions have proved invaluable as fellows experience a significantly better interview process and companies yield engaged and enthusiastic new team members.

An important aspect of Insight’s fellowships is the opportunity for hands-on work, focusing on everything from building big-data pipelines to contributing novel features to industry-standard open source efforts. Insight provides free AWS resources for all fellows to use, in addition to mentorships from the Data Engineering team. Fellows regularly utilize Amazon S3, Amazon EC2, Amazon Kinesis, Amazon EMR, AWS Lambda, Amazon Redshift, Amazon RDS, among other services. The experience with AWS gives fellows a solid skill set as they transition into the industry. Fellowships are currently being offered in Boston, New York, Seattle, and the Bay Area.

Check out the Insight blog for more information on trends in data infrastructure, artificial intelligence, and cutting-edge data products.

 

iTranslate (Austria)

When the App Store was introduced in 2008, the founders of iTranslate saw an opportunity to be part of something big. The group of four fully believed that the iPhone and apps were going to change the world, and together they brainstormed ideas for their own app. The combination of translation and mobile devices seemed a natural fit, and by 2009 iTranslate was born. iTranslate’s mission is to enable travelers, students, business professionals, employers, and medical staff to read, write, and speak in all languages, anywhere in the world. The app allows users to translate text, voice, websites and more into nearly 100 languages on various platforms. Today, iTranslate is the leading player for conversational translation and dictionary apps, with more than 60 million downloads and 6 million monthly active users.

iTranslate is breaking language barriers through disruptive technology and innovation, enabling people to translate in real time. The app has a variety of features designed to optimize productivity including offline translation, website and voice translation, and language auto detection. iTranslate also recently launched the world’s first ear translation device in collaboration with Bragi, a company focused on smart earphones. The Dash Pro allows people to communicate freely, while having a personal translator right in their ear.

iTranslate started using Amazon Polly soon after it was announced. CEO Alexander Marktl said, “As the leading translation and dictionary app, it is our mission at iTranslate to provide our users with the best possible tools to read, write, and speak in all languages across the globe. Amazon Polly provides us with the ability to efficiently produce and use high quality, natural sounding synthesized speech.” The stable and simple-to-use API, low latency, and free caching allow iTranslate to scale as they continue adding features to their app. Customers also enjoy the option to change speech rate and change between male and female voices. To assure quality, speed, and reliability of their products, iTranslate also uses Amazon EC2, Amazon S3, and Amazon Route 53.

To get started with iTranslate, visit their website here.

—–

Thanks for reading!

-Tina

New: Server-Side Encryption for Amazon Kinesis Streams

Post Syndicated from Tara Walker original https://aws.amazon.com/blogs/aws/new-server-side-encryption-for-amazon-kinesis-streams/

In this age of smart homes, big data, IoT devices, mobile phones, social networks, chatbots, and game consoles, streaming data scenarios are everywhere. Amazon Kinesis Streams enables you to build custom applications that can capture, process, analyze, and store terabytes of data per hour from thousands of streaming data sources. Since Amazon Kinesis Streams allows applications to process data concurrently from the same Kinesis stream, you can build parallel processing systems. For example, you can emit processed data to Amazon S3, perform complex analytics with Amazon Redshift, and even build robust, serverless streaming solutions using AWS Lambda.

Kinesis Streams enables several streaming use cases for consumers, and now we are making the service more effective for securing your data in motion by adding server-side encryption (SSE) support for Kinesis Streams. With this new Kinesis Streams feature, you can now enhance the security of your data and/or meet any regulatory and compliance requirements for any of your organization’s data streaming needs.
In fact, Kinesis Streams is now one of the AWS Services in Scope for the Payment Card Industry Data Security Standard (PCI DSS) compliance program. PCI DSS is a proprietary information security standard administered by the PCI Security Standards Council founded by key financial institutions. PCI DSS compliance applies to all entities that store, process, or transmit cardholder data and/or sensitive authentication data which includes service providers. You can request the PCI DSS Attestation of Compliance and Responsibility Summary using AWS Artifact. But the good news about compliance with Kinesis Streams doesn’t stop there. Kinesis Streams is now also FedRAMP compliant in AWS GovCloud. FedRAMP stands for Federal Risk and Authorization Management Program and is a U.S. government-wide program that delivers a standard approach to the security assessment, authorization, and continuous monitoring for cloud products and services. You can learn more about FedRAMP compliance with AWS Services here.

Now are you ready to get into the keys? Get it, instead of get into the weeds. Okay a little corny, but it was the best I could do. Coming back to discussing SSE for Kinesis Streams, let me explain the flow of server-side encryption with Kinesis.  Each data record and partition key put into a Kinesis Stream using the PutRecord or PutRecords API is encrypted using an AWS Key Management Service (KMS) master key. With the AWS Key Management Service (KMS) master key, Kinesis Streams uses the 256-bit Advanced Encryption Standard (AES-256 GCM algorithm) to add encryption to the incoming data.

In order to enable server-side encryption with Kinesis Streams for new or existing streams, you can use the Kinesis management console or leverage one of the available AWS SDKs.  Additionally, you can audit the history of your stream encryption, validate the encryption status of a certain stream in the Kinesis Streams console, or check that the PutRecord or GetRecord transactions are encrypted using the AWS CloudTrail service.

 

Walkthrough: Kinesis Streams Server-Side Encryption

Let’s do a quick walkthrough of server-side encryption with Kinesis Streams. First, I’ll go to the Amazon Kinesis console and select the Streams console option.

Once in the Kinesis Streams console, I can add server-side encryption to one of my existing Kinesis streams or opt to create a new Kinesis stream.  For this walkthrough, I’ll opt to quickly create a new Kinesis stream, therefore, I’ll select the Create Kinesis stream button.

I’ll name my stream, KinesisSSE-stream, and allocate one shard for my stream. Remember that the data capacity of your stream is calculated based upon the number of shards specified for the stream.  You can use the Estimate the number of shards you’ll need dropdown within the console or read more calculations to estimate the number of shards in a stream here.  To complete the creation of my stream, now I click the Create Kinesis stream button.

 

With my KinesisSSE-stream created, I will select it in the dashboard and choose the Actions dropdown and select the Details option.


On the Details page of the KinesisSSE-stream, there is now a Server-side encryption section.  In this section, I will select the Edit button.

 

 

Now I can enable server-side encryption for my stream with an AWS KMS master key, by selecting the Enabled radio button. Once selected I can choose which AWS KMS master key to use for the encryption of  data in KinesisSSE-stream. I can either select the KMS master key generated by the Kinesis service, (Default) aws/kinesis, or select one of my own KMS master keys that I have previously generated.  I’ll select the default master key and all that is left is for me to click the Save button.


That’s it!  As you can see from my screenshots below, after only about 20 seconds, server-side encryption was added to my Kinesis stream and now any incoming data into my stream will be encrypted.  One thing to note is server-side encryption only encrypts incoming data after encryption has been enabled. Preexisting data that is in a Kinesis stream prior to server-side encryption being enabled will remain unencrypted.

 

Summary

Kinesis Streams with Server-side encryption using AWS KMS keys makes it easy for you to automatically encrypt the streaming data coming into your  stream. You can start, stop, or update server-side encryption for any Kinesis stream using the AWS management console or the AWS SDK. To learn more about Kinesis Server-Side encryption, AWS Key Management Service, or about Kinesis Streams review the Amazon Kinesis getting started guide, the AWS Key Management Service developer guide, or the Amazon Kinesis product page.

 

Enjoy streaming.

Tara

Analyze OpenFDA Data in R with Amazon S3 and Amazon Athena

Post Syndicated from Ryan Hood original https://aws.amazon.com/blogs/big-data/analyze-openfda-data-in-r-with-amazon-s3-and-amazon-athena/

One of the great benefits of Amazon S3 is the ability to host, share, or consume public data sets. This provides transparency into data to which an external data scientist or developer might not normally have access. By exposing the data to the public, you can glean many insights that would have been difficult with a data silo.

The openFDA project creates easy access to the high value, high priority, and public access data of the Food and Drug Administration (FDA). The data has been formatted and documented in consumer-friendly standards. Critical data related to drugs, devices, and food has been harmonized and can easily be called by application developers and researchers via API calls. OpenFDA has published two whitepapers that drill into the technical underpinnings of the API infrastructure as well as how to properly analyze the data in R. In addition, FDA makes openFDA data available on S3 in raw format.

In this post, I show how to use S3, Amazon EMR, and Amazon Athena to analyze the drug adverse events dataset. A drug adverse event is an undesirable experience associated with the use of a drug, including serious drug side effects, product use errors, product quality programs, and therapeutic failures.

Data considerations

Keep in mind that this data does have limitations. In addition, in the United States, these adverse events are submitted to the FDA voluntarily from consumers so there may not be reports for all events that occurred. There is no certainty that the reported event was actually due to the product. The FDA does not require that a causal relationship between a product and event be proven, and reports do not always contain the detail necessary to evaluate an event. Because of this, there is no way to identify the true number of events. The important takeaway to all this is that the information contained in this data has not been verified to produce cause and effect relationships. Despite this disclaimer, many interesting insights and value can be derived from the data to accelerate drug safety research.

Data analysis using SQL

For application developers who want to perform targeted searching and lookups, the API endpoints provided by the openFDA project are “ready to go” for software integration using a standard API powered by Elasticsearch, NodeJS, and Docker. However, for data analysis purposes, it is often easier to work with the data using SQL and statistical packages that expect a SQL table structure. For large-scale analysis, APIs often have query limits, such as 5000 records per query. This can cause extra work for data scientists who want to analyze the full dataset instead of small subsets of data.

To address the concern of requiring all the data in a single dataset, the openFDA project released the full 100 GB of harmonized data files that back the openFDA project onto S3. Athena is an interactive query service that makes it easy to analyze data in S3 using standard SQL. It’s a quick and easy way to answer your questions about adverse events and aspirin that does not require you to spin up databases or servers.

While you could point tools directly at the openFDA S3 files, you can find greatly improved performance and use of the data by following some of the preparation steps later in this post.

Architecture

This post explains how to use the following architecture to take the raw data provided by openFDA, leverage several AWS services, and derive meaning from the underlying data.

Steps:

  1. Load the openFDA /drug/event dataset into Spark and convert it to gzip to allow for streaming.
  2. Transform the data in Spark and save the results as a Parquet file in S3.
  3. Query the S3 Parquet file with Athena.
  4. Perform visualization and analysis of the data in R and Python on Amazon EC2.

Optimizing public data sets: A primer on data preparation

Those who want to jump right into preparing the files for Athena may want to skip ahead to the next section.

Transforming, or pre-processing, files is a common task for using many public data sets. Before you jump into the specific steps for transforming the openFDA data files into a format optimized for Athena, I thought it would be worthwhile to provide a quick exploration on the problem.

Making a dataset in S3 efficiently accessible with minimal transformation for the end user has two key elements:

  1. Partitioning the data into objects that contain a complete part of the data (such as data created within a specific month).
  2. Using file formats that make it easy for applications to locate subsets of data (for example, gzip, Parquet, ORC, etc.).

With these two key elements in mind, you can now apply transformations to the openFDA adverse event data to prepare it for Athena. You might find the data techniques employed in this post to be applicable to many of the questions you might want to ask of the public data sets stored in Amazon S3.

Before you get started, I encourage those who are interested in doing deeper healthcare analysis on AWS to make sure that you first read the AWS HIPAA Compliance whitepaper. This covers the information necessary for processing and storing patient health information (PHI).

Also, the adverse event analysis shown for aspirin is strictly for demonstration purposes and should not be used for any real decision or taken as anything other than a demonstration of AWS capabilities. However, there have been robust case studies published that have explored a causal relationship between aspirin and adverse reactions using OpenFDA data. If you are seeking research on aspirin or its risks, visit organizations such as the Centers for Disease Control and Prevention (CDC) or the Institute of Medicine (IOM).

Preparing data for Athena

For this walkthrough, you will start with the FDA adverse events dataset, which is stored as JSON files within zip archives on S3. You then convert it to Parquet for analysis. Why do you need to convert it? The original data download is stored in objects that are partitioned by quarter.

Here is a small sample of what you find in the adverse events (/drugs/event) section of the openFDA website.

If you were looking for events that happened in a specific quarter, this is not a bad solution. For most other scenarios, such as looking across the full history of aspirin events, it requires you to access a lot of data that you won’t need. The zip file format is not ideal for using data in place because zip readers must have random access to the file, which means the data can’t be streamed. Additionally, the zip files contain large JSON objects.

To read the data in these JSON files, a streaming JSON decoder must be used or a computer with a significant amount of RAM must decode the JSON. Opening up these files for public consumption is a great start. However, you still prepare the data with a few lines of Spark code so that the JSON can be streamed.

Step 1:  Convert the file types

Using Apache Spark on EMR, you can extract all of the zip files and pull out the events from the JSON files. To do this, use the Scala code below to deflate the zip file and create a text file. In addition, compress the JSON files with gzip to improve Spark’s performance and reduce your overall storage footprint. The Scala code can be run in either the Spark Shell or in an Apache Zeppelin notebook on your EMR cluster.

If you are unfamiliar with either Apache Zeppelin or the Spark Shell, the following posts serve as great references:

 

import scala.io.Source
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import org.apache.hadoop.io.compress.GzipCodec

// Input Directory
val inputFile = "s3://download.open.fda.gov/drug/event/2015q4/*.json.zip";

// Output Directory
val outputDir = "s3://{YOUR OUTPUT BUCKET HERE}/output/2015q4/";

// Extract zip files from 
val zipFiles = sc.binaryFiles(inputFile);

// Process zip file to extract the json as text file and save it
// in the output directory 
val rdd = zipFiles.flatMap((file: (String, PortableDataStream)) => {
    val zipStream = new ZipInputStream(file.2.open)
    val entry = zipStream.getNextEntry
    val iter = Source.fromInputStream(zipStream).getLines
    iter
}).map(.replaceAll("\s+","")).saveAsTextFile(outputDir, classOf[GzipCodec])

Step 2:  Transform JSON into Parquet

With just a few more lines of Scala code, you can use Spark’s abstractions to convert the JSON into a Spark DataFrame and then export the data back to S3 in Parquet format.

Spark requires the JSON to be in JSON Lines format to be parsed correctly into a DataFrame.

// Output Parquet directory
val outputDir = "s3://{YOUR OUTPUT BUCKET NAME}/output/drugevents"
// Input json file
val inputJson = "s3://{YOUR OUTPUT BUCKET NAME}/output/2015q4/*”
// Load dataframe from json file multiline 
val df = spark.read.json(sc.wholeTextFiles(inputJson).values)
// Extract results from dataframe
val results = df.select("results")
// Save it to Parquet
results.write.parquet(outputDir)

Step 3:  Create an Athena table

With the data cleanly prepared and stored in S3 using the Parquet format, you can now place an Athena table on top of it to get a better understanding of the underlying data.

Because the openFDA data structure incorporates several layers of nesting, it can be a complex process to try to manually derive the underlying schema in a Hive-compatible format. To shorten this process, you can load the top row of the DataFrame from the previous step into a Hive table within Zeppelin and then extract the “create  table” statement from SparkSQL.

results.createOrReplaceTempView("data")

val top1 = spark.sql("select * from data tablesample(1 rows)")

top1.write.format("parquet").mode("overwrite").saveAsTable("drugevents")

val show_cmd = spark.sql("show create table drugevents”).show(1, false)

This returns a “create table” statement that you can almost paste directly into the Athena console. Make some small modifications (adding the word “external” and replacing “using with “stored as”), and then execute the code in the Athena query editor. The table is created.

For the openFDA data, the DDL returns all string fields, as the date format used in your dataset does not conform to the yyy-mm-dd hh:mm:ss[.f…] format required by Hive. For your analysis, the string format works appropriately but it would be possible to extend this code to use a Presto function to convert the strings into time stamps.

CREATE EXTERNAL TABLE  drugevents (
   companynumb  string, 
   safetyreportid  string, 
   safetyreportversion  string, 
   receiptdate  string, 
   patientagegroup  string, 
   patientdeathdate  string, 
   patientsex  string, 
   patientweight  string, 
   serious  string, 
   seriousnesscongenitalanomali  string, 
   seriousnessdeath  string, 
   seriousnessdisabling  string, 
   seriousnesshospitalization  string, 
   seriousnesslifethreatening  string, 
   seriousnessother  string, 
   actiondrug  string, 
   activesubstancename  string, 
   drugadditional  string, 
   drugadministrationroute  string, 
   drugcharacterization  string, 
   drugindication  string, 
   drugauthorizationnumb  string, 
   medicinalproduct  string, 
   drugdosageform  string, 
   drugdosagetext  string, 
   reactionoutcome  string, 
   reactionmeddrapt  string, 
   reactionmeddraversionpt  string)
STORED AS parquet
LOCATION
  's3://{YOUR TARGET BUCKET}/output/drugevents'

With the Athena table in place, you can start to explore the data by running ad hoc queries within Athena or doing more advanced statistical analysis in R.

Using SQL and R to analyze adverse events

Using the openFDA data with Athena makes it very easy to translate your questions into SQL code and perform quick analysis on the data. After you have prepared the data for Athena, you can begin to explore the relationship between aspirin and adverse drug events, as an example. One of the most common metrics to measure adverse drug events is the Proportional Reporting Ratio (PRR). It is defined as:

PRR = (m/n)/( (M-m)/(N-n) )
Where
m = #reports with drug and event
n = #reports with drug
M = #reports with event in database
N = #reports in database

Gastrointestinal haemorrhage has the highest PRR of any reaction to aspirin when viewed in aggregate. One question you may want to ask is how the PRR has trended on a yearly basis for gastrointestinal haemorrhage since 2005.

Using the following query in Athena, you can see the PRR trend of “GASTROINTESTINAL HAEMORRHAGE” reactions with “ASPIRIN” since 2005:

with drug_and_event as 
(select rpad(receiptdate, 4, 'NA') as receipt_year
    , reactionmeddrapt
    , count(distinct (concat(safetyreportid,receiptdate,reactionmeddrapt))) as reports_with_drug_and_event 
from fda.drugevents
where rpad(receiptdate,4,'NA') 
     between '2005' and '2015' 
     and medicinalproduct = 'ASPIRIN'
     and reactionmeddrapt= 'GASTROINTESTINAL HAEMORRHAGE'
group by reactionmeddrapt, rpad(receiptdate, 4, 'NA') 
), reports_with_drug as 
(
select rpad(receiptdate, 4, 'NA') as receipt_year
    , count(distinct (concat(safetyreportid,receiptdate,reactionmeddrapt))) as reports_with_drug 
 from fda.drugevents 
 where rpad(receiptdate,4,'NA') 
     between '2005' and '2015' 
     and medicinalproduct = 'ASPIRIN'
group by rpad(receiptdate, 4, 'NA') 
), reports_with_event as 
(
   select rpad(receiptdate, 4, 'NA') as receipt_year
    , count(distinct (concat(safetyreportid,receiptdate,reactionmeddrapt))) as reports_with_event 
   from fda.drugevents
   where rpad(receiptdate,4,'NA') 
     between '2005' and '2015' 
     and reactionmeddrapt= 'GASTROINTESTINAL HAEMORRHAGE'
   group by rpad(receiptdate, 4, 'NA')
), total_reports as 
(
   select rpad(receiptdate, 4, 'NA') as receipt_year
    , count(distinct (concat(safetyreportid,receiptdate,reactionmeddrapt))) as total_reports 
   from fda.drugevents
   where rpad(receiptdate,4,'NA') 
     between '2005' and '2015' 
   group by rpad(receiptdate, 4, 'NA')
)
select  drug_and_event.receipt_year, 
(1.0 * drug_and_event.reports_with_drug_and_event/reports_with_drug.reports_with_drug)/ (1.0 * (reports_with_event.reports_with_event- drug_and_event.reports_with_drug_and_event)/(total_reports.total_reports-reports_with_drug.reports_with_drug)) as prr
, drug_and_event.reports_with_drug_and_event
, reports_with_drug.reports_with_drug
, reports_with_event.reports_with_event
, total_reports.total_reports
from drug_and_event
    inner join reports_with_drug on  drug_and_event.receipt_year = reports_with_drug.receipt_year   
    inner join reports_with_event on  drug_and_event.receipt_year = reports_with_event.receipt_year
    inner join total_reports on  drug_and_event.receipt_year = total_reports.receipt_year
order by  drug_and_event.receipt_year


One nice feature of Athena is that you can quickly connect to it via R or any other tool that can use a JDBC driver to visualize the data and understand it more clearly.

With this quick R script that can be run in R Studio either locally or on an EC2 instance, you can create a visualization of the PRR and Reporting Odds Ratio (RoR) for “GASTROINTESTINAL HAEMORRHAGE” reactions from “ASPIRIN” since 2005 to better understand these trends.

# connect to ATHENA
conn <- dbConnect(drv, '<Your JDBC URL>',s3_staging_dir="<Your S3 Location>",user=Sys.getenv(c("USER_NAME"),password=Sys.getenv(c("USER_PASSWORD"))

# Declare Adverse Event
adverseEvent <- "'GASTROINTESTINAL HAEMORRHAGE'"

# Build SQL Blocks
sqlFirst <- "SELECT rpad(receiptdate, 4, 'NA') as receipt_year, count(DISTINCT safetyreportid) as event_count FROM fda.drugsflat WHERE rpad(receiptdate,4,'NA') between '2005' and '2015'"
sqlEnd <- "GROUP BY rpad(receiptdate, 4, 'NA') ORDER BY receipt_year"

# Extract Aspirin with adverse event counts
sql <- paste(sqlFirst,"AND medicinalproduct ='ASPIRIN' AND reactionmeddrapt=",adverseEvent, sqlEnd,sep=" ")
aspirinAdverseCount = dbGetQuery(conn,sql)

# Extract Aspirin counts
sql <- paste(sqlFirst,"AND medicinalproduct ='ASPIRIN'", sqlEnd,sep=" ")
aspirinCount = dbGetQuery(conn,sql)

# Extract adverse event counts
sql <- paste(sqlFirst,"AND reactionmeddrapt=",adverseEvent, sqlEnd,sep=" ")
adverseCount = dbGetQuery(conn,sql)

# All Drug Adverse event Counts
sql <- paste(sqlFirst, sqlEnd,sep=" ")
allDrugCount = dbGetQuery(conn,sql)

# Select correct rows
selAll =  allDrugCount$receipt_year == aspirinAdverseCount$receipt_year
selAspirin = aspirinCount$receipt_year == aspirinAdverseCount$receipt_year
selAdverse = adverseCount$receipt_year == aspirinAdverseCount$receipt_year

# Calculate Numbers
m <- c(aspirinAdverseCount$event_count)
n <- c(aspirinCount[selAspirin,2])
M <- c(adverseCount[selAdverse,2])
N <- c(allDrugCount[selAll,2])

# Calculate proptional reporting ratio
PRR = (m/n)/((M-m)/(N-n))

# Calculate reporting Odds Ratio
d = n-m
D = N-M
ROR = (m/d)/(M/D)

# Plot the PRR and ROR
g_range <- range(0, PRR,ROR)
g_range[2] <- g_range[2] + 3
yearLen = length(aspirinAdverseCount$receipt_year)
axis(1,1:yearLen,lab=ax)
plot(PRR, type="o", col="blue", ylim=g_range,axes=FALSE, ann=FALSE)
axis(1,1:yearLen,lab=ax)
axis(2, las=1, at=1*0:g_range[2])
box()
lines(ROR, type="o", pch=22, lty=2, col="red")

As you can see, the PRR and RoR have both remained fairly steady over this time range. With the R Script above, all you need to do is change the adverseEvent variable from GASTROINTESTINAL HAEMORRHAGE to another type of reaction to analyze and compare those trends.

Summary

In this walkthrough:

  • You used a Scala script on EMR to convert the openFDA zip files to gzip.
  • You then transformed the JSON blobs into flattened Parquet files using Spark on EMR.
  • You created an Athena DDL so that you could query these Parquet files residing in S3.
  • Finally, you pointed the R package at the Athena table to analyze the data without pulling it into a database or creating your own servers.

If you have questions or suggestions, please comment below.


Next Steps

Take your skills to the next level. Learn how to optimize Amazon S3 for an architecture commonly used to enable genomic data analysis. Also, be sure to read more about running R on Amazon Athena.

 

 

 

 

 


About the Authors

Ryan Hood is a Data Engineer for AWS. He works on big data projects leveraging the newest AWS offerings. In his spare time, he enjoys watching the Cubs win the World Series and attempting to Sous-vide anything he can find in his refrigerator.

 

 

Vikram Anand is a Data Engineer for AWS. He works on big data projects leveraging the newest AWS offerings. In his spare time, he enjoys playing soccer and watching the NFL & European Soccer leagues.

 

 

Dave Rocamora is a Solutions Architect at Amazon Web Services on the Open Data team. Dave is based in Seattle and when he is not opening data, he enjoys biking and drinking coffee outside.

 

 

 

 

AWS Adds 12 More Services to Its PCI DSS Compliance Program

Post Syndicated from Sara Duffer original https://aws.amazon.com/blogs/security/aws-adds-12-more-services-to-its-pci-dss-compliance-program/

Twelve more AWS services have obtained Payment Card Industry Data Security Standard (PCI DSS) compliance, giving you more options, flexibility, and functionality to process and store sensitive payment card data in the AWS Cloud. The services were audited by Coalfire to ensure that they meet strict PCI DSS standards.

The newly compliant AWS services are:

AWS now offers 42 services that meet PCI DSS standards, putting administrators in better control of their frameworks and making workloads more efficient and cost effective.

For more information about the AWS PCI DSS compliance program, see Compliance Resources, AWS Services in Scope by Compliance Program, and PCI DSS Compliance.

– Sara

Perform Near Real-time Analytics on Streaming Data with Amazon Kinesis and Amazon Elasticsearch Service

Post Syndicated from Tristan Li original https://aws.amazon.com/blogs/big-data/perform-near-real-time-analytics-on-streaming-data-with-amazon-kinesis-and-amazon-elasticsearch-service/

Nowadays, streaming data is seen and used everywhere—from social networks, to mobile and web applications, IoT devices, instrumentation in data centers, and many other sources. As the speed and volume of this type of data increases, the need to perform data analysis in real time with machine learning algorithms and extract a deeper understanding from the data becomes ever more important. For example, you might want a continuous monitoring system to detect sentiment changes in a social media feed so that you can react to the sentiment in near real time.

In this post, we use Amazon Kinesis Streams to collect and store streaming data. We then use Amazon Kinesis Analytics to process and analyze the streaming data continuously. Specifically, we use the Kinesis Analytics built-in RANDOM_CUT_FOREST function, a machine learning algorithm, to detect anomalies in the streaming data. Finally, we use Amazon Kinesis Firehose to export the anomalies data to Amazon Elasticsearch Service (Amazon ES). We then build a simple dashboard in the open source tool Kibana to visualize the result.

Solution overview

The following diagram depicts a high-level overview of this solution.

Amazon Kinesis Streams

You can use Amazon Kinesis Streams to build your own streaming application. This application can process and analyze streaming data by continuously capturing and storing terabytes of data per hour from hundreds of thousands of sources.

Amazon Kinesis Analytics

Kinesis Analytics provides an easy and familiar standard SQL language to analyze streaming data in real time. One of its most powerful features is that there are no new languages, processing frameworks, or complex machine learning algorithms that you need to learn.

Amazon Kinesis Firehose

Kinesis Firehose is the easiest way to load streaming data into AWS. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service.

Amazon Elasticsearch Service

Amazon ES is a fully managed service that makes it easy to deploy, operate, and scale Elasticsearch for log analytics, full text search, application monitoring, and more.

Solution summary

The following is a quick walkthrough of the solution that’s presented in the diagram:

  1. IoT sensors send streaming data into Kinesis Streams. In this post, you use a Python script to simulate an IoT temperature sensor device that sends the streaming data.
  2. By using the built-in RANDOM_CUT_FOREST function in Kinesis Analytics, you can detect anomalies in real time with the sensor data that is stored in Kinesis Streams. RANDOM_CUT_FOREST is also an appropriate algorithm for many other kinds of anomaly-detection use cases—for example, the media sentiment example mentioned earlier in this post.
  3. The processed anomaly data is then loaded into the Kinesis Firehose delivery stream.
  4. By using the built-in integration that Kinesis Firehose has with Amazon ES, you can easily export the processed anomaly data into the service and visualize it with Kibana.

Implementation steps

The following sections walk through the implementation steps in detail.

Creating the delivery stream

  1. Open the Amazon Kinesis Streams console.
  2. Create a new Kinesis stream. Give it a name that indicates it’s for raw incoming stream data—for example, RawStreamData. For Number of shards, type 1.
  3. The Python code provided below simulates a streaming application, such as an IoT device, and generates random data and anomalies into a Kinesis stream. The code generates two temperature ranges, where the first range is the hypothetical sensor’s normal operating temperature range (10–20), and the second is the anomaly temperature range (100–120).Make sure to change the stream name on line 16 and 20 and the Region on line 6 to match your configuration. Alternatively, you can download the Amazon Kinesis Data Generator from this repository and use it to generate the data.
    import json
    import datetime
    import random
    import testdata
    from boto import kinesis
    
    kinesis = kinesis.connect_to_region("us-east-1")
    
    def getData(iotName, lowVal, highVal):
       data = {}
       data["iotName"] = iotName
       data["iotValue"] = random.randint(lowVal, highVal) 
       return data
    
    while 1:
       rnd = random.random()
       if (rnd < 0.01):
          data = json.dumps(getData("DemoSensor", 100, 120))  
          kinesis.put_record("RawStreamData", data, "DemoSensor")
          print '***************************** anomaly ************************* ' + data
       else:
          data = json.dumps(getData("DemoSensor", 10, 20))  
          kinesis.put_record("RawStreamData", data, "DemoSensor")
          print data

  4. Open the Amazon Elasticsearch Service console and create a new domain.
    1. Give the domain a unique name. In the Configure cluster screen, use the default settings.
    2. In the Set up access policy screen, in the Set the domain access policy list, choose Allow access to the domain from specific IP(s).
    3. Enter the public IP address of your computer.
      Note: If you’re working behind a proxy or firewall, see the “Use a proxy to simplify request signing” section in this AWS Database blog post to learn how to work with a proxy. For additional information about securing access to your Amazon ES domain, see How to Control Access to Your Amazon Elasticsearch Domain in the AWS Security Blog.
  5. After the Amazon ES domain is up and running, you can set up and configure Kinesis Firehose to export results to Amazon ES:
    1. Open the Amazon Kinesis Firehose console and choose Create Delivery Stream.
    2. In the Destination dropdown list, choose Amazon Elasticsearch Service.
    3. Type a stream name, and choose the Amazon ES domain that you created in Step 4.
    4. Provide an index name and ES type. In the S3 bucket dropdown list, choose Create New S3 bucket. Choose Next.
    5. In the configuration, change the Elasticsearch Buffer size to 1 MB and the Buffer interval to 60s. Use the default settings for all other fields. This shortens the time for the data to reach the ES cluster.
    6. Under IAM Role, choose Create/Update existing IAM role.
      The best practice is to create a new role every time. Otherwise, the console keeps adding policy documents to the same role. Eventually the size of the attached policies causes IAM to reject the role, but it does it in a non-obvious way, where the console basically quits functioning.
    7. Choose Next to move to the Review page.
  6. Review the configuration, and then choose Create Delivery Stream.
  7. Run the Python file for 1–2 minutes, and then press Ctrl+C to stop the execution. This loads some data into the stream for you to visualize in the next step.

Analyzing the data

Now it’s time to analyze the IoT streaming data using Amazon Kinesis Analytics.

  1. Open the Amazon Kinesis Analytics console and create a new application. Give the application a name, and then choose Create Application.
  2. On the next screen, choose Connect to a source. Choose the raw incoming data stream that you created earlier. (Note the stream name Source_SQL_STREAM_001 because you will need it later.)
  3. Use the default settings for everything else. When the schema discovery process is complete, it displays a success message with the formatted stream sample in a table as shown in the following screenshot. Review the data, and then choose Save and continue.
  4. Next, choose Go to SQL editor. When prompted, choose Yes, start application.
  5. Copy the following SQL code and paste it into the SQL editor window.
    CREATE OR REPLACE STREAM "TEMP_STREAM" (
       "iotName"        varchar (40),
       "iotValue"   integer,
       "ANOMALY_SCORE"  DOUBLE);
    -- Creates an output stream and defines a schema
    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "iotName"       varchar(40),
       "iotValue"       integer,
       "ANOMALY_SCORE"  DOUBLE,
       "created" TimeStamp);
     
    -- Compute an anomaly score for each record in the source stream
    -- using Random Cut Forest
    CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "TEMP_STREAM"
    SELECT STREAM "iotName", "iotValue", ANOMALY_SCORE FROM
      TABLE(RANDOM_CUT_FOREST(
        CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")
      )
    );
    
    -- Sort records by descending anomaly score, insert into output stream
    CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "iotName", "iotValue", ANOMALY_SCORE, ROWTIME FROM "TEMP_STREAM"
    ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

 

  1. Choose Save and run SQL.
    As the application is running, it displays the results as stream data arrives. If you don’t see any data coming in, run the Python script again to generate some fresh data. When there is data, it appears in a grid as shown in the following screenshot.Note that you are selecting data from the source stream name Source_SQL_STREAM_001 that you created previously. Also note the ANOMALY_SCORE column. This is the value that the Random_Cut_Forest function calculates based on the temperature ranges provided by the Python script. Higher (anomaly) temperature ranges have a higher score.Looking at the SQL code, note that the first two blocks of code create two new streams to store temporary data and the final result. The third block of code analyzes the raw source data (Stream_Pump_1) using the Random_Cut_Forest function. It calculates an anomaly score (ANOMALY_SCORE) and inserts it into the TEMP_STREAM stream. The final code block loads the result stored in the TEMP_STREAM into DESTINATION_SQL_STREAM.
  2. Choose Exit (done editing) next to the Save and run SQL button to return to the application configuration page.

Load processed data into the Kinesis Firehose delivery stream

Now, you can export the result from DESTINATION_SQL_STREAM into the Amazon Kinesis Firehose stream that you created previously.

  1. On the application configuration page, choose Connect to a destination.
  2. Choose the stream name that you created earlier, and use the default settings for everything else. Then choose Save and Continue.
  3. On the application configuration page, choose Exit to Kinesis Analytics applications to return to the Amazon Kinesis Analytics console.
  4. Run the Python script again for 4–5 minutes to generate enough data to flow through Amazon Kinesis Streams, Kinesis Analytics, Kinesis Firehose, and finally into the Amazon ES domain.
  5. Open the Kinesis Firehose console, choose the stream, and then choose the Monitoring
  6. As the processed data flows into Kinesis Firehose and Amazon ES, the metrics appear on the Delivery Stream metrics page. Keep in mind that the metrics page takes a few minutes to refresh with the latest data.
  7. Open the Amazon Elasticsearch Service dashboard in the AWS Management Console. The count in the Searchable documents column increases as shown in the following screenshot. In addition, the domain shows a cluster health of Yellow. This is because, by default, it needs two instances to deploy redundant copies of the index. To fix this, you can deploy two instances instead of one.

Visualize the data using Kibana

Now it’s time to launch Kibana and visualize the data.

  1. Use the ES domain link to go to the cluster detail page, and then choose the Kibana link as shown in the following screenshot.

    If you’re working behind a proxy or firewall, see the “Use a proxy to simplify request signing” section in this blog post to learn how to work with a proxy.
  2. In the Kibana dashboard, choose the Discover tab to perform a query.
  3. You can also visualize the data using the different types of charts offered by Kibana. For example, by going to the Visualize tab, you can quickly create a split bar chart that aggregates by ANOMALY_SCORE per minute.


Conclusion

In this post, you learned how to use Amazon Kinesis to collect, process, and analyze real-time streaming data, and then export the results to Amazon ES for analysis and visualization with Kibana. If you have comments about this post, add them to the “Comments” section below. If you have questions or issues with implementing this solution, please open a new thread on the Amazon Kinesis or Amazon ES discussion forums.


Next Steps

Take your skills to the next level. Learn real-time clickstream anomaly detection with Amazon Kinesis Analytics.

 


About the Author

Tristan Li is a Solutions Architect with Amazon Web Services. He works with enterprise customers in the US, helping them adopt cloud technology to build scalable and secure solutions on AWS.

 

 

 

 

Under the Hood of Server-Side Encryption for Amazon Kinesis Streams

Post Syndicated from Damian Wylie original https://aws.amazon.com/blogs/big-data/under-the-hood-of-server-side-encryption-for-amazon-kinesis-streams/

Customers are using Amazon Kinesis Streams to ingest, process, and deliver data in real time from millions of devices or applications. Use cases for Kinesis Streams vary, but a few common ones include IoT data ingestion and analytics, log processing, clickstream analytics, and enterprise data bus architectures.

Within milliseconds of data arrival, applications (KCL, Apache Spark, AWS Lambda, Amazon Kinesis Analytics) attached to a stream are continuously mining value or delivering data to downstream destinations. Customers are then scaling their streams elastically to match demand. They pay incrementally for the resources that they need, while taking advantage of a fully managed, serverless streaming data service that allows them to focus on adding value closer to their customers.

These benefits are great; however, AWS learned that many customers could not take advantage of Kinesis Streams unless their data-at-rest within a stream was encrypted. Many customers did not want to manage encryption on their own, so they asked for a fully managed, automatic, server-side encryption mechanism leveraging centralized AWS Key Management Service (AWS KMS) customer master keys (CMK).

Motivated by this feedback, AWS added another fully managed, low cost aspect to Kinesis Streams by delivering server-side encryption via KMS managed encryption keys (SSE-KMS) in the following regions:

  • US East (N. Virginia)
  • US West (Oregon)
  • US West (N. California)
  • EU (Ireland)
  • Asia Pacific (Singapore)
  • Asia Pacific (Tokyo)

In this post, I cover the mechanics of the Kinesis Streams server-side encryption feature. I also share a few best practices and considerations so that you can get started quickly.

Understanding the mechanics

The following section walks you through how Kinesis Streams uses CMKs to encrypt a message in the PutRecord or PutRecords path before it is propagated to the Kinesis Streams storage layer, and then decrypt it in the GetRecords path after it has been retrieved from the storage layer.

When server-side encryption is enabled—which takes just a few clicks in the console—the partition key and payload for every incoming record is encrypted automatically as it’s flowing into Kinesis Streams, using the selected CMK. When data is at rest within a stream, it’s encrypted.

When records are retrieved through a GetRecords request from the encrypted stream, they are decrypted automatically as they are flowing out of the service. That means your Kinesis Streams producers and consumers do not need to be aware of encryption. You have a fully managed data encryption feature at your fingertips, which can be enabled within seconds.

AWS also makes it easy to audit the application of server-side encryption. You can use the AWS Management Console for instant stream-level verification; the responses from PutRecord, PutRecords, and getRecords; or AWS CloudTrail.

Calling PutRecord or PutRecords

When server-side encryption is enabled for a particular stream, Kinesis Streams and KMS perform the following actions when your applications call PutRecord or PutRecords on a stream with server-side encryption enabled. The Amazon Kinesis Producer Library (KPL) uses PutRecords.

 

  1. Data is sent from a customer’s producer (client) to a Kinesis stream using TLS via HTTPS. Data in transit to a stream is encrypted by default.
  2. After data is received, it is momentarily stored in RAM within a front-end proxy layer.
  3. Kinesis Streams authenticates the producer, then impersonates the producer to request input keying material from KMS.
  4. KMS creates key material, encrypts it by using CMK, and sends both the plaintext and encrypted key material to the service, encrypted with TLS.
  5. The client uses the plaintext key material to derive data encryption keys (data keys) that are unique per-record.
  6. The client encrypts the payload and partition key using the data key in RAM within the front-end proxy layer and removes the plaintext data key from memory.
  7. The client appends the encrypted key material to the encrypted data.
  8. The plaintext key material is securely cached in memory within the front-end layer for reuse, until it expires after 5 minutes.
  9. The client delivers the encrypted message to a back-end store where it is stored at rest and fetchable by an authorized consumer through a GetRecords The Amazon Kinesis Client Library (KCL) calls GetRecords to retrieve records from a stream.

Calling getRecords

Kinesis Streams and KMS perform the following actions when your applications call GetRecords on a server-side encrypted stream.

 

  1. When a GeRecords call is made, the front-end proxy layer retrieves the encrypted record from its back-end store.
  2. The consumer (client) makes a request to KMS using a token generated by the customer’s request. KMS authorizes it.
  3. The client requests that KMS decrypt the encrypted key material.
  4. KMS decrypts the encrypted key material and sends the plaintext key material to the client.
  5. Kinesis Streams derives the per-record data keys from the decrypted key material.
  6. If the calling application is authorized, the client decrypts the payload and removes the plaintext data key from memory.
  7. The client delivers the payload over TLS and HTTPS to the consumer, requesting the records. Data in transit to a consumer is encrypted by default.

Verifying server-side encryption

Auditors or administrators often ask for proof that server-side encryption was or is enabled. Here are a few ways to do this.

To check if encryption is enabled now for your streams:

  • Use the AWS Management Console or the DescribeStream API operation. You can also see what CMK is being used for encryption.
  • See encryption in action by looking at responses from PutRecord, PutRecords, or GetRecords When encryption is enabled, the encryptionType parameter is set to “KMS”. If encryption is not enabled, encryptionType is not included in the response.

Sample PutRecord response

{
    "SequenceNumber": "49573959617140871741560010162505906306417380215064887298",
    "ShardId": "shardId-000000000000",
    "EncryptionType": "KMS"
}

Sample GetRecords response

{
    "Records": [
        {
            "Data": "aGVsbG8gd29ybGQ=", 
            "PartitionKey": "test", 
            "ApproximateArrivalTimestamp": 1498292565.825, 
            "EncryptionType": "KMS", 
            "SequenceNumber": "495735762417140871741560010162505906306417380215064887298"
        }, 
        {
            "Data": "ZnJvZG8gbGl2ZXMK", 
            "PartitionKey": "3d0d9301-3c30-4c48-a9a8-e485b2982b28", 
            "ApproximateArrivalTimestamp": 1498292801.747, 
            "EncryptionType": "KMS", 
            "SequenceNumber": "49573959617140871741560010162507115232237011062036103170"
        }
    ], 
    "NextShardIterator": "AAAAAAAAAAEvFypHZDx/4bJVAS34puwdiNcwssKqbh/XhRK7HSYRq3RS+YXJnVKJ8j0gQUt94bONdqQYHk9X9JHgefMUDKzDzndy5WbZWO4CS3hRdMdrbmJ/9KoR4lOfZvqTLt6JWQjDqXv0IaKs06/LHYcEA3oPcyQLOTJHdJl2EzplCTZnn/U295ovxvqF9g9DY8y2nVoMkdFLmdcEMVXjhCDKiRIt", 
    "MillisBehindLatest": 0
}

To check if encryption was enabled, use CloudTrail, which logs the StartStreamEncryption() and StopStreamEncryption() API calls made against a particular stream.

Getting started

It’s very easy to enable, disable, or modify server-side encryption for a particular stream.

  1. In the Kinesis Streams console, select a stream and choose Details.
  2. Select a CMK and select Enabled.
  3. Choose Save.

You can enable encryption only for a live stream, not upon stream creation.  Follow the same process to disable a stream. To use a different CMK, select it and choose Save.

Each of these tasks can also be accomplished using the StartStreamEncryption and StopStreamEncryption API operations.

Considerations

There are a few considerations you should be aware of when using server-side encryption for Kinesis Streams:

  • Permissions
  • Costs
  • Performance

Permissions

One benefit of using the “(Default) aws/kinesis” AWS managed key is that every producer and consumer with permissions to call PutRecord, PutRecords, or GetRecords inherits the right permissions over the “(Default) aws/kinesis” key automatically.

However, this is not necessarily the same case for a CMK. Kinesis Streams producers and consumers do not need to be aware of encryption. However, if you enable encryption using a custom master key but a producer or consumer doesn’t have IAM permissions to use it, PutRecord, PutRecords, or GetRecords requests fail.

This is a great security feature. On the other hand, it can effectively lead to data loss if you inadvertently apply a custom master key that restricts producers and consumers from interacting from the Kinesis stream. Take precautions when applying a custom master key. For more information about the minimum IAM permissions required for producers and consumers interacting with an encrypted stream, see Using Server-Side Encryption.

Costs

When you apply server-side encryption, you are subject to KMS API usage and key costs. Unlike custom KMS master keys, the “(Default) aws/kinesis” CMK is offered free of charge. However, you still need to pay for the API usage costs that Kinesis Streams incurs on your behalf.

API usage costs apply for every CMK, including custom ones. Kinesis Streams calls KMS approximately every 5 minutes when it is rotating the data key. In a 30-day month, the total cost of KMS API calls initiated by a Kinesis stream should be less than a few dollars.

Performance

During testing, AWS discovered that there was a slight increase (typically 0.2 millisecond or less per record) with put and get record latencies due to the additional overhead of encryption.

If you have questions or suggestions, please comment below.

Analysis of Top-N DynamoDB Objects using Amazon Athena and Amazon QuickSight

Post Syndicated from Rendy Oka original https://aws.amazon.com/blogs/big-data/analysis-of-top-n-dynamodb-objects-using-amazon-athena-and-amazon-quicksight/

If you run an operation that continuously generates a large amount of data, you may want to know what kind of data is being inserted by your application. The ability to analyze data intake quickly can be very valuable for business units, such as operations and marketing. For many operations, it’s important to see what is driving the business at any particular moment. For retail companies, for example, understanding which products are currently popular can aid in planning for future growth. Similarly, for PR companies, understanding the impact of an advertising campaign can help them market their products more effectively.

This post covers an architecture that helps you analyze your streaming data. You’ll build a solution using Amazon DynamoDB Streams, AWS Lambda, Amazon Kinesis Firehose, and Amazon Athena to analyze data intake at a frequency that you choose. And because this is a serverless architecture, you can use all of the services here without the need to provision or manage servers.

The data source

You’ll collect a random sampling of tweets via Twitter’s API and store a variety of attributes in your DynamoDB table, such as: Twitter handle, tweet ID, hashtags, location, and Time-To-Live (TTL) value.

In DynamoDB, the primary key is used as an input to an internal hash function. The output from this function determines the partition in which the data will be stored. When using a combination of primary key and sort key as a DynamoDB schema, you need to make sure that no single partition key contains many more objects than the other partition keys because this can cause partition level throttling. For the demonstration in this blog, the Twitter handle will be the primary key and the tweet ID will be the sort key. This allows you to group and sort tweets from each user.

To help you get started, I have written a script that pulls a live Twitter stream that you can use to generate your data. All you need to do is provide your own Twitter Apps credentials, and it should generate the data immediately. Alternatively, I have also provided a script that you can use to generate random Tweets with little effort.

You can find both scripts in the Github repository:

https://github.com/awslabs/aws-blog-dynamodb-analysis

There are some modules that you may need to install to run these scripts. You can find them in Python’s module repository:

To get your own Twitter credentials, go to https://www.twitter.com/ and sign up for a free account, if you don’t already have one. After your account is set up, go to https://apps.twitter.com/. On the main landing page, choose the Create New App button. After the application is created, go to Keys and Access Tokens to get your credentials to use the Twitter API. You’ll need to generate Customer Tokens/Secret and Access Token/Secret. All four keys will be used to authenticate your request.

Architecture overview

Before we begin, let’s take a look at the overall flow of information will look like, from data ingestion into DynamoDB to visualization of results in Amazon QuickSight.

As illustrated in the architecture diagram above, any changes made to the items in DynamoDB will be captured and processed using DynamoDB Streams. Next, a Lambda function will be invoked by a trigger that is configured to respond to events in DynamoDB Streams. The Lambda function processes the data prior to pushing to Amazon Kinesis Firehose, which will output to Amazon S3. Finally, you use Amazon Athena to analyze the streaming data landing in Amazon S3. The result can be explored and visualized in Amazon QuickSight for your company’s business analytics.

You’ll need to implement your custom Lambda function to help transform the raw <key, value> data stored in DynamoDB to a JSON format for Athena to digest, but I can help you with a sample code that you are free to modify.

Implementation

In the following sections, I’ll walk through how you can set up the architecture discussed earlier.

Create your DynamoDB table

First, let’s create a DynamoDB table and enable DynamoDB Streams. This will enable data to be copied out of this table. From the console, use the user_id as the partition key and tweet_id as the sort key:

After the table is ready, you can enable DynamoDB Streams. This process operates asynchronously, so there is no performance impact on the table when you enable this feature. The easiest way to manage DynamoDB Streams is also through the DynamoDB console.

In the Overview tab of your newly created table, click Manage Stream. In the window, choose the information that will be written to the stream whenever data in the table is added or modified. In this example, you can choose either New image or New and old images.

For more details on this process, check out our documentation:

http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html

Configure Kinesis Firehose

Before creating the Lambda function, you need to configure Kinesis Firehose delivery stream so that it’s ready to accept data from Lambda. Open the Firehose console and choose Create Firehose Delivery Stream. From here, choose S3 as the destination and use the following to information to configure the resource. Note the Delivery stream name because you will use it in the next step.

For more details on this process, check out our documentation:

http://docs.aws.amazon.com/firehose/latest/dev/basic-create.html#console-to-s3

Create your Lambda function

Now that Kinesis Firehose is ready to accept data, you can create your Lambda function.

From the AWS Lambda console, choose the Create a Lambda function button and use the Blank Function. Enter a name and description, and choose Python 2.7 as the Runtime. Note your Lambda function name because you’ll need it in the next step.

In the Lambda function code field, you can paste the script that I have written for this purpose. All this function needs is the name of your Firehose stream name set as an environment variable.

import boto3
import json
import os

# Initiate Firehose client
firehose_client = boto3.client('firehose')

def lambda_handler(event, context):
    records = []
    batch   = []
    try :
        for record in event['Records']:
            tweet = {}
            t_stats = '{ "table_name":"%s", "user_id":"%s", "tweet_id":"%s", "approx_post_time":"%d" }\n' \
                      % ( record['eventSourceARN'].split('/')[1], \
                          record['dynamodb']['Keys']['user_id']['S'], \
                          record['dynamodb']['Keys']['tweet_id']['N'], \
                          int(record['dynamodb']['ApproximateCreationDateTime']) )
            tweet["Data"] = t_stats
            records.append(tweet)
        batch.append(records)
        res = firehose_client.put_record_batch(
            DeliveryStreamName = os.environ['firehose_stream_name'],
            Records = batch[0]
        )
        return 'Successfully processed {} records.'.format(len(event['Records']))
    except Exception :
        pass

The handler should be set to lambda_function.lambda_handler and you can use the existing lambda_dynamodb_streams role that’s been created by default.

Enable DynamoDB trigger and start collecting data

Everything is ready to go. Open your table using the DynamoDB console and go to the Triggers tab. Select the Create trigger drop down list and choose Existing Lambda function. In the pop-up window, select the function that you just created, and choose the Create button.

At this point, you can start collecting data with the Python script that I’ve provided. The first one will create a script that will pull public Twitter data and the other will generate fake tweets using Lorem Ipsum text.

Configure Amazon Athena to read the data

Next, you will configure Amazon Athena so that it can read the data Kinesis Firehose outputs to Amazon S3 and allow you to analyze the data as needed. You can connect to Athena directly from the Athena console, and you can establish a connection using JDBC or the Athena API. In this example, I’m going to demonstrate what this looks like on the Athena console.

First, create a new database and a new table. You can do this by running the following two queries. The first query creates a new database:

CREATE DATABASE IF NOT EXISTS ddbtablestats

And the second query creates a new table:

CREATE EXTERNAL TABLE IF NOT EXISTS ddbtablestats.twitterfeed (
    `table_name` string,
    `user_id` string,
    `tweet_id` bigint,
    `approx_post_time` timestamp 
) PARTITIONED BY (
    year string,
    month string,
    day string,
    hour string 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('serialization.format' = '1')
LOCATION 's3://myBucket/dynamodb/streams/transactions/'

Note that this table is created using partitions. Partitioning separates your data into logical parts based on certain criteria, such as date, location, language, etc. This allows Athena to selectively pull your data without needing to process the entire data set. This effectively minimizes the query execution time, and it also allows you to have greater control over the data that you want to query.

After the query has completed, you should be able to see the table in the left side pane of the Athena dashboard.

After the database and table have been created, execute the ALTER TABLE query to populate the partitions in your table. Replace the date with the current date when the script was executed.

ALTER TABLE ddbtablestats.TwitterFeed ADD IF NOT EXISTS
PARTITION (year='2017',month='05',day='17',hour='01') location 's3://myBucket/dynamodb/streams/transactions/2017/05/17/01/'

Using the Athena console, you’ll need to manually populate each partition for each additional partition that you’d like to analyze, however you can programmatically automate this process by using the JDBC driver or any AWS SDK of your choice.

For more information on partitioning in Athena, check out our documentation:

http://docs.aws.amazon.com/athena/latest/ug/partitions.html

Querying the data in Amazon Athena

This is it! Let’s run this query to see the top 10 most active Twitter users in the last 24 hours. You can do this from the Athena console:

SELECT user_id, COUNT(DISTINCT tweet_id) tweets FROM ddbTableStats.TwitterFeed
WHERE year='2017' AND month='05' AND day='17'
GROUP BY user_id
ORDER BY tweets DESC
LIMIT 10

The result should look similar to the following:

Linking Athena to Amazon QuickSight

Finally, to make this data available to a larger audience, let’s visualize this data in Amazon QuickSight. Amazon QuickSight provides native connectivity to AWS data sources such as Amazon Redshift, Amazon RDS, and Amazon Athena. Amazon QuickSight can also connect to on-premises databases, Excel, or CSV files, and it can connect to cloud data sources such as Salesforce.com. For this solution, we will connect Amazon QuickSight to the Athena table we just created.

Amazon QuickSight has a free tier that provides 1 user and 1GB of SPICE (Superfast Parallel In-memory Calculated Engine) capacity free. So you can sign up and use QuickSight free of charge.

When you are signing up for Amazon QuickSight, ensure that you grant permissions for QuickSight to connect to Athena and the S3 bucket where the data is stored.

After you’ve signed up, navigate to the new analysis button, and choose new data set, and then select the Athena data source option. Create a new name for your data source and proceed to the next prompt. At this point, you should see the Athena table you created earlier.

Choose the option to import the data to SPICE for a quicker analysis. SPICE is an in-memory optimized calculation engine that is designed for quick data visualization through parallel processing. SPICE also enables you to refresh your data sets at a regular interval or on-demand as you want.

In the dialog box, confirm this data set creation, and you’ll arrive on the landing page where you can start building your graph. The X-axis will represent the user_id and the Value will be used to represent the SUM total of the tweets from each user.

The Amazon QuickSight report looks like this:

Through this visualization, I can easily see that there are 3 users that tweeted over 20 times that day and that the majority of the users have fewer than 10 tweets that day. I can also set up a scheduled refresh of my SPICE dataset so that I have a dashboard that is regularly updated with the latest data.

Closing thoughts

Here are the benefits that you can gain from using this architecture:

  1. You can optimize the design of your DynamoDB schema that follows AWS best practice recommendations.
  1. You can run analysis and data intelligence in order to understand the current customer demands for your business.
  1. You can store incremental backup for future auditing.

The flexibility of our AWS services invites you to create and design the ideal workflow for your production at any scale, and, as always, if you ever need some guidance, don’t hesitate to reach out to us.I  hope this has been helpful to you! Please leave any questions and comments below.

 


Additional Reading

Learn how to analyze VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight.


About the Author

Rendy Oka is a Big Data Support Engineer for Amazon Web Services. He provides consultations and architectural designs and partners with the TAMs, Solution Architects, and AWS product teams to help develop solutions for our customers. He is also a team lead for the big data support team in Seattle. Rendy has traveled to dozens of countries around the world and takes every opportunity to experience the local culture wherever he goes

 

 

 

 

New – Cross-Account Delivery of CloudWatch Events

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/new-cross-account-delivery-of-cloudwatch-events/

CloudWatch Events allow you to track and respond to changes in your AWS resources. You get a near real-time stream of events that you can route to one or more targets (AWS Lambda functions, Amazon Kinesis streams, Amazon SNS topics, and more) using rules. The events that are generated depend on the particular AWS service. For example, here are the events generated for EC2 instances:

Or for S3 (CloudTrail must be enabled in order to create rules that use these events):

See the CloudWatch Event Types list to see which services and events are available.

New Cross-Account Event Delivery
Our customers have asked us to extend CloudWatch Events to handle some interesting & powerful use cases that span multiple AWS accounts, and we are happy to oblige. Today we are adding support for controlled, cross-account delivery of CloudWatch Events. As you will see, you can now arrange to route events from one AWS account to another. As is the case with the existing event delivery model, you can use CloudWatch Events rules to specify which events you would like to send to another account.

Here are some of the use cases that have been shared with us:

Separation of Concerns – Customers would like to handle and respond to events in a separate account in order to implement advanced security schemes.

Rollup – Customers are using AWS Organizations and would like to track certain types of events across the entire organization, across a multitude of AWS accounts.

Each AWS account uses a resource event bus to distribute events. This object dates back to the introduction of CloudWatch Events, but has never been formally called out as such. AWS services, the PutEvents function, and other accounts can publish events to it.

The event bus (currently one per account, with plans to allow more in the future) now has an associated access policy. This policy specifies the set of AWS accounts that are allowed to send events to the bus. You can add one or more accounts, or you can specify that any account is allowed to send events.

You can create event distribution topologies that work on a fan-in or a fan-out basis. A fan-in model allows you to handle events from multiple accounts in one place. A fan-out model allows you to route different types of events to distinct locations and accounts.

In order to avoid the possibility of creating a loop, events that are sent from one account to another will not be sent to a third one. You should take this in to account when you are planning your cross-account implementation.

Using Cross-Account Event Delivery
In order to test this new feature, I made use of my work and my personal AWS accounts. I log in to my personal account and went to the CloudWatch Console. Then I select Event Buses, clicked on Add Permission, and enter the Account ID of my work account:

I can see all of my buses (just one is allowed right now) and permissions in one place:

Next, I log in to my work account and create a rule that will send events to the event bus in my personal account. In this case my personal account is interested in changes of state for EC2 instances running in my work account:

Back in my personal account, I create a rule that will fire on any EC2 event, targeting it at an SNS topic that is configured to send email:

After testing this rule with an EC2 instance launched in my personal account, I launch an instance in my work account and wait for the email message:

The account and resources fields in the message are from the source (work) account.

Things to Know
This functionality is available in all AWS Regions where CloudWatch Events is available and you can start using it today. It is also accessible from the CloudWatch Events APIs and the AWS Command Line Interface (CLI).

Events forwarded from one account to another are considered custom events. The sending account is charged $1 for every million events (see the CloudWatch Pricing page for more info).

Jeff;

PS – AWS CloudFormation support is in the works and coming soon!

Visualize and Monitor Amazon EC2 Events with Amazon CloudWatch Events and Amazon Kinesis Firehose

Post Syndicated from Karan Desai original https://aws.amazon.com/blogs/big-data/visualize-and-monitor-amazon-ec2-events-with-amazon-cloudwatch-events-and-amazon-kinesis-firehose/

Monitoring your AWS environment is important for security, performance, and cost control purposes. For example, by monitoring and analyzing API calls made to your Amazon EC2 instances, you can trace security incidents and gain insights into administrative behaviors and access patterns. The kinds of events you might monitor include console logins, Amazon EBS snapshot creation/deletion/modification, VPC creation/deletion/modification, and instance reboots, etc.

In this post, I show you how to build a near real-time API monitoring solution for EC2 events using Amazon CloudWatch Events and Amazon Kinesis Firehose. Please be sure to have Amazon CloudTrail enabled in your account.

  • CloudWatch Events offers a near real-time stream of system events that describe changes in AWS resources. CloudWatch Events now supports Kinesis Firehose as a target.
  • Kinesis Firehose is a fully managed service for continuously capturing, transforming, and delivering data in minutes to storage and analytics destinations such as Amazon S3, Amazon Kinesis Analytics, Amazon Redshift, and Amazon Elasticsearch Service.

Walkthrough

For this walkthrough, you create a CloudWatch event rule that matches specific EC2 events such as:

  • Starting, stopping, and terminating an instance
  • Creating and deleting VPC route tables
  • Creating and deleting a security group
  • Creating, deleting, and modifying instance volumes and snapshots

Your CloudWatch event target is a Kinesis Firehose delivery stream that delivers this data to an Elasticsearch cluster, where you set up Kibana for visualization. Using this solution, you can easily load and visualize EC2 events in minutes without setting up complicated data pipelines.

Set up the Elasticsearch cluster

Create the Amazon ES domain in the Amazon ES console, or by using the create-elasticsearch-domain command in the AWS CLI.

This example uses the following configuration:

  • Domain Name: esLogSearch
  • Elasticsearch Version: 1
  • Instance Count: 2
  • Instance type:elasticsearch
  • Enable dedicated master: true
  • Enable zone awareness: true
  • Restrict Amazon ES to an IP-based access policy

Other settings are left as the defaults.

Create a Kinesis Firehose delivery stream

In the Kinesis Firehose console, create a new delivery stream with Amazon ES as the destination. For detailed steps, see Create a Kinesis Firehose Delivery Stream to Amazon Elasticsearch Service.

Set up CloudWatch Events

Create a rule, and configure the event source and target. You can choose to configure multiple event sources with several AWS resources, along with options to specify specific or multiple event types.

In the CloudWatch console, choose Events.

For Service Name, choose EC2.

In Event Pattern Preview, choose Edit and copy the pattern below. For this walkthrough, I selected events that are specific to the EC2 API, but you can modify it to include events for any of your AWS resources.

 

{
	"source": [
		"aws.ec2"
	],
	"detail-type": [
		"AWS API Call via CloudTrail"
	],
	"detail": {
		"eventSource": [
			"ec2.amazonaws.com"
		],
		"eventName": [
			"RunInstances",
			"StopInstances",
			"StartInstances",
			"CreateFlowLogs",
			"CreateImage",
			"CreateNatGateway",
			"CreateVpc",
			"DeleteKeyPair",
			"DeleteNatGateway",
			"DeleteRoute",
			"DeleteRouteTable",
"CreateSnapshot",
"DeleteSnapshot",
			"DeleteVpc",
			"DeleteVpcEndpoints",
			"DeleteSecurityGroup",
			"ModifyVolume",
			"ModifyVpcEndpoint",
			"TerminateInstances"
		]
	}
}

The following screenshot shows what your event looks like in the console.

Next, choose Add target and select the delivery stream that you just created.

Set up Kibana on the Elasticsearch cluster

Amazon ES provides a default installation of Kibana with every Amazon ES domain. You can find the Kibana endpoint on your domain dashboard in the Amazon ES console. You can restrict Amazon ES access to an IP-based access policy.

In the Kibana console, for Index name or pattern, type log. This is the name of the Elasticsearch index.

For Time-field name, choose @time.

To view the events, choose Discover.

The following chart demonstrates the API operations and the number of times that they have been triggered in the past 12 hours.

Summary

In this post, you created a continuous, near real-time solution to monitor various EC2 events such as starting and shutting down instances, creating VPCs, etc. Likewise, you can build a continuous monitoring solution for all the API operations that are relevant to your daily AWS operations and resources.

With Kinesis Firehose as a new target for CloudWatch Events, you can retrieve, transform, and load system events to the storage and analytics destination of your choice in minutes, without setting up complicated data pipelines.

If you have any questions or suggestions, please comment below.


Additional Reading

Learn how to build a serverless architecture to analyze Amazon CloudFront access logs using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

 

 

 

Event: AWS Serverless Roadshow – Hands-on Workshops

Post Syndicated from Tara Walker original https://aws.amazon.com/blogs/aws/event-aws-serverless-roadshow-hands-on-workshops/

Surely, some of you have contemplated how you would survive the possible Zombie apocalypse or how you would build your exciting new startup to disrupt the transportation industry when Unicorn haven is uncovered. Well, there is no need to worry; I know just the thing to get you prepared to handle both of those scenarios: the AWS Serverless Computing Workshop Roadshow.

With the roadshow’s serverless workshops, you can get hands-on experience building serverless applications and microservices so you can rebuild what remains of our great civilization after a widespread viral infection causes human corpses to reanimate around the world in the AWS Zombie Microservices Workshop. In addition, you can give your startup a jump on the competition with the Wild Rydes workshop in order to revolutionize the transportation industry; just in time for a pilot’s crash landing leading the way to the discovery of abundant Unicorn pastures found on the outskirts of the female Amazonian warrior inhabited island of Themyscira also known as Paradise Island.

These free, guided hands-on workshops will introduce the basics of building serverless applications and microservices for common and uncommon scenarios using services like AWS Lambda, Amazon API Gateway, Amazon DynamoDB, Amazon S3, Amazon Kinesis, AWS Step Functions, and more. Let me share some advice before you decide to tackle Zombies and mount Unicorns – don’t forget to bring your laptop to the workshop and make sure you have an AWS account established and available for use for the event.

Check out the schedule below and get prepared today by registering for an upcoming workshop in a city near you. Remember these are workshops are completely free, so participation is on a first come, first served basis. So register and get there early, we need Zombie hunters and Unicorn riders across the globe.  Learn more about AWS Serverless Computing Workshops here and register for your city using links below.

Event Location Date
Wild Rydes New York Thursday, June 8
Wild Rydes Austin Thursday, June 22
Wild Rydes Santa Monica Thursday, July 20
Zombie Apocalypse Chicago Thursday, July 20
Wild Rydes Atlanta Tuesday, September 12
Zombie Apocalypse Dallas Tuesday, September 19

 

I look forward to fighting zombies and riding unicorns with you all.

Tara

AWS Online Tech Talks – June 2017

Post Syndicated from Tara Walker original https://aws.amazon.com/blogs/aws/aws-online-tech-talks-june-2017/

As the sixth month of the year, June is significant in that it is not only my birth month (very special), but it contains the summer solstice in the Northern Hemisphere, the day with the most daylight hours, and the winter solstice in the Southern Hemisphere, the day with the fewest daylight hours. In the United States, June is also the month in which we celebrate our dads with Father’s Day and have month-long celebrations of music, heritage, and the great outdoors.

Therefore, the month of June can be filled with lots of excitement. So why not add even more delight to the month, by enhancing your cloud computing skills. This month’s AWS Online Tech Talks features sessions on Artificial Intelligence (AI), Storage, Big Data, and Compute among other great topics.

June 2017 – Schedule

Noted below are the upcoming scheduled live, online technical sessions being held during the month of June. Make sure to register ahead of time so you won’t miss out on these free talks conducted by AWS subject matter experts. All schedule times for the online tech talks are shown in the Pacific Time (PDT) time zone.

Webinars featured this month are:

Thursday, June 1

Storage

9:00 AM – 10:00 AM: Deep Dive on Amazon Elastic File System

Big Data

10:30 AM – 11:30 AM: Migrating Big Data Workloads to Amazon EMR

Serverless

12:00 Noon – 1:00 PM: Building AWS Lambda Applications with the AWS Serverless Application Model (AWS SAM)

 

Monday, June 5

Artificial Intelligence

9:00 AM – 9:40 AM: Exploring the Business Use Cases for Amazon Lex

 

Tuesday, June 6

Management Tools

9:00 AM – 9:40 AM: Automated Compliance and Governance with AWS Config and AWS CloudTrail

 

Wednesday, June 7

Storage

9:00 AM – 9:40 AM: Backing up Amazon EC2 with Amazon EBS Snapshots

Big Data

10:30 AM – 11:10 AM: Intro to Amazon Redshift Spectrum: Quickly Query Exabytes of Data in S3

DevOps

12:00 Noon – 12:40 PM: Introduction to AWS CodeStar: Quickly Develop, Build, and Deploy Applications on AWS

 

Thursday, June 8

Artificial Intelligence

9:00 AM – 9:40 AM: Exploring the Business Use Cases for Amazon Polly

10:30 AM – 11:10 AM: Exploring the Business Use Cases for Amazon Rekognition

 

Monday, June 12

Artificial Intelligence

9:00 AM – 9:40 AM: Exploring the Business Use Cases for Amazon Machine Learning

 

Tuesday, June 13

Compute

9:00 AM – 9:40 AM: DevOps with Visual Studio, .NET and AWS

IoT

10:30 AM – 11:10 AM: Create, with Intel, an IoT Gateway and Establish a Data Pipeline to AWS IoT

Big Data

12:00 Noon – 12:40 PM: Real-Time Log Analytics using Amazon Kinesis and Amazon Elasticsearch Service

 

Wednesday, June 14

Containers

9:00 AM – 9:40 AM: Batch Processing with Containers on AWS

Security & Identity

12:00 Noon – 12:40 PM: Using Microsoft Active Directory across On-premises and Cloud Workloads

 

Thursday, June 15

Big Data

12:00 Noon – 1:00 PM: Building Big Data Applications with Serverless Architectures

 

Monday, June 19

Artificial Intelligence

9:00 AM – 9:40 AM: Deep Learning for Data Scientists: Using Apache MxNet and R on AWS

 

Tuesday, June 20

Storage

9:00 AM – 9:40 AM: Cloud Backup & Recovery Options with AWS Partner Solutions

Artificial Intelligence

10:30 AM – 11:10 AM: An Overview of AI on the AWS Platform

 

The AWS Online Tech Talks series covers a broad range of topics at varying technical levels. These sessions feature live demonstrations & customer examples led by AWS engineers and Solution Architects. Check out the AWS YouTube channel for more on-demand webinars on AWS technologies.

Tara

Build a Serverless Architecture to Analyze Amazon CloudFront Access Logs Using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

Post Syndicated from Rajeev Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-serverless-architecture-to-analyze-amazon-cloudfront-access-logs-using-aws-lambda-amazon-athena-and-amazon-kinesis-analytics/

Nowadays, it’s common for a web server to be fronted by a global content delivery service, like Amazon CloudFront. This type of front end accelerates delivery of websites, APIs, media content, and other web assets to provide a better experience to users across the globe.

The insights gained by analysis of Amazon CloudFront access logs helps improve website availability through bot detection and mitigation, optimizing web content based on the devices and browser used to view your webpages, reducing perceived latency by caching of popular object closer to its viewer, and so on. This results in a significant improvement in the overall perceived experience for the user.

This blog post provides a way to build a serverless architecture to generate some of these insights. To do so, we analyze Amazon CloudFront access logs both at rest and in transit through the stream. This serverless architecture uses Amazon Athena to analyze large volumes of CloudFront access logs (on the scale of terabytes per day), and Amazon Kinesis Analytics for streaming analysis.

The analytic queries in this blog post focus on three common use cases:

  1. Detection of common bots using the user agent string
  2. Calculation of current bandwidth usage per Amazon CloudFront distribution per edge location
  3. Determination of the current top 50 viewers

However, you can easily extend the architecture described to power dashboards for monitoring, reporting, and trigger alarms based on deeper insights gained by processing and analyzing the logs. Some examples are dashboards for cache performance, usage and viewer patterns, and so on.

Following we show a diagram of this architecture.

Prerequisites

Before you set up this architecture, install the AWS Command Line Interface (AWS CLI) tool on your local machine, if you don’t have it already.

Setup summary

The following steps are involved in setting up the serverless architecture on the AWS platform:

  1. Create an Amazon S3 bucket for your Amazon CloudFront access logs to be delivered to and stored in.
  2. Create a second Amazon S3 bucket to receive processed logs and store the partitioned data for interactive analysis.
  3. Create an Amazon Kinesis Firehose delivery stream to batch, compress, and deliver the preprocessed logs for analysis.
  4. Create an AWS Lambda function to preprocess the logs for analysis.
  5. Configure Amazon S3 event notification on the CloudFront access logs bucket, which contains the raw logs, to trigger the Lambda preprocessing function.
  6. Create an Amazon DynamoDB table to look up partition details, such as partition specification and partition location.
  7. Create an Amazon Athena table for interactive analysis.
  8. Create a second AWS Lambda function to add new partitions to the Athena table based on the log delivered to the processed logs bucket.
  9. Configure Amazon S3 event notification on the processed logs bucket to trigger the Lambda partitioning function.
  10. Configure Amazon Kinesis Analytics application for analysis of the logs directly from the stream.

ETL and preprocessing

In this section, we parse the CloudFront access logs as they are delivered, which occurs multiple times in an hour. We filter out commented records and use the user agent string to decipher the browser name, the name of the operating system, and whether the request has been made by a bot. For more details on how to decipher the preceding information based on the user agent string, see user-agents 1.1.0 in the Python documentation.

We use the Lambda preprocessing function to perform these tasks on individual rows of the access log. On successful completion, the rows are pushed to an Amazon Kinesis Firehose delivery stream to be persistently stored in an Amazon S3 bucket, the processed logs bucket.

To create a Firehose delivery stream with a new or existing S3 bucket as the destination, follow the steps described in Create a Firehose Delivery Stream to Amazon S3 in the S3 documentation. Keep most of the default settings, but select an AWS Identity and Access Management (IAM) role that has write access to your S3 bucket and specify GZIP compression. Name the delivery stream CloudFrontLogsToS3.

Another pre-requisite for this setup is to create an IAM role that provides the necessary permissions our AWS Lambda function to get the data from S3, process it, and deliver it to the CloudFrontLogsToS3 delivery stream.

Let’s use the AWS CLI to create the IAM role using the following the steps:

  1. Create the IAM policy (lambda-exec-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (lambda-cflogs-exec-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To download the policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/lambda-exec-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/assume-lambda-policy.json  <path_on_your_local_machine>

Following is the lambda-exec-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchAccess",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Sid": "S3Access",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        },
        {
            "Sid": "FirehoseAccess",
            "Effect": "Allow",
            "Action": [
                "firehose:ListDeliveryStreams",
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3"
            ]
        }
    ]
}

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-exec-policy --policy-document file://<path>/lambda-exec-policy.json

To create the AWS Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Following is the assume-lambda-policy.json file, to grant Lambda permission to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To attach the policy (lambda-exec-policy) created to the AWS Lambda execution role (lambda-cflogs-exec-role), type the following command.

aws iam attach-role-policy --role-name lambda-cflogs-exec-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-exec-policy

Now that we have created the CloudFrontLogsToS3 Firehose delivery stream and the lambda-cflogs-exec-role IAM role for Lambda, the next step is to create a Lambda preprocessing function.

This Lambda preprocessing function parses the CloudFront access logs delivered into the S3 bucket and performs a few transformation and mapping operations on the data. The Lambda function adds descriptive information, such as the browser and the operating system that were used to make this request based on the user agent string found in the logs. The Lambda function also adds information about the web distribution to support scenarios where CloudFront access logs are delivered to a centralized S3 bucket from multiple distributions. With the solution in this blog post, you can get insights across distributions and their edge locations.

Use the Lambda Management Console to create a new Lambda function with a Python 2.7 runtime and the s3-get-object-python blueprint. Open the console, and on the Configure triggers page, choose the name of the S3 bucket where the CloudFront access logs are delivered. Choose Put for Event type. For Prefix, type the name of the prefix, if any, for the folder where CloudFront access logs are delivered, for example cloudfront-logs/. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable trigger.

Choose Next and provide a function name to identify this Lambda preprocessing function.

For Code entry type, choose Upload a file from Amazon S3. For S3 link URL, type https.amazonaws.com//preprocessing-lambda/pre-data.zip. In the section, also create an environment variable with the key KINESIS_FIREHOSE_STREAM and a value with the name of the Firehose delivery stream as CloudFrontLogsToS3.

Choose lambda-cflogs-exec-role as the IAM role for the Lambda function, and type prep-data.lambda_handler for the value for Handler.

Choose Next, and then choose Create Lambda.

Table creation in Amazon Athena

In this step, we will build the Athena table. Use the Athena console in the same region and create the table using the query editor.

CREATE EXTERNAL TABLE IF NOT EXISTS cf_logs (
  logdate date,
  logtime string,
  location string,
  bytes bigint,
  requestip string,
  method string,
  host string,
  uri string,
  status bigint,
  referrer string,
  useragent string,
  uriquery string,
  cookie string,
  resulttype string,
  requestid string,
  header string,
  csprotocol string,
  csbytes string,
  timetaken bigint,
  forwardedfor string,
  sslprotocol string,
  sslcipher string,
  responseresulttype string,
  protocolversion string,
  browserfamily string,
  osfamily string,
  isbot string,
  filename string,
  distribution string
)
PARTITIONED BY(year string, month string, day string, hour string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION 's3://<pre-processing-log-bucket>/prefix/';

Creation of the Athena partition

A popular website with millions of requests each day routed using Amazon CloudFront can generate a large volume of logs, on the order of a few terabytes a day. We strongly recommend that you partition your data to effectively restrict the amount of data scanned by each query. Partitioning significantly improves query performance and substantially reduces cost. The Lambda partitioning function adds the partition information to the Athena table for the data delivered to the preprocessed logs bucket.

Before delivering the preprocessed Amazon CloudFront logs file into the preprocessed logs bucket, Amazon Kinesis Firehose adds a UTC time prefix in the format YYYY/MM/DD/HH. This approach supports multilevel partitioning of the data by year, month, date, and hour. You can invoke the Lambda partitioning function every time a new processed Amazon CloudFront log is delivered to the preprocessed logs bucket. To do so, configure the Lambda partitioning function to be triggered by an S3 Put event.

For a website with millions of requests, a large number of preprocessed logs can be delivered multiple times in an hour—for example, at the interval of one each second. To avoid querying the Athena table for partition information every time a preprocessed log file is delivered, you can create an Amazon DynamoDB table for fast lookup.

Based on the year, month, data and hour in the prefix of the delivered log, the Lambda partitioning function checks if the partition specification exists in the Amazon DynamoDB table. If it doesn’t, it’s added to the table using an atomic operation, and then the Athena table is updated.

Type the following command to create the Amazon DynamoDB table.

aws dynamodb create-table --table-name athenapartitiondetails \
--attribute-definitions AttributeName=PartitionSpec,AttributeType=S \
--key-schema AttributeName=PartitionSpec,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=100,WriteCapacityUnits=100

Here the following is true:

  • PartitionSpec is the hash key and is a representation of the partition signature—for example, year=”2017”; month=”05”; day=”15”; hour=”10”.
  • Depending on the rate at which the processed log files are delivered to the processed log bucket, you might have to increase the ReadCapacityUnits and WriteCapacityUnits values, if these are throttled.

The other attributes besides PartitionSpec are the following:

  • PartitionPath – The S3 path associated with the partition.
  • PartitionType – The type of partition used (Hour, Month, Date, Year, or ALL). In this case, ALL is used.

Next step is to create the IAM role to provide permissions for the Lambda partitioning function. You require permissions to do the following:

  1. Look up and write partition information to DynamoDB.
  2. Alter the Athena table with new partition information.
  3. Perform Amazon CloudWatch logs operations.
  4. Perform Amazon S3 operations.

To download the policy document to your local machine, type following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/lambda-partition-function-execution-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/assume-lambda-policy.json <path_on_your_local_machine>

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(lambda-partition-exec-policy) used by the Lambda execution role.
  2. Create the Lambda execution role (lambda-partition-execution-role)and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-partition-exec-policy --policy-document file://<path>/lambda-partition-function-execution-policy.json

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-partition-execution-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

To attach the policy (lambda-partition-exec-policy) created to the AWS Lambda execution role (lambda-partition-execution-role), type the following command.

aws iam attach-role-policy --role-name lambda-partition-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-partition-exec-policy

Following is the lambda-partition-function-execution-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
            	"Sid": "DDBTableAccess",
            	"Effect": "Allow",
            	"Action": "dynamodb:PutItem"
            	"Resource": "arn:aws:dynamodb*:*:table/athenapartitiondetails"
        	},
        	{
            	"Sid": "S3Access",
            	"Effect": "Allow",
            	"Action": [
                		"s3:GetBucketLocation",
                		"s3:GetObject",
                		"s3:ListBucket",
                		"s3:ListBucketMultipartUploads",
                		"s3:ListMultipartUploadParts",
                		"s3:AbortMultipartUpload",
                		"s3:PutObject"
            	],
          		"Resource":"arn:aws:s3:::*"
		},
	              {
		      "Sid": "AthenaAccess",
      		"Effect": "Allow",
      		"Action": [ "athena:*" ],
      		"Resource": [ "*" ]
	      },
        	{
            	"Sid": "CloudWatchLogsAccess",
            	"Effect": "Allow",
            	"Action": [
                		"logs:CreateLogGroup",
                		"logs:CreateLogStream",
             	   	"logs:PutLogEvents"
            	],
            	"Resource": "arn:aws:logs:*:*:*"
        	}
    ]
}

Download the .jar file containing the Java deployment package to your local machine.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/aws-lambda-athena-1.0.0.jar <path_on_your_local_machine>

From the AWS Management Console, create a new Lambda function with Java8 as the runtime. Select the Blank Function blueprint.

On the Configure triggers page, choose the name of the S3 bucket where the preprocessed logs are delivered. Choose Put for the Event Type. For Prefix, type the name of the prefix folder, if any, where preprocessed logs are delivered by Firehose—for example, out/. For Suffix, type the name of the compression format that the Firehose stream (CloudFrontLogToS3) delivers the preprocessed logs —for example, gz. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable Trigger.

Choose Next and provide a function name to identify this Lambda partitioning function.

Choose Java8 for Runtime for the AWS Lambda function. Choose Upload a .ZIP or .JAR file for the Code entry type, and choose Upload to upload the downloaded aws-lambda-athena-1.0.0.jar file.

Next, create the following environment variables for the Lambda function:

  • TABLE_NAME – The name of the Athena table (for example, cf_logs).
  • PARTITION_TYPE – The partition to be created based on the Athena table for the logs delivered to the sub folders in S3 bucket based on Year, Month, Date, Hour, or Set this to ALL to use Year, Month, Date, and Hour.
  • DDB_TABLE_NAME – The name of the DynamoDB table holding partition information (for example, athenapartitiondetails).
  • ATHENA_REGION – The current AWS Region for the Athena table to construct the JDBC connection string.
  • S3_STAGING_DIR – The Amazon S3 location where your query output is written. The JDBC driver asks Athena to read the results and provide rows of data back to the user (for example, s3://<bucketname>/<folder>/).

To configure the function handler and IAM, for Handler copy and paste the name of the handler: com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3EventWithDDB::handleRequest. Choose the existing IAM role, lambda-partition-execution-role.

Choose Next and then Create Lambda.

Interactive analysis using Amazon Athena

In this section, we analyze the historical data that’s been collected since we added the partitions to the Amazon Athena table for data delivered to the preprocessing logs bucket.

Scenario 1 is robot traffic by edge location.

SELECT COUNT(*) AS ct, requestip, location FROM cf_logs
WHERE isbot='True'
GROUP BY requestip, location
ORDER BY ct DESC;

Scenario 2 is total bytes transferred per distribution for each edge location for your website.

SELECT distribution, location, SUM(bytes) as totalBytes
FROM cf_logs
GROUP BY location, distribution;

Scenario 3 is the top 50 viewers of your website.

SELECT requestip, COUNT(*) AS ct  FROM cf_logs
GROUP BY requestip
ORDER BY ct DESC;

Streaming analysis using Amazon Kinesis Analytics

In this section, you deploy a stream processing application using Amazon Kinesis Analytics to analyze the preprocessed Amazon CloudFront log streams. This application analyzes directly from the Amazon Kinesis Stream as it is delivered to the preprocessing logs bucket. The stream queries in section are focused on gaining the following insights:

  • The IP address of the bot, identified by its Amazon CloudFront edge location, that is currently sending requests to your website. The query also includes the total bytes transferred as part of the response.
  • The total bytes served per distribution per population for your website.
  • The top 10 viewers of your website.

To download the firehose-access-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/firehose-access-policy.json  <path_on_your_local_machine>

To download the kinesisanalytics-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/assume-kinesisanalytics-policy.json <path_on_your_local_machine>

Before we create the Amazon Kinesis Analytics application, we need to create the IAM role to provide permission for the analytics application to access Amazon Kinesis Firehose stream.

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(firehose-access-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (ka-execution-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

Following is the firehose-access-policy.json file, which is the IAM policy used by Kinesis Analytics to read Firehose delivery stream.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
    	"Sid": "AmazonFirehoseAccess",
    	"Effect": "Allow",
    	"Action": [
       	"firehose:DescribeDeliveryStream",
        	"firehose:Get*"
    	],
    	"Resource": [
              "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3”
       ]
     }
}

Following is the assume-kinesisanalytics-policy.json file, to grant Amazon Kinesis Analytics permissions to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesisanalytics.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To create the IAM policy used by Analytics access role, type the following command.

aws iam create-policy --policy-name firehose-access-policy --policy-document file://<path>/firehose-access-policy.json

To create the Analytics execution role and assign the service to use this role, type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To attach the policy (irehose-access-policy) created to the Analytics execution role (ka-execution-role), type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To deploy the Analytics application, first download the configuration file and then modify ResourceARN and RoleARN for the Amazon Kinesis Firehose input configuration.

"KinesisFirehoseInput": { 
    "ResourceARN": "arn:aws:firehose:<region>:<account-id>:deliverystream/CloudFrontLogsToS3", 
    "RoleARN": "arn:aws:iam:<account-id>:role/ka-execution-role"
}

To download the Analytics application configuration file, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis//kinesisanalytics/kinesis-analytics-app-configuration.json <path_on_your_local_machine>

To deploy the application, type the following command.

aws kinesisanalytics create-application --application-name "cf-log-analysis" --cli-input-json file://<path>/kinesis-analytics-app-configuration.json

To start the application, type the following command.

aws kinesisanalytics start-application --application-name "cf-log-analysis" --input-configuration Id="1.1",InputStartingPositionConfiguration={InputStartingPosition="NOW"}

SQL queries using Amazon Kinesis Analytics

Scenario 1 is a query for detecting bots for sending request to your website detection for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BOT_DETECTION" (requesttime TIME, destribution VARCHAR(16), requestip VARCHAR(64), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BOT_DETECTION_PUMP" AS INSERT INTO "BOT_DETECTION"
--
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "request_ip" as requestip, 
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
WHERE "is_bot" = true
GROUP BY "request_ip", "edge_location", "distribution_name",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 2 is a query for total bytes transferred per distribution for each edge location for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BYTES_TRANSFFERED" (requesttime TIME, destribution VARCHAR(16), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BYTES_TRANSFFERED_PUMP" AS INSERT INTO "BYTES_TRANSFFERED"
-- Bytes Transffered per second per web destribution by edge location
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
GROUP BY "distribution_name", "edge_location", "request_date",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 3 is a query for the top 50 viewers for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "TOP_TALKERS" (requestip VARCHAR(64), requestcount DOUBLE);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "TOP_TALKERS_PUMP" AS INSERT INTO "TOP_TALKERS"
-- Top Ten Talker
SELECT STREAM ITEM as requestip, ITEM_COUNT as requestcount FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "CF_LOG_STREAM_001"),
  'request_ip', -- name of column in single quotes
  50, -- number of top items
  60 -- tumbling window size in seconds
  )
);

Conclusion

Following the steps in this blog post, you just built an end-to-end serverless architecture to analyze Amazon CloudFront access logs. You analyzed these both in interactive and streaming mode, using Amazon Athena and Amazon Kinesis Analytics respectively.

By creating a partition in Athena for the logs delivered to a centralized bucket, this architecture is optimized for performance and cost when analyzing large volumes of logs for popular websites that receive millions of requests. Here, we have focused on just three common use cases for analysis, sharing the analytic queries as part of the post. However, you can extend this architecture to gain deeper insights and generate usage reports to reduce latency and increase availability. This way, you can provide a better experience on your websites fronted with Amazon CloudFront.

In this blog post, we focused on building serverless architecture to analyze Amazon CloudFront access logs. Our plan is to extend the solution to provide rich visualization as part of our next blog post.


About the Authors

Rajeev Srinivasan is a Senior Solution Architect for AWS. He works very close with our customers to provide big data and NoSQL solution leveraging the AWS platform and enjoys coding . In his spare time he enjoys riding his motorcycle and reading books.

 

Sai Sriparasa is a consultant with AWS Professional Services. He works with our customers to provide strategic and tactical big data solutions with an emphasis on automation, operations & security on AWS. In his spare time, he follows sports and current affairs.

 

 


Related

Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight

Build a Visualization and Monitoring Dashboard for IoT Data with Amazon Kinesis Analytics and Amazon QuickSight

Post Syndicated from Karan Desai original https://aws.amazon.com/blogs/big-data/build-a-visualization-and-monitoring-dashboard-for-iot-data-with-amazon-kinesis-analytics-and-amazon-quicksight/

Customers across the world are increasingly building innovative Internet of Things (IoT) workloads on AWS. With AWS, they can handle the constant stream of data coming from millions of new, internet-connected devices. This data can be a valuable source of information if it can be processed, analyzed, and visualized quickly in a scalable, cost-efficient manner. Engineers and developers can monitor performance and troubleshoot issues while sales and marketing can track usage patterns and statistics to base business decisions.

In this post, I demonstrate a sample solution to build a quick and easy monitoring and visualization dashboard for your IoT data using AWS serverless and managed services. There’s no need for purchasing any additional software or hardware. If you are already using AWS IoT, you can build this dashboard to tap into your existing device data. If you are new to AWS IoT, you can be up and running in minutes using sample data. Later, you can customize it to your needs, as your business grows to millions of devices and messages.

Architecture

The following is a high-level architecture diagram showing the serverless setup to configure.

 

AWS service overview

AWS IoT is a managed cloud platform that lets connected devices interact easily and securely with cloud applications and other devices. AWS IoT can process and route billions of messages to AWS endpoints and to other devices reliably and securely.

Amazon Kinesis Firehose is the easiest way to capture, transform, and load streaming data continuously into AWS from thousands of data sources, such as IoT devices. It is a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration.

Amazon Kinesis Analytics allows you to process streaming data coming from IoT devices in real time with standard SQL, without having to learn new programming languages or processing frameworks, providing actionable insights promptly.

The processed data is fed into Amazon QuickSight, which is a fast, cloud-powered business analytics service that makes it easy to build visualizations, perform ad-hoc analysis, and quickly get business insights from the data.

The most popular way for Internet-connected devices to send data is using MQTT messages. The AWS IoT gateway receives these messages from registered IoT devices. The solution in this post uses device data from AWS Simple Beer Service (SBS), a series of internet-connected kegerators sending sensor outputs such as temperature, humidity, and sound levels in a JSON payload. You can use any existing IoT data source that you may have.

The AWS IoT rules engine allows selecting data from message payloads, processing it, and sending it to other services. You forward the data to a Firehose delivery stream to consolidate the continuous data stream into batches for further processing. The batched data is also stored temporarily in an Amazon S3 bucket for later retrieval and can be set for deletion after a specified time using S3 Lifecycle Management rules.

The incoming data from the Firehose delivery stream is fed into an Analytics application that provides an easy way to process the data in real time using standard SQL queries. Analytics allows writing standard SQL queries to extract specific components from the incoming data stream and perform real-time ETL on it. In this post, you use this feature to aggregate minimum and maximum temperature values from the sensors per minute. You load it in Amazon QuickSight to create a monitoring dashboard and check if the devices are over-heating or cooling down during use. You also extract every device’s location, parameters such as temperature, sound levels, humidity, and the time stamp in Analytics to use on the visualization dashboard.

The processed data from the two queries is fed into two Firehose delivery streams, both of which batch the data into CSV files every minute and store it in S3. The batching time interval is configurable between 1 and 15 minutes in 1-second intervals.

Finally, you use Amazon QuickSight to ingest the processed CSV files from S3 as a data source to build visualizations. Amazon QuickSight’s super-fast, parallel, in-memory, calculation engine (SPICE) parses the ingested data and allows you to create a variety of visualizations with different graph types. You can also use the Amazon QuickSight built-in Story feature to combine visualizations into business dashboards that can be shared in a secure manner.

Implementation

AWS IoT, Amazon Kinesis, and Amazon QuickSight are all fully managed services, which means you can complete the entire setup in just a few steps using the AWS Management Console. Don’t worry about setting up any underlying hardware or installing any additional software. So, get started.

Step 1. Set up your AWS IoT data source

Do you currently use AWS IoT? If you have an existing IoT thing set up and running on AWS IoT, you can skip to Step 2.

If you have an AWS IoT button or other IoT devices that can publish MQTT messages and would like to use that for the setup, follow the Getting Started with AWS IoT topic to connect your thing to AWS IoT. Continue to Step 2.

If you do not have an existing IoT device, you can generate simulated device data using a script on your local machine and have it publish to AWS IoT. The following script lets you set up your AWS IoT environment and publish simulated data that mimics device data from Simple Beer Service.

Generate sample Data

Running the sbs.py Python script generates fictitious AWS IoT messages from multiple SBS devices. The IoT rule sends the message to Firehose for further processing.

The script requires access to AWS CLI credentials and boto3 installation on the machine running the script. Download and run the following Python script:

https://github.com/awslabs/sbs-iot-data-generator/blob/master/sbs.py

The script generates random data that looks like the following:

{"deviceParameter": "Temperature", "deviceValue": 33, "deviceId": "SBS01", "dateTime": "2017-02-03 11:29:37"}
{"deviceParameter": "Sound", "deviceValue": 140, "deviceId": "SBS03", "dateTime": "2017-02-03 11:29:38"}
{"deviceParameter": "Humidity", "deviceValue": 63, "deviceId": "SBS01", "dateTime": "2017-02-03 11:29:39"}
{"deviceParameter": "Flow", "deviceValue": 80, "deviceId": "SBS04", "dateTime": "2017-02-03 11:29:41"}

Run the script and keep it running for the duration of the project to generate sufficient data.

Tip: If you encounter any issues running the script from your local machine, launch an EC2 instance and run the script there as a root user. Remember to assign an appropriate IAM role to your instance at the time of launch that allows it to access AWS IoT.

Step 2. Create three Firehose delivery streams

For this post, you require three Firehose delivery streams:  one to batch raw data from AWS IoT, and two to batch output device data and aggregated data from Analytics.

  1. In the console, choose Firehose.
  2. Create all three Firehose delivery streams using the following field values.

Delivery stream 1:

Name IoT-Source-Stream
S3 bucket <your unique name>-kinesis
S3 prefix source/

Delivery stream 2:

Name IoT-Destination-Data-Stream
S3 bucket <your unique name>-kinesis
S3 prefix data/

Delivery stream 3:

Name IoT-Destination-Aggregate-Stream
S3 bucket <your unique name>-kinesis
S3 prefix aggregate/

Step 3. Set up AWS IoT to receive and forward incoming data

  1. In the console, choose IoT.
  2. Create a new AWS IoT rule with the following field values.
Name IoT_to_Firehose
Attribute *
Topic Filter /sbs/devicedata/#
Add Action Send messages to an Amazon Kinesis Firehose stream (select IoT-Source-Stream from dropdown)
Select Separator “\n (newline)”

A quick check before proceeding further: make sure that you have run the script to generate simulated IoT data or that your IoT Thing is running and delivering data. If not, set it up now. The Amazon Kinesis Analytics application you set up in the next step needs the data to process it further.

Step 4: Create an Analytics application to process data

  1. In the console, choose Kinesis.
  2. Create a new application.
  3. Enter a name of your choice, for example, SBS-IoT-Data.
  4. For the source, choose IoT-Source-Stream.

Analytics auto-discovers the schema on the data by sampling records from the input stream. It also includes an in-built SQL editor that allows you to write standard SQL queries to transform incoming data.

Tip: If Analytics is unable to discover your incoming data, it may be missing the appropriate IAM permissions. In the IAM console, select the role that you assigned to your IoT rule in Step 3. Make sure that it has the ARN of the IoT-Source-Data Firehose stream listed in the firehose:putRecord section.

Here is a sample SQL query that generates two output streams:

  • DESTINATION_SQL_BASIC_STREAM contains the device ID, device parameter, its value, and the time stamp from the incoming stream.
  • DESTINATION_SQL_AGGREGATE_STREAM aggregates the maximum and minimum values of temperatures from the sensors over a one-minute period from the incoming data.
-- Create an output stream with four columns, which is used to send IoT data to the destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_BASIC_STREAM" (dateTime TIMESTAMP, deviceId VARCHAR(8), deviceParameter VARCHAR(16), deviceValue INTEGER);

-- Create a pump that continuously selects from the source stream and inserts it into the output data stream
CREATE OR REPLACE PUMP "STREAM_PUMP_1" AS INSERT INTO "DESTINATION_SQL_BASIC_STREAM"

-- Filter specific columns from the source stream
SELECT STREAM "dateTime", "deviceId", "deviceParameter", "deviceValue" FROM "SOURCE_SQL_STREAM_001";

-- Create a second output stream with three columns, which is used to send aggregated min/max data to the destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_AGGREGATE_STREAM" (dateTime TIMESTAMP, highestTemp SMALLINT, lowestTemp SMALLINT);

-- Create a pump that continuously selects from a source stream 
CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS INSERT INTO "DESTINATION_SQL_AGGREGATE_STREAM"

-- Extract time in minutes, plus the highest and lowest value of device temperature in that minute, into the destination aggregate stream, aggregated per minute
SELECT STREAM FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) AS "dateTime", MAX("deviceValue") AS "highestTemp", MIN("deviceValue") AS "lowestTemp" FROM "SOURCE_SQL_STREAM_001" WHERE "deviceParameter"='Temperature' GROUP BY FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE);

Real-time analytics shows the results of the SQL query. If everything is working correctly, you see three streams listed, similar to the following screenshot.

Step 5: Connect the Analytics application to output Firehose delivery streams

You create two destinations for the two delivery streams that you created in the previous step. A single Analytics application can have multiple destinations defined; however, this needs to be set up using the AWS CLI, not from the console. If you do not already have it, install the AWS CLI on your local machine and configure it with your credentials.

Tip: If you are running the IoT script from an EC2 instance, it comes pre-installed with the AWS CLI.

Create the first destination delivery stream 

The AWS CLI command to create a new output Firehose delivery stream is as follows:

aws kinesisanalytics add-application-output --application-name <Name of Analytics Application> --current-application-version-id <number> --application-output 'Name=DESTINATION_SQL_BASIC_STREAM,KinesisFirehoseOutput={ResourceARN=<ARN of IoT-Data-Stream>,RoleARN=<ARN of Analytics application>,DestinationSchema={RecordFormatType=CSV}'

Do not copy this into the CLI just yet! Before entering this command, make the following four changes to personalize it:

  • For Name of Analytics Application, enter the value from Step 4, or from the Analytics console.
  • For current-application-version-ID, run the following command:
aws kinesisanalytics describe-application --application-name <application name from above>; | grep ApplicationVersionId
  • For ResourceARN, run the following command:
aws firehose describe-delivery-stream --delivery-stream-name IoT-Destination-Data-Stream | grep DeliveryStreamARN
  • For RoleARN, run the following command:
aws kinesisanalytics describe-application --application-name <application name from above>; | grep RoleARN

Now, paste the complete command in the AWS CLI and press Enter. If there are any errors, the response provides details. If everything goes well, a new destination delivery stream is created to send the first query (DESTINATION_SQL_BASIC_STREAM) to IoT-Destination-Data-Stream.

Create the second destination delivery stream

Following similar steps as above, create a second destination Firehose delivery stream with the following changes:

  • For Name of Analytics Application, enter the same name as the first delivery stream.
  • For current-application-version-ID, increment by 1 from the previous value (unless you made other changes in between these steps). If unsure, run the same command as above to get it again.
  • For ResourceARN, get the value by running the following CLI command:
aws firehose describe-delivery-stream --delivery-stream-name IoT-Destination-Aggregate-Stream | grep DeliveryStreamARN
  • For RoleArn, enter the same value as the first stream.

Run the aws kinesisanalytics CLI command, similar to the previous step but with the new parameters substituted. This creates the second output Firehose destination delivery stream.

Update the IAM role for Analytics to allow writing to both output streams.

  1. In the console, choose IAM, Roles.
  2. Select the role that you created with Analytics in Step 4.
  3. Choose Policy, JSON, and Edit.
  4. Find “Sid”: “WriteOutputFirehose” in the JSON document, go to the “Resource” section and make sure that it includes Resource ARNs of both streams that you found in the previous step.
  5. If it has only one ARN, add the second ARN and choose Save.

This completes the Amazon Kinesis setup. The incoming IoT data is processed by Analytics and delivered, using two output delivery streams, to two separate folders in your S3 bucket.

Step 6: Set up Amazon QuickSight to analyze the data

To build the visualization dashboard, ingest the processed CSV files from the S3 bucket into Amazon QuickSight.

  1. In the console, choose QuickSight.
  2. If this is your first time using Amazon QuickSight, you are asked to create a new account. Follow the prompts.
  3. When you are logged in to your account, choose New Analysis and enter a name of your choice.
  4. Choose New data set for the analysis or, if you have previously imported your data set, select one from the available data sets.
  5. You import two data sets: one with general device parameters information, and the other with aggregates of maximum and minimum temperatures for monitoring. For the first data set, choose S3 from the list of available data sources and enter a name, for example, IoT Device Data.
  6. The location of the S3 bucket and the objects to use are provided to Amazon QuickSight as a manifest file. Create a new manifest file following the supported formats for Amazon S3 manifest files.
  7. In the URIPrefixes section, provide your appropriate S3 bucket and folder location for the general device data. Hint: it should include <your unique name>-kinesis/data/.

Your manifest file should look similar to the following:

{ 
    "fileLocations": [                                                    
              {"URIPrefixes": ["https://s3.amazonaws.com/<YOUR_BUCKET_NAME>/data/<YEAR>/<MONTH>/<DATE>/<HOUR>/"]}
     ],
     "globalUploadSettings": { 
     "format": "CSV",  
     "delimiter": ","
    }
}

Amazon QuickSight imports and parses the data set, and provides available data fields that can be used for making graphs. The Edit/Preview data button allows you to format and transform the data, change data types, and filter or join your data. Make sure that the columns have the correct titles. If not, you can edit them and then save.

Tip: choose the downward arrow on the top right and unselect Files include headers to give each column appropriate headers. Choose Save. This takes you back to the data sets page.

Follow the same steps as above to import the second data set. This time, your manifest should include your aggregate data set folder on S3, which is named <your unique name>-kinesis/aggregate/. Update headers if necessary and choose Save & visualize.

Build an analysis

The visualization screen shows the data set that you last imported, which in this case is the aggregate data. To include the general device data as well, for Fields on the top left, choose Edit analysis data sets. Choose Add data set and select the other data set that you saved earlier.

Now both data sets are available on the analysis screen. For Visual Types at bottom left, select the type of graph to make. For Fields, select the fields to visualize. For example, drag Device ID, Device Parameter, and Value to Field wells, as shown in the screenshot below, to generate a visualization of average parameter values compared across devices.

You can create another visual by choose +Add. This time, select a line graph to show monitoring of the maximum temperature values of the sensors in any minute, from the aggregate data set.

If you would like to create an interactive story to present to your team or organization, you can choose the Story option on the left panel. Create a dashboard with multiple visualizations, to save and share securely with the intended audience. An example of a story is shown below.

Conclusion

Any data is valuable only when it can be actually put to use. In this post, you’ve seen how it’s possible to quickly build a simple Analytics application to ingest, process, and visualize IoT data in near real time entirely using AWS managed services. This solution is scalable and reliable, and costs a fraction of other business intelligence solutions. It is easy enough that anyone with an AWS account can build and use it without any special training.

If you have any questions or suggestions, please comment below.


About the Author

Karan Desai is a Solutions Architect with Amazon Web Services. He works with startups and small businesses in the US, helping them adopt cloud technology to build scalable and secure solutions using AWS. In his spare time, he likes to build personal IoT projects, travel to offbeat places and write about it.

 

 


Related

Visualize Big Data with Amazon QuickSight, Presto, and Apache Spark on Amazon EMR

 

 

 

 

 

 

 

Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator

Post Syndicated from Allan MacInnis original https://aws.amazon.com/blogs/big-data/test-your-streaming-data-solution-with-the-new-amazon-kinesis-data-generator/

When building a streaming data solution, most customers want to test it with data that is similar to their production data. Creating this data and streaming it to your solution can often be the most tedious task in testing the solution.

Amazon Kinesis Streams and Amazon Kinesis Firehose enable you to continuously capture and store terabytes of data per hour from hundreds of thousands of sources. Amazon Kinesis Analytics gives you the ability to use standard SQL to analyze and aggregate this data in real-time. It’s easy to create an Amazon Kinesis stream or Firehose delivery stream with just a few clicks in the AWS Management Console (or a few commands using the AWS CLI or Amazon Kinesis API). However, to generate a continuous stream of test data, you must write a custom process or script that runs continuously, using the AWS SDK or CLI to send test records to Amazon Kinesis. Although this task is necessary to adequately test your solution, it means more complexity and longer development and testing times.

Wouldn’t it be great if there were a user-friendly tool to generate test data and send it to Amazon Kinesis? Well, now there is—the Amazon Kinesis Data Generator (KDG).

KDG overview

The KDG simplifies the task of generating data and sending it to Amazon Kinesis. The tool provides a user-friendly UI that runs directly in your browser. With the KDG, you can do the following:

  • Create templates that represent records for your specific use cases
  • Populate the templates with fixed data or random data
  • Save the templates for future use
  • Continuously send thousands of records per second to your Amazon Kinesis stream or Firehose delivery stream

The KDG is open source, and you can find the source code on the Amazon Kinesis Data Generator repo in GitHub. Because the tool is a collection of static HTML and JavaScript files that run directly in your browser, you can start using it immediately without downloading or cloning the project. It is enabled as a static site in GitHub, and we created a short URL to access it.

To get started immediately, check it out at http://amzn.to/datagen.

Using the KDG

Getting started with the KDG requires only three short steps:

  1. Create an Amazon Cognito user in your AWS account (first-time only).
  2. Use this user’s credentials to log in to the KDG.
  3. Create a record template for your data.

When you’ve completed these steps, you can then send data to Streams or Firehose.

Create an Amazon Cognito user

The KDG is a great example of a mobile application that uses Amazon Cognito for a user repository and user authentication, and the AWS JavaScript SDK to communicate with AWS services directly from your browser. For information about how to build your own JavaScript application that uses Amazon Cognito, see Use Amazon Cognito in your website for simple AWS authentication on the AWS Mobile Blog.

Before you can start sending data to your Amazon Kinesis stream, you must create an Amazon Cognito user in your account who can write to Streams and Firehose. When you create the user, you create a username and password for that user. You use those credentials to sign in to the KDG. To simplify creating the Amazon Cognito user in your account, we created a Lambda function and a CloudFormation template. For more information about creating the Amazon Cognito user in your AWS account, see Configure Your AWS Account.

Note:  It’s important that you use the URL provided by the output of the CloudFormation stack the first time that you access the KDG. This URL contains parameters needed by the KDG. The KDG stores the values of these parameters locally, so you can then access the tool using the short URL, http://amzn.to/datagen.

Log in to the KDG

After you create an Amazon Cognito user in your account, the next step is to log in to the KDG. To do this, provide the username and password that you created earlier.

On the main page, you can configure your data templates and send data to an Amazon Kinesis stream or Firehose delivery stream.

The basic configuration is simple enough. All fields on the page are required:

  • Region: Choose the AWS Region that contains the Amazon Kinesis stream or Firehose delivery stream to receive your streaming data.
  • Stream/firehose name: Choose the name of the stream or delivery stream to receive your streaming data.
  • Records per second: Enter the number of records to send to your stream or delivery stream each second.
  • Record template: Enter the raw data, or a template that represents your data structure, to be used for each record sent by the KDG. For information about creating templates for your data, see the “Creating Record Templates” section, later in this post.

When you set the Records per second value, consider that the KDG isn’t intended to be a data producer for load-testing your application. However, it can easily send several thousand records per second from a single tab in your browser, which is plenty of data for most applications. In testing, the KDG has produced 80,000 records per second to a single Amazon Kinesis stream, but your mileage may vary. The maximum rate at which it produces records depends on your computer’s specs and the complexity of your record template.

Ensure that your stream or delivery stream is scaled appropriately:

  • 1,000 records/second or 1 MB/second to an Amazon Kinesis stream
  • 5,000 records/second or 5 MB/second to a Firehose delivery stream

Otherwise, Amazon Kinesis may reject records, and you won’t achieve your desired throughput. For more information about adding capacity to a stream by adding more shards, see Resharding a Stream. For information about increasing the capacity of a delivery stream, see Amazon Kinesis Firehose Limits.

Create record templates

The Record Template field is a free-text field where you can enter any text that represents a single streaming data record. You can create a single line of static data, so that each record sent to Amazon Kinesis is identical. Or, you can format the text as a template.

In this case, the KDG substitutes portions of the template with fake or random data before sending the record. This lets you introduce randomness or variability in each record that is sent in your data stream. The KDG uses Faker.js, an open source library, to generate fake data. For more information, see the faker.js project page in GitHub. The easiest way to see how this works is to review an example.

To simulate records being sent from a weather sensor Internet of Things (IoT) device, you want each record to be formatted in JSON. The following is an example of what a final record must look like:

{
	"sensorId": 40,
	"currentTemperature": 76,
	"status": "OK"
} 

For this use case, you want to simulate sending data from one of 50 sensors, so the sensorID field can be an integer between 1 and 50. The temperature value can range between 10 and 150, so the currentTemperature field should contain a value in this range. Finally, the status value can be one of three possible values: OK, FAIL, and WARN. The KDG template format uses moustache syntax (double curly-braces) to enclose items that should be replaced before the record is sent to Amazon Kinesis. To model the record, the template looks like this:

{
    "sensorId": {{random.number(50)}},
    "currentTemperature": {{random.number(
        {
            "min":10,
            "max":150
        }
    )}},
    "status": "{{random.arrayElement(
        ["OK","FAIL","WARN"]
    )}}"
}

Take a look at one more example, simulating a stream of records that represent rows from an Apache access log. A single Apache access log entry might look like this:

76.0.56.179 - - [29/Apr/2017:16:32:11 -05:00] "GET /wp-admin" 200 8233 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0 rv:6.0; CY) AppleWebKit/535.0.0 (KHTML, like Gecko) Version/4.0.3 Safari/535.0.0"

The following example shows how to create a template for the Apache access log:

{{internet.ip}} - - [{{date.now("DD/MMM/YYYY:HH:mm:ss Z")}}] "{{random.weightedArrayElement({"weights":[0.6,0.1,0.1,0.2],"data":["GET","POST","DELETE","PUT"]})}} {{random.arrayElement(["/list","/wp-content","/wp-admin","/explore","/search/tag/list","/app/main/posts","/posts/posts/explore"])}}" {{random.weightedArrayElement({"weights": [0.9,0.04,0.02,0.04], "data":["200","404","500","301"]})}} {{random.number(10000)}} "-" "{{internet.userAgent}}"

For more information about creating your own templates, see the Record Template section of the KDG documentation.

The KDG saves the templates that you create in your local browser storage. As long as you use the same browser on the same computer, you can reuse up to five templates.

Summary

Testing your streaming data solution has never been easier. Get started today by visiting the KDG hosted UI or its Amazon Kinesis Data Generator page in GitHub. The project is licensed under the Apache 2.0 license, so feel free to clone and modify it for your own use as necessary. And of course, please submit any issues or pull requests via GitHub.

If you have any questions or suggestions, please add them below.

 


About the Author

Allan MacInnis is a Solutions Architect at Amazon Web Services. He works with our customers to help them build streaming data solutions using Amazon Kinesis. In his spare time, he enjoys mountain biking and spending time with his family.

 

 


Related

Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount

 

 

AWS Online Tech Talks – May 2017

Post Syndicated from Tara Walker original https://aws.amazon.com/blogs/aws/aws-online-tech-talks-may-2017/

Spring has officially sprung. As you enjoy the blossoming of May flowers, it may be worthy to also note some of the great tech talks blossoming online during the month of May. This month’s AWS Online Tech Talks features sessions on topics like AI, DevOps, Data, and Serverless just to name a few.

May 2017 – Schedule

Below is the upcoming schedule for the live, online technical sessions scheduled for the month of May. Make sure to register ahead of time so you won’t miss out on these free talks conducted by AWS subject matter experts. All schedule times for the online tech talks are shown in the Pacific Time (PDT) time zone.

Webinars featured this month are:

Monday, May 15

Artificial Intelligence

9:00 AM – 10:00 AM: Integrate Your Amazon Lex Chatbot with Any Messaging Service

 

Tuesday, May 16

Compute

10:30 AM – 11:30 AM: Deep Dive on Amazon EC2 F1 Instance

IoT

12:00 Noon – 1:00 PM: How to Connect Your Own Creations with AWS IoT

Wednesday, May 17

Management Tools

9:00 AM – 10:00 AM: OpsWorks for Chef Automate – Automation Made Easy!

Serverless

10:30 AM – 11:30 AM: Serverless Orchestration with AWS Step Functions

Enterprise & Hybrid

12:00 Noon – 1:00 PM: Moving to the AWS Cloud: An Overview of the AWS Cloud Adoption Framework

 

Thursday, May 18

Compute

9:00 AM – 10:00 AM: Scaling Up Tenfold with Amazon EC2 Spot Instances

Big Data

10:30 AM – 11:30 AM: Building Analytics Pipelines for Games on AWS

12:00 Noon – 1:00 PM: Serverless Big Data Analytics using Amazon Athena and Amazon QuickSight

 

Monday, May 22

Artificial Intelligence

9:00 AM – 10:00 AM: What’s New with Amazon Rekognition

Serverless

10:30 AM – 11:30 AM: Building Serverless Web Applications

 

Tuesday, May 23

Hands-On Lab

8:30 – 10:00 AM: Hands On Lab: Windows Workloads on AWS

Big Data

10:30 AM – 11:30 AM: Streaming ETL for Data Lakes using Amazon Kinesis Firehose

DevOps

12:00 Noon – 1:00 PM: Deep Dive: Continuous Delivery for AI Applications with ECS

 

Wednesday, May 24

Storage

9:00 – 10:00 AM: Moving Data into the Cloud with AWS Transfer Services

Containers

12:00 Noon – 1:00 PM: Building a CICD Pipeline for Container Deployment to Amazon ECS

 

Thursday, May 25

Mobile

9:00 – 10:00 AM: Test Your Android App with Espresso and AWS Device Farm

Security & Identity

10:30 AM – 11:30 AM: Advanced Techniques for Federation of the AWS Management Console and Command Line Interface (CLI)

 

Tuesday, May 30

Databases

9:00 – 10:00 AM: DynamoDB: Architectural Patterns and Best Practices for Infinitely Scalable Applications

Compute

10:30 AM – 11:30 AM: Deep Dive on Amazon EC2 Elastic GPUs

Security & Identity

12:00 Noon – 1:00 PM: Securing Your AWS Infrastructure with Edge Services

 

Wednesday, May 31

Hands-On Lab

8:30 – 10:00 AM: Hands On Lab: Introduction to Microsoft SQL Server in AWS

Enterprise & Hybrid

10:30 AM – 11:30 AM: Best Practices in Planning a Large-Scale Migration to AWS

Databases

12:00 Noon – 1:00 PM: Convert and Migrate Your NoSQL Database or Data Warehouse to AWS

 

The AWS Online Tech Talks series covers a broad range of topics at varying technical levels. These sessions feature live demonstrations & customer examples led by AWS engineers and Solution Architects. Check out the AWS YouTube channel for more on-demand webinars on AWS technologies.

Tara