Guest post by Pablo Giner, Head of BI, Wind Mobility.
Over the past few years, urban micro-mobility has become a trending topic. With the contamination indexes hitting historic highs, cities and companies worldwide have been introducing regulations and working on a wide spectrum of solutions to alleviate the situation.
We at Wind Mobility strive to make commuters’ life more sustainable and convenient by bringing short distance urban transportation to cities worldwide.
At Wind Mobility, we scale our services at the same pace as our users demand them, and we do it in an economically and environmentally viable way. We optimize our fleet distribution to avoid overcrowding cities with more scooters than those that are actually going to be used, and we position them just meters away from where our users need them and at the time of the day when they want them.
How do we do that? By optimizing our operations to their fullest. To do so, we need to be very well informed about our users’ behavior under varying conditions and understand our fleet’s potential.
Scalability and flexibility for rapid growth
We knew that before we could solve this challenge, we needed to collect data from many different sources, such as user interactions with our application, user demand, IoT signals from our scooters, and operational metrics. To analyze the numerous datasets collected and extract actionable insights, we needed to build a data lake. While the high-level goal was clear, the scope was less so. We were working hard to scale our operation as we continued to launch new markets. The rapid growth and expansion made it very difficult to predict the volume of data we would need to consume. We were also launching new microservices to support our growth, which resulted in more data sources to ingest. We needed an architecture that allowed us to be agile and quickly adopt to meet our growth. It became clear that a serverless architecture was best positioned to meet those needs, so we started to design our 100% serverless infrastructure.
The first challenge was ingesting and storing data from our scooters in the field, events from our mobile app, operational metrics, and partner APIs. We use AWS Lambda to capture changes in our operational databases and mobile app and push the events to Amazon Kinesis Data Streams, which allows us to take action in real time. We also use Amazon Kinesis Data Firehose to write the data to Amazon Simple Storage Service (Amazon S3), which we use for analytics.
After we were in Amazon S3 and adequately partitioned as per its most common use cases (we partition by date, region, and business line, depending on the data source), we had to find a way to query this data for both data profiling (understanding structure, content, and interrelationships) and ad hoc analysis. For that we chose AWS Glue crawlers to catalog our data and Amazon Athena to read from the AWS Glue Data Catalog and run queries. However, ad hoc analysis and data profiling are relatively sporadic tasks in our team, because most of the data processing computing hours are actually dedicated to transforming the multiple data sources into our data warehouse, consolidating the raw data, modeling it, adding new attributes, and picking the data elements, which constitute 95% of our analytics and predictive needs.
This is where all the heavy lifting takes place. We parse through millions of scooter and user events generated daily (over 300 events per second) to extract actionable insight. We selected AWS Glue to perform this task. Our primary ETL job reads the newly added raw event data from Amazon S3, processes it using Apache Spark, and writes the results to our Amazon Redshift data warehouse. AWS Glue plays a critical role in our ability to scale on demand. After careful evaluation and testing, we concluded that AWS Glue ETL jobs meet all our needs and free us from procuring and managing infrastructure.
Architecture overview
The following diagram represents our current data architecture, showing two serverless data collection, processing, and reporting pipelines:
IoT and application events, followed by Athena for data profiling and Amazon Redshift for reporting
Our data is curated and transformed multiple times a day using an automated pipeline running on AWS Glue. The team can now focus on analyzing the data and building machine learning (ML) applications.
We chose Amazon QuickSight as our business intelligence tool to help us visualize and better understand our operational KPIs. Additionally, we use Amazon Elastic Container Registry (Amazon ECR) to store our Docker images containing our custom ML algorithms and Amazon Elastic Container Service (Amazon ECS) where we train, evaluate, and host our ML models. We schedule our models to be trained and evaluated multiple times a day. Taking as input curated data about demand, conversion, and flow of scooters, we run the models to help us optimize fleet utilization for a particular city at any given time.
The following diagram represents how data from the data lake is incorporated into our ML training, testing, and serving system. First, our developers work in the application code and commit their changes, which are built into new Docker images by our CI/CD pipeline and stored in the Amazon ECR registry. These images are pushed into Amazon ECS and tested in DEV and UAT environments before moving to PROD (where they are triggered by the Amazon ECS task scheduler). During their execution, the Amazon ECS tasks (some train the demand and usage forecasting models, some produce the daily and hourly predictions, and others optimize the fleet distribution to satisfy the forecast) read their configuration and pull data from Amazon S3 (which has been previously produced by scheduled AWS Glue jobs), finally storing their results back into Amazon S3. Executions of these pipelines are tracked via MLFlow (in a dedicated Amazon Elastic Compute Cloud (Amazon EC2) server) and the final result indicating the fleet operations required is fit into a Kepler map, which is then consumed by the operators on the field.
Conclusion
We at Wind Mobility place data at the forefront of our operations. For that, we need our data infrastructure to be as flexible as the industry and the context we operate in, which is why we chose serverless. Over the course of a year, we have built a data lake, a data warehouse, a BI suite, and a variety of (production) data science applications. All of that with a very small team.
Also, within the last 12 months, we have scaled up several of our data pipelines by a factor of 10, without slowing our momentum or redesigning any part of our architecture. When it came to double our fleet in 1 week and increase the frequency at which we capture data from scooters by a factor of 10, our serverless data architecture scaled with no issues. This allowed us to focus on adding value by simplifying our operation, reacting to changes quickly, and delighting our users.
We have measured our success in multiple dimensions:
Speed – Serverless is faster to deploy and expand; we believe we have reduced our time to market for the entire infrastructure by a factor of 2
Visibility – We have 360 degree visibility of our operations worldwide, accessible by our city managers, finance team, and management board
Optimized fleet deployment – We know, at any minute of the day, the number of scooters that our customers need over the next few hours, which reduces unsatisfied demand by more than 50%
If you face a similar challenge, our advice is clear: go fully serverless and use the spectrum of solutions available from AWS.
Pablo Giner is Head of BI at Wind Mobility. Pablo’s background is in wheels (motorcycle racing > vehicle engineering > collision insurance > eScooters sharing…) and for the last few years he has specialized in forming and developing data teams. At Wind Mobility, he leads the data function (data engineering + analytics + data science), and the project he is most proud of is what they call smart fleet rebalancing, an AI backed solution to reposition their fleet in real-time. “In God we trust. All others must bring data.” – W. Edward Deming
Live business data and real-time analytics are critical to informed decision-making and customer service. For example, streaming services like Netflix process billions of traffic flows each day to help you binge-watch your favorite shows. And consumer audio specialists like Sonos monitor a billion events per week to improve listener experiences. These data-savvy businesses collect and analyze massive amounts of real-time data every day.
Kinesis Data Streams overview
To help ingest real-time data or streaming data at large scales, AWS customers turn to Amazon Kinesis Data Streams. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources. The data collected is available in milliseconds, enabling real-time analytics.
To provide this massively scalable throughput, Kinesis Data Streams relies on shards, which are units of throughput and represent a parallelism. One shard provides an ingest throughput of 1 MB/second or 1000 records/second. A shard also has an outbound throughput of 2 MB/sec. As you ingest more data, Kinesis Data Streams can add more shards. Customers often ingest thousands of shards in a single stream.
Enhanced fan-out
One of the main advantages of stream processing is that you can attach multiple unique applications, each consuming data from the same Kinesis data stream. For example, one application can aggregate the records in the data stream, batch them, and write the batch to S3 for long-term retention. Another application can enrich the records and write them into an Amazon DynamoDB table. At the same time, a third application can filter the stream and write a subset of the data into a different Kinesis data stream.
Before the adoption of enhanced fan-out technology, users consumed data from a Kinesis data stream with multiple AWS Lambda functions sharing the same 2 MB/second outbound throughput. Due to shared bandwidth constraints, no more than two or three functions could efficiently connect to the data stream at a time, as shown in the following diagram.
To achieve greater outbound throughput across multiple applications, you could spread data ingestion across multiple data streams. So, a developer seeking to achieve 10 GB/second of outbound throughput to support five separate applications might resort to math like the following table:
Stream
Shards
Input
Output
1
1
1000 records/second or 1 MB/second
2 MB/second
2
2500 ea.
5,000,000 records/second or 5000 MB/second
10,000 MB/second or 10 GB/second
Due to the practical limitation of two to three applications per stream, you must have at least two streams to support five individual applications. You could attach three applications to the first stream and two applications to the second. However, diverting data into two separate streams adds complexity.
In August of 2018, Kinesis Data Streams announced a solution: support for enhanced fan-out and HTTP/2 for faster streaming. The enhanced fan-out method is an option that you can use for consuming Kinesis data streams at a higher capacity. The enhanced capacity enables you to achieve higher outbound throughput without having to provision more streams or shards in the same stream.
When using the enhanced fan-out option, first create a Kinesis data stream consumer. A consumer is an isolated connection to the stream that provides a 2 MB/second outbound throughput. A Kinesis data stream can support up to five consumers, providing a combined outbound throughput capacity of 10MB/second/shard. As the stream scales dynamically by adding shards, so does the amount of throughput scale through the consumers.
Consider again the requirement of 10 GB of output capacity—but rerun your math using enhanced fan-out.
Stream
Shards
Input
Consumers
Output
1
1
1000 records/second or 1 MB/second
5
10 MB/second
1
1000
1,000,000 records or 1,000 MB/second
5
10,000 MB/second or 10 GB/second
Enhanced fan-out with Lambda functions
Just before re:Invent 2018, AWS Lambda announced support for enhanced fan-out and HTTP/2. Lambda functions can now be triggered using the enhanced fan-out pattern to reduce latency. These improvements increase the amount of real-time data that can be processed in serverless applications, as seen in the following diagram.
In addition to using the enhanced fan-out option, you can still attach Lambda functions to the stream using the GetRecords API, as before. You can attach up to five consumers with Lambda functions at 2 MB/second outbound throughput capacity and another two or three Lambda functions sharing a single 2 MB/second outbound throughput capacity. Thus, enhanced fan-out enables you to support up to eight Lambda functions, simultaneously.
HTTP/2
The streaming technology in HTTP/2 increases the output ability of Kinesis data streams. In addition, it allows data delivery from producers to consumers in 70 milliseconds or better (a 65% improvement) in typical scenarios. These new features enable you to build faster, more reactive, highly parallel, and latency-sensitive applications on top of Kinesis Data Streams.
Comparing methods
To demonstrate the advantage of Kinesis Data Streams enhanced fan-out, I built an application with a single shard stream. It has three Lambda functions connected using the standard method and three Lambda functions connected using enhanced fan-out for consumers. I created roughly 76 KB of dummy data and inserted it into the stream at 1,000 records per second. After four seconds, I stopped the process, leaving a total of 4,000 records to be processed.
As seen in the following diagram, each of the enhanced fan-out functions processed the 4000 records in under 2 seconds, averaging at 1,852 MS each. Interestingly, the standard method got a jumpstart in the first function, processing 4,000 records in 1,732 MS. However, because of the shared resources, the other two functions took longer to process the data, at just over 2.5 seconds.
By Kinesis Data Streams standards, 4000 records is a small dataset. But when processing millions of records in real time, the latency between standard and enhanced fan-out becomes much more significant.
Cost
When using Kinesis Data Streams, a company incurs an hourly cost of $0.015 per shard and a PUT fee of $0.014 per one million units. You can purchase enhanced fan-out for a consumer-shard per hour fee of $0.015 and $0.013 per GB data retrieval fee. These fees are for the us-east-1 Region only. To see a full list of prices, see Kinesis Data Streams pricing.
Show me the code
To demonstrate the use of Kinesis Data Streams enhanced fan-out with Lambda functions, I built a simple application. It ingests simulated IoT sensor data and stores it in an Amazon DynamoDB table as well as in an Amazon S3 bucket for later use. I could have conceivably done this in a single Lambda function. However, to keep things simple, I broke it into two separate functions.
On the deployment status page, you can monitor the resources being deployed, including policies and capabilities.
After all the resources deploy, you should see a green banner.
Exploring the application
Take a moment to examine the list of deployed resources. The two Lambda functions, DDBFunction and S3Function, receive data and write to DynamoDB and S3, respectively. Additionally, two roles have been created to allow the functions access to their respective targets.
There are also two consumers, DDBConsumer and S3Consumer, which provide isolated output at 2 MB/second throughput. Each consumer is connected to the KinesisStream stream and triggers the Lambda functions when data occurs.
Also, there is a DynamoDB table called DBRecords and an S3 bucket called S3Records.
Finally, there is a stream consumption app, as shown in the following diagram.
Testing the application
Now that you have your application installed, test it by putting data into the Kinesis data stream.
After you have the generator configured, you should have a custom URL to generate data for your Kinesis data stream. In your configuration steps, you created a username and password. Log in to the generator using those credentials.
When you are logged in, you can generate data for your stream test.
For Region, choose us-east-1.
For Stream/delivery stream, select your stream. It should start with serverlessrepo.
For Records per second, keep the default value of 100.
At this point, if all went according to plan, you should see data in both your DynamoDB table and S3 bucket. Use the following steps to verify that your enhanced fan-out process worked.
On the Lambda console, choose Applications.
Select the application that starts with serverlessrepo-.
Choose Resources, DDBFunction. This opens the DynamoDB console.
Choose Items.
The following screenshot shows the first 100 items that your database absorbed from the DDBFunction attached to KinesisStream through DDBConsumer.
Next, check your S3 bucket,
On the Lambda console, choose Applications.
Select the application that starts with serverlessrepo-.
Choose Resources, S3Records. This opens the S3 console.
As in DynamoDB, you should now see the fake IoT sensor data stored in your S3 bucket for later use.
Now that the demonstration is working, I want to point out the benefits of what you have just done. By using the enhanced fan-out method, you have increased your performance in the following ways.
HTTP/2 has decreased the time from data producers to consumers to <=70 MS, a 65% improvement.
At the consumer level, each consumer has an isolated 2 MB/second outbound throughput speed. Because you are using two consumers, it works out to 2x the performance.
Conclusion
Using Lambda functions in concert with Kinesis Data Streams to collect and analyze massive amounts of data isn’t a new idea. However, the introduction of enhanced fan-out technology and HTTP/2 enables you to use more functions at the same time without losing throughput capacity.
If you only connect one or two Lambda functions to a data stream, then enhanced fan-out might not be a great fit. However, if you attach more than three Lambda functions to a stream for real-time manipulation and data routing, it makes sense to evaluate this option.
Join us this month to learn about AWS services and solutions. New this month, we have a fireside chat with the GM of Amazon WorkSpaces and our 2nd episode of the “How to re:Invent” series. We’ll also cover best practices, deep dives, use cases and more! Join us and register today!
AWS re:Invent June 13, 2018 | 05:00 PM – 05:30 PM PT – Episode 2: AWS re:Invent Breakout Content Secret Sauce – Hear from one of our own AWS content experts as we dive deep into the re:Invent content strategy and how we maintain a high bar. Compute
Containers June 25, 2018 | 09:00 AM – 09:45 AM PT – Running Kubernetes on AWS – Learn about the basics of running Kubernetes on AWS including how setup masters, networking, security, and add auto-scaling to your cluster.
June 19, 2018 | 11:00 AM – 11:45 AM PT – Launch AWS Faster using Automated Landing Zones – Learn how the AWS Landing Zone can automate the set up of best practice baselines when setting up new
June 21, 2018 | 01:00 PM – 01:45 PM PT – Enabling New Retail Customer Experiences with Big Data – Learn how AWS can help retailers realize actual value from their big data and deliver on differentiated retail customer experiences.
June 28, 2018 | 01:00 PM – 01:45 PM PT – Fireside Chat: End User Collaboration on AWS – Learn how End User Compute services can help you deliver access to desktops and applications anywhere, anytime, using any device. IoT
June 27, 2018 | 11:00 AM – 11:45 AM PT – AWS IoT in the Connected Home – Learn how to use AWS IoT to build innovative Connected Home products.
Mobile June 25, 2018 | 11:00 AM – 11:45 AM PT – Drive User Engagement with Amazon Pinpoint – Learn how Amazon Pinpoint simplifies and streamlines effective user engagement.
June 26, 2018 | 11:00 AM – 11:45 AM PT – Deep Dive: Hybrid Cloud Storage with AWS Storage Gateway – Learn how you can reduce your on-premises infrastructure by using the AWS Storage Gateway to connecting your applications to the scalable and reliable AWS storage services. June 27, 2018 | 01:00 PM – 01:45 PM PT – Changing the Game: Extending Compute Capabilities to the Edge – Discover how to change the game for IIoT and edge analytics applications with AWS Snowball Edge plus enhanced Compute instances. June 28, 2018 | 11:00 AM – 11:45 AM PT – Big Data and Analytics Workloads on Amazon EFS – Get best practices and deployment advice for running big data and analytics workloads on Amazon EFS.
Last year, we released Amazon Connect, a cloud-based contact center service that enables any business to deliver better customer service at low cost. This service is built based on the same technology that empowers Amazon customer service associates. Using this system, associates have millions of conversations with customers when they inquire about their shipping or order information. Because we made it available as an AWS service, you can now enable your contact center agents to make or receive calls in a matter of minutes. You can do this without having to provision any kind of hardware. 2
There are several advantages of building your contact center in the AWS Cloud, as described in our documentation. In addition, customers can extend Amazon Connect capabilities by using AWS products and the breadth of AWS services. In this blog post, we focus on how to get analytics out of the rich set of data published by Amazon Connect. We make use of an Amazon Connect data stream and create an end-to-end workflow to offer an analytical solution that can be customized based on need.
Solution overview
The following diagram illustrates the solution.
In this solution, Amazon Connect exports its contact trace records (CTRs) using Amazon Kinesis. CTRs are data streams in JSON format, and each has information about individual contacts. For example, this information might include the start and end time of a call, which agent handled the call, which queue the user chose, queue wait times, number of holds, and so on. You can enable this feature by reviewing our documentation.
In this architecture, we use Kinesis Firehose to capture Amazon Connect CTRs as raw data in an Amazon S3 bucket. We don’t use the recent feature added by Kinesis Firehose to save the data in S3 as Apache Parquet format. We use AWS Glue functionality to automatically detect the schema on the fly from an Amazon Connect data stream.
The primary reason for this approach is that it allows us to use attributes and enables an Amazon Connect administrator to dynamically add more fields as needed. Also by converting data to parquet in batch (every couple of hours) compression can be higher. However, if your requirement is to ingest the data in Parquet format on realtime, we recoment using Kinesis Firehose recently launched feature. You can review this blog post for further information.
By default, Firehose puts these records in time-series format. To make it easy for AWS Glue crawlers to capture information from new records, we use AWS Lambda to move all new records to a single S3 prefix called flatfiles. Our Lambda function is configured using S3 event notification. To comply with AWS Glue and Athena best practices, the Lambda function also converts all column names to lowercase. Finally, we also use the Lambda function to start AWS Glue crawlers. AWS Glue crawlers identify the data schema and update the AWS Glue Data Catalog, which is used by extract, transform, load (ETL) jobs in AWS Glue in the latter half of the workflow.
You can see our approach in the Lambda code following.
from __future__ import print_function
import json
import urllib
import boto3
import os
import re
s3 = boto3.resource('s3')
client = boto3.client('s3')
def convertColumntoLowwerCaps(obj):
for key in obj.keys():
new_key = re.sub(r'[\W]+', '', key.lower())
v = obj[key]
if isinstance(v, dict):
if len(v) > 0:
convertColumntoLowwerCaps(v)
if new_key != key:
obj[new_key] = obj[key]
del obj[key]
return obj
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
try:
client.download_file(bucket, key, '/tmp/file.json')
with open('/tmp/out.json', 'w') as output, open('/tmp/file.json', 'rb') as file:
i = 0
for line in file:
for object in line.replace("}{","}\n{").split("\n"):
record = json.loads(object,object_hook=convertColumntoLowwerCaps)
if i != 0:
output.write("\n")
output.write(json.dumps(record))
i += 1
newkey = 'flatfiles/' + key.replace("/", "")
client.upload_file('/tmp/out.json', bucket,newkey)
s3.Object(bucket,key).delete()
return "success"
except Exception as e:
print(e)
print('Error coping object {} from bucket {}'.format(key, bucket))
raise e
We trigger AWS Glue crawlers based on events because this approach lets us capture any new data frame that we want to be dynamic in nature. CTR attributes are designed to offer multiple custom options based on a particular call flow. Attributes are essentially key-value pairs in nested JSON format. With the help of event-based AWS Glue crawlers, you can easily identify newer attributes automatically.
We recommend setting up an S3 lifecycle policy on the flatfiles folder that keeps records only for 24 hours. Doing this optimizes AWS Glue ETL jobs to process a subset of files rather than the entire set of records.
After we have data in the flatfiles folder, we use AWS Glue to catalog the data and transform it into Parquet format inside a folder called parquet/ctr/. The AWS Glue job performs the ETL that transforms the data from JSON to Parquet format. We use AWS Glue crawlers to capture any new data frame inside the JSON code that we want to be dynamic in nature. What this means is that when you add new attributes to an Amazon Connect instance, the solution automatically recognizes them and incorporates them in the schema of the results.
After AWS Glue stores the results in Parquet format, you can perform analytics using Amazon Redshift Spectrum, Amazon Athena, or any third-party data warehouse platform. To keep this solution simple, we have used Amazon Athena for analytics. Amazon Athena allows us to query data without having to set up and manage any servers or data warehouse platforms. Additionally, we only pay for the queries that are executed.
Try it out!
You can get started with our sample AWS CloudFormation template. This template creates the components starting from the Kinesis stream and finishes up with S3 buckets, the AWS Glue job, and crawlers. To deploy the template, open the AWS Management Console by clicking the following link.
In the console, specify the following parameters:
BucketName: The name for the bucket to store all the solution files. This name must be unique; if it’s not, template creation fails.
etlJobSchedule: The schedule in cron format indicating how often the AWS Glue job runs. The default value is every hour.
KinesisStreamName: The name of the Kinesis stream to receive data from Amazon Connect. This name must be different from any other Kinesis stream created in your AWS account.
s3interval: The interval in seconds for Kinesis Firehose to save data inside the flatfiles folder on S3. The value must between 60 and 900 seconds.
sampledata: When this parameter is set to true, sample CTR records are used. Doing this lets you try this solution without setting up an Amazon Connect instance. All examples in this walkthrough use this sample data.
Select the “I acknowledge that AWS CloudFormation might create IAM resources.” check box, and then choose Create. After the template finishes creating resources, you can see the stream name on the stack Outputs tab.
If you haven’t created your Amazon Connect instance, you can do so by following the Getting Started Guide. When you are done creating, choose your Amazon Connect instance in the console, which takes you to instance settings. Choose Data streaming to enable streaming for CTR records. Here, you can choose the Kinesis stream (defined in the KinesisStreamName parameter) that was created by the CloudFormation template.
Now it’s time to generate the data by making or receiving calls by using Amazon Connect. You can go to Amazon Connect Cloud Control Panel (CCP) to make or receive calls using a software phone or desktop phone. After a few minutes, we should see data inside the flatfiles folder. To make it easier to try this solution, we provide sample data that you can enable by setting the sampledata parameter to true in your CloudFormation template.
You can navigate to the AWS Glue console by choosing Jobs on the left navigation pane of the console. We can select our job here. In my case, the job created by CloudFormation is called glueJob-i3TULzVtP1W0; yours should be similar. You run the job by choosing Run job for Action.
After that, we wait for the AWS Glue job to run and to finish successfully. We can track the status of the job by checking the History tab.
When the job finishes running, we can check the Database section. There should be a new table created called ctr in Parquet format.
To query the data with Athena, we can select the ctr table, and for Action choose View data.
Doing this takes us to the Athena console. If you run a query, Athena shows a preview of the data.
When we can query the data using Athena, we can visualize it using Amazon QuickSight. Before connecting Amazon QuickSight to Athena, we must make sure to grant Amazon QuickSight access to Athena and the associated S3 buckets in the account. For more information on doing this, see Managing Amazon QuickSight Permissions to AWS Resources in the Amazon QuickSight User Guide. We can then create a new data set in Amazon QuickSight based on the Athena table that was created.
After setting up permissions, we can create a new analysis in Amazon QuickSight by choosing New analysis.
Then we add a new data set.
We choose Athena as the source and give the data source a name (in this case, I named it connectctr).
Choose the name of the database and the table referencing the Parquet results.
Then choose Visualize.
After that, we should see the following screen.
Now we can create some visualizations. First, search for the agent.username column, and drag it to the AutoGraph section.
We can see the agents and the number of calls for each, so we can easily see which agents have taken the largest amount of calls. If we want to see from what queues the calls came for each agent, we can add the queue.arn column to the visual.
After following all these steps, you can use Amazon QuickSight to add different columns from the call records and perform different types of visualizations. You can build dashboards that continuously monitor your connect instance. You can share those dashboards with others in your organization who might need to see this data.
Conclusion
In this post, you see how you can use services like AWS Lambda, AWS Glue, and Amazon Athena to process Amazon Connect call records. The post also demonstrates how to use AWS Lambda to preprocess files in Amazon S3 and transform them into a format that recognized by AWS Glue crawlers. Finally, the post shows how to used Amazon QuickSight to perform visualizations.
You can use the provided template to analyze your own contact center instance. Or you can take the CloudFormation template and modify it to process other data streams that can be ingested using Amazon Kinesis or stored on Amazon S3.
Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.
Peter Dalbhanjan is a Solutions Architect for AWS based in Herndon, VA. Peter has a keen interest in evangelizing AWS solutions and has written multiple blog posts that focus on simplifying complex use cases. At AWS, Peter helps with designing and architecting variety of customer workloads.
If you’ve used Amazon CloudWatch Events to schedule the invocation of a Lambda function at regular intervals, you may have noticed that the highest frequency possible is one invocation per minute. However, in some cases, you may need to invoke Lambda more often than that. In this blog post, I’ll cover invoking a Lambda function every 10 seconds, but with some simple math you can change to whatever interval you like.
To achieve this, I’ll show you how to leverage Step Functions and Amazon Kinesis Data Streams.
The Solution
For this example, I’ve created a Step Functions State Machine that invokes our Lambda function 6 times, 10 seconds apart. Such State Machine is then executed once per minute by a CloudWatch Events Rule. This state machine is then executed once per minute by an Amazon CloudWatch Events rule. Finally, the Kinesis Data Stream triggers our Lambda function for each record inserted. The result is our Lambda function being invoked every 10 seconds, indefinitely.
Below is a diagram illustrating how the various services work together.
Step 1: My sampleLambda function doesn’t actually do anything, it just simulates an execution for a few seconds. This is the (Python) code of my dummy function:
import time
import random
def lambda_handler(event, context):
rand = random.randint(1, 3)
print('Running for {} seconds'.format(rand))
time.sleep(rand)
return True
Step 2:
The next step is to create a second Lambda function, that I called Iterator, which has two duties:
It keeps track of the current number of iterations, since Step Function doesn’t natively have a state we can use for this purpose.
It asynchronously invokes our Lambda function at every loops.
This is the code of the Iterator, adapted from here.
The state machine starts and sets the index at 0 and the count at 6.
Iterator function is invoked.
If the iterator function reached the end of the loop, the IsCountReached state terminates the execution, otherwise the machine waits for 10 seconds.
The machine loops back to the iterator.
Step 3: Create an Amazon CloudWatch Events rule scheduled to trigger every minute and add the state machine as its target. I’ve actually prepared an Amazon CloudFormation template that creates the whole stack and starts the Lambda invocations, you can find it here.
Performance
Let’s have a look at a sample series of invocations and analyse how precise the timing is. In the following chart I reported the delay (in excess of the expected 10-second-wait) of 30 consecutive invocations of my dummy function, when the Iterator is configured with a memory size of 1024MB.
Invocations Delay
Notice the delay increases by a few hundred milliseconds at every invocation. The good news is it accrues only within the same loop, 6 times; after that, a new CloudWatch Events kicks in and it resets.
This delay is due to the work that AWS Step Function does outside of the Wait state, the main component of which is the Iterator function itself, that runs synchronously in the state machine and therefore adds up its duration to the 10-second-wait.
As we can easily imagine, the memory size of the Iterator Lambda function does make a difference. Here are the Average and Maximum duration of the function with 256MB, 512MB, 1GB and 2GB of memory.
Average Duration
Maximum Duration
Given those results, I’d say that a memory of 1024MB is a good compromise between costs and performance.
Caveats
As mentioned, in our Amazon CloudWatch Events documentation, in rare cases a rule can be triggered twice, causing two parallel executions of the state machine. If that is a concern, we can add a task state at the beginning of the state machine that checks if any other executions are currently running. If the outcome is positive, then a choice state can immediately terminate the flow. Since the state machine is invoked every 60 seconds and runs for about 50, it is safe to assume that executions should all be sequential and any parallel executions should be treated as duplicates. The task state that checks for current running executions can be a Lambda function similar to the following:
Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.
To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.
Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.
What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.
Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.
Solution overview
Here is how the solution is laid out:
The following sections walk you through each of these steps to set up the pipeline.
1. Define the schema
When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.
To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.
The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.
That’s all you have to do to prepare the schema for Kinesis Data Firehose.
2. Define the data streams
Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events. Open the Kinesis Data Streams console and create two streams. You can configure them with only one shard each because you don’t have a lot of data right now.
3. Define the Kinesis Data Firehose delivery stream for Parquet
Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.
As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.
To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.
On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.
Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.
Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.
Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.
4. Set up the Amazon Connect contact center
After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.
After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.
At this point, your pipeline is complete. Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.
So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.
To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.
5. Analyze the data with Athena
Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures. The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.
Let’s run the first query to see which agents have logged into the system.
The query might look complex, but it’s fairly straightforward:
WITH dataset AS (
SELECT
from_iso8601_timestamp(eventtimestamp) AS event_ts,
eventtype,
-- CURRENT STATE
json_extract_scalar(
currentagentsnapshot,
'$.agentstatus.name') AS current_status,
from_iso8601_timestamp(
json_extract_scalar(
currentagentsnapshot,
'$.agentstatus.starttimestamp')) AS current_starttimestamp,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.firstname') AS current_firstname,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.lastname') AS current_lastname,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.username') AS current_username,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.routingprofile.defaultoutboundqueue.name') AS current_outboundqueue,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
-- PREVIOUS STATE
json_extract_scalar(
previousagentsnapshot,
'$.agentstatus.name') as prev_status,
from_iso8601_timestamp(
json_extract_scalar(
previousagentsnapshot,
'$.agentstatus.starttimestamp')) as prev_starttimestamp,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.firstname') as prev_firstname,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.lastname') as prev_lastname,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.username') as prev_username,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
from kfhconnectblog
where eventtype <> 'HEART_BEAT'
)
SELECT
current_status as status,
current_username as username,
event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC
The query output looks something like this:
Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.
WITH src AS (
SELECT
eventid,
json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
from kfhconnectblog
),
src2 AS (
SELECT *
FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT
eventid,
username,
json_extract_scalar(c_item, '$.contactid') as c_contactid,
json_extract_scalar(c_item, '$.channel') as c_channel,
json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
json_extract_scalar(c_item, '$.queue.name') as c_queue,
json_extract_scalar(c_item, '$.state') as c_state,
from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
json_extract_scalar(p_item, '$.contactid') as p_contactid,
json_extract_scalar(p_item, '$.channel') as p_channel,
json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
json_extract_scalar(p_item, '$.queue.name') as p_queue,
json_extract_scalar(p_item, '$.state') as p_state,
from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT
username,
c_channel as channel,
c_direction as direction,
p_state as prev_state,
c_state as current_state,
c_ts as current_ts,
c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC
The query output looks similar to the following:
6. Analyze the data with Amazon Redshift Spectrum
With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.
Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.
SELECT
eventtype,
json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
json_extract_path_text(
currentagentsnapshot,
'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog
The following shows the query output:
Summary
In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.
Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.
The Internet of Things (IoT) has precipitated to an influx of connected devices and data that can be mined to gain useful business insights. If you own an IoT device, you might want the data to be uploaded seamlessly from your connected devices to the cloud so that you can make use of cloud storage and the processing power to perform sophisticated analysis of data. To upload the data to the AWS Cloud, devices must pass authentication and authorization checks performed by the respective AWS services. The standard way of authenticating AWS requests is the Signature Version 4 algorithm that requires the caller to have an access key ID and secret access key. Consequently, you need to hardcode the access key ID and the secret access key on your devices. Alternatively, you can use the built-in X.509 certificate as the unique device identity to authenticate AWS requests.
AWS IoT has introduced the credentials provider feature that allows a caller to authenticate AWS requests by having an X.509 certificate. The credentials provider authenticates a caller using an X.509 certificate, and vends a temporary, limited-privilege security token. The token can be used to sign and authenticate any AWS request. Thus, the credentials provider relieves you from having to manage and periodically refresh the access key ID and secret access key remotely on your devices.
In the process of retrieving a security token, you use AWS IoT to create a thing (a representation of a specific device or logical entity), register a certificate, and create AWS IoT policies. You also configure an AWS Identity and Access Management (IAM) role and attach appropriate IAM policies to the role so that the credentials provider can assume the role on your behalf. You also make an HTTP-over-Transport Layer Security (TLS) mutual authentication request to the credentials provider that uses your preconfigured thing, certificate, policies, and IAM role to authenticate and authorize the request, and obtain a security token on your behalf. You can then use the token to sign any AWS request using Signature Version 4.
In this blog post, I explain the AWS IoT credentials provider design and then demonstrate the end-to-end process of retrieving a security token from AWS IoT and using the token to write a temperature and humidity record to a specific Amazon DynamoDB table.
Note: This post assumes you are familiar with AWS IoT and IAM to perform steps using the AWS CLI and OpenSSL. Make sure you are running the latest version of the AWS CLI.
Overview of the credentials provider workflow
The following numbered diagram illustrates the credentials provider workflow. The diagram is followed by explanations of the steps.
To explain the steps of the workflow as illustrated in the preceding diagram:
The AWS IoT device uses the AWS SDK or custom client to make an HTTPS request to the credentials provider for a security token. The request includes the device X.509 certificate for authentication.
The credentials provider forwards the request to the AWS IoT authentication and authorization module to verify the certificate and the permission to request the security token.
If the certificate is valid and has permission to request a security token, the AWS IoT authentication and authorization module returns success. Otherwise, it returns failure, which goes back to the device with the appropriate exception.
If assuming the role succeeds, AWS STS returns a temporary, limited-privilege security token to the credentials provider.
The credentials provider returns the security token to the device.
The AWS SDK on the device uses the security token to sign an AWS request with AWS Signature Version 4.
The requested service invokes IAM to validate the signature and authorize the request against access policies attached to the preconfigured IAM role.
If IAM validates the signature successfully and authorizes the request, the request goes through.
In another solution, you could configure an AWS Lambda rule that ingests your device data and sends it to another AWS service. However, in applications that require the uploading of large files such as videos or aggregated telemetry to the AWS Cloud, you may want your devices to be able to authenticate and send data directly to the AWS service of your choice. The credentials provider enables you to do that.
Outline of the steps to retrieve and use security token
Perform the following steps as part of this solution:
Create an AWS IoT thing: Start by creating a thing that corresponds to your home thermostat in the AWS IoT thing registry database. This allows you to authenticate the request as a thing and use thing attributes as policy variables in AWS IoT and IAM policies.
Register a certificate: Create and register a certificate with AWS IoT, and attach it to the thing for successful device authentication.
Create and configure an IAM role: Create an IAM role to be assumed by the service on behalf of your device. I illustrate how to configure a trust policy and an access policy so that AWS IoT has permission to assume the role, and the token has necessary permission to make requests to DynamoDB.
Create a role alias: Create a role alias in AWS IoT. A role alias is an alternate data model pointing to an IAM role. The credentials provider request must include a role alias name to indicate which IAM role to assume for obtaining a security token from AWS STS. You may update the role alias on the server to point to a different IAM role and thus make your device obtain a security token with different permissions.
Attach a policy: Create an authorization policy with AWS IoT and attach it to the certificate to control which device can assume which role aliases.
Request a security token: Make an HTTPS request to the credentials provider and retrieve a security token and use it to sign a DynamoDB request with Signature Version 4.
Use the security token to sign a request: Use the retrieved token to sign a request to DynamoDB and successfully write a temperature and humidity record from your home thermostat in a specific table. Thus, starting with an X.509 certificate on your home thermostat, you can successfully upload your thermostat record to DynamoDB and use it for further analysis. Before the availability of the credentials provider, you could not do this.
Deploy the solution
1. Create an AWS IoT thing
Register your home thermostat in the AWS IoT thing registry database by creating a thing type and a thing. You can use the AWS CLI with the following command to create a thing type. The thing type allows you to store description and configuration information that is common to a set of things.
Now, you need to have a Certificate Authority (CA) certificate, sign a device certificate using the CA certificate, and register both certificates with AWS IoT before your device can authenticate to AWS IoT. If you do not already have a CA certificate, you can use OpenSSL to create a CA certificate, as described in Use Your Own Certificate. To register your CA certificate with AWS IoT, follow the steps on Registering Your CA Certificate.
You then have to create a device certificate signed by the CA certificate and register it with AWS IoT, which you can do by following the steps on Creating a Device Certificate Using Your CA Certificate. Save the certificate and the corresponding key pair; you will use them when you request a security token later. Also, remember the password you provide when you create the certificate.
Run the following command in the AWS CLI to attach the device certificate to your thing so that you can use thing attributes in policy variables.
If the attach-thing-principal command succeeds, the output is empty.
3. Configure an IAM role
Next, configure an IAM role in your AWS account that will be assumed by the credentials provider on behalf of your device. You are required to associate two policies with the role: a trust policy that controls who can assume the role, and an access policy that controls which actions can be performed on which resources by assuming the role.
The following trust policy grants the credentials provider permission to assume the role. Put it in a text document and save the document with the name, trustpolicyforiot.json.
The following access policy allows DynamoDB operations on the table that has the same name as the thing name that you created in Step 1, MyHomeThermostat, by using credentials-iot:ThingName as a policy variable. I explain after Step 5 about using thing attributes as policy variables. Put the following policy in a text document and save the document with the name, accesspolicyfordynamodb.json.
Finally, run the following command in the AWS CLI to attach the access policy to your role.
aws iam attach-role-policy --role-name dynamodb-access-role --policy-arn arn:aws:iam::<your_aws_account_id>:policy/accesspolicyfordynamodb
If the attach-role-policy command succeeds, the output is empty.
Configure the PassRole permissions
The IAM role that you have created must be passed to AWS IoT to create a role alias, as described in Step 4. The user who performs the operation requires iam:PassRole permission to authorize this action. You also should add permission for the iam:GetRole action to allow the user to retrieve information about the specified role. Create the following policy to grant iam:PassRole and iam:GetRole permissions. Name this policy, passrolepermission.json.
Now, run the following command to attach the policy to the user.
aws iam attach-user-policy --policy-arn arn:aws:iam::<your_aws_account_id>:policy/passrolepermission --user-name <user_name>
If the attach-user-policy command succeeds, the output is empty.
4. Create a role alias
Now that you have configured the IAM role, you will create a role alias with AWS IoT. You must provide the following pieces of information when creating a role alias:
RoleAlias: This is the primary key of the role alias data model and hence a mandatory attribute. It is a string; the minimum length is 1 character, and the maximum length is 128 characters.
RoleArn: This is the Amazon Resource Name (ARN) of the IAM role you have created. This is also a mandatory attribute.
CredentialDurationSeconds: This is an optional attribute specifying the validity (in seconds) of the security token. The minimum value is 900 seconds (15 minutes), and the maximum value is 3,600 seconds (60 minutes); the default value is 3,600 seconds, if not specified.
Run the following command in the AWS CLI to create a role alias. Use the credentials of the user to whom you have given the iam:PassRole permission.
You created and registered a certificate with AWS IoT earlier for successful authentication of your device. Now, you need to create and attach a policy to the certificate to authorize the request for the security token.
Let’s say you want to allow a thing to get credentials for the role alias, Thermostat-dynamodb-access-role-alias, with thing owner Alice, thing type thermostat, and the thing attached to a principal. The following policy, with thing attributes as policy variables, achieves these requirements. After this step, I explain more about using thing attributes as policy variables. Put the policy in a text document, and save it with the name, alicethermostatpolicy.json.
If the attach-policy command succeeds, the output is empty.
You have completed all the necessary steps to request an AWS security token from the credentials provider!
Using thing attributes as policy variables
Before I show how to request a security token, I want to explain more about how to use thing attributes as policy variables and the advantage of using them. As a prerequisite, a device must provide a thing name in the credentials provider request.
Thing substitution variables in AWS IoT policies
AWS IoT Simplified Permission Management allows you to associate a connection with a specific thing, and allow the thing name, thing type, and other thing attributes to be available as substitution variables in AWS IoT policies. You can write a generic AWS IoT policy as in alicethermostatpolicy.json in Step 5, attach it to multiple certificates, and authorize the connection as a thing. For example, you could attach alicethermostatpolicy.json to certificates corresponding to each of the thermostats you have that you want to assume the role alias, Thermostat-dynamodb-access-role-alias, and allow operations only on the table with the name that matches the thing name. For more information, see the full list of thing policy variables.
Thing substitution variables in IAM policies
You also can use the following three substitution variables in the IAM role’s access policy (I used credentials-iot:ThingName in accesspolicyfordynamodb.json in Step 3):
credentials-iot:ThingName
credentials-iot:ThingTypeName
credentials-iot:AwsCertificateId
When the device provides the thing name in the request, the credentials provider fetches these three variables from the database and adds them as context variables to the security token. When the device uses the token to access DynamoDB, the variables in the role’s access policy are replaced with the corresponding values in the security token. Note that you also can use credentials-iot:AwsCertificateId as a policy variable; AWS IoT returns certificateId during registration.
6. Request a security token
Make an HTTPS request to the credentials provider to fetch a security token. You have to supply the following information:
Certificate and key pair: Because this is an HTTP request over TLS mutual authentication, you have to provide the certificate and the corresponding key pair to your client while making the request. Use the same certificate and key pair that you used during certificate registration with AWS IoT.
RoleAlias: Provide the role alias (in this example, Thermostat-dynamodb-access-role-alias) to be assumed in the request.
ThingName: Provide the thing name that you created earlier in the AWS IoT thing registry database. This is passed as a header with the name, x-amzn-iot-thingname. Note that the thing name is mandatory only if you have thing attributes as policy variables in AWS IoT or IAM policies.
Run the following command in the AWS CLI to obtain your AWS account-specific endpoint for the credentials provider. See the DescribeEndpoint API documentation for further details.
Note that if you are on Mac OS X, you need to export your certificate to a .pfx or .p12 file before you can pass it in the https request. Use OpenSSL with the following command to convert the device certificate from .pem to .pfx format. Remember the password because you will need it subsequently in a curl command.
Now, make an HTTPS request to the credentials provider to fetch a security token. You may use your preferred HTTP client for the request. I use curl in the following examples.
This command returns a security token object that has an accessKeyId, a secretAccessKey, a sessionToken, and an expiration. The following is sample output of the curl command.
Create a DynamoDB table called MyHomeThermostat in your AWS account. You will have to choose the hash (partition key) and the range (sort key) while creating the table to uniquely identify a record. Make the hash the serial_number of the thermostat and the range the timestamp of the record. Create a text file with the following JSON to put a temperature and humidity record in the table. Name the file, item.json.
You can use the accessKeyId, secretAccessKey, and sessionToken retrieved from the output of the curl command to sign a request that writes the temperature and humidity record to the DynamoDB table. Use the following commands to accomplish this.
In this blog post, I demonstrated how to retrieve a security token by using an X.509 certificate and then writing an item to a DynamoDB table by using the security token. Similarly, you could run applications on surveillance cameras or sensor devices that exchange the X.509 certificate for an AWS security token and use the token to upload video streams to Amazon Kinesis or telemetry data to Amazon CloudWatch.
If you have comments about this blog post, submit them in the “Comments” section below. If you have questions about or issues implementing this solution, start a new thread on the AWS IoT forum.
This blog post was co-authored by Ujjwal Ratan, a senior AI/ML solutions architect on the global life sciences team.
Healthcare data is generated at an ever-increasing rate and is predicted to reach 35 zettabytes by 2020. Being able to cost-effectively and securely manage this data whether for patient care, research or legal reasons is increasingly important for healthcare providers.
Healthcare providers must have the ability to ingest, store and protect large volumes of data including clinical, genomic, device, financial, supply chain, and claims. AWS is well-suited to this data deluge with a wide variety of ingestion, storage and security services (e.g. AWS Direct Connect, Amazon Kinesis Streams, Amazon S3, Amazon Macie) for customers to handle their healthcare data. In a recent Healthcare IT News article, healthcare thought-leader, John Halamka, noted, “I predict that five years from now none of us will have datacenters. We’re going to go out to the cloud to find EHRs, clinical decision support, analytics.”
I realize simply storing this data is challenging enough. Magnifying the problem is the fact that healthcare data is increasingly attractive to cyber attackers, making security a top priority. According to Mariya Yao in her Forbes column, it is estimated that individual medical records can be worth hundreds or even thousands of dollars on the black market.
In this first of a 2-part post, I will address the value that AWS can bring to customers for ingesting, storing and protecting provider’s healthcare data. I will describe key components of any cloud-based healthcare workload and the services AWS provides to meet these requirements. In part 2 of this post we will dive deep into the AWS services used for advanced analytics, artificial intelligence and machine learning.
The data tsunami is upon us
So where is this data coming from? In addition to the ubiquitous electronic health record (EHR), the sources of this data include:
genomic sequencers
devices such as MRIs, x-rays and ultrasounds
sensors and wearables for patients
medical equipment telemetry
mobile applications
Additional sources of data come from non-clinical, operational systems such as:
human resources
finance
supply chain
claims and billing
Data from these sources can be structured (e.g., claims data) as well as unstructured (e.g., clinician notes). Some data comes across in streams such as that taken from patient monitors, while some comes in batch form. Still other data comes in near-real time such as HL7 messages. All of this data has retention policies dictating how long it must be stored. Much of this data is stored in perpetuity as many systems in use today have no purge mechanism. AWS has services to manage all these data types as well as their retention, security and access policies.
Imaging is a significant contributor to this data tsunami. Increasing demand for early-stage diagnoses along with aging populations drive increasing demand for images from CT, PET, MRI, ultrasound, digital pathology, X-ray and fluoroscopy. For example, a thin-slice CT image can be hundreds of megabytes. Increasing demand and strict retention policies make storage costly.
Due to the plummeting cost of gene sequencing, molecular diagnostics (including liquid biopsy) is a large contributor to this data deluge. Many predict that as the value of molecular testing becomes more identifiable, the reimbursement models will change and it will increasingly become the standard of care. According to the Washington Post article “Sequencing the Genome Creates so Much Data We Don’t Know What to do with It,”
“Some researchers predict that up to one billion people will have their genome sequenced by 2025 generating up to 40 exabytes of data per year.”
Although genomics is primarily used for oncology diagnostics today, it’s also used for other purposes, pharmacogenomics — used to understand how an individual will metabolize a medication.
Reference Architecture
It is increasingly challenging for the typical hospital, clinic or physician practice to securely store, process and manage this data without cloud adoption.
Amazon has a variety of ingestion techniques depending on the nature of the data including size, frequency and structure. AWS Snowball and AWS Snowmachine are appropriate for extremely-large, secure data transfers whether one time or episodic. AWS Glue is a fully-managed ETL service for securely moving data from on-premise to AWS and Amazon Kinesis can be used for ingesting streaming data.
Amazon S3, Amazon S3 IA, and Amazon Glacier are economical, data-storage services with a pay-as-you-go pricing model that expand (or shrink) with the customer’s requirements.
The above architecture has four distinct components – ingestion, storage, security, and analytics. In this post I will dive deeper into the first three components, namely ingestion, storage and security. In part 2, I will look at how to use AWS’ analytics services to draw value on, and optimize, your healthcare data.
Ingestion
A typical provider data center will consist of many systems with varied datasets. AWS provides multiple tools and services to effectively and securely connect to these data sources and ingest data in various formats. The customers can choose from a range of services and use them in accordance with the use case.
For use cases involving one-time (or periodic), very large data migrations into AWS, customers can take advantage of AWS Snowball devices. These devices come in two sizes, 50 TB and 80 TB and can be combined together to create a petabyte scale data transfer solution.
The devices are easy to connect and load and they are shipped to AWS avoiding the network bottlenecks associated with such large-scale data migrations. The devices are extremely secure supporting 256-bit encryption and come in a tamper-resistant enclosure. AWS Snowball imports data in Amazon S3 which can then interface with other AWS compute services to process that data in a scalable manner.
For use cases involving a need to store a portion of datasets on premises for active use and offload the rest on AWS, the Amazon storage gateway service can be used. The service allows you to seamlessly integrate on premises applications via standard storage protocols like iSCSI or NFS mounted on a gateway appliance. It supports a file interface, a volume interface and a tape interface which can be utilized for a range of use cases like disaster recovery, backup and archiving, cloud bursting, storage tiering and migration.
The AWS Storage Gateway appliance can use the AWS Direct Connect service to establish a dedicated network connection from the on premises data center to AWS.
Specific Industry Use Cases
By using the AWS proposed reference architecture for disaster recovery, healthcare providers can ensure their data assets are securely stored on the cloud and are easily accessible in the event of a disaster. The “AWS Disaster Recovery” whitepaper includes details on options available to customers based on their desired recovery time objective (RTO) and recovery point objective (RPO).
AWS is an ideal destination for offloading large volumes of less-frequently-accessed data. These datasets are rarely used in active compute operations but are exceedingly important to retain for reasons like compliance. By storing these datasets on AWS, customers can take advantage of the highly-durable platform to securely store their data and also retrieve them easily when they need to. For more details on how AWS enables customers to run back and archival use cases on AWS, please refer to the following set of whitepapers.
A healthcare provider may have a variety of databases spread throughout the hospital system supporting critical applications such as EHR, PACS, finance and many more. These datasets often need to be aggregated to derive information and calculate metrics to optimize business processes. AWS Glue is a fully-managed Extract, Transform and Load (ETL) service that can read data from a JDBC-enabled, on-premise database and transfer the datasets into AWS services like Amazon S3, Amazon Redshift and Amazon RDS. This allows customers to create transformation workflows that integrate smaller datasets from multiple sources and aggregates them on AWS.
Healthcare providers deal with a variety of streaming datasets which often have to be analyzed in near real time. These datasets come from a variety of sources such as sensors, messaging buses and social media, and often do not adhere to an industry standard. The Amazon Kinesis suite of services, that includes Amazon Kinesis Streams, Amazon Kinesis Firehose, and Amazon Kinesis Analytics, are the ideal set of services to accomplish the task of deriving value from streaming data.
Example: Using AWS Glue to de-identify and ingest healthcare data into S3 Let’s consider a scenario in which a provider maintains patient records in a database they want to ingest into S3. The provider also wants to de-identify the data by stripping personally- identifiable attributes and store the non-identifiable information in an S3 bucket. This bucket is different from the one that contains identifiable information. Doing this allows the healthcare provider to separate sensitive information with more restrictions set up via S3 bucket policies.
To ingest records into S3, we create a Glue job that reads from the source database using a Glue connection. The connection is also used by a Glue crawler to populate the Glue data catalog with the schema of the source database. We will use the Glue development endpoint and a zeppelin notebook server on EC2 to develop and execute the job.
Step 1: Import the necessary libraries and also set a glue context which is a wrapper on the spark context:
Step 2: Create a dataframe from the source data. I call the dataframe “readmissionsdata”. Here is what the schema would look like:
Step 3: Now select the columns that contains indentifiable information and store it in a new dataframe. Call the new dataframe “phi”.
Step 4: Non-PHI columns are stored in a separate dataframe. Call this dataframe “nonphi”.
Step 5: Write the two dataframes into two separate S3 buckets
Once successfully executed, the PHI and non-PHI attributes are stored in two separate files in two separate buckets that can be individually maintained.
Storage
In 2016, 327 healthcare providers reported a protected health information (PHI) breach, affecting 16.4m patient records[1]. There have been 342 data breaches reported in 2017 — involving 3.2 million patient records.[2]
To date, AWS has released 51 HIPAA-eligible services to help customers address security challenges and is in the process of making many more services HIPAA-eligible. These HIPAA-eligible services (along with all other AWS services) help customers build solutions that comply with HIPAA security and auditing requirements. A catalogue of HIPAA-enabled services can be found at AWS HIPAA-eligible services. It is important to note that AWS manages physical and logical access controls for the AWS boundary. However, the overall security of your workloads is a shared responsibility, where you are responsible for controlling user access to content on your AWS accounts.
AWS storage services allow you to store data efficiently while maintaining high durability and scalability. By using Amazon S3 as the central storage layer, you can take advantage of the Amazon S3 storage management features to get operational metrics on your data sets and transition them between various storage classes to save costs. By tagging objects on Amazon S3, you can build a governance layer on Amazon S3 to grant role based access to objects using Amazon IAM and Amazon S3 bucket policies.
To learn more about the Amazon S3 storage management features, see the following link.
Security
In the example above, we are storing the PHI information in a bucket named “phi.” Now, we want to protect this information to make sure its encrypted, does not have unauthorized access, and all access requests to the data are logged.
Encryption: S3 provides settings to enable default encryption on a bucket. This ensures any object in the bucket is encrypted by default.
Logging: S3 provides object level logging that can be used to capture all API calls to the object. The API calls are logged in cloudtrail for easy access and consolidation. Moreover, it also supports events to proactively alert customers of read and write operations.
Access control: Customers can use S3 bucket policies and IAM policies to restrict access to the phi bucket. It can also put a restriction to enforce multi-factor authentication on the bucket. For example, the following policy enforces multi-factor authentication on the phi bucket:
In Part 1 of this blog, we detailed the ingestion, storage, security and management of healthcare data on AWS. Stay tuned for part two where we are going to dive deep into optimizing the data for analytics and machine learning.
This post courtesy of Massimiliano Angelino, AWS Solutions Architect
Different enterprise systems—ERP, CRM, BI, HR, etc.—need to exchange information but normally cannot do that natively because they are from different vendors. Enterprises have tried multiple ways to integrate heterogeneous systems, generally referred to as enterprise application integration (EAI).
Modern EAI systems are based on a message-oriented middleware (MoM), also known as enterprise service bus (ESB). An ESB provides data communication via a message bus, on top of which it also provides components to orchestrate, route, translate, and monitor the data exchange. Communication with the ESB is done via adapters or connectors provided by the ESB. In this way, the different applications do not have to have specific knowledge of the technology used to provide the integration.
Amazon MQ used with Apache Camel is an open-source alternative to commercial ESBs. With the launch of Amazon MQ, integration between on-premises applications and cloud services becomes much simpler. Amazon MQ provides a managed message broker service currently supporting ApacheMQ 5.15.0.
In this post, I show how a simple integration between Amazon MQ and other AWS services can be achieved by using Apache Camel.
Apache Camel provides built-in connectors for integration with a wide variety of AWS services such as Amazon MQ, Amazon SQS, Amazon SNS, Amazon SWF, Amazon S3, AWS Lambda, Amazon DynamoDB, AWS Elastic Beanstalk, and Amazon Kinesis Streams. It also provides a broad range of other connectors including Cassandra, JDBC, Spark, and even Facebook and Slack.
EAI system architecture
Different applications use different data formats, hence the need for a translation/transformation service. Such services can be provided to or from a common “normalized” format, or specifically between two applications.
The use of normalized formats simplifies the integration process when multiple applications need to share the same data, as the number of conversions to be realized is N (number of applications). This is at the cost of a more complex adaptation to a common format, which is required to cover all needs from the different applications, current and future.
Another characteristic of an EAI system is the support of distributed transactions to ensure data consistency across multiple applications.
EAI system architecture is normally composed of the following components:
A centralized broker that handles security, access control, and data communications. Amazon MQ provides these features through the support of multiple transport protocols (AMQP, Openwire, MQTT, WebSocket), security (all communications are encrypted via SSL), and per destination granular access control.
An independent data model, also known as the canonical data model. XML is the de facto standard for the data representation.
Connectors/agents that allow the applications to communicate with the broker.
A system model to allow a standardized way for all components to interface with the EAI. Java Message Service (JMS) and Windows Communication Foundation (WCF) are standard APIs to interact with constructs such as queues and topics to implement the different messaging patterns.
Walkthrough
This solution walks you through the following steps:
Creating the broker
Writing a simple application
Adding the dependencies
Triaging files into S3
Writing the Camel route
Sending files to the AMQP queue
Setting up AMQP
Testing the code
Creating the broker
To create a new broker, log in to your AWS account and choose Amazon MQ. Amazon MQ is currently available in six AWS Regions:
US East (N. Virginia)
US East (Ohio)
US West (Oregon)
EU (Ireland)
EU (Frankfurt)
Asia Pacific (Sydney) regions.
Make sure that you have selected one of these Regions.
The master user name and password are used to access the monitoring console of the broker and can be also used to authenticate when connecting the clients to the broker. I recommend creating separate users, without console access, to authenticate the clients to the broker, after the broker has been created.
For this example, create a single broker without failover. If your application requires a higher availability level, check the Create standby in a different zone check box. In case the principal broker instance would fail, the standby takes over in seconds. To make the client aware of the standby, use the failover:// protocol in the connection configuration pointing to both broker endpoints.
Leave the other settings as is. The broker takes few minutes to be created. After it’s done, you can see the list of endpoints available for the different protocols.
After the broker has been created, modify the security group to add the allowed ports and sources for access.
For this example, you need access to the ActiveMQ admin page and to AMQP. Open up ports 8162 and 5671 to the public address of your laptop.
You can also create a new user for programmatic access to the broker. In the Users section, choose Create User and add a new user named sdk.
Writing a simple application
The complete code for this walkthrough is available from the aws-amazonmq-apachecamel-sample GitHub repo. Clone the repository on your local machine to have the fully functional example. The rest of this post offers step-by-step instructions to build this solution.
To write the application, use Apache Maven and the Camel archetypes provided by Maven. If you do not have Apache Maven installed on your machine, you can follow the instructions at Installing Apache Maven.
From a terminal, run the following command:
mvn archetype:generate
You get a list of archetypes. Type camel to get only the one related to camel. In this case, use the java8 example and type the following:
Maven now generates the skeleton code in a folder named as the artifactId. In this case:
camel-aws-simple
Next, test that the environment is configured correctly to run Camel. At the prompt, run the following commands:
cd camel-aws-simple
mvn install
mvn exec:java
You should see a log appearing in the console, printing the following:
[INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ camel-aws-test ---
[ com.angmas.MainApp.main()] DefaultCamelContext INFO Apache Camel 2.20.1 (CamelContext: camel-1) is starting
[ com.angmas.MainApp.main()] ManagedManagementStrategy INFO JMX is enabled
[ com.angmas.MainApp.main()] DefaultTypeConverter INFO Type converters loaded (core: 192, classpath: 0)
[ com.angmas.MainApp.main()] DefaultCamelContext INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[ com.angmas.MainApp.main()] DefaultCamelContext INFO Route: route1 started and consuming from: timer://simple?period=1000
[ com.angmas.MainApp.main()] DefaultCamelContext INFO Total 1 routes, of which 1 are started
[ com.angmas.MainApp.main()] DefaultCamelContext INFO Apache Camel 2.20.1 (CamelContext: camel-1) started in 0.419 seconds
[-1) thread #2 - timer://simple] route1 INFO Got a String body
[-1) thread #2 - timer://simple] route1 INFO Got an Integer body
[-1) thread #2 - timer://simple] route1 INFO Got a Double body
[-1) thread #2 - timer://simple] route1 INFO Got a String body
[-1) thread #2 - timer://simple] route1 INFO Got an Integer body
[-1) thread #2 - timer://simple] route1 INFO Got a Double body
[-1) thread #2 - timer://simple] route1 INFO Got a String body
[-1) thread #2 - timer://simple] route1 INFO Got an Integer body
[-1) thread #2 - timer://simple] route1 INFO Got a Double body
Adding the dependencies
Now that you have verified that the sample works, modify it to add the dependencies to interface to Amazon MQ/ActiveMQ and AWS.
For the following steps, you can use a normal text editor, such as vi, Sublime Text, or Visual Studio Code. Or, open the maven project in an IDE such as Eclipse or IntelliJ IDEA.
Open pom.xml and add the following lines inside the <dependencies> tag:
The camel-aws component is taking care of the interface with the supported AWS services without requiring any in-depth knowledge of the AWS Java SDK. For more information, see Camel Components for Amazon Web Services.
Triaging files into S3
Write a Camel component that receives files as a payload to messages in a queue and write them to an S3 bucket with different prefixes depending on the extension.
Because the broker that you created is exposed via a public IP address, you can execute the code from anywhere that there is an internet connection that allows communication on the specific ports. In this example, run the code from your own laptop. A broker can also be created without public IP address, in which case it is only accessible from inside the VPC in which it has been created, or by any peered VPC or network connected via a virtual gateway (VPN or AWS Direct Connect).
First, look at the code created by Maven. The archetype chosen created a standalone Camel context run via the helper org.apache.camel.main.Main class. This provides an easy way to run Camel routes from an IDE or the command line without needing to deploy it inside a container. Apache Camel can be also run as an OSGi module, or Spring and SpringBoot bean.
package com.angmas;
import org.apache.camel.main.Main;
/**
* A Camel Application
*/
public class MainApp {
/**
* A main() so you can easily run these routing rules in your IDE
*/
public static void main(String... args) throws Exception {
Main main = new Main();
main.addRouteBuilder(new MyRouteBuilder());
main.run(args);
}
}
The main method instantiates the Camel Main helper class and the routes, and runs the Camel application. The MyRouteBuilder class creates a route using Java DSL. It is also possible to define routes in Spring XML and load them dynamically in the code.
public void configure() {
// this sample sets a random body then performs content-based
// routing on the message using method references
from("timer:simple?period=1000")
.process()
.message(m -> m.setHeader("index", index++ % 3))
.transform()
.message(this::randomBody)
.choice()
.when()
.body(String.class::isInstance)
.log("Got a String body")
.when()
.body(Integer.class::isInstance)
.log("Got an Integer body")
.when()
.body(Double.class::isInstance)
.log("Got a Double body")
.otherwise()
.log("Other type message");
}
Writing the Camel route
Replace the existing route with one that fetches messages from Amazon MQ over AMQP, and routes the content to different S3 buckets depending on the file name extension.
Reads messages from the AMQP queue named filequeue.
Processes the message and sets a new ext header using the setExtensionHeader method (see below).
Checks the value of the ext header and write the body of the message as an object in an S3 bucket using different key prefixes, retaining the original name of the file.
The Amazon S3 component is configured with the bucket name, and a reference to an S3 client (amazonS3client=#s3Client) that you added to the Camel registry in the Main method of the app. Adding the object to the Camel registry allows Camel to find the object at runtime. Even though you could pass the region, accessKey, and secretKey parameters directly in the component URI, this way is more secure. It can make use of EC2 instance roles, so that you never need to pass the secrets.
Sending files to the AMQP queue
To send the files to the AMQP queue for testing, add another Camel route. In a real scenario, the messages to the AMQP queue are generated by another client. You are going to create a new route builder, but you could also add this route inside the existing MyRouteBuilder.
package com.angmas;
import org.apache.camel.builder.RouteBuilder;
/**
* A Camel Java8 DSL Router
*/
public class MessageProducerBuilder extends RouteBuilder {
/**
* Configure the Camel routing rules using Java code...
*/
public void configure() {
from("file://input?delete=false&noop=true")
.log("Content ${body} ${headers.CamelFileName}")
.to("amqp:filequeue");
}
}
The code reads files from the input folder in the work directory and publishes it to the queue. The route builder is added in the main class:
By default, Camel tries to connect to a local AMQP broker. Configure it to connect to your Amazon MQ broker.
Create an AMQPConnectionDetails object that is configured to connect to Amazon MQ broker with SSL and pass the user name and password that you set on the broker. Adding the object to the Camel registry allows Camel to find the object at runtime and use it as the default connection to AMQP.
public class MainApp {
public static String BROKER_URL = System.getenv("BROKER_URL");
public static String AMQP_URL = "amqps://"+BROKER_URL+":5671";
public static String BROKER_USERNAME = System.getenv("BROKER_USERNAME");
public static String BROKER_PASSWORD = System.getenv("BROKER_PASSWORD");
/**
* A main() so you can easily run these routing rules in your IDE
*/
public static void main(String... args) throws Exception {
Main main = new Main();
main.bind("amqp", getAMQPconnection());
main.bind("s3Client", AmazonS3ClientBuilder.standard().withRegion(Regions.US_EAST_1).build());
main.addRouteBuilder(new MyRouteBuilder());
main.addRouteBuilder(new MessageProducerBuilder());
main.run(args);
}
public static AMQPConnectionDetails getAMQPconnection() {
return new AMQPConnectionDetails(AMQP_URL, BROKER_USERNAME, BROKER_PASSWORD);
}
}
The AMQP_URL uses the amqps schema that indicates that you are using SSL. You then add the component to the registry. Camel finds it by matching the class type. main.bind("amqp-ssl", getAMQPConnection());
Testing the code
Create an input folder in the project root, and create few files with different extensions, such as txt, html, and csv.
Set the different environment variables required by the code, either in the shell or in your IDE as execution configuration.
If you are running the example from an EC2 instance, ensure that the EC2 instance role has read permission on the S3 bucket.
If you are running this on your laptop, ensure that you have configured the AWS credentials in the environment, for example, by using the aws configure command.
From the command line, execute the code:
mvn exec:java
If you are using an IDE, execute the main class. Camel outputs logging information and you should see messages listing the content and names of the files in the input folder.
Keep adding some more files to the input folder. You see that they are triaged in S3 a few seconds later. You can open the S3 console to check that they have been created.
To stop Camel, press CTRL+C in the shell.
Conclusion
In this post, I showed you how to create a publicly accessible Amazon MQ broker, and how to use Apache Camel to easily integrate AWS services with the broker. In the example, you created a Camel route that reads messages containing files from the AMQP queue and triages them by file extension into an S3 bucket.
Camel supports several components and provides blueprints for several enterprise integration patterns. Used in combination with the Amazon MQ, it provides a powerful and flexible solution to extend traditional enterprise solutions to the AWS Cloud, and integrate them seamlessly with cloud-native services, such as Amazon S3, Amazon SNS, Amazon SQS, Amazon CloudWatch, and AWS Lambda.
To learn more, see the Amazon MQ website. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.
Trying something new can often be a daunting task. Where do you start? What resources are available to help guide you through unfamiliar territory? Where can you go if you need additional help?
Whether you’ve just signed up for your first AWS account or you’ve been with us for some time, there’s always something new to learn as our services evolve to meet the ever-changing needs of our customers. To help ensure you’re set up for success as you build with AWS, we put together this quick reference guide for Big Data training and resources available here on the AWS site.
Here, you’ll find a round-up of all things Big Data, including comprehensive, step-by-step project guides to walk you through production-ready solutions, tutorials, labs and additional resources broken out by service.
This site is a fantastic resource, offering free and unlimited access to more than 100 digital training courses built by AWS experts. This link will take you directly to the Analytics category for all things Big Data on AWS, but you can always browse and search the site for other courses. Here are a few quick links for service-specific training:
If you prefer the interactive nature of a classroom environment, register for any of these courses (near you or online) to learn best practices, get live feedback, and receive answers to your questions in real-time from the class instructor.
Big Data on AWS (3 days) Introduces you to cloud-based big data solutions such as Amazon EMR, Amazon Redshift, Amazon Kinesis, and the rest of the AWS big data platform.
Data Warehousing on AWS (3 days) Introduces you to concepts, strategies, and best practices for designing a cloud-based data warehousing solution, and demonstrates how to collect, store, and prepare data for the data warehouse.
Building a Serverless Data Lake (1 day) Teaches you how to design, build, and operate a serverless data lake solution with AWS services. Includes topics such as ingesting data from any data source at large scale, storing the data securely and durably, using the right tool to process large volumes of data, and understanding the options available for analyzing the data in near-real time.
With the advent of AWS PrivateLink, you can provide services to AWS customers directly in their Virtual Private Networks by offering cross-account SaaS solutions on private IP addresses rather than over the Internet.
Traffic that flows to the services you provide does so over private AWS networking rather than over the Internet, offering security and performance enhancements, as well as convenience. PrivateLink can tie in with the AWS Marketplace, facilitating billing and providing a straightforward consumption model to your customers.
The use cases are myriad, but, for this blog post, we’ll demonstrate a fictional order-processing resource. The resource accepts JSON data over a RESTful API, simulating an interface. This could easily be an existing application being considered for a PrivateLink-based consumption model. Consumers of this resource send JSON payloads representing new orders and the system responds with order IDs corresponding to newly-created orders in the system. In a real-world scenario, additional APIs, such as authentication, might also represent critical aspects of the system. This example will not demonstrate these additional APIs because they could be consumed over PrivateLink in a similar fashion to the API constructed in the example.
I’ll demonstrate how to expose the resource on a private IP address in a customer’s VPC. I’ll also explain an architecture leveraging PrivateLink and provide detailed instructions for how to set up such a service. Finally, I’ll provide an example of how a customer might consume such a service. I’ll focus not only on how to architect the solution, but also the considerations that drive architectural choices.
Solution Overview
N.B.: Only two subnets and Availability Zones are shown per VPC for simplicity. Resources must cover all Availability Zones per Region, so that the application is available to all consumers in the region. The instructions in this post, which pertain to resources sitting in us-east-1 will detail the deployment of subnets in all six Availability Zones for this region.
This solution exposes an application’s HTTP-based API over PrivateLink in a provider’s AWS account. The application is a stateless web server running on Amazon Elastic Compute Cloud (EC2) instances. The provider places instances within a virtual private network (VPC) consisting of one private subnet per Availability Zone (AZ). Each AZ contains a subnet. Instances populate each subnet inside of Auto Scaling Groups (ASGs), maintaining a desired count per subnet. There is one ASG per subnet to ensure that the service is available in each AZ. An internal Network Load Balancer (NLB) sits in front of the entire fleet of application instances and an endpoint service is connected with the NLB.
In the customer’s AWS account, they create an endpoint that consumes the endpoint service from the provider’s account. The endpoint exposes an Elastic Network Interface (ENI) in each subnet the customer desires. Each ENI is assigned an IP address within the CIDR block associated with the subnet, for any number of subnets in any number of AZs within the region, for each customer.
PrivateLink facilitates cross-account access to services so the customer can use the provider’s service, feeding it data that exist within the customer’s account while using application logic and systems that run in the provider’s account. The routing between accounts is over private networking rather than over the Internet.
Though this example shows a simple, stateless service running on EC2 and sitting behind an NLB, many kinds of AWS services can be exposed through PrivateLink and can serve as pathways into a provider’s application, such as Amazon Kinesis Streams, Amazon EC2 Container Service, Amazon EC2 Systems Manager, and more.
Using PrivateLink to Establish a Service for Consumption
Building a service to be consumed through PrivateLink involves a few steps:
Build a VPC covering all AZs in region with private subnets
Create a NLB, listener, and target group for instances
Create a launch configuration and ASGs to manage the deployment of Amazon
EC2 instances in each subnet
Launch an endpoint service and connect it with the NLB
Tie endpoint-request approval with billing systems or the AWS Marketplace
Provide the endpoint service in multiple regions
Step 1: Build a VPC and private subnets
Start by determining the network you will need to serve the application. Keep in mind, that you will need to serve the application out of each AZ within any region you choose. Customers will expect to consume your service in multiple AZs because AWS recommends they architect their own applications to span across AZs for fault-tolerance purposes.
Additionally, anything less than full coverage across all AZs in a single region will not facilitate straightforward consumption of your service because AWS does not guarantee that a single AZ will carry the same name across accounts. In fact, AWS randomizes AZ names across accounts to ensure even distribution of independent workloads. Telling customers, for example, that you provide a service in us-east-1a may not give them sufficient information to connect with your service.
The solution is to serve your application in all AZs within a region because this guarantees that no matter what AZs a customer chooses for endpoint creation, that customer is guaranteed to find a running instance of your application with which to connect.
You can lay the foundations for doing this by creating a subnet in each AZ within the region of your choice. The subnets can be private because the service, exposed via PrivateLink, will not provide any publicly routable APIs.
This example uses the us-east-1 region. If you use a different region, the number of AZs may vary, which will change the number of subnets required, and thus the size of the IP address range for your VPC may require adjustments.
The example above creates a VPC with 128 IP addresses starting at 10.3.0.0. Each subnet will contain 16 IP addresses, using a total of 96 addresses in the space. Allocating a sufficient block of addresses requires some planning (though you can make adjustments later if needed). I’d suggest an equally-sized address space in each subnet because the provided service should embody the same performance, availability, and functionality regardless of which AZ your customers choose. Each subnet will need a sufficient address space to accommodate the number of instances you run within it. Additionally, you will need enough space to allow for one IP address per subnet to assign to that subnet’s NLB node’s Elastic Network Interface (ENI).
In this simple example, 16 IP addresses per subnet are enough because we will configure ASGs to maintain two instances each and the NLB requires one ENI. Each subnet reserves five IP addresses for internal purposes, for a total of eight IP addresses needed in each subnet to support the service.
Next, create the private subnets for each Availability Zone. The following demonstrates the creation of the first subnet, which sits in the us-east-1a AZ:
Repeat this step for each remaining AZ. If using the us-east-1 region, you will need to create private subnets in all AZs as follows:
For the purpose of this example, the subnets can leverage the default route table, as it contains a single rule for routing requests to private IP addresses in the VPC, as follows:
In a real-world case, additional routing may be required. For example, you may need additional routes to support VPC peering to access dependencies in other VPCs, connectivity to on-premises resources over DirectConnect or VPN, Internet-accessible dependencies via NAT, or other scenarios.
Security Group Creation
Instances will need to be placed in a security group that allows traffic from the NLB nodes that sit in each subnet.
All instances running the service should be in a security group accepting TCP traffic on the traffic port from any other IP address in the VPC. This will allow the NLB to forward traffic to those instances because the NLB nodes sit in the VPC and are assigned IP addresses in the subnets. In this example, the order processing server running on each instance exposes a service on port 3000, so the security group rule covers this port.
Create a security group for instances:
aws ec2 create-security-group \
--group-name "service-sg" \
--description "Security group for service instances" \
--vpc-id "vpc-2cadab54"
Step 2: Create a Network Load Balancer, Listener, and Target Group
The service integrates with PrivateLink using an internal NLB which sits in front of instances that run the service.
Step 3: Create a Launch Configuration and Auto Scaling Groups
Each private subnet in the VPC will require its own ASG in order to ensure that there is always a minimum number of instances in each subnet.
A single ASG spanning all subnets will not guarantee that every subnet contains the appropriate number of instances. For example, while a single ASG could be configured to work across six subnets and maintain twelve instances, there is no guarantee that each of the six subnets will contain two instances. To guarantee the appropriate number of instances on a per-subnet basis, each subnet must be configured with its own ASG.
New instances should be automatically created within each ASG based on a single launch configuration. The launch configuration should be set up to use an existing Amazon Machine Image (AMI).
This post presupposes you have an AMI that can be used to create new instances that serve the application. There are only a few basic assumptions to how this image is configured:
1. The image containes a web server that serves traffic (in this case, on port 3000) 2. The image is configured to automatically launch the web server as a daemon when the instance starts.
Create a launch configuration for the ASGs, providing the AMI ID, the ID of the security group created in previous steps (above), a key for access, and an instance type:
Repeat this process to create an ASG in each remaining subnet, using the same launch configuration and target group.
In this example, only two instances are created in each subnet. In a real-world scenario, additional instances would likely be recommended for both availability and scale. The ASGs use the provided launch configuration as a template for creating new instances.
When creating the ASGs, the ARN of the target group for the NLB is provided. This way, the ASGs automatically register newly-created instances with the target group so that the NLB can begin sending traffic to them.
Step 4: Launch an endpoint service and connect with NLB
Now, expose the service via PrivateLink with an endpoint service, providing the ARN of the NLB:
This endpoint service is configured to require acceptance. This means that new consumers who attempt to add endpoints that consume it will have to wait for the provider to allow access. This provides an opportunity to control access and integrate with billing systems that monetize the provided service.
Step 5: Tie endpoint request approval with billing system or the AWS Marketplace
If you’re maintaining your service as a private service, any account that is intended to have access must be whitelisted before it can find the endpoint service and create an endpoint to consume it.
For more information on listing a PrivateLink service in the AWS Marketplace, see How to List Your Product in AWS Marketplace (https://aws.amazon.com/blogs/apn/how-to-list-your-product-in-aws-marketplace/).
Most production-ready services offered through PrivateLink will require acceptance of Endpoint requests before customers can consume them. Typically, some level of automation around processing approvals is helpful. PrivateLink can publish on a Simple Notification Service (SNS) topic when customers request approval.
Setting this up requires two steps:
1. Create a new SNS topic 2. Create an endpoint connection notification that publishes to the SNS topic.
Each is discussed below.
Create an SNS Topic
First, create a new SNS Topic that can receive messages relating to endpoint service access requests:
This creates a new topic with the name “service-notification-topic”. Endpoint request approval systems can subscribe to this Topic so that acceptance can be automated.
Next, create a new Endpoint Connection Notification, so that messages will be published to the topic when new Endpoints connect and need to have access requests approved:
A billing system may ultimately tie in with request approval. This can also be done manually, which may be less useful, but is illustrative. As an example, assume that a customer account has already requested an endpoint to consume the service. The customer can be accepted manually, as follows:
At this point, the consumer can begin consuming the service.
Step 6: Take the Service Across Regions
In distributing SaaS via PrivateLink, providers may have to have to think about how to make their services available in different regions because Endpoint Services are only available within the region where they are created. Customers who attempt to consume Endpoint Services will not be able to create Endpoints across regions.
Rather than saddling consumers with the responsibility of making the jump across regions, we recommend providers work to make services available where their customers consume. They are in a better position to adapt their architectures to multiple regions than customers who do not know the internals of how providers have designed their services.
There are several architectural options that can support multi-region adaptation. Selection among them will depend on a number of factors, including read-to-write ratio, latency requirements, budget, amenability to re-architecture, and preference for simplicity.
Generally, the challenge in providing multi-region SaaS is in instantiating stateful components in multiple regions because the data on which such components depend are hard to replicate, synchronize, and access with low latency over large geographical distances.
Of all stateful components, perhaps the most frequently encountered will be databases. Some solutions for overcoming this challenge with respect to databases are as follows:
1. Provide a master in a single region; provide read replicas in every region. 2. Provide a master in every region; assign each tenant to one master only. 3. Create a full multi-master architecture; replicate data efficiently. 4. Rely on a managed service for replicating data cross-regionally (e.g., DynamoDB Global Tables).
Stateless components can be provisioned in multiple regions more easily. In this example, you will have to re-create all of the VPC resources—including subnets, Routing Tables, Security Groups, and Endpoint Services—as well as all EC2 resources—including instances, NLBs, Listeners, Target Groups, ASGs, and Launch Configurations—in each additional region. Because of the complexity in doing so, in addition to the significant need to keep regional configurations in-sync, you may wish to explore an orchestration tool such as CloudFormation, rather than the command line.
Regardless of what orchestration tooling you choose, you will need to copy your AMI to each region in which you wish to deploy it. Once available, you can build out your service in that region much as you did in the first one.
This copies the AMI used to provide the service in us-east-1 into us-east-2. Repeat the process for each region you wish to enable.
Consuming a Service via PrivateLink
To consume a service over PrivateLink, a customer must create a new Endpoint in their VPC within a Security Group that allows traffic on the traffic port.
Start by creating a Security Group to apply to a new Endpoint:
aws ec2 create-security-group \ --group-name "consumer-sg" \ --description "Security group for service consumers" \ --vpc-id "vpc-2cadab54"
Next, create an endpoint in the VPC, placing it in the Security Group:
The response will include an attribute called VpcEndpoint.DnsEntries. The service can be accessed at each of the DNS names in the output under any of the entries there. Before the consumer can access the endpoint service, the provider has to accept the Endpoint.
Access Endpoint Via Custom DNS Names
When creating a new Endpoint, the consumer will receive named endpoint addresses in each AZ where the Endpoint is created, plus a named endpoint that is AZ-agnostic. For example:
The consumer can use Route53 to provide a custom DNS name for the service. This not only allows for using cleaner service names, but also enables the consumer to leverage the traffic management features of Route53, such as fail-over routing.
First, the the consumer must enable DNS Hostnames and DNS Support on the VPC within which the Endpoint was created. The consumer should start by enabling DNS Hostnames:
After the VPC is properly configured to work with Route53, the consumer should either select an existing hosted zone or create a new one. Assuming one has not already been created, the consumer should create one as follows:
In the request, the consumer specifies the DNS name, VPC ID, region, and flags the hosted zone as private. Additionally, the consumer must provide a “caller reference” which is a unique ID of the request that can be used to identify it in subsequent actions if the request fails.
Next, the consumer should create a JSON file corresponding to a batch of record change requests. In this file, the consumer can specify the name of the endpoint, as well as a CNAME pointing to the AZ-agnostic DNS name of the Endpoint:
At this point, the Endpoint can be consumed at http://order-processor.endpoints.internal.
Conclusion
AWS PrivateLink is an exciting way to expose SaaS services to customers. This article demonstrated how to expose an existing application on EC2 via PrivateLink in a customer’s VPC, as well as recommended architecture. Finally, it walked through the steps that a customer would have to go through to consume the service.
Today we’re releasing a new machine learning feature in Amazon Kinesis Data Analytics for detecting “hotspots” in your streaming data. We launched Kinesis Data Analytics in August of 2016 and we’ve continued to add features since. As you may already know, Kinesis Data Analytics is a fully managed real-time processing engine for streaming data that lets you write SQL queries to derive meaning from your data and output the results to Kinesis Data Firehose, Kinesis Data Streams, or even an AWS Lambda function. The new HOTSPOT function adds to the existing machine learning capabilities in Kinesis that allow customers to leverage unsupervised streaming based machine learning algorithms. Customers don’t need to be experts in data science or machine learning to take advantage of these capabilities.
Hotspots
The HOTSPOTS function is a new Kinesis Data Analytics SQL function you can use to idenitfy relatively dense regions in your data without having to explicity build and train complicated machine learning models. You can identify subsections of your data that need immediate attention and take action programatically by streaming the hotspots out to a Kinesis Data stream, to a Firehose delivery stream, or by invoking a AWS Lambda function.
There are a ton of really cool scenarios where this could make your operations easier. Imagine a ride-share program or autonomous vehicle fleet communicating spatiotemporal data about traffic jams and congestion, or a datacenter where a number of servers start to overheat indicating an HVAC issue. HOTSPOTS is not limited to spatiotemporal data and you could apply it across many problem domains.
The function follows some simple syntax and accepts the DOUBLE, INTEGER, FLOAT, TINYINT, SMALLINT, REAL, and BIGINT data types.
The HOTSPOT function takes a cursor as input and returns a JSON string describing the hotspot. This will be easier to understand with an example.
Using Kinesis Data Analytics to Detect Hotspots
Let’s take a simple data set from NY Taxi and Limousine Commission that tracks yellow cab pickup and dropoff locations. Most of this data is already on S3 and publicly accessible at s3://nyc-tlc/. We will create a small python script to load our Kinesis Data Stream with Taxi records which will feed our Kinesis Data Analytics. Finally we’ll output all of this to a Kinesis Data Firehose connected to an Amazon Elasticsearch Service cluster for visualization with Kibana. I know from living in New York for 5 years that we’ll probably find a hotspot or two in this data.
First, we’ll create an input Kinesis stream and start sending our NYC Taxi Ride data into it. I just wrote a quick python script to read from one of the CSV files and used boto3 to push the records into Kinesis. You can put the record in whatever way works for you.
import csv
import json
import boto3
def chunkit(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
kinesis = boto3.client("kinesis")
with open("taxidata2.csv") as f:
reader = csv.DictReader(f)
records = chunkit([{"PartitionKey": "taxis", "Data": json.dumps(row)} for row in reader], 500)
for chunk in records:
kinesis.put_records(StreamName="TaxiData", Records=chunk)
Next, we’ll create the Kinesis Data Analytics application and add our input stream with our taxi data as the source.
Next we’ll automatically detect the schema.
Now we’ll create a quick SQL Script to detect our hotspots and add that to the Real Time Analytics section of our application.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"pickup_longitude" DOUBLE,
"pickup_latitude" DOUBLE,
HOTSPOTS_RESULT VARCHAR(10000)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT "pickup_longitude", "pickup_latitude", "HOTSPOTS_RESULT" FROM
TABLE(HOTSPOTS(
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
1000,
0.013,
20
)
);
Our HOTSPOTS function takes an input stream, a window size, scan radius, and a minimum number of points to count as a hotspot. The values for these are application dependent but you can tinker with them in the console easily until you get the results you want. There are more details about the parameters themselves in the documentation. The HOTSPOTS_RESULT returns some useful JSON that would let us plot bounding boxes around our hotspots:
When we have our desired results we can save the script and connect our application to our Amazon Elastic Search Service Firehose Delivery Stream. We can run an intermediate lambda function in the firehose to transform our record into a format more suitable for geographic work. Then we can update our mapping in Elasticsearch to index the hotspot objects as Geo-Shapes.
Finally, we can connect to Kibana and visualize the results.
Looks like Manhattan is pretty busy!
Available Now This feature is available now in all existing regions with Kinesis Data Analytics. I think this is a really interesting new feature of Kinesis Data Analytics that can bring immediate value to many applications. Let us know what you build with it on Twitter or in the comments!
Data that describe processes in a spatial context are everywhere in our day-to-day lives and they dominate big data problems. Map data, for instance, whether describing networks of roads or remote sensing data from satellites, get us where we need to go. Atmospheric data from simulations and sensors underlie our weather forecasts and climate models. Devices and sensors with GPS can provide a spatial context to nearly all mobile data.
In this post, we introduce the WIND toolkit, a huge (500 TB), open weather model dataset that’s available to the world on Amazon’s cloud services. We walk through how to access this data and some of the open-source software developed to make it easily accessible. Our solution considers a subset of geospatial data that exist on a grid (raster) and explores ways to provide access to large-scale raster data from weather models. The solution uses foundational AWS services and the Hierarchical Data Format (HDF), a well adopted format for scientific data.
The approach developed here can be extended to any data that fit in an HDF5 file, which can describe sparse and dense vectors and matrices of arbitrary dimensions. This format is already popular within the physical sciences for both experimental and simulation data. We discuss solutions to gridded data storage for a massive dataset of public weather model outputs called the Wind Integration National Dataset (WIND) toolkit. We also highlight strategies that are general to other large geospatial data management problems.
Wind Integration National Dataset
As variable renewable power penetration levels increase in power systems worldwide, the importance of renewable integration studies to ensure continued economic and reliable operation of the power grid is also increasing. The WIND toolkit is the largest freely available grid integration dataset to date.
The WIND toolkit was developed by 3TIER by Vaisala. They were under a subcontract to the National Renewable Energy Laboratory (NREL) to support studies on integration of wind energy into the existing US grid. NREL is a part of a network of national laboratories for the US Department of Energy and has a mission to advance the science and engineering of energy efficiency, sustainable transportation, and renewable power technologies.
The toolkit has been used by consultants, research groups, and universities worldwide to support grid integration studies. Less traditional uses also include resource assessments for wind plants (such as those powering Amazon data centers), and studying the effects of weather on California condor migrations in the Baja peninsula.
The diversity of applications highlights the value of accessible, open public data. Yet, there’s a catch: the dataset is huge. The WIND toolkit provides simulated atmospheric (weather) data at a two-km spatial resolution and five-minute temporal resolution at multiple heights for seven years. The entire dataset is half a petabyte (500 TB) in size and is stored in the NREL High Performance Computing data center in Golden, Colorado. Making this dataset publicly available easily and in a cost-effective manner is a major challenge.
As other laboratories and public institutions work to release their data to the world, they may face similar challenges to those that we experienced. Some prior, well-intentioned efforts to release huge datasets as-is have resulted in data resources that are technically available but fundamentally unusable. They may be stored in an unintuitive format or indexed and organized to support only a subset of potential uses. Downloading hundreds of terabytes of data is often impractical. Most users don’t have access to a big data cluster (or super computer) to slice and dice the data as they need after it’s downloaded.
We aim to provide a large amount of data (50 terabytes) to the public in a way that is efficient, scalable, and easy to use. In many cases, researchers can access these huge cloud-located datasets using the same software and algorithms they have developed for smaller datasets stored locally. Only the pieces of data they need for their individual analysis must be downloaded. To make this work in practice, we worked with the HDF Group and have built upon their forthcoming Highly Scalable Data Service.
In the rest of this post, we discuss how the HSDS software was developed to use Amazon EC2 and Amazon S3 resources to provide convenient and scalable access to these huge geospatial datasets. We describe how the HSDS service has been put to work for the WIND Toolkit dataset and demonstrate how to access it using the h5pyd Python library and the REST API. We conclude with information about our ongoing work to release more ‘open’ datasets to the public using AWS services, and ways to improve and extend the HSDS with newer Amazon services like Amazon ECS and AWS Lambda.
Developing a scalable service for big geospatial data
The HDF5 file format and API have been used for many years and is an effective means of storing large scientific datasets. For example, NASA’s Earth Observing System (EOS) satellites collect more than 16 TBs of data per day using HDF5.
With the rise of the cloud, there are new challenges and opportunities to rethink how HDF5 can be enhanced to work effectively as a component in a cloud-native architecture. For the HDF Group, working with NREL has been a great opportunity to put ideas into practice with a production-size dataset.
An HDF5 file consists of a directed graph of group and dataset objects. Datasets can be thought of as a multidimensional array with support for user-defined metadata tags and compression. Typical operations on datasets would be reading or writing data to a regular subregion (a hyperslab) or reading and writing individual elements (a point selection). Also, group and dataset objects may each contain an arbitrary number of the user-defined metadata elements known as attributes.
Many people have used the HDF library in applications developed or ported to run on EC2 instances, but there are a number of constraints that often prove problematic:
The HDF5 library can’t read directly from HDF5 files stored as S3 objects. The entire file (often many GB in size) would need to be copied to local storage before the first byte can be read. Also, the instance must be configured with the appropriately sized EBS volume)
The HDF library only has access to the computational resources of the instance itself (as opposed to a cluster of instances), so many operations are bottlenecked by the library.
Any modifications to the HDF5 file would somehow have to be synchronized with changes that other instances have made to same file before writing back to S3.
Using a pattern common to many offerings from AWS, the solution to these constraints is to develop a service framework around the HDF data model. Using this model, the HDF Group has created the Highly Scalable Data Service (HSDS) that provides all the functionality that traditionally was provided by the HDF5 library. By using the service, you don’t need to manage your own file volumes, but can just read and write whatever data that you need.
Because the service manages the actual data persistence to a durable medium (S3, in this case), you don’t need to worry about disk management. Simply stream the data you need from the service as you need it. Secondly, putting the functionality behind a service allows some tricks to increase performance (described in more detail later). And lastly, HSDS allows any number of clients to access the data at the same time, enabling HDF5 to be used as a coordination mechanism for multiple readers and writers.
In designing the HSDS architecture, we gave much thought to how to achieve scalability of the HSDS service. For accessing HDF5 data, there are two different types of scaling to consider:
Multiple clients making many requests to the service
Single requests that require a significant amount of data processing
To deal with the first scaling challenge, as with most services, we considered how the service responds as the request rate increases. AWS provides some great tools that help in this regard:
Auto Scaling groups
Elastic Load Balancing load balancers
The ability of S3 to handle large aggregate throughput rates
By using a cluster of EC2 instances behind a load balancer, you can handle different client loads in a cost-effective manner.
The second scaling challenge concerns single requests that would take significant processing time with just one compute node. One example of this from the WIND toolkit would be extracting all the values in the seven-year time span for a given geographic point and dataset.
In HDF5, large datasets are typically stored as “chunks”; that is, a regular partition of the array. In HSDS, each chunk is stored as a binary object in S3. The sequential approach to retrieving the time series values would be for the service to read each chunk needed from S3, extract the needed elements, and go on to the next chunk. In this case, that would involve processing 2557 chunks, and would be quite slow.
Fortunately, with HSDS, you can speed this up quite a bit by exploiting the compute and I/O capabilities of the cluster. Upon receiving the request, the receiving node can use other nodes in the cluster to read different portions of the selection. With multiple nodes reading from S3 in parallel, performance improves as the cluster size increases.
The diagram below illustrates how this works in simplified case of four chunks and four nodes.
This architecture has worked in well in practice. In testing with the WIND toolkit and time series extraction, we observed a request latency of ~60 seconds using four nodes vs. ~5 seconds with 40 nodes. Performance roughly scales with the size of the cluster.
A planned enhancement to this is to use AWS Lambda for the worker processing. This enables 1000-way parallel reads at a reasonable cost, as you only pay for the milliseconds of CPU time used with AWS Lambda.
Public access to atmospheric data using HSDS and AWS
An early challenge in releasing the WIND toolkit data was in deciding how to subset the data for different use cases. In general, few researchers need access to the entire 0.5 PB of data and a great deal of efficiency and cost reduction can be gained by making directed constituent datasets.
NREL grid integration researchers initially extracted a 2-TB subset by selecting 120,000 points where the wind resource seemed appropriate for development. They also chose only those data important for wind applications (100-m wind speed, converted to power), the most interesting locations for those performing grid studies. To support the remaining users who needed more data resolution, we down-sampled the data to a 60-minute temporal resolution, keeping all the other variables and spatial resolution intact. This reduced dataset is 50 TB of data describing 30+ atmospheric variables of data for 7 years at a 60-minute temporal resolution.
The WindViz browser-based Gridded Wind Toolkit Visualizer was created as an example implementation of the HSDS REST API in JavaScript. The visualizer is written in the style of ECMAScript 2016 using a modern development toolchain that includes webpack and Babel. The source code is available through our GitHub repository. The demo page is hosted via GitHub pages, and we use a cross-origin AJAX request to fetch data from the HSDS service running on the EC2 infrastructure. The visualizer can be used to explore the gridded wind toolkit data on a map. Achieve full spatial resolution by zooming in to a specific region.
Programmatic access is possible using the h5pyd Python library, a distributed analog to the widely used h5py library. Users interact with the datasets (variables) and slice the data from its (time x longitude x latitude) cube form as they see fit.
Examples and use cases are described in a set of Jupyter notebooks and available on GitHub:
Now you have a Jupyter notebook server running on your EC2 server.
From your laptop, create an SSH tunnel:
$ ssh –L 8888:localhost:8888 (IP address of the EC2 server)
Now, you can browse to localhost:8888 using the correct token, and interact with the notebooks as if they were local. Within the directory, there are examples for accessing the HSDS API and plotting wind and weather data using matplotlib.
Controlling access and defraying costs
A final concern is rate limiting and access control. Although the HSDS service is scalable and relatively robust, we had a few practical concerns:
How can we protect from malicious or accidental use that may lead to high egress fees (for example, someone who attempts to repeatedly download the entire dataset from S3)?
How can we keep track of who is using the data both to document the value of the data resource and to justify the costs?
If costs become too high, can we charge for some or all API use to help cover the costs?
To approach these problems, we investigated using Amazon API Gateway and its simplified integration with the AWS Marketplace for SaaS monetization as well as third-party API proxies.
In the end, we chose to use API Umbrella due to its close involvement with http://data.gov. While AWS Marketplace is a compelling option for future datasets, the decision was made to keep this dataset entirely open, at least for now. As community use and associated costs grow, we’ll likely revisit Marketplace. Meanwhile, API Umbrella provides controls for rate limiting and API key registration out of the box and was simple to implement as a front-end proxy to HSDS. Those applications that may want to charge for API use can accomplish a similar strategy using Amazon API Gateway and AWS Marketplace.
Ongoing work and other resources
As NREL and other government research labs, municipalities, and organizations try to share data with the public, we expect many of you will face similar challenges to those we have tried to approach with the architecture described in this post. Providing large datasets is one challenge. Doing so in a way that is affordable and convenient for users is an entirely more difficult goal. Using AWS cloud-native services and the existing foundation of the HDF file format has allowed us to tackle that challenge in a meaningful way.
Dr. Caleb Phillips is a senior scientist with the Data Analysis and Visualization Group within the Computational Sciences Center at the National Renewable Energy Laboratory. Caleb comes from a background in computer science systems, applied statistics, computational modeling, and optimization. His work at NREL spans the breadth of renewable energy technologies and focuses on applying modern data science techniques to data problems at scale.
Dr. Caroline Draxl is a senior scientist at NREL. She supports the research and modeling activities of the US Department of Energy from mesoscale to wind plant scale. Caroline uses mesoscale models to research wind resources in various countries, and participates in on- and offshore boundary layer research and in the coupling of the mesoscale flow features (kilometer scale) to the microscale (tens of meters). She holds a M.S. degree in Meteorology and Geophysics from the University of Innsbruck, Austria, and a PhD in Meteorology from the Technical University of Denmark.
John Readey has been a Senior Architect at The HDF Group since he joined in June 2014. His interests include web services related to HDF, applications that support the use of HDF and data visualization.Before joining The HDF Group, John worked at Amazon.com from 2006–2014 where he developed service-based systems for eCommerce and AWS.
Jordan Perr-Sauer is an RPP intern with the Data Analysis and Visualization Group within the Computational Sciences Center at the National Renewable Energy Laboratory. Jordan hopes to use his professional background in software engineering and his academic training in applied mathematics to solve the challenging problems facing America and the world.
This post was written in partnership with Intuit to share learnings, best practices, and recommendations for running an Apache Kafka cluster on AWS. Thanks to Vaishak Suresh and his colleagues at Intuit for their contribution and support.
Intuit, in their own words: Intuit, a leading enterprise customer for AWS, is a creator of business and financial management solutions. For more information on how Intuit partners with AWS, see our previous blog post, Real-time Stream Processing Using Apache Spark Streaming and Apache Kafka on AWS. Apache Kafka is an open-source, distributed streaming platform that enables you to build real-time streaming applications.
The best practices described in this post are based on our experience in running and operating large-scale Kafka clusters on AWS for more than two years. Our intent for this post is to help AWS customers who are currently running Kafka on AWS, and also customers who are considering migrating on-premises Kafka deployments to AWS.
Running your Kafka deployment on Amazon EC2 provides a high performance, scalable solution for ingesting streaming data. AWS offers many different instance types and storage option combinations for Kafka deployments. However, given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.
In this blog post, we cover the following aspects of running Kafka clusters on AWS:
Deployment considerations and patterns
Storage options
Instance types
Networking
Upgrades
Performance tuning
Monitoring
Security
Backup and restore
Note: While implementing Kafka clusters in a production environment, make sure also to consider factors like your number of messages, message size, monitoring, failure handling, and any operational issues.
Deployment considerations and patterns
In this section, we discuss various deployment options available for Kafka on AWS, along with pros and cons of each option. A successful deployment starts with thoughtful consideration of these options. Considering availability, consistency, and operational overhead of the deployment helps when choosing the right option.
Single AWS Region, Three Availability Zones, All Active
One typical deployment pattern (all active) is in a single AWS Region with three Availability Zones (AZs). One Kafka cluster is deployed in each AZ along with Apache ZooKeeper and Kafka producer and consumer instances as shown in the illustration following.
In this pattern, this is the Kafka cluster deployment:
Kafka producers and Kafka cluster are deployed on each AZ.
Data is distributed evenly across three Kafka clusters by using Elastic Load Balancer.
Kafka consumers aggregate data from all three Kafka clusters.
Kafka cluster failover occurs this way:
Mark down all Kafka producers
Stop consumers
Debug and restack Kafka
Restart consumers
Restart Kafka producers
Following are the pros and cons of this pattern.
Pros
Cons
Highly available
Can sustain the failure of two AZs
No message loss during failover
Simple deployment
Very high operational overhead:
All changes need to be deployed three times, one for each Kafka cluster
Maintaining and monitoring three Kafka clusters
Maintaining and monitoring three consumer clusters
A restart is required for patching and upgrading brokers in a Kafka cluster. In this approach, a rolling upgrade is done separately for each cluster.
Single Region, Three Availability Zones, Active-Standby
Another typical deployment pattern (active-standby) is in a single AWS Region with a single Kafka cluster and Kafka brokers and Zookeepers distributed across three AZs. Another similar Kafka cluster acts as a standby as shown in the illustration following. You can use Kafka mirroring with MirrorMaker to replicate messages between any two clusters.
In this pattern, this is the Kafka cluster deployment:
Kafka producers are deployed on all three AZs.
Only one Kafka cluster is deployed across three AZs (active).
ZooKeeper instances are deployed on each AZ.
Brokers are spread evenly across all three AZs.
Kafka consumers can be deployed across all three AZs.
Standby Kafka producers and a Multi-AZ Kafka cluster are part of the deployment.
Kafka cluster failover occurs this way:
Switch traffic to standby Kafka producers cluster and Kafka cluster.
Restart consumers to consume from standby Kafka cluster.
Following are the pros and cons of this pattern.
Pros
Cons
Less operational overhead when compared to the first option
Only one Kafka cluster to manage and consume data from
Can handle single AZ failures without activating a standby Kafka cluster
Added latency due to cross-AZ data transfer among Kafka brokers
For Kafka versions before 0.10, replicas for topic partitions have to be assigned so they’re distributed to the brokers on different AZs (rack-awareness)
The cluster can become unavailable in case of a network glitch, where ZooKeeper does not see Kafka brokers
Possibility of in-transit message loss during failover
Intuit recommends using a single Kafka cluster in one AWS Region, with brokers distributing across three AZs (single region, three AZs). This approach offers stronger fault tolerance than otherwise, because a failed AZ won’t cause Kafka downtime.
Storage options
There are two storage options for file storage in Amazon EC2:
Ephemeral storage is local to the Amazon EC2 instance. It can provide high IOPS based on the instance type. On the other hand, Amazon EBS volumes offer higher resiliency and you can configure IOPS based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. Your choice of storage is closely related to the type of workload supported by your Kafka cluster.
Kafka provides built-in fault tolerance by replicating data partitions across a configurable number of instances. If a broker fails, you can recover it by fetching all the data from other brokers in the cluster that host the other replicas. Depending on the size of the data transfer, it can affect recovery process and network traffic. These in turn eventually affect the cluster’s performance.
The following table contrasts the benefits of using an instance store versus using EBS for storage.
Instance store
EBS
Instance storage is recommended for large- and medium-sized Kafka clusters. For a large cluster, read/write traffic is distributed across a high number of brokers, so the loss of a broker has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important, but a failed broker takes longer and requires more network traffic for a smaller Kafka cluster.
Storage-optimized instances like h1, i3, and d2 are an ideal choice for distributed applications like Kafka.
The primary advantage of using EBS in a Kafka deployment is that it significantly reduces data-transfer traffic when a broker fails or must be replaced. The replacement broker joins the cluster much faster.
Data stored on EBS is persisted in case of an instance failure or termination. The broker’s data stored on an EBS volume remains intact, and you can mount the EBS volume to a new EC2 instance. Most of the replicated data for the replacement broker is already available in the EBS volume and need not be copied over the network from another broker. Only the changes made after the original broker failure need to be transferred across the network. That makes this process much faster.
Intuit chose EBS because of their frequent instance restacking requirements and also other benefits provided by EBS.
Generally, Kafka deployments use a replication factor of three. EBS offers replication within their service, so Intuit chose a replication factor of two instead of three.
Instance types
The choice of instance types is generally driven by the type of storage required for your streaming applications on a Kafka cluster. If your application requires ephemeral storage, h1, i3, and d2 instances are your best option.
Intuit used r3.xlarge instances for their brokers and r3.large for ZooKeeper, with ST1 (throughput optimized HDD) EBS for their Kafka cluster.
Here are sample benchmark numbers from Intuit tests.
Configuration
Broker bytes (MB/s)
r3.xlarge
ST1 EBS
12 brokers
12 partitions
Aggregate 346.9
If you need EBS storage, then AWS has a newer-generation r4 instance. The r4 instance is superior to R3 in many ways:
It has a faster processor (Broadwell).
EBS is optimized by default.
It features networking based on Elastic Network Adapter (ENA), with up to 10 Gbps on smaller sizes.
The network plays a very important role in a distributed system like Kafka. A fast and reliable network ensures that nodes can communicate with each other easily. The available network throughput controls the maximum amount of traffic that Kafka can handle. Network throughput, combined with disk storage, is often the governing factor for cluster sizing.
If you expect your cluster to receive high read/write traffic, select an instance type that offers 10-Gb/s performance.
In addition, choose an option that keeps interbroker network traffic on the private subnet, because this approach allows clients to connect to the brokers. Communication between brokers and clients uses the same network interface and port. For more details, see the documentation about IP addressing for EC2 instances.
If you are deploying in more than one AWS Region, you can connect the two VPCs in the two AWS Regions using cross-region VPC peering. However, be aware of the networking costs associated with cross-AZ deployments.
Upgrades
Kafka has a history of not being backward compatible, but its support of backward compatibility is getting better. During a Kafka upgrade, you should keep your producer and consumer clients on a version equal to or lower than the version you are upgrading from. After the upgrade is finished, you can start using a new protocol version and any new features it supports. There are three upgrade approaches available, discussed following.
Rolling or in-place upgrade
In a rolling or in-place upgrade scenario, upgrade one Kafka broker at a time. Take into consideration the recommendations for doing rolling restarts to avoid downtime for end users.
Downtime upgrade
If you can afford the downtime, you can take your entire cluster down, upgrade each Kafka broker, and then restart the cluster.
Blue/green upgrade
Intuit followed the blue/green deployment model for their workloads, as described following.
If you can afford to create a separate Kafka cluster and upgrade it, we highly recommend the blue/green upgrade scenario. In this scenario, we recommend that you keep your clusters up-to-date with the latest Kafka version. For additional details on Kafka version upgrades or more details, see the Kafka upgrade documentation.
The following illustration shows a blue/green upgrade.
In this scenario, the upgrade plan works like this:
Create a new Kafka cluster on AWS.
Create a new Kafka producers stack to point to the new Kafka cluster.
Create topics on the new Kafka cluster.
Test the green deployment end to end (sanity check).
Using Amazon Route 53, change the new Kafka producers stack on AWS to point to the new green Kafka environment that you have created.
The roll-back plan works like this:
Switch Amazon Route 53 to the old Kafka producers stack on AWS to point to the old Kafka environment.
You can tune Kafka performance in multiple dimensions. Following are some best practices for performance tuning.
These are some general performance tuning techniques:
If throughput is less than network capacity, try the following:
Add more threads
Increase batch size
Add more producer instances
Add more partitions
To improve latency when acks =-1, increase your num.replica.fetches value.
For cross-AZ data transfer, tune your buffer settings for sockets and for OS TCP.
Make sure that num.io.threads is greater than the number of disks dedicated for Kafka.
Adjust num.network.threads based on the number of producers plus the number of consumers plus the replication factor.
Your message size affects your network bandwidth. To get higher performance from a Kafka cluster, select an instance type that offers 10 Gb/s performance.
For Java and JVM tuning, try the following:
Minimize GC pauses by using the Oracle JDK, which uses the new G1 garbage-first collector.
Try to keep the Kafka heap size below 4 GB.
Monitoring
Knowing whether a Kafka cluster is working correctly in a production environment is critical. Sometimes, just knowing that the cluster is up is enough, but Kafka applications have many moving parts to monitor. In fact, it can easily become confusing to understand what’s important to watch and what you can set aside. Items to monitor range from simple metrics about the overall rate of traffic, to producers, consumers, brokers, controller, ZooKeeper, topics, partitions, messages, and so on.
For monitoring, Intuit used several tools, including Newrelec, Wavefront, Amazon CloudWatch, and AWS CloudTrail. Our recommended monitoring approach follows.
For system metrics, we recommend that you monitor:
CPU load
Network metrics
File handle usage
Disk space
Disk I/O performance
Garbage collection
ZooKeeper
For producers, we recommend that you monitor:
Batch-size-avg
Compression-rate-avg
Waiting-threads
Buffer-available-bytes
Record-queue-time-max
Record-send-rate
Records-per-request-avg
For consumers, we recommend that you monitor:
Batch-size-avg
Compression-rate-avg
Waiting-threads
Buffer-available-bytes
Record-queue-time-max
Record-send-rate
Records-per-request-avg
Security
Like most distributed systems, Kafka provides the mechanisms to transfer data with relatively high security across the components involved. Depending on your setup, security might involve different services such as encryption, Kerberos, Transport Layer Security (TLS) certificates, and advanced access control list (ACL) setup in brokers and ZooKeeper. The following tells you more about the Intuit approach. For details on Kafka security not covered in this section, see the Kafka documentation.
Encryption at rest
For EBS-backed EC2 instances, you can enable encryption at rest by using Amazon EBS volumes with encryption enabled. Amazon EBS uses AWS Key Management Service (AWS KMS) for encryption. For more details, see Amazon EBS Encryption in the EBS documentation. For instance store–backed EC2 instances, you can enable encryption at rest by using Amazon EC2 instance store encryption.
Encryption in transit
Kafka uses TLS for client and internode communications.
Authentication
Authentication of connections to brokers from clients (producers and consumers) to other brokers and tools uses either Secure Sockets Layer (SSL) or Simple Authentication and Security Layer (SASL).
Kafka supports Kerberos authentication. If you already have a Kerberos server, you can add Kafka to your current configuration.
Authorization
In Kafka, authorization is pluggable and integration with external authorization services is supported.
Backup and restore
The type of storage used in your deployment dictates your backup and restore strategy.
The best way to back up a Kafka cluster based on instance storage is to set up a second cluster and replicate messages using MirrorMaker. Kafka’s mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. Depending on your setup and requirements, your backup cluster might be in the same AWS Region as your main cluster or in a different one.
For EBS-based deployments, you can enable automatic snapshots of EBS volumes to back up volumes. You can easily create new EBS volumes from these snapshots to restore. We recommend storing backup files in Amazon S3.
For more information on how to back up in Kafka, see the Kafka documentation.
Conclusion
In this post, we discussed several patterns for running Kafka in the AWS Cloud. AWS also provides an alternative managed solution with Amazon Kinesis Data Streams, there are no servers to manage or scaling cliffs to worry about, you can scale the size of your streaming pipeline in seconds without downtime, data replication across availability zones is automatic, you benefit from security out of the box, Kinesis Data Streams is tightly integrated with a wide variety of AWS services like Lambda, Redshift, Elasticsearch and it supports open source frameworks like Storm, Spark, Flink, and more. You may refer to kafka-kinesis connector.
If you have questions or suggestions, please comment below.
Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.
This is a customer post by Ajay Rathod, a Staff Data Engineer at Realtor.com.
Realtor.com, in their own words: Realtor.com®, operated by Move, Inc., is a trusted resource for home buyers, sellers, and dreamers. It offers the most comprehensive database of for-sale properties, among competing national sites, and the information, tools, and professional expertise to help people move confidently through every step of their home journey.
Move, Inc. processes hundreds of terabytes of data partitioned by day and hour. Various teams run hundreds of queries on this data. Using AWS services, Move, Inc. has built an infrastructure for gathering and analyzing data:
To increase the effectiveness of the storage and subsequent querying, the data is converted into a Parquet format, and stored again in S3.
Amazon Athena is used as the SQL (Structured Query Language) engine to query the data in S3. Athena is easy to use and is often quickly adopted by various teams.
Teams visualize query results in Amazon QuickSight. Amazon QuickSight is a business analytics service that allows you to quickly and easily visualize data and collaborate with other users in your account.
This architecture is known as the data platform and is shared by the data science, data engineering, and the data operations teams within the organization. Move, Inc. also enables other cross-functional teams to use Athena. When many users use Athena, it helps to monitor its usage to ensure cost-effectiveness. This leads to a strong need for Athena metrics that can give details about the following:
Users
Amount of data scanned (to monitor the cost of AWS service usage)
The databases used for queries
Actual queries that teams run
Currently, the Move, Inc. team does not have an easy way of obtaining all these metrics from a single tool. Having a way to do this would greatly simplify monitoring efforts. For example, the data operations team wants to collect several metrics every day obtained from queries run on Athena for their data. They require the following metrics:
Amount of data scanned by each user
Number of queries by each user
Databases accessed by each user
In this post, I discuss how to build a solution for monitoring Athena usage. To build this solution, you rely on AWS CloudTrail. CloudTrail is a web service that records AWS API calls for your AWS account and delivers log files to an S3 bucket.
Solution
Here is the high-level overview:
Use the CloudTrail API to audit the user queries, and then use Athena to create a table from the CloudTrail logs.
Query the Athena API with the AWS CLI to gather metrics about the data scanned by the user queries and put this information into another table in Athena.
Combine the information from these two sources by joining the two tables.
Use the resulting data to analyze, build insights, and create a dashboard that shows the usage of Athena by users within different teams in the organization.
The architecture of this solution is shown in the following diagram.
Take a look at this solution step by step.
IAM and permissions setup
This solution uses CloudTrail, Athena, and S3. Make sure that the users who run the following scripts and steps have the appropriate IAM roles and policies. For more information, see Tutorial: Delegate Access Across AWS Accounts Using IAM Roles.
Step 1: Create a table in Athena for data in CloudTrail
The CloudTrail API records all Athena queries run by different teams within the organization. These logs are saved in S3. The fields of most interest are:
User identity
Start time of the API call
Source IP address
Request parameters
Response elements returned by the service
When end users make queries in Athena, these queries are recorded by CloudTrail as responses from Athena web service calls. In these responses, each query is represented as a JSON (JavaScript Object Notation) string.
You can use the following CREATE TABLE statement to create the cloudtrail_logs table in Athena. For more information, see Querying CloudTrail Logs in the Athena documentation.
Step 2: Create a table in Amazon Athena for data from API output
Athena provides an API that can be queried to obtain information of a specific query ID. It also provides an API to obtain information of a batch of query IDs, with a batch size of up to 50 query IDs.
You can use this API call to obtain information about the Athena queries that you are interested in and store this information in an S3 location. Create an Athena table to represent this data in S3. For the purpose of this post, the response fields that are of interest are as follows:
QueryExecutionId
Database
EngineExecutionTimeInMillis
DataScannedInBytes
Status
SubmissionDateTime
CompletionDateTime
The CREATE TABLE statement for athena_api_output, is as follows:
CREATE EXTERNAL TABLE IF NOT EXISTS athena_api_output(
queryid string,
querydatabase string,
executiontime bigint,
datascanned bigint,
status string,
submissiondatetime string,
completiondatetime string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = ',',
'field.delim' = ','
) LOCATION 's3://<s3 location of the output from the API calls>'
TBLPROPERTIES ('has_encrypted_data'='false')
You can inspect the query IDs and user information for the last day. The query is as follows:
with data AS (
SELECT
json_extract(responseelements,
'$.queryExecutionId') AS query_id,
(useridentity.arn) AS uid,
(useridentity.sessioncontext.sessionIssuer.userName) AS role,
from_iso8601_timestamp(eventtime) AS dt
FROM cloudtrail_logs
WHERE eventsource='athena.amazonaws.com'
AND eventname='StartQueryExecution'
AND json_extract(responseelements, '$.queryExecutionId') is NOT null)
SELECT *
FROM data
WHERE dt > date_add('day',-1,now() )
Step 3: Obtain Query Statistics from Athena API
You can write a simple Python script to loop through queries in batches of 50 and query the Athena API for query statistics. You can use the Boto library for these lookups. Boto is a library that provides you with an easy way to interact with and automate your AWS development. The response from the Boto API can be parsed to extract the fields that you need as described in Step 2.
An example Python script is available in the AthenaMetrics GitHub repo.
Format these fields, for each query ID, as CSV strings and store them for the entire batch response in an S3 bucket. This S3 bucket is represented by the table created in Step 2, cloudtrail_logs.
In your Python code, create a variable named sql_query, and assign it a string representing the SQL query defined in Step 2. The s3_query_folder is the location in S3 that is used by Athena for storing results of the query. The code is as follows:
sql_query =
“””
with data AS (
SELECT
json_extract(responseelements,
'$.queryExecutionId') AS query_id,
(useridentity.arn) AS uid,
(useridentity.sessioncontext.sessionIssuer.userName) AS role,
from_iso8601_timestamp(eventtime) AS dt
FROM cloudtrail_logs
WHERE eventsource='athena.amazonaws.com'
AND eventname='StartQueryExecution'
AND json_extract(responseelements, '$.queryExecutionId') is NOT null)
SELECT *
FROM data
WHERE dt > date_add('day',-1,now() )
“””
athena_client = boto3.client('athena')
query_execution = self.client.start_query_execution(
QueryString=sql_query,
ClientRequestToken=str(uuid.uuid4()),
ResultConfiguration={
'OutputLocation': s3_staging_folder,
}
)
query_execution_id = query_execution['QueryExecutionId']
### Allow query to complete, check for status response["QueryExecution"]["Status"]["State"]
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
if response[“QueryExecution”][“Status”][“State”] == “SUCCEEDED”:
results = athena_client.get_query_results(QueryEecutionId=query_exection_id)
You can iterate through the results in the response object and consolidate them in batches of 50 results. For each batch, you can invoke the Athena API, batch-get-query-execution.
Store the output in the S3 location pointed to by the CREATE TABLE definition for the table athena_api_output, in Step 2. The SQL statement above returns only queries run in the last 24 hours. You may want to increase that to get usage over a longer period of time. The code snippet for this API call is as follows:
The batchqueryids value is an array of 50 query IDs extracted from the result set of the SELECT query. This script creates the data needed by your second table, athena_api_output, and you are now ready to join both tables in Athena.
Step 4: Join the CloudTrail and Athena API data
Now that the two tables are available with the data that you need, you can run the following Athena query to look at the usage by user. You can limit the output of this query to the most recent five days.
SELECT
c.useridentity.arn,
json_extract(c.responseelements, '$.queryExecutionId') qid,
a.datascanned,
a.querydatabase,
a.executiontime,
a.submissiondatetime,
a.completiondatetime,
a.status
FROM cloudtrail_logs c
JOIN athena_api_output a
ON cast(json_extract(c.responseelements, '$.queryExecutionId') as varchar) = a.queryid
WHERE eventsource = 'athena.amazonaws.com'
AND eventname = 'StartQueryExecution'
AND from_iso8601_timestamp(eventtime) > date_add('day',-5 ,now() )
Step 5: Analyze and visualize the results
In this step, using QuickSight, you can create a dashboard that shows the following metrics:
Average amount of data scanned (MB) by a user and database
Using the solution described in this post, you can continuously monitor the usage of Athena by various teams. Taking this a step further, you can automate and set user limits for how much data the Athena users in your team can query within a given period of time. You may also choose to add notifications when the usage by a particular user crosses a specified threshold. This helps you manage costs incurred by different teams in your organization.
Realtor.com would like to acknowledge the tremendous support and guidance provided by Hemant Borole, Senior Consultant, Big Data & Analytics with AWS Professional Services in helping to author this post.
Ajay Rathod is Staff Data Engineer at Realtor.com. With a deep background in AWS Cloud Platform and Data Infrastructure, Ajay leads the Data Engineering and automation aspect of Data Operations at Realtor.com. He has designed and deployed many ETL pipelines and workflows for the Realtor Data Analytics Platform using AWS services like Data Pipeline, Athena, Batch, Glue and Boto3. He has created various operational metrics to monitor ETL Pipelines and Resource Usage.
Over the past year, we’ve released several features that make it easier to track the metrics that are associated with your Amazon SES account. The first of these features, launched in November of last year, was event publishing.
Initially, event publishing let you capture basic metrics related to your email sending and publish them to other AWS services, such as Amazon CloudWatch and Amazon Kinesis Data Firehose. Some examples of these basic metrics include the number of emails that were sent and delivered, as well as the number that bounced or received complaints. A few months ago, we expanded this feature by adding engagement metrics—specifically, information about the number of emails that your customers opened or engaged with by clicking links.
As a former Cloud Support Engineer, I’ve seen Amazon SES customers do some amazing things with event publishing, but I’ve also seen some common issues. In this article, we look at some of these issues, and discuss the steps you can take to resolve them.
Before we begin
This post assumes that your Amazon SES account is already out of the sandbox, that you’ve verified an identity (such as an email address or domain), and that you have the necessary permissions to use Amazon SES and the service that you’ll publish event data to (such as Amazon SNS, CloudWatch, or Kinesis Data Firehose).
We also assume that you’re familiar with the process of creating configuration sets and specifying event destinations for those configuration sets. For more information, see Using Amazon SES Configuration Sets in the Amazon SES Developer Guide.
Amazon SNS event destinations
If you want to receive notifications when events occur—such as when recipients click a link in an email, or when they report an email as spam—you can use Amazon SNS as an event destination.
Occasionally, customers ask us why they’re not receiving notifications when they use an Amazon SNS topic as an event destination. One of the most common reasons for this issue is that they haven’t configured subscriptions for their Amazon SNS topic yet.
A single topic in Amazon SNS can have one or more subscriptions. When you subscribe to a topic, you tell that topic which endpoints (such as email addresses or mobile phone numbers) to contact when it receives a notification. If you haven’t set up any subscriptions, nothing will happen when an email event occurs.
If you want to store your Amazon SES event data for the long term, choose Amazon Kinesis Data Firehose as a destination for Amazon SES events. With Kinesis Data Firehose, you can stream data to Amazon S3 or Amazon Redshift for storage and analysis.
The process of setting up Kinesis Data Firehose as an event destination is similar to the process for setting up Amazon SNS: you choose the types of events (such as deliveries, opens, clicks, or bounces) that you want to export, and the name of the Kinesis Data Firehose stream that you want to export to. However, there’s one important difference. When you set up a Kinesis Data Firehose event destination, you must also choose the IAM role that Amazon SES uses to send event data to Kinesis Data Firehose.
When you set up the Kinesis Data Firehose event destination, you can choose to have Amazon SES create the IAM role for you automatically. For many users, this is the best solution—it ensures that the IAM role has the appropriate permissions to move event data from Amazon SES to Kinesis Data Firehose.
Customers occasionally run into issues with the Kinesis Data Firehose event destination when they use an existing IAM role. If you use an existing IAM role, or create a new role for this purpose, make sure that the role includes the firehose:PutRecord and firehose:PutRecordBatch permissions. If the role doesn’t include these permissions, then the Amazon SES event data isn’t published to Kinesis Data Firehose. For more information, see Controlling Access with Amazon Kinesis Data Firehose in the Amazon Kinesis Data Firehose Developer Guide.
CloudWatch event destinations
By publishing your Amazon SES event data to Amazon CloudWatch, you can create dashboards that track your sending statistics in real time, as well as alarms that notify you when your event metrics reach certain thresholds.
The amount that you’re charged for using CloudWatch is based on several factors, including the number of metrics you use. In order to give you more control over the specific metrics you send to CloudWatch—and to help you avoid unexpected charges—you can limit the email sending events that are sent to CloudWatch.
When you choose CloudWatch as an event destination, you must choose a value source. The value source can be one of three options: a message tag, a link tag, or an email header. After you choose a value source, you then specify a name and a value. When you send an email using a configuration set that refers to a CloudWatch event destination, it only sends the metrics for that email to CloudWatch if the email contains the name and value that you specified as the value source. This requirement is commonly overlooked.
For example, assume that you chose Message Tag as the value source, and specified “CategoryId” as the dimension name and “31415” as the dimension value. When you want to send events for an email to CloudWatch, you must specify the name of the configuration set that uses the CloudWatch destination. You must also include a tag in your message. The name of the tag must be “CategoryId” and the value must be “31415”.
Troubleshooting event publishing for open and click data
Occasionally, customers ask why they’re not seeing open and click data for their emails. This issue most often occurs when the customer only sends text versions of their emails. Because of the way Amazon SES tracks open and click events, you can only see open and click data for emails that are sent as HTML. For more information about how Amazon SES modifies your emails when you enable open and click tracking, see Amazon SES Email Sending Metrics FAQs in the Amazon SES Developer Guide.
The process that you use to send HTML emails varies based on the email sending method you use. The Code Examples section of the Amazon SES Developer Guide contains examples of several methods of sending email by using the Amazon SES SMTP interface or an AWS SDK. All of the examples in this section include methods for sending HTML (as well as text-only) emails.
If you encounter any issues that weren’t covered in this post, please open a case in the Support Center and we’d be more than happy to assist.
This is a customer post by Stephen Borg, the Head of Big Data and BI at Cerberus Technologies.
Cerberus Technologies, in their own words: Cerberus is a company founded in 2017 by a team of visionary iGaming veterans. Our mission is simple – to offer the best tech solutions through a data-driven and a customer-first approach, delivering innovative solutions that go against traditional forms of working and process. This mission is based on the solid foundations of reliability, flexibility and security, and we intend to fundamentally change the way iGaming and other industries interact with technology.
Over the years, I have developed and created a number of data warehouses from scratch. Recently, I built a data warehouse for the iGaming industry single-handedly. To do it, I used the power and flexibility of Amazon Redshift and the wider AWS data management ecosystem. In this post, I explain how I was able to build a robust and scalable data warehouse without the large team of experts typically needed.
In two of my recent projects, I ran into challenges when scaling our data warehouse using on-premises infrastructure. Data was growing at many tens of gigabytes per day, and query performance was suffering. Scaling required major capital investment for hardware and software licenses, and also significant operational costs for maintenance and technical staff to keep it running and performing well. Unfortunately, I couldn’t get the resources needed to scale the infrastructure with data growth, and these projects were abandoned. Thanks to cloud data warehousing, the bottleneck of infrastructure resources, capital expense, and operational costs have been significantly reduced or have totally gone away. There is no more excuse for allowing obstacles of the past to delay delivering timely insights to decision makers, no matter how much data you have.
With Amazon Redshift and AWS, I delivered a cloud data warehouse to the business very quickly, and with a small team: me. I didn’t have to order hardware or software, and I no longer needed to install, configure, tune, or keep up with patches and version updates. Instead, I easily set up a robust data processing pipeline and we were quickly ingesting and analyzing data. Now, my data warehouse team can be extremely lean, and focus more time on bringing in new data and delivering insights. In this post, I show you the AWS services and the architecture that I used.
Handling data feeds
I have several different data sources that provide everything needed to run the business. The data includes activity from our iGaming platform, social media posts, clickstream data, marketing and campaign performance, and customer support engagements.
To handle the diversity of data feeds, I developed abstract integration applications using Docker that run on Amazon EC2 Container Service (Amazon ECS) and feed data to Amazon Kinesis Data Streams. These data streams can be used for real time analytics. In my system, each record in Kinesis is preprocessed by an AWS Lambda function to cleanse and aggregate information. My system then routes it to be stored where I need on Amazon S3 by Amazon Kinesis Data Firehose. Suppose that you used an on-premises architecture to accomplish the same task. A team of data engineers would be required to maintain and monitor a Kafka cluster, develop applications to stream data, and maintain a Hadoop cluster and the infrastructure underneath it for data storage. With my stream processing architecture, there are no servers to manage, no disk drives to replace, and no service monitoring to write.
Setting up a Kinesis stream can be done with a few clicks, and the same for Kinesis Firehose. Firehose can be configured to automatically consume data from a Kinesis Data Stream, and then write compressed data every N minutes to Amazon S3. When I want to process a Kinesis data stream, it’s very easy to set up a Lambda function to be executed on each message received. I can just set a trigger from the AWS Lambda Management Console, as shown following.
Regardless of the format I receive the data from our partners, I can send it to Kinesis as JSON data using my own formatters. After Firehose writes this to Amazon S3, I have everything in nearly the same structure I received but compressed, encrypted, and optimized for reading.
This data is automatically crawled by AWS Glue and placed into the AWS Glue Data Catalog. This means that I can immediately query the data directly on S3 using Amazon Athena or through Amazon Redshift Spectrum. Previously, I used Amazon EMR and an Amazon RDS–based metastore in Apache Hive for catalog management. Now I can avoid the complexity of maintaining Hive Metastore catalogs. Glue takes care of high availability and the operations side so that I know that end users can always be productive.
Working with Amazon Athena and Amazon Redshift for analysis
I found Amazon Athena extremely useful out of the box for ad hoc analysis. Our engineers (me) use Athena to understand new datasets that we receive and to understand what transformations will be needed for long-term query efficiency.
For our data analysts and data scientists, we’ve selected Amazon Redshift. Amazon Redshift has proven to be the right tool for us over and over again. It easily processes 20+ million transactions per day, regardless of the footprint of the tables and the type of analytics required by the business. Latency is low and query performance expectations have been more than met. We use Redshift Spectrum for long-term data retention, which enables me to extend the analytic power of Amazon Redshift beyond local data to anything stored in S3, and without requiring me to load any data. Redshift Spectrum gives me the freedom to store data where I want, in the format I want, and have it available for processing when I need it.
To load data directly into Amazon Redshift, I use AWS Data Pipeline to orchestrate data workflows. I create Amazon EMR clusters on an intra-day basis, which I can easily adjust to run more or less frequently as needed throughout the day. EMR clusters are used together with Amazon RDS, Apache Spark 2.0, and S3 storage. The data pipeline application loads ETL configurations from Spring RESTful services hosted on AWS Elastic Beanstalk. The application then loads data from S3 into memory, aggregates and cleans the data, and then writes the final version of the data to Amazon Redshift. This data is then ready to use for analysis. Spark on EMR also helps with recommendations and personalization use cases for various business users, and I find this easy to set up and deliver what users want. Finally, business users use Amazon QuickSight for self-service BI to slice, dice, and visualize the data depending on their requirements.
Each AWS service in this architecture plays its part in saving precious time that’s crucial for delivery and getting different departments in the business on board. I found the services easy to set up and use, and all have proven to be highly reliable for our use as our production environments. When the architecture was in place, scaling out was either completely handled by the service, or a matter of a simple API call, and crucially doesn’t require me to change one line of code. Increasing shards for Kinesis can be done in a minute by editing a stream. Increasing capacity for Lambda functions can be accomplished by editing the megabytes allocated for processing, and concurrency is handled automatically. EMR cluster capacity can easily be increased by changing the master and slave node types in Data Pipeline, or by using Auto Scaling. Lastly, RDS and Amazon Redshift can be easily upgraded without any major tasks to be performed by our team (again, me).
In the end, using AWS services including Kinesis, Lambda, Data Pipeline, and Amazon Redshift allows me to keep my team lean and highly productive. I eliminated the cost and delays of capital infrastructure, as well as the late night and weekend calls for support. I can now give maximum value to the business while keeping operational costs down. My team pushed out an agile and highly responsive data warehouse solution in record time and we can handle changing business requirements rapidly, and quickly adapt to new data and new user requests.
Stephen Borg is the Head of Big Data and BI at Cerberus Technologies. He has a background in platform software engineering, and first became involved in data warehousing using the typical RDBMS, SQL, ETL, and BI tools. He quickly became passionate about providing insight to help others optimize the business and add personalization to products. He is now the Head of Big Data and BI at Cerberus Technologies.
The collective thoughts of the interwebz
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.