Brotli is a state of the art lossless compression format, supported by all major browsers. It is capable of achieving considerably better compression ratios than the ubiquitous gzip, and is rapidly gaining in popularity. Cloudflare uses the Google brotli library to dynamically compress web content whenever possible. In 2015, we took an in-depth look at how brotli works and its compression advantages.
One of the more interesting features of the brotli file format, in the context of textual web content compression, is the inclusion of a built-in static dictionary. The dictionary is quite large, and in addition to containing various strings in multiple languages, it also supports the option to apply multiple transformations to those words, increasing its versatility.
The open sourced brotli library, that implements an encoder and decoder for brotli, has 11 predefined quality levels for the encoder, with higher quality level demanding more CPU in exchange for a better compression ratio. The static dictionary feature is used to a limited extent starting with level 5, and to the full extent only at levels 10 and 11, due to the high CPU cost of this feature.
We improve on the limited dictionary use approach and add optimizations to improve the compression at levels 5 through 9 at a negligible performance impact when compressing web content.
Brotli Static Dictionary
Brotli primarily uses the LZ77 algorithm to compress its data. Our previous blog post about brotli provides an introduction.
To improve compression on text files and web content, brotli also includes a static, predefined dictionary. If a byte sequence cannot be matched with an earlier sequence using LZ77 the encoder will try to match the sequence with a reference to the static dictionary, possibly using one of the multiple transforms. For example, every HTML file contains the opening <html> tag that cannot be compressed with LZ77, as it is unique, but it is contained in the brotli static dictionary and will be replaced by a reference to it. The reference generally takes less space than the sequence itself, which decreases the compressed file size.
The dictionary contains 13,504 words in six languages, with lengths from 4 to 24 characters. To improve the compression of real-world text and web data, some dictionary words are common phrases (“The current”) or strings common in web content (‘type=”text/javascript”’). Unlike usual LZ77 compression, a word from the dictionary can only be matched as a whole. Starting a match in the middle of a dictionary word, ending it before the end of a word or even extending into the next word is not supported by the brotli format.
Instead, the dictionary supports 120 transforms of dictionary words to support a larger number of matches and find longer matches. The transforms include adding suffixes (“work” becomes “working”) adding prefixes (“book” => “ the book”) making the first character uppercase (“process” => “Process”) or converting the whole word to uppercase (“html” => “HTML”). In addition to transforms that make words longer or capitalize them, the cut transform allows a shortened match (“consistently” => “consistent”), which makes it possible to find even more matches.
Methods
With the transforms included, the static dictionary contains 1,633,984 different words – too many for exhaustive search, except when used with the slow brotli compression levels 10 and 11. When used at a lower compression level, brotli either disables the dictionary or only searches through a subset of roughly 5,500 words to find matches in an acceptable time frame. It also only considers matches at positions where no LZ77 match can be found and only uses the cut transform.
Our approach to the brotli dictionary uses a larger, but more specialized subset of the dictionary than the default, using more aggressive heuristics to improve the compression ratio with negligible cost to performance. In order to provide a more specialized dictionary, we provide the compressor with a content type hint from our servers, relying on the Content-Type header to tell the compressor if it should use a dictionary for HTML, JavaScript or CSS. The dictionaries can be furthermore refined by colocation language in the future.
Fast dictionary lookup
To improve compression without sacrificing performance, we needed a fast way to find matches if we want to search the dictionary more thoroughly than brotli does by default. Our approach uses three data structures to find a matching word directly. The radix trie is responsible for finding the word while the hash table and bloom filter are used to speed up the radix trie and quickly eliminate many words that can’t be matched using the dictionary.
Lookup for a position starting with “type”
The radix trie easily finds the longest matching word without having to try matching several words. To find the match, we traverse the graph based on the text at the current position and remember the last node with a matching word. The radix trie supports compressed nodes (having more than one character as an edge label), which greatly reduces the number of nodes that need to be traversed for typical dictionary words.
The radix trie is slowed down by the large number of positions where we can’t find a match. An important finding is that most mismatching strings have a mismatching character in the first four bytes. Even for positions where a match exists, a lot of time is spent traversing nodes for the first four bytes since the nodes close to the tree root usually have many children.
Luckily, we can use a hash table to look up the node equivalent to four bytes, matching if it exists or reject the possibility of a match. We thus look up the first four bytes of the string, if there is a matching node we traverse the trie from there, which will be fast as each four-byte prefix usually only has a few corresponding dict words. If there is no matching node, there will not be a matching word at this position and we do not need to further consider it.
While the hash table is designed to reject mismatches quickly and avoid cache misses and high search costs in the trie, it still suffers from similar problems: We might search through several 4-byte prefixes with the hash value of the given position, only to learn that no match can be found. Additionally, hash lookups can be expensive due to cache misses.
To quickly reject words that do not match the dictionary, but might still cause cache misses, we use a k=1 bloom filter to quickly rule out most non-matching positions. In the k=1 case, the filter is simply a lookup table with one bit indicating whether any matching 4-byte prefixes exist for a given hash value. If the hash value for the given bit is 0, there won’t be a match. Since the bloom filter uses at most one bit for each four-byte prefix while the hash table requires 16 bytes, cache misses are much less likely. (The actual size of the structures is a bit different since there are many empty spaces in both structures and the bloom filter has twice as many elements to reject more non-matching positions.)
This is very useful for performance as a bloom filter lookup requires a single memory access. The bloom filter is designed to be fast and simple, but still rejects more than half of all non-matching positions and thus allows us to save a full hash lookup, which would often mean a cache miss.
Heuristics
To improve the compression ratio without sacrificing performance, we employed a number of heuristics:
Only search the dictionary at some positions This is also done using the stock dictionary, but we search more aggressively. While the stock dictionary only considers positions where the LZ77 match finder did not find a match, we also consider positions that have a bad match according to the brotli cost model: LZ77 matches that are short or have a long distance between the current position and the reference usually only offer a small compression improvement, so it is worth trying to find a better match in the static dictionary.
Only consider the longest match and then transform it Instead of finding and transforming all matches at a position, the radix trie only gives us the longest match which we then transform. This approach results in a vast performance improvement. In most cases, this results in finding the best match.
Only include some transforms While all transformations can improve the compression ratio, we only included those that work well with the data structures. The suffix transforms can easily be applied after finding a non-transformed match. For the upper case transforms, we include both the non-transformed and the upper case version of a word in the radix trie. The prefix and cut transforms do not play well with the radix trie, therefore a cut of more than 1 byte and prefix transforms are not supported.
Generating the reduced dictionary
At low compression levels, brotli searches a subset of ~5,500 out of 13,504 words of the dictionary, negatively impacting compression. To store the entire dictionary, we would need to store ~31,700 words in the trie considering the upper case transformed output of ASCII sequences and ~11,000 four-byte prefixes in the hash. This would slow down hash table and radix trie, so we needed to find a different subset of the dictionary that works well for web content.
For this purpose, we used a large data set containing representative content. We made sure to use web content from several world regions to reflect language diversity and optimize compression. Based on this data set, we identified which words are most common and result in the largest compression improvement according to the brotli cost model. We only include the most useful words based on this calculation. Additionally, we remove some words if they slow down hash table lookups of other, more common words based on their hash value.
We have generated separate dictionaries for HTML, CSS and JavaScript content and use the MIME type to identify the right dictionary to use. The dictionaries we currently use include about 15-35% of the entire dictionary including uppercase transforms. Depending on the type of data and the desired compression/speed tradeoff, different options for the size of the dictionary can be useful. We have also developed code that automatically gathers statistics about matches and generates a reduced dictionary based on this, which makes it easy to extend this to other textual formats, perhaps data that is majority non-English or XML data and achieve better results for this type of data.
Results
We tested the reduced dictionary on a large data set of HTML, CSS and JavaScript files.
The improvement is especially big for small files as the LZ77 compression is less effective on them. Since the improvement on large files is a lot smaller, we only tested files up to 256KB. We used compression level 5, the same compression level we currently use for dynamic compression on our edge, and tested on a Intel Core i7-7820HQ CPU.
Compression improvement is defined as 1 – (compressed size using the reduced dictionary / compressed size without dictionary). This ratio is then averaged for each input size range. We also provide an average value weighted by file size. Our data set mirrors typical web traffic, covering a wide range of file sizes with small files being more common, which explains the large difference between the weighted and unweighted average.
With the improved dictionary approach, we are now able to compress HTML, JavaScript and CSS files as well, or sometimes even better than using a higher compression level would allow us, all while using only 1% to 3% more CPU. For reference using compression level 6 over 5 would increase CPU usage by up to 12%.
I’d like to share something very brief and very obvious – that compression works better with large amounts of data. That is, if you have to compress 100 sentences you’d better compress them in bulk rather than once sentence at a time. Let me illustrate that:
public static void main(String[] args) throws Exception {
List<String> sentences = new ArrayList<>();
for (int i = 0; i < 100; i ++) {
StringBuilder sentence = new StringBuilder();
for (int j = 0; j < 100; j ++) {
sentence.append(RandomStringUtils.randomAlphabetic(10)).append(" ");
}
sentences.add(sentence.toString());
}
byte[] compressed = compress(StringUtils.join(sentences, ". "));
System.out.println(compressed.length);
System.out.println(sentences.stream().collect(Collectors.summingInt(sentence -> compress(sentence).length)));
}
The compress method is using commons-compress to easily generate results for multiple compression algorithms:
public static byte[] compress(String str) {
if (str == null || str.length() == 0) {
return new byte[0];
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (CompressorOutputStream gzip = new CompressorStreamFactory()
.createCompressorOutputStream(CompressorStreamFactory.GZIP, out)) {
gzip.write(str.getBytes("UTF-8"));
gzip.close();
return out.toByteArray();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
The results are as follows, in bytes (note that there’s some randomness, so algorithms are not directly comparable):
Algorithm
Bulk
Individual
GZIP
6590
10596
LZ4_FRAMED
9214
10900
BZIP2
6663
12451
Why is that an obvious result? Because of the way most compression algorithms work – they look for patterns in the raw data and create a map of those patterns (a very rough description).
How is that useful? In big data scenarios where the underlying store supports per-record compression (e.g. a database or search engine), you may save a significant amount of disk space if you bundle multiple records into one stored/indexed record.
This is not a generically useful advice, though. You should check the particular datastore implementation. For example MS SQL Server supports both row and page compression. Cassandra does compression on an SSTable level, so it may not matter how you structure your rows. Certainly, if storing data in files, storing it in one file and compressing it is more efficient than compressing multiple files separately.
Disk space is cheap so playing with data bundling and compression may be seen as premature optimization. But in systems that operate on large datasets it’s a decision that can save you a lot of storage costs.
Last year, we released Amazon Connect, a cloud-based contact center service that enables any business to deliver better customer service at low cost. This service is built based on the same technology that empowers Amazon customer service associates. Using this system, associates have millions of conversations with customers when they inquire about their shipping or order information. Because we made it available as an AWS service, you can now enable your contact center agents to make or receive calls in a matter of minutes. You can do this without having to provision any kind of hardware. 2
There are several advantages of building your contact center in the AWS Cloud, as described in our documentation. In addition, customers can extend Amazon Connect capabilities by using AWS products and the breadth of AWS services. In this blog post, we focus on how to get analytics out of the rich set of data published by Amazon Connect. We make use of an Amazon Connect data stream and create an end-to-end workflow to offer an analytical solution that can be customized based on need.
Solution overview
The following diagram illustrates the solution.
In this solution, Amazon Connect exports its contact trace records (CTRs) using Amazon Kinesis. CTRs are data streams in JSON format, and each has information about individual contacts. For example, this information might include the start and end time of a call, which agent handled the call, which queue the user chose, queue wait times, number of holds, and so on. You can enable this feature by reviewing our documentation.
In this architecture, we use Kinesis Firehose to capture Amazon Connect CTRs as raw data in an Amazon S3 bucket. We don’t use the recent feature added by Kinesis Firehose to save the data in S3 as Apache Parquet format. We use AWS Glue functionality to automatically detect the schema on the fly from an Amazon Connect data stream.
The primary reason for this approach is that it allows us to use attributes and enables an Amazon Connect administrator to dynamically add more fields as needed. Also by converting data to parquet in batch (every couple of hours) compression can be higher. However, if your requirement is to ingest the data in Parquet format on realtime, we recoment using Kinesis Firehose recently launched feature. You can review this blog post for further information.
By default, Firehose puts these records in time-series format. To make it easy for AWS Glue crawlers to capture information from new records, we use AWS Lambda to move all new records to a single S3 prefix called flatfiles. Our Lambda function is configured using S3 event notification. To comply with AWS Glue and Athena best practices, the Lambda function also converts all column names to lowercase. Finally, we also use the Lambda function to start AWS Glue crawlers. AWS Glue crawlers identify the data schema and update the AWS Glue Data Catalog, which is used by extract, transform, load (ETL) jobs in AWS Glue in the latter half of the workflow.
You can see our approach in the Lambda code following.
from __future__ import print_function
import json
import urllib
import boto3
import os
import re
s3 = boto3.resource('s3')
client = boto3.client('s3')
def convertColumntoLowwerCaps(obj):
for key in obj.keys():
new_key = re.sub(r'[\W]+', '', key.lower())
v = obj[key]
if isinstance(v, dict):
if len(v) > 0:
convertColumntoLowwerCaps(v)
if new_key != key:
obj[new_key] = obj[key]
del obj[key]
return obj
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
try:
client.download_file(bucket, key, '/tmp/file.json')
with open('/tmp/out.json', 'w') as output, open('/tmp/file.json', 'rb') as file:
i = 0
for line in file:
for object in line.replace("}{","}\n{").split("\n"):
record = json.loads(object,object_hook=convertColumntoLowwerCaps)
if i != 0:
output.write("\n")
output.write(json.dumps(record))
i += 1
newkey = 'flatfiles/' + key.replace("/", "")
client.upload_file('/tmp/out.json', bucket,newkey)
s3.Object(bucket,key).delete()
return "success"
except Exception as e:
print(e)
print('Error coping object {} from bucket {}'.format(key, bucket))
raise e
We trigger AWS Glue crawlers based on events because this approach lets us capture any new data frame that we want to be dynamic in nature. CTR attributes are designed to offer multiple custom options based on a particular call flow. Attributes are essentially key-value pairs in nested JSON format. With the help of event-based AWS Glue crawlers, you can easily identify newer attributes automatically.
We recommend setting up an S3 lifecycle policy on the flatfiles folder that keeps records only for 24 hours. Doing this optimizes AWS Glue ETL jobs to process a subset of files rather than the entire set of records.
After we have data in the flatfiles folder, we use AWS Glue to catalog the data and transform it into Parquet format inside a folder called parquet/ctr/. The AWS Glue job performs the ETL that transforms the data from JSON to Parquet format. We use AWS Glue crawlers to capture any new data frame inside the JSON code that we want to be dynamic in nature. What this means is that when you add new attributes to an Amazon Connect instance, the solution automatically recognizes them and incorporates them in the schema of the results.
After AWS Glue stores the results in Parquet format, you can perform analytics using Amazon Redshift Spectrum, Amazon Athena, or any third-party data warehouse platform. To keep this solution simple, we have used Amazon Athena for analytics. Amazon Athena allows us to query data without having to set up and manage any servers or data warehouse platforms. Additionally, we only pay for the queries that are executed.
Try it out!
You can get started with our sample AWS CloudFormation template. This template creates the components starting from the Kinesis stream and finishes up with S3 buckets, the AWS Glue job, and crawlers. To deploy the template, open the AWS Management Console by clicking the following link.
In the console, specify the following parameters:
BucketName: The name for the bucket to store all the solution files. This name must be unique; if it’s not, template creation fails.
etlJobSchedule: The schedule in cron format indicating how often the AWS Glue job runs. The default value is every hour.
KinesisStreamName: The name of the Kinesis stream to receive data from Amazon Connect. This name must be different from any other Kinesis stream created in your AWS account.
s3interval: The interval in seconds for Kinesis Firehose to save data inside the flatfiles folder on S3. The value must between 60 and 900 seconds.
sampledata: When this parameter is set to true, sample CTR records are used. Doing this lets you try this solution without setting up an Amazon Connect instance. All examples in this walkthrough use this sample data.
Select the “I acknowledge that AWS CloudFormation might create IAM resources.” check box, and then choose Create. After the template finishes creating resources, you can see the stream name on the stack Outputs tab.
If you haven’t created your Amazon Connect instance, you can do so by following the Getting Started Guide. When you are done creating, choose your Amazon Connect instance in the console, which takes you to instance settings. Choose Data streaming to enable streaming for CTR records. Here, you can choose the Kinesis stream (defined in the KinesisStreamName parameter) that was created by the CloudFormation template.
Now it’s time to generate the data by making or receiving calls by using Amazon Connect. You can go to Amazon Connect Cloud Control Panel (CCP) to make or receive calls using a software phone or desktop phone. After a few minutes, we should see data inside the flatfiles folder. To make it easier to try this solution, we provide sample data that you can enable by setting the sampledata parameter to true in your CloudFormation template.
You can navigate to the AWS Glue console by choosing Jobs on the left navigation pane of the console. We can select our job here. In my case, the job created by CloudFormation is called glueJob-i3TULzVtP1W0; yours should be similar. You run the job by choosing Run job for Action.
After that, we wait for the AWS Glue job to run and to finish successfully. We can track the status of the job by checking the History tab.
When the job finishes running, we can check the Database section. There should be a new table created called ctr in Parquet format.
To query the data with Athena, we can select the ctr table, and for Action choose View data.
Doing this takes us to the Athena console. If you run a query, Athena shows a preview of the data.
When we can query the data using Athena, we can visualize it using Amazon QuickSight. Before connecting Amazon QuickSight to Athena, we must make sure to grant Amazon QuickSight access to Athena and the associated S3 buckets in the account. For more information on doing this, see Managing Amazon QuickSight Permissions to AWS Resources in the Amazon QuickSight User Guide. We can then create a new data set in Amazon QuickSight based on the Athena table that was created.
After setting up permissions, we can create a new analysis in Amazon QuickSight by choosing New analysis.
Then we add a new data set.
We choose Athena as the source and give the data source a name (in this case, I named it connectctr).
Choose the name of the database and the table referencing the Parquet results.
Then choose Visualize.
After that, we should see the following screen.
Now we can create some visualizations. First, search for the agent.username column, and drag it to the AutoGraph section.
We can see the agents and the number of calls for each, so we can easily see which agents have taken the largest amount of calls. If we want to see from what queues the calls came for each agent, we can add the queue.arn column to the visual.
After following all these steps, you can use Amazon QuickSight to add different columns from the call records and perform different types of visualizations. You can build dashboards that continuously monitor your connect instance. You can share those dashboards with others in your organization who might need to see this data.
Conclusion
In this post, you see how you can use services like AWS Lambda, AWS Glue, and Amazon Athena to process Amazon Connect call records. The post also demonstrates how to use AWS Lambda to preprocess files in Amazon S3 and transform them into a format that recognized by AWS Glue crawlers. Finally, the post shows how to used Amazon QuickSight to perform visualizations.
You can use the provided template to analyze your own contact center instance. Or you can take the CloudFormation template and modify it to process other data streams that can be ingested using Amazon Kinesis or stored on Amazon S3.
Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.
Peter Dalbhanjan is a Solutions Architect for AWS based in Herndon, VA. Peter has a keen interest in evangelizing AWS solutions and has written multiple blog posts that focus on simplifying complex use cases. At AWS, Peter helps with designing and architecting variety of customer workloads.
Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.
To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.
Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.
What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.
Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.
Solution overview
Here is how the solution is laid out:
The following sections walk you through each of these steps to set up the pipeline.
1. Define the schema
When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.
To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.
The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.
That’s all you have to do to prepare the schema for Kinesis Data Firehose.
2. Define the data streams
Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events. Open the Kinesis Data Streams console and create two streams. You can configure them with only one shard each because you don’t have a lot of data right now.
3. Define the Kinesis Data Firehose delivery stream for Parquet
Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.
As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.
To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.
On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.
Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.
Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.
Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.
4. Set up the Amazon Connect contact center
After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.
After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.
At this point, your pipeline is complete. Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.
So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.
To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.
5. Analyze the data with Athena
Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures. The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.
Let’s run the first query to see which agents have logged into the system.
The query might look complex, but it’s fairly straightforward:
WITH dataset AS (
SELECT
from_iso8601_timestamp(eventtimestamp) AS event_ts,
eventtype,
-- CURRENT STATE
json_extract_scalar(
currentagentsnapshot,
'$.agentstatus.name') AS current_status,
from_iso8601_timestamp(
json_extract_scalar(
currentagentsnapshot,
'$.agentstatus.starttimestamp')) AS current_starttimestamp,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.firstname') AS current_firstname,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.lastname') AS current_lastname,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.username') AS current_username,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.routingprofile.defaultoutboundqueue.name') AS current_outboundqueue,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
-- PREVIOUS STATE
json_extract_scalar(
previousagentsnapshot,
'$.agentstatus.name') as prev_status,
from_iso8601_timestamp(
json_extract_scalar(
previousagentsnapshot,
'$.agentstatus.starttimestamp')) as prev_starttimestamp,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.firstname') as prev_firstname,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.lastname') as prev_lastname,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.username') as prev_username,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
from kfhconnectblog
where eventtype <> 'HEART_BEAT'
)
SELECT
current_status as status,
current_username as username,
event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC
The query output looks something like this:
Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.
WITH src AS (
SELECT
eventid,
json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
from kfhconnectblog
),
src2 AS (
SELECT *
FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT
eventid,
username,
json_extract_scalar(c_item, '$.contactid') as c_contactid,
json_extract_scalar(c_item, '$.channel') as c_channel,
json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
json_extract_scalar(c_item, '$.queue.name') as c_queue,
json_extract_scalar(c_item, '$.state') as c_state,
from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
json_extract_scalar(p_item, '$.contactid') as p_contactid,
json_extract_scalar(p_item, '$.channel') as p_channel,
json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
json_extract_scalar(p_item, '$.queue.name') as p_queue,
json_extract_scalar(p_item, '$.state') as p_state,
from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT
username,
c_channel as channel,
c_direction as direction,
p_state as prev_state,
c_state as current_state,
c_ts as current_ts,
c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC
The query output looks similar to the following:
6. Analyze the data with Amazon Redshift Spectrum
With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.
Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.
SELECT
eventtype,
json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
json_extract_path_text(
currentagentsnapshot,
'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog
The following shows the query output:
Summary
In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.
Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.
Many companies across the globe use Amazon DynamoDB to store and query historical user-interaction data. DynamoDB is a fast NoSQL database used by applications that need consistent, single-digit millisecond latency.
Often, customers want to turn their valuable data in DynamoDB into insights by analyzing a copy of their table stored in Amazon S3. Doing this separates their analytical queries from their low-latency critical paths. This data can be the primary source for understanding customers’ past behavior, predicting future behavior, and generating downstream business value. Customers often turn to DynamoDB because of its great scalability and high availability. After a successful launch, many customers want to use the data in DynamoDB to predict future behaviors or provide personalized recommendations.
DynamoDB is a good fit for low-latency reads and writes, but it’s not practical to scan all data in a DynamoDB database to train a model. In this post, I demonstrate how you can use DynamoDB table data copied to Amazon S3 by AWS Data Pipeline to predict customer behavior. I also demonstrate how you can use this data to provide personalized recommendations for customers using Amazon SageMaker. You can also run ad hoc queries using Amazon Athena against the data. DynamoDB recently released on-demand backups to create full table backups with no performance impact. However, it’s not suitable for our purposes in this post, so I chose AWS Data Pipeline instead to create managed backups are accessible from other services.
To do this, I describe how to read the DynamoDB backup file format in Data Pipeline. I also describe how to convert the objects in S3 to a CSV format that Amazon SageMaker can read. In addition, I show how to schedule regular exports and transformations using Data Pipeline. The sample data used in this post is from Bank Marketing Data Set of UCI.
The solution that I describe provides the following benefits:
Separates analytical queries from production traffic on your DynamoDB table, preserving your DynamoDB read capacity units (RCUs) for important production requests
Automatically updates your model to get real-time predictions
Optimizes for performance (so it doesn’t compete with DynamoDB RCUs after the export) and for cost (using data you already have)
Makes it easier for developers of all skill levels to use Amazon SageMaker
All code and data set in this post are available in this .zip file.
Solution architecture
The following diagram shows the overall architecture of the solution.
The steps that data follows through the architecture are as follows:
Data Pipeline regularly copies the full contents of a DynamoDB table as JSON into an S3
Exported JSON files are converted to comma-separated value (CSV) format to use as a data source for Amazon SageMaker.
Amazon SageMaker renews the model artifact and update the endpoint.
The converted CSV is available for ad hoc queries with Amazon Athena.
Data Pipeline controls this flow and repeats the cycle based on the schedule defined by customer requirements.
Building the auto-updating model
This section discusses details about how to read the DynamoDB exported data in Data Pipeline and build automated workflows for real-time prediction with a regularly updated model.
Find the automation_script.sh file and edit it for your environment. For example, you need to replace 's3://<your bucket>/<datasource path>/' with your own S3 path to the data source for Amazon ML. In the script, the text enclosed by angle brackets—< and >—should be replaced with your own path.
Upload the json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar file to your S3 path so that the ADD jar command in Apache Hive can refer to it.
For this solution, the banking.csv should be imported into a DynamoDB table.
Export a DynamoDB table
To export the DynamoDB table to S3, open the Data Pipeline console and choose the Export DynamoDB table to S3 template. In this template, Data Pipeline creates an Amazon EMR cluster and performs an export in the EMRActivity activity. Set proper intervals for backups according to your business requirements.
One core node(m3.xlarge) provides the default capacity for the EMR cluster and should be suitable for the solution in this post. Leave the option to resize the cluster before running enabled in the TableBackupActivity activity to let Data Pipeline scale the cluster to match the table size. The process of converting to CSV format and renewing models happens in this EMR cluster.
For a more in-depth look at how to export data from DynamoDB, see Export Data from DynamoDB in the Data Pipeline documentation.
Add the script to an existing pipeline
After you export your DynamoDB table, you add an additional EMR step to EMRActivity by following these steps:
Open the Data Pipeline console and choose the ID for the pipeline that you want to add the script to.
For Actions, choose Edit.
In the editing console, choose the Activities category and add an EMR step using the custom script downloaded in the previous section, as shown below.
Paste the following command into the new step after the data upload step:
The element #{output.directoryPath} references the S3 path where the data pipeline exports DynamoDB data as JSON. The path should be passed to the script as an argument.
The bash script has two goals, converting data formats and renewing the Amazon SageMaker model. Subsequent sections discuss the contents of the automation script.
Automation script: Convert JSON data to CSV with Hive
We use Apache Hive to transform the data into a new format. The Hive QL script to create an external table and transform the data is included in the custom script that you added to the Data Pipeline definition.
When you run the Hive scripts, do so with the -e option. Also, define the Hive table with the 'org.openx.data.jsonserde.JsonSerDe' row format to parse and read JSON format. The SQL creates a Hive EXTERNAL table, and it reads the DynamoDB backup data on the S3 path passed to it by Data Pipeline.
Note: You should create the table with the “EXTERNAL” keyword to avoid the backup data being accidentally deleted from S3 if you drop the table.
The full automation script for converting follows. Add your own bucket name and data source path in the highlighted areas.
After creating an external table, you need to read data. You then use the INSERT OVERWRITE DIRECTORY ~ SELECT command to write CSV data to the S3 path that you designated as the data source for Amazon SageMaker.
Depending on your requirements, you can eliminate or process the columns in the SELECT clause in this step to optimize data analysis. For example, you might remove some columns that have unpredictable correlations with the target value because keeping the wrong columns might expose your model to “overfitting” during the training. In this post, customer_id columns is removed. Overfitting can make your prediction weak. More information about overfitting can be found in the topic Model Fit: Underfitting vs. Overfitting in the Amazon ML documentation.
Automation script: Renew the Amazon SageMaker model
After the CSV data is replaced and ready to use, create a new model artifact for Amazon SageMaker with the updated dataset on S3. For renewing model artifact, you must create a new training job. Training jobs can be run using the AWS SDK ( for example, Amazon SageMaker boto3 ) or the Amazon SageMaker Python SDK that can be installed with “pip install sagemaker” command as well as the AWS CLI for Amazon SageMaker described in this post.
In addition, consider how to smoothly renew your existing model without service impact, because your model is called by applications in real time. To do this, you need to create a new endpoint configuration first and update a current endpoint with the endpoint configuration that is just created.
#!/bin/bash
## Define variable
REGION=$2
DTTIME=`date +%Y-%m-%d-%H-%M-%S`
ROLE="<your AmazonSageMaker-ExecutionRole>"
# Select containers image based on region.
case "$REGION" in
"us-west-2" )
IMAGE="174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest"
;;
"us-east-1" )
IMAGE="382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest"
;;
"us-east-2" )
IMAGE="404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest"
;;
"eu-west-1" )
IMAGE="438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest"
;;
*)
echo "Invalid Region Name"
exit 1 ;
esac
# Start training job and creating model artifact
TRAINING_JOB_NAME=TRAIN-${DTTIME}
S3OUTPUT="s3://<your bucket name>/model/"
INSTANCETYPE="ml.m4.xlarge"
INSTANCECOUNT=1
VOLUMESIZE=5
aws sagemaker create-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION} --algorithm-specification TrainingImage=${IMAGE},TrainingInputMode=File --role-arn ${ROLE} --input-data-config '[{ "ChannelName": "train", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://<your bucket name>/<datasource path>/", "S3DataDistributionType": "FullyReplicated" } }, "ContentType": "text/csv", "CompressionType": "None" , "RecordWrapperType": "None" }]' --output-data-config S3OutputPath=${S3OUTPUT} --resource-config InstanceType=${INSTANCETYPE},InstanceCount=${INSTANCECOUNT},VolumeSizeInGB=${VOLUMESIZE} --stopping-condition MaxRuntimeInSeconds=120 --hyper-parameters feature_dim=20,predictor_type=binary_classifier
# Wait until job completed
aws sagemaker wait training-job-completed-or-stopped --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}
# Get newly created model artifact and create model
MODELARTIFACT=`aws sagemaker describe-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION} --query 'ModelArtifacts.S3ModelArtifacts' --output text `
MODELNAME=MODEL-${DTTIME}
aws sagemaker create-model --region ${REGION} --model-name ${MODELNAME} --primary-container Image=${IMAGE},ModelDataUrl=${MODELARTIFACT} --execution-role-arn ${ROLE}
# create a new endpoint configuration
CONFIGNAME=CONFIG-${DTTIME}
aws sagemaker create-endpoint-config --region ${REGION} --endpoint-config-name ${CONFIGNAME} --production-variants VariantName=Users,ModelName=${MODELNAME},InitialInstanceCount=1,InstanceType=ml.m4.xlarge
# create or update the endpoint
STATUS=`aws sagemaker describe-endpoint --endpoint-name ServiceEndpoint --query 'EndpointStatus' --output text --region ${REGION} `
if [[ $STATUS -ne "InService" ]] ;
then
aws sagemaker create-endpoint --endpoint-name ServiceEndpoint --endpoint-config-name ${CONFIGNAME} --region ${REGION}
else
aws sagemaker update-endpoint --endpoint-name ServiceEndpoint --endpoint-config-name ${CONFIGNAME} --region ${REGION}
fi
Grant permission
Before you execute the script, you must grant proper permission to Data Pipeline. Data Pipeline uses the DataPipelineDefaultResourceRole role by default. I added the following policy to DataPipelineDefaultResourceRole to allow Data Pipeline to create, delete, and update the Amazon SageMaker model and data source in the script.
After you deploy a model into production using Amazon SageMaker hosting services, your client applications use this API to get inferences from the model hosted at the specified endpoint. This approach is useful for interactive web, mobile, or desktop applications.
Following, I provide a simple Python code example that queries against Amazon SageMaker endpoint URL with its name (“ServiceEndpoint”) and then uses them for real-time prediction.
Data Pipeline exports DynamoDB table data into S3. The original JSON data should be kept to recover the table in the rare event that this is needed. Data Pipeline then converts JSON to CSV so that Amazon SageMaker can read the data.Note: You should select only meaningful attributes when you convert CSV. For example, if you judge that the “campaign” attribute is not correlated, you can eliminate this attribute from the CSV.
Train the Amazon SageMaker model with the new data source.
When a new customer comes to your site, you can judge how likely it is for this customer to subscribe to your new product based on “predictedScores” provided by Amazon SageMaker.
If the new user subscribes your new product, your application must update the attribute “y” to the value 1 (for yes). This updated data is provided for the next model renewal as a new data source. It serves to improve the accuracy of your prediction. With each new entry, your application can become smarter and deliver better predictions.
Running ad hoc queries using Amazon Athena
Amazon Athena is a serverless query service that makes it easy to analyze large amounts of data stored in Amazon S3 using standard SQL. Athena is useful for examining data and collecting statistics or informative summaries about data. You can also use the powerful analytic functions of Presto, as described in the topic Aggregate Functions of Presto in the Presto documentation.
With the Data Pipeline scheduled activity, recent CSV data is always located in S3 so that you can run ad hoc queries against the data using Amazon Athena. I show this with example SQL statements following. For an in-depth description of this process, see the post Interactive SQL Queries for Data in Amazon S3 on the AWS News Blog.
Creating an Amazon Athena table and running it
Simply, you can create an EXTERNAL table for the CSV data on S3 in Amazon Athena Management Console.
=== Table Creation ===
CREATE EXTERNAL TABLE datasource (
age int,
job string,
marital string ,
education string,
default string,
housing string,
loan string,
contact string,
month string,
day_of_week string,
duration int,
campaign int,
pdays int ,
previous int ,
poutcome string,
emp_var_rate double,
cons_price_idx double,
cons_conf_idx double,
euribor3m double,
nr_employed double,
y int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n'
LOCATION 's3://<your bucket name>/<datasource path>/';
The following query calculates the correlation coefficient between the target attribute and other attributes using Amazon Athena.
=== Sample Query ===
SELECT corr(age,y) AS correlation_age_and_target,
corr(duration,y) AS correlation_duration_and_target,
corr(campaign,y) AS correlation_campaign_and_target,
corr(contact,y) AS correlation_contact_and_target
FROM ( SELECT age , duration , campaign , y ,
CASE WHEN contact = 'telephone' THEN 1 ELSE 0 END AS contact
FROM datasource
) datasource ;
Conclusion
In this post, I introduce an example of how to analyze data in DynamoDB by using table data in Amazon S3 to optimize DynamoDB table read capacity. You can then use the analyzed data as a new data source to train an Amazon SageMaker model for accurate real-time prediction. In addition, you can run ad hoc queries against the data on S3 using Amazon Athena. I also present how to automate these procedures by using Data Pipeline.
You can adapt this example to your specific use case at hand, and hopefully this post helps you accelerate your development. You can find more examples and use cases for Amazon SageMaker in the video AWS 2017: Introducing Amazon SageMaker on the AWS website.
Yong Seong Lee is a Cloud Support Engineer for AWS Big Data Services. He is interested in every technology related to data/databases and helping customers who have difficulties in using AWS services. His motto is “Enjoy life, be curious and have maximum experience.”
Last week, Oracle announced the general availability of MySQL 8.0. This is good news for database users, as it means Oracle is still developing MySQL.
I decide to celebrate the event by doing a quick test of MySQL 8.0. Here follows a step-by-step description of my first experience with MySQL 8.0. Note that I did the following without reading the release notes, as is what I have done with every MySQL / MariaDB release up to date; In this case it was not the right thing to do.
I pulled MySQL 8.0 from [email protected]:mysql/mysql-server.git I was pleasantly surprised that ‘cmake . ; make‘ worked without without any compiler warnings! I even checked the used compiler options and noticed that MySQL was compiled with -Wall + several other warning flags. Good job MySQL team!
I did have a little trouble finding the mysqld binary as Oracle had moved it to ‘runtime_output_directory’; Unexpected, but no big thing.
Now it’s was time to install MySQL 8.0.
I did know that MySQL 8.0 has removed mysql_install_db, so I had to use the mysqld binary directly to install the default databases: (I have specified datadir=/my/data3 in the /tmp/my.cnf file)
> cd runtime_output_directory > mkdir /my/data3 > ./mysqld –defaults-file=/tmp/my.cnf –install
2018-04-22T12:38:18.332967Z 1 [ERROR] [MY-011011] [Server] Failed to find valid data directory. 2018-04-22T12:38:18.333109Z 0 [ERROR] [MY-010020] [Server] Data Dictionary initialization failed. 2018-04-22T12:38:18.333135Z 0 [ERROR] [MY-010119] [Server] Aborting
A quick look in mysqld –help –verbose output showed that the right command option is –-initialize. My bad, lets try again,
> ./mysqld –defaults-file=/tmp/my.cnf –initialize
2018-04-22T12:39:31.910509Z 0 [ERROR] [MY-010457] [Server] –initialize specified but the data directory has files in it. Aborting. 2018-04-22T12:39:31.910578Z 0 [ERROR] [MY-010119] [Server] Aborting
Now I used the right options, but still didn’t work. I took a quick look around:
> ls /my/data3/ binlog.index
So even if the mysqld noticed that the data3 directory was wrong, it still wrote things into it. This even if I didn’t have –log-binlog enabled in the my.cnf file. Strange, but easy to fix:
> rm /my/data3/binlog.index > ./mysqld –defaults-file=/tmp/my.cnf –initialize 2018-04-22T12:40:45.633637Z 0 [ERROR] [MY-011071] [Server] unknown variable ‘max-tmp-tables=100’ 2018-04-22T12:40:45.633657Z 0 [Warning] [MY-010952] [Server] The privilege system failed to initialize correctly. If you have upgraded your server, make sure you’re executing mysql_upgrade to correct the issue. 2018-04-22T12:40:45.633663Z 0 [ERROR] [MY-010119] [Server] Aborting
The warning about the privilege system confused me a bit, but I ignored it for the time being and removed from my configuration files the variables that MySQL 8.0 doesn’t support anymore. I couldn’t find a list of the removed variables anywhere so this was done with the trial and error method.
> ./mysqld –defaults-file=/tmp/my.cnf
2018-04-22T12:42:56.626583Z 0 [ERROR] [MY-010735] [Server] Can’t open the mysql.plugin table. Please run mysql_upgrade to create it. 2018-04-22T12:42:56.827685Z 0 [Warning] [MY-010015] [Repl] Gtid table is not ready to be used. Table ‘mysql.gtid_executed’ cannot be opened. 2018-04-22T12:42:56.838501Z 0 [Warning] [MY-010068] [Server] CA certificate ca.pem is self signed. 2018-04-22T12:42:56.848375Z 0 [Warning] [MY-010441] [Server] Failed to open optimizer cost constant tables 2018-04-22T12:42:56.848863Z 0 [ERROR] [MY-013129] [Server] A message intended for a client cannot be sent there as no client-session is attached. Therefore, we’re sending the information to the error-log instead: MY-001146 – Table ‘mysql.component’ doesn’t exist 2018-04-22T12:42:56.848916Z 0 [Warning] [MY-013129] [Server] A message intended for a client cannot be sent there as no client-session is attached. Therefore, we’re sending the information to the error-log instead: MY-003543 – The mysql.component table is missing or has an incorrect definition. …. 2018-04-22T12:42:56.854141Z 0 [System] [MY-010931] [Server] /home/my/mysql-8.0/runtime_output_directory/mysqld: ready for connections. Version: ‘8.0.11’ socket: ‘/tmp/mysql.sock’ port: 3306 Source distribution.
I figured out that if there is a single wrong variable in the configuration file, running mysqld –initialize will leave the database in an inconsistent state. NOT GOOD! I am happy I didn’t try this in a production system!
2018-04-22T12:44:45.548960Z 5 [Note] [MY-010454] [Server] A temporary password is generated for [email protected]: px)NaaSp?6um 2018-04-22T12:44:51.221751Z 0 [System] [MY-013170] [Server] /home/my/mysql-8.0/runtime_output_directory/mysqld (mysqld 8.0.11) initializing of server has completed
Success!
I wonder why the temporary password is so complex; It could easily have been something that one could easily remember without decreasing security, it’s temporary after all. No big deal, one can always paste it from the logs. (Side note: MariaDB uses socket authentication on many system and thus doesn’t need temporary installation passwords).
Now lets start the MySQL server for real to do some testing:
> ./client/mysql –socket=/tmp/mysql.sock –user=root –password=”px)NaaSp?6um” ERROR 2059 (HY000): Plugin caching_sha2_password could not be loaded: /usr/local/mysql/lib/plugin/caching_sha2_password.so: cannot open shared object file: No such file or directory
Apparently MySQL 8.0 doesn’t work with old MySQL / MariaDB clients by default 🙁
I was testing this in a system with MariaDB installed, like all modern Linux system today, and didn’t want to use the MySQL clients or libraries.
I decided to try to fix this by changing the authentication to the native (original) MySQL authentication method.
> mysqld –skip-grant-tables
> ./client/mysql –socket=/tmp/mysql.sock –user=root ERROR 1045 (28000): Access denied for user ‘root’@’localhost’ (using password: NO)
Apparently –skip-grant-tables is not good enough anymore. Let’s try again with:
> ./client/mysql –socket=/tmp/mysql.sock –user=root mysql Welcome to the MariaDB monitor. Commands end with ; or \g. Your MySQL connection id is 7 Server version: 8.0.11 Source distribution
Great, we are getting somewhere, now lets fix “root” to work with the old authenticaion:
MySQL [mysql]> update mysql.user set plugin=”mysql_native_password”,authentication_string=password(“test”) where user=”root”; ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘(“test”) where user=”root”‘ at line 1
A quick look in the MySQL 8.0 release notes told me that the PASSWORD() function is removed in 8.0. Why???? I don’t know how one in MySQL 8.0 is supposed to generate passwords compatible with old installations of MySQL. One could of course start an old MySQL or MariaDB version, execute the password() function and copy the result.
I decided to fix this the easy way and use an empty password:
(Update:: I later discovered that the right way would have been to use: FLUSH PRIVILEGES; ALTER USER’ root’@’localhost’ identified by ‘test’ ; I however dislike this syntax as it has the password in clear text which is easy to grab and the command can’t be used to easily update the mysql.user table. One must also disable the –skip-grant mode to do use this)
MySQL [mysql]> update mysql.user set plugin=”mysql_native_password”,authentication_string=”” where user=”root”; Query OK, 1 row affected (0.077 sec) Rows matched: 1 Changed: 1 Warnings: 0 I restarted mysqld: > mysqld –default_authentication_plugin=mysql_native_password
> ./client/mysql –user=root –password=”” mysql ERROR 1862 (HY000): Your password has expired. To log in you must change it using a client that supports expired passwords.
2018-04-22T13:18:06.629548Z 5 [Warning] [MY-010453] [Server] [email protected] is created with an empty password ! Please consider switching off the –initialize-insecure option.
Hm. Don’t understand the warning as–initialize-insecure is not an option that one would use more than one time and thus nothing one would ‘switch off’.
> ./mysqld –defaults-file=/tmp/my.cnf
> ./client/mysql –user=root –password=”” mysql ERROR 2059 (HY000): Plugin caching_sha2_password could not be loaded: /usr/local/mysql/lib/plugin/caching_sha2_password.so: cannot open shared object file: No such file or directory
Back to the beginning 🙁
To get things to work with old clients, one has to initialize the database with: > ./mysqld –defaults-file=/tmp/my.cnf –initialize-insecure –default_authentication_plugin=mysql_native_password
Now I finally had MySQL 8.0 up and running and thought I would take it up for a spin by running the “standard” MySQL/MariaDB sql-bench test suite. This was removed in MySQL 5.7, but as I happened to have MariaDB 10.3 installed, I decided to run it from there.
sql-bench is a single threaded benchmark that measures the “raw” speed for some common operations. It gives you the ‘maximum’ performance for a single query. Its different from other benchmarks that measures the maximum throughput when you have a lot of users, but sql-bench still tells you a lot about what kind of performance to expect from the database.
I tried first to be clever and create the “test” database, that I needed for sql-bench, with > mkdir /my/data3/test
but when I tried to run the benchmark, MySQL 8.0 complained that the test database didn’t exist.
MySQL 8.0 has gone away from the original concept of MySQL where the user can easily create directories and copy databases into the database directory. This may have serious implication for anyone doing backup of databases and/or trying to restore a backup with normal OS commands.
I created the ‘test’ database with mysqladmin and then tried to run sql-bench:
> ./run-all-tests –user=root
The first run failed in test-ATIS:
Can’t execute command ‘create table class_of_service (class_code char(2) NOT NULL,rank tinyint(2) NOT NULL,class_description char(80) NOT NULL,PRIMARY KEY (class_code))’ Error: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘rank tinyint(2) NOT NULL,class_description char(80) NOT NULL,PRIMARY KEY (class_’ at line 1
This happened because ‘rank‘ is now a reserved word in MySQL 8.0. This is also reserved in ANSI SQL, but I don’t know of any other database that has failed to run test-ATIS before. I have in the past run it against Oracle, PostgreSQL, Mimer, MSSQL etc without any problems.
MariaDB also has ‘rank’ as a keyword in 10.2 and 10.3 but one can still use it as an identifier.
I fixed test-ATIS and then managed to run all tests on MySQL 8.0.
I did run the test both with MySQL 8.0 and MariaDB 10.3 with the InnoDB storage engine and by having identical values for all InnoDB variables, table-definition-cache and table-open-cache. I turned off performance schema for both databases. All test are run with a user with an empty password (to keep things comparable and because it’s was too complex to generate a password in MySQL 8.0)
The result are as follows Results per test in seconds: Operation |MariaDB|MySQL-8|
This is of course just a first view of the performance of MySQL 8.0 in a single user environment. Some reflections about the results:
Alter-table test is slower (as expected) in 8.0 as some of the alter tests benefits of the instant add column in MariaDB 10.3.
connect test is also better for MariaDB as we put a lot of efforts to speed this up in MariaDB 10.2
table-elimination shows an optimization in MariaDB for the Anchor table model, which MySQL doesn’t have.
CREATE and DROP TABLE is almost 8 times slower in MySQL 8.0 than in MariaDB 10.3. I assume this is the cost of ‘atomic DDL’. This may also cause performance problems for any thread using the data dictionary when another thread is creating/dropping tables.
When looking at the individual test results, MySQL 8.0 was slower in almost every test, in many significantly slower.
The only test where MySQL was faster was “update_with_key_prefix”. I checked this and noticed that there was a bug in the test and the columns was updated to it’s original value (which should be instant with any storage engine). This is an old bug that MySQL has found and fixed and that we have not been aware of in the test or in MariaDB.
While writing this, I noticed that MySQL 8.0 is now using utf8mb4 as the default character set instead of latin1. This may affect some of the benchmarks slightly (not much as most tests works with numbers and Oracle claims that utf8mb4 is only 20% slower than latin1), but needs to be verified.
Oracle claims that MySQL 8.0 is much faster on multi user benchmarks. The above test indicates that they may have done this by sacrificing single user performance.
We need to do more and many different benchmarks to better understand exactly what is going on. Stay tuned!
Short summary of my first run with MySQL 8.0:
Using the new caching_sha2_password authentication as default for new installation is likely to cause a lot of problems for users. No old application will be able to use MySQL 8.0, installed with default options, without moving to MySQL’s client libraries. While working on this blog I saw MySQL users complain on IRC that not even MySQL Workbench can authenticate with MySQL 8.0. This is the first time in MySQL’s history where such an incompatible change has ever been done!
Atomic DDL is a good thing (We plan to have this in MariaDB 10.4), but it should not have such a drastic impact on performance. I am also a bit skeptical of MySQL 8.0 having just one copy of the data dictionary as if this gets corrupted you will lose all your data. (Single point of failure)
MySQL 8.0 has several new reserved words and hasremoved a lot of variables, which makes upgrades hard. Before upgrading to MySQL 8.0 one has to check all one’s databases and applications to ensure that there are no conflicts.
As my test above shows, if you have a single deprecated variable in your configuration files, the installation of MySQL will abort and can leave the database in inconsistent state. I did of course my tests by installing into an empty data dictionary, but one can assume that some of the problems may also happen when upgrading an old installation.
Conclusions: In many ways, MySQL 8.0 has caught up with some earlier versions of MariaDB. For instance, in MariaDB 10.0, we introduced roles (four years ago). In MariaDB 10.1, we introduced encrypted redo/undo logs (three years ago). In MariaDB 10.2, we introduced window functions and CTEs (a year ago). However, some catch-up of MariaDB Server 10.2 features still remains for MySQL (such as check constraints, binlog compression, and log-based rollback).
MySQL 8.0 has a few new interesting features (mostly Atomic DDL and JSON TABLE functions), but at the same time MySQL has strayed away from some of the fundamental corner stone principles of MySQL:
From the start of the first version of MySQL in 1995, all development has been focused around 3 core principles:
Ease of use
Performance
Stability
With MySQL 8.0, Oracle has sacrifices 2 of 3 of these.
In addition (as part of ease of use), while I was working on MySQL, we did our best to ensure that the following should hold:
Upgrades should be trivial
Things should be kept compatible, if possible (don’t remove features/options/functions that are used)
Minimize reserved words, don’t remove server variables
One should be able to use normal OS commands to create and drop databases, copy and move tables around within the same system or between different systems. With 8.0 and data dictionary taking backups of specific tables will be hard, even if the server is not running.
mysqldump should always be usable backups and to move to new releases
Old clients and application should be able to use ‘any’ MySQL server version unchanged. (Some Oracle client libraries, like C++, by default only supports the new X protocol and can thus not be used with older MySQL or any MariaDB version)
We plan to add a data dictionary to MariaDB 10.4 or MariaDB 10.5, but in a way to not sacrifice any of the above principles!
The competition between MySQL and MariaDB is not just about a tactical arms race on features. It’s about design philosophy, or strategic vision, if you will.
This shows in two main ways: our respective view of the Storage Engine structure, and of the top-level direction of the roadmap.
On the Storage Engine side, MySQL is converging on InnoDB, even for clustering and partitioning. In doing so, they are abandoning the advantages of multiple ways of storing data. By contrast, MariaDB sees lots of value in the Storage Engine architecture: MariaDB Server 10.3 will see the general availability of MyRocks (for write-intensive workloads) and Spider (for scalable workloads). On top of that, we have ColumnStore for analytical workloads. One can use the CONNECT engine to join with other databases. The use of different storage engines for different workloads and different hardware is a competitive differentiator, now more than ever.
On the roadmap side, MySQL is carefully steering clear of features that close the gap between MySQL and Oracle. MariaDB has no such constraints. With MariaDB 10.3, we are introducing PL/SQL compatibility (Oracle’s stored procedures) and AS OF (built-in system versioned tables with point-in-time querying). For both of those features, MariaDB is the first Open Source database doing so. I don’t except Oracle to provide any of the above features in MySQL!
Also on the roadmap side, MySQL is not working with the ecosystem in extending the functionality. In 2017, MariaDB accepted more code contributions in one year, than MySQL has done during its entire lifetime, and the rate is increasing!
I am sure that the experience I had with testing MySQL 8.0 would have been significantly better if MySQL would have an open development model where the community could easily participate in developing and testing MySQL continuously. Most of the confusing error messages and strange behavior would have been found and fixed long before the GA release.
Before upgrading to MySQL 8.0 please readhttps://dev.mysql.com/doc/refman/8.0/en/upgrading-from-previous-series.html to see what problems you can run into! Don’t expect that old installations or applications will work out of the box without testing as a lot of features and options has been removed (query cache, partition of myisam tables etc)! You probably also have to revise your backup methods, especially if you want to ever restore just a few tables. (With 8.0, I don’t know how this can be easily done).
According to the MySQL 8.0 release notes, one can’t use mysqldump to copy a database to MySQL 8.0. One has to first to move to a MySQL 5.7 GA version (with mysqldump, as recommended by Oracle) and then to MySQL 8.0 with in-place update. I assume this means that all old mysqldump backups are useless for MySQL 8.0?
MySQL 8.0 seams to be a one way street to an unknown future. Up to MySQL 5.7 it has been trivial to move to MariaDB and one could always move back to MySQL with mysqldump. All MySQL client libraries has worked with MariaDB and all MariaDB client libraries has worked with MySQL. With MySQL 8.0 this has changed in the wrong direction.
As long as you are using MySQL 5.7 and below you have choices for your future, after MySQL 8.0 you have very little choice. But don’t despair, as MariaDB will always be able to load a mysqldump file and it’s very easy to upgrade your old MySQL installation to MariaDB 🙂
I wish you good luck to try MySQL 8.0 (and also the upcoming MariaDB 10.3)!
Version 1.10 of the Subversion version-control system is out. Improvements include a new interactive resolver for merge conflicts, better path-based authorization, LZ4 compression, and more; see the release notes for details.
New S3 One Zone-IA Storage Class – This new storage class is 20% less expensive than the existing Standard-IA storage class. It is designed to be used to store data that does not need the extra level of protection provided by geographic redundancy.
General Availability of S3 Select – This unique retrieval option lets you retrieve subsets of data from S3 objects using simple SQL expressions, with the possibility for a 400% performance improvement in the process.
Let’s take a look at both!
S3 One Zone-IA (Infrequent Access) Storage Class This new storage class stores data in a single AWS Availability Zone and is designed to provide eleven 9’s (99.99999999%) of data durability, just like the other S3 storage classes. Unlike those other classes, it is not designed to be resilient to the physical loss of an AZ due to major event such as an earthquake or a flood, and data could be lost in the unlikely event that an AZ is destroyed. S3 One Zone-IA storage gives you a lower cost option for secondary backups of on-premises data and for data that can be easily re-created. You can also use it as the target of S3 Cross-Region Replication from another AWS region.
You can specify the use of S3 One Zone-IA storage when you upload a new object to S3:
You can also make use of it as part of an S3 lifecycle rule:
You can set up a lifecycle rule that moves previous versions of an object to S3 One Zone-IA after 30 or more days:
And you can modify the storage class of an existing object:
You can also manage storage classes using the S3 API, CLI, and CloudFormation templates.
The S3 One Zone-IA storage class can be used in all public AWS regions. As I noted earlier, pricing is 20% lower than for the S3 Standard-IA storage class (see the S3 Pricing page for more info). There’s a 30 day minimum retention period, and a 128 KB minimum object size.
General Availability of S3 Select Randall wrote a detailed introduction to S3 Select last year and showed you how you can use it to retrieve selected data from within S3 objects. During the preview we added support for server-side encryption and the ability to run queries from the S3 Console.
I used a CSV file of airport codes to exercise the new console functionality:
This file contains listings for over 9100 airports, so it makes for useful test data but it definitely does not test the limits of S3 Select in any way. I select the file, open the More menu, and choose Select from:
The console sets the file format and compression according to the file name and the encryption status. I set delimiter and click Show file preview to verify that my settings are correct. Then I click Next to proceed:
I type SQL expressions in the SQL editor and click Run SQL to issue the query:
Or:
I can also issue queries from the AWS SDKs. I initiate the select operation:
s3 = boto3.client('s3', region_name='us-west-2')
r = s3.select_object_content(
Bucket='jbarr-us-west-2',
Key='sample-data/airportCodes.csv',
ExpressionType='SQL',
Expression="select * from s3object s where s.\"Country (Name)\" like '%United States%'",
InputSerialization = {'CSV': {"FileHeaderInfo": "Use"}},
OutputSerialization = {'CSV': {}},
)
And then I process the stream of results:
for event in r['Payload']:
if 'Records' in event:
records = event['Records']['Payload'].decode('utf-8')
print(records)
elif 'Stats' in event:
statsDetails = event['Stats']['Details']
print("Stats details bytesScanned: ")
print(statsDetails['BytesScanned'])
print("Stats details bytesProcessed: ")
print(statsDetails['BytesProcessed'])
S3 Select is available in all public regions and you can start using it today. Pricing is based on the amount of data scanned and the amount of data returned.
Designing a cloud storage solution to accommodate traditional enterprise software such as Microsoft SharePoint can be challenging. Microsoft SharePoint is complex and demands a lot of the underlying storage that’s used for its many databases and content repositories. To ensure that the selected storage platform can accommodate the availability, connectivity, and performance requirements recommended by Microsoft you need to use third-party storage solutions that build on and extend the functionality and performance of AWS storage services.
An appropriate storage solution for Microsoft SharePoint needs to provide data redundancy, high availability, fault tolerance, strong encryption, standard connectivity protocols, point-in-time data recovery, compression, ease of management, directory integration, and support.
AWS Marketplace is uniquely positioned as a procurement channel to find a third-party storage product that provides the additional technology layered on top of AWS storage services. The third-party storage products are provided and maintained by industry newcomers with born-in-the-cloud solutions as well as existing industry leaders. They include many mainstream storage products that are already familiar and commonly deployed in enterprises.
We recently released the “Leveraging AWS Marketplace Storage Solutions for Microsoft SharePoint” whitepaper to walk through the deployment and configuration of SoftNAS Cloud NAS, an AWS Marketplace third-party storage product that provides secure, highly available, redundant, and fault-tolerant storage to the Microsoft SharePoint collaboration suite.
About the Author
Israel Lawson is a senior solutions architect on the AWS Marketplace team.
Here’s the second part of Daniel Stone’s series on recent improvements in low-level graphics support. “The end result of all this work is that we have been able to eliminate the magic side channels which used to proliferate, and lay the groundwork for properly communicating this information across multiple devices as well. Devices supporting ARM’s AFBC compression format are just beginning to hit the market, which share a single compression format between video decoder, GPU, and display controller. We are also beginning to see GPUs from different vendors share tiling formats, in order to squeeze the most performance possible from hybrid GPU systems.”
Data that describe processes in a spatial context are everywhere in our day-to-day lives and they dominate big data problems. Map data, for instance, whether describing networks of roads or remote sensing data from satellites, get us where we need to go. Atmospheric data from simulations and sensors underlie our weather forecasts and climate models. Devices and sensors with GPS can provide a spatial context to nearly all mobile data.
In this post, we introduce the WIND toolkit, a huge (500 TB), open weather model dataset that’s available to the world on Amazon’s cloud services. We walk through how to access this data and some of the open-source software developed to make it easily accessible. Our solution considers a subset of geospatial data that exist on a grid (raster) and explores ways to provide access to large-scale raster data from weather models. The solution uses foundational AWS services and the Hierarchical Data Format (HDF), a well adopted format for scientific data.
The approach developed here can be extended to any data that fit in an HDF5 file, which can describe sparse and dense vectors and matrices of arbitrary dimensions. This format is already popular within the physical sciences for both experimental and simulation data. We discuss solutions to gridded data storage for a massive dataset of public weather model outputs called the Wind Integration National Dataset (WIND) toolkit. We also highlight strategies that are general to other large geospatial data management problems.
Wind Integration National Dataset
As variable renewable power penetration levels increase in power systems worldwide, the importance of renewable integration studies to ensure continued economic and reliable operation of the power grid is also increasing. The WIND toolkit is the largest freely available grid integration dataset to date.
The WIND toolkit was developed by 3TIER by Vaisala. They were under a subcontract to the National Renewable Energy Laboratory (NREL) to support studies on integration of wind energy into the existing US grid. NREL is a part of a network of national laboratories for the US Department of Energy and has a mission to advance the science and engineering of energy efficiency, sustainable transportation, and renewable power technologies.
The toolkit has been used by consultants, research groups, and universities worldwide to support grid integration studies. Less traditional uses also include resource assessments for wind plants (such as those powering Amazon data centers), and studying the effects of weather on California condor migrations in the Baja peninsula.
The diversity of applications highlights the value of accessible, open public data. Yet, there’s a catch: the dataset is huge. The WIND toolkit provides simulated atmospheric (weather) data at a two-km spatial resolution and five-minute temporal resolution at multiple heights for seven years. The entire dataset is half a petabyte (500 TB) in size and is stored in the NREL High Performance Computing data center in Golden, Colorado. Making this dataset publicly available easily and in a cost-effective manner is a major challenge.
As other laboratories and public institutions work to release their data to the world, they may face similar challenges to those that we experienced. Some prior, well-intentioned efforts to release huge datasets as-is have resulted in data resources that are technically available but fundamentally unusable. They may be stored in an unintuitive format or indexed and organized to support only a subset of potential uses. Downloading hundreds of terabytes of data is often impractical. Most users don’t have access to a big data cluster (or super computer) to slice and dice the data as they need after it’s downloaded.
We aim to provide a large amount of data (50 terabytes) to the public in a way that is efficient, scalable, and easy to use. In many cases, researchers can access these huge cloud-located datasets using the same software and algorithms they have developed for smaller datasets stored locally. Only the pieces of data they need for their individual analysis must be downloaded. To make this work in practice, we worked with the HDF Group and have built upon their forthcoming Highly Scalable Data Service.
In the rest of this post, we discuss how the HSDS software was developed to use Amazon EC2 and Amazon S3 resources to provide convenient and scalable access to these huge geospatial datasets. We describe how the HSDS service has been put to work for the WIND Toolkit dataset and demonstrate how to access it using the h5pyd Python library and the REST API. We conclude with information about our ongoing work to release more ‘open’ datasets to the public using AWS services, and ways to improve and extend the HSDS with newer Amazon services like Amazon ECS and AWS Lambda.
Developing a scalable service for big geospatial data
The HDF5 file format and API have been used for many years and is an effective means of storing large scientific datasets. For example, NASA’s Earth Observing System (EOS) satellites collect more than 16 TBs of data per day using HDF5.
With the rise of the cloud, there are new challenges and opportunities to rethink how HDF5 can be enhanced to work effectively as a component in a cloud-native architecture. For the HDF Group, working with NREL has been a great opportunity to put ideas into practice with a production-size dataset.
An HDF5 file consists of a directed graph of group and dataset objects. Datasets can be thought of as a multidimensional array with support for user-defined metadata tags and compression. Typical operations on datasets would be reading or writing data to a regular subregion (a hyperslab) or reading and writing individual elements (a point selection). Also, group and dataset objects may each contain an arbitrary number of the user-defined metadata elements known as attributes.
Many people have used the HDF library in applications developed or ported to run on EC2 instances, but there are a number of constraints that often prove problematic:
The HDF5 library can’t read directly from HDF5 files stored as S3 objects. The entire file (often many GB in size) would need to be copied to local storage before the first byte can be read. Also, the instance must be configured with the appropriately sized EBS volume)
The HDF library only has access to the computational resources of the instance itself (as opposed to a cluster of instances), so many operations are bottlenecked by the library.
Any modifications to the HDF5 file would somehow have to be synchronized with changes that other instances have made to same file before writing back to S3.
Using a pattern common to many offerings from AWS, the solution to these constraints is to develop a service framework around the HDF data model. Using this model, the HDF Group has created the Highly Scalable Data Service (HSDS) that provides all the functionality that traditionally was provided by the HDF5 library. By using the service, you don’t need to manage your own file volumes, but can just read and write whatever data that you need.
Because the service manages the actual data persistence to a durable medium (S3, in this case), you don’t need to worry about disk management. Simply stream the data you need from the service as you need it. Secondly, putting the functionality behind a service allows some tricks to increase performance (described in more detail later). And lastly, HSDS allows any number of clients to access the data at the same time, enabling HDF5 to be used as a coordination mechanism for multiple readers and writers.
In designing the HSDS architecture, we gave much thought to how to achieve scalability of the HSDS service. For accessing HDF5 data, there are two different types of scaling to consider:
Multiple clients making many requests to the service
Single requests that require a significant amount of data processing
To deal with the first scaling challenge, as with most services, we considered how the service responds as the request rate increases. AWS provides some great tools that help in this regard:
Auto Scaling groups
Elastic Load Balancing load balancers
The ability of S3 to handle large aggregate throughput rates
By using a cluster of EC2 instances behind a load balancer, you can handle different client loads in a cost-effective manner.
The second scaling challenge concerns single requests that would take significant processing time with just one compute node. One example of this from the WIND toolkit would be extracting all the values in the seven-year time span for a given geographic point and dataset.
In HDF5, large datasets are typically stored as “chunks”; that is, a regular partition of the array. In HSDS, each chunk is stored as a binary object in S3. The sequential approach to retrieving the time series values would be for the service to read each chunk needed from S3, extract the needed elements, and go on to the next chunk. In this case, that would involve processing 2557 chunks, and would be quite slow.
Fortunately, with HSDS, you can speed this up quite a bit by exploiting the compute and I/O capabilities of the cluster. Upon receiving the request, the receiving node can use other nodes in the cluster to read different portions of the selection. With multiple nodes reading from S3 in parallel, performance improves as the cluster size increases.
The diagram below illustrates how this works in simplified case of four chunks and four nodes.
This architecture has worked in well in practice. In testing with the WIND toolkit and time series extraction, we observed a request latency of ~60 seconds using four nodes vs. ~5 seconds with 40 nodes. Performance roughly scales with the size of the cluster.
A planned enhancement to this is to use AWS Lambda for the worker processing. This enables 1000-way parallel reads at a reasonable cost, as you only pay for the milliseconds of CPU time used with AWS Lambda.
Public access to atmospheric data using HSDS and AWS
An early challenge in releasing the WIND toolkit data was in deciding how to subset the data for different use cases. In general, few researchers need access to the entire 0.5 PB of data and a great deal of efficiency and cost reduction can be gained by making directed constituent datasets.
NREL grid integration researchers initially extracted a 2-TB subset by selecting 120,000 points where the wind resource seemed appropriate for development. They also chose only those data important for wind applications (100-m wind speed, converted to power), the most interesting locations for those performing grid studies. To support the remaining users who needed more data resolution, we down-sampled the data to a 60-minute temporal resolution, keeping all the other variables and spatial resolution intact. This reduced dataset is 50 TB of data describing 30+ atmospheric variables of data for 7 years at a 60-minute temporal resolution.
The WindViz browser-based Gridded Wind Toolkit Visualizer was created as an example implementation of the HSDS REST API in JavaScript. The visualizer is written in the style of ECMAScript 2016 using a modern development toolchain that includes webpack and Babel. The source code is available through our GitHub repository. The demo page is hosted via GitHub pages, and we use a cross-origin AJAX request to fetch data from the HSDS service running on the EC2 infrastructure. The visualizer can be used to explore the gridded wind toolkit data on a map. Achieve full spatial resolution by zooming in to a specific region.
Programmatic access is possible using the h5pyd Python library, a distributed analog to the widely used h5py library. Users interact with the datasets (variables) and slice the data from its (time x longitude x latitude) cube form as they see fit.
Examples and use cases are described in a set of Jupyter notebooks and available on GitHub:
Now you have a Jupyter notebook server running on your EC2 server.
From your laptop, create an SSH tunnel:
$ ssh –L 8888:localhost:8888 (IP address of the EC2 server)
Now, you can browse to localhost:8888 using the correct token, and interact with the notebooks as if they were local. Within the directory, there are examples for accessing the HSDS API and plotting wind and weather data using matplotlib.
Controlling access and defraying costs
A final concern is rate limiting and access control. Although the HSDS service is scalable and relatively robust, we had a few practical concerns:
How can we protect from malicious or accidental use that may lead to high egress fees (for example, someone who attempts to repeatedly download the entire dataset from S3)?
How can we keep track of who is using the data both to document the value of the data resource and to justify the costs?
If costs become too high, can we charge for some or all API use to help cover the costs?
To approach these problems, we investigated using Amazon API Gateway and its simplified integration with the AWS Marketplace for SaaS monetization as well as third-party API proxies.
In the end, we chose to use API Umbrella due to its close involvement with http://data.gov. While AWS Marketplace is a compelling option for future datasets, the decision was made to keep this dataset entirely open, at least for now. As community use and associated costs grow, we’ll likely revisit Marketplace. Meanwhile, API Umbrella provides controls for rate limiting and API key registration out of the box and was simple to implement as a front-end proxy to HSDS. Those applications that may want to charge for API use can accomplish a similar strategy using Amazon API Gateway and AWS Marketplace.
Ongoing work and other resources
As NREL and other government research labs, municipalities, and organizations try to share data with the public, we expect many of you will face similar challenges to those we have tried to approach with the architecture described in this post. Providing large datasets is one challenge. Doing so in a way that is affordable and convenient for users is an entirely more difficult goal. Using AWS cloud-native services and the existing foundation of the HDF file format has allowed us to tackle that challenge in a meaningful way.
Dr. Caleb Phillips is a senior scientist with the Data Analysis and Visualization Group within the Computational Sciences Center at the National Renewable Energy Laboratory. Caleb comes from a background in computer science systems, applied statistics, computational modeling, and optimization. His work at NREL spans the breadth of renewable energy technologies and focuses on applying modern data science techniques to data problems at scale.
Dr. Caroline Draxl is a senior scientist at NREL. She supports the research and modeling activities of the US Department of Energy from mesoscale to wind plant scale. Caroline uses mesoscale models to research wind resources in various countries, and participates in on- and offshore boundary layer research and in the coupling of the mesoscale flow features (kilometer scale) to the microscale (tens of meters). She holds a M.S. degree in Meteorology and Geophysics from the University of Innsbruck, Austria, and a PhD in Meteorology from the Technical University of Denmark.
John Readey has been a Senior Architect at The HDF Group since he joined in June 2014. His interests include web services related to HDF, applications that support the use of HDF and data visualization.Before joining The HDF Group, John worked at Amazon.com from 2006–2014 where he developed service-based systems for eCommerce and AWS.
Jordan Perr-Sauer is an RPP intern with the Data Analysis and Visualization Group within the Computational Sciences Center at the National Renewable Energy Laboratory. Jordan hopes to use his professional background in software engineering and his academic training in applied mathematics to solve the challenging problems facing America and the world.
This post was written in partnership with Intuit to share learnings, best practices, and recommendations for running an Apache Kafka cluster on AWS. Thanks to Vaishak Suresh and his colleagues at Intuit for their contribution and support.
Intuit, in their own words: Intuit, a leading enterprise customer for AWS, is a creator of business and financial management solutions. For more information on how Intuit partners with AWS, see our previous blog post, Real-time Stream Processing Using Apache Spark Streaming and Apache Kafka on AWS. Apache Kafka is an open-source, distributed streaming platform that enables you to build real-time streaming applications.
The best practices described in this post are based on our experience in running and operating large-scale Kafka clusters on AWS for more than two years. Our intent for this post is to help AWS customers who are currently running Kafka on AWS, and also customers who are considering migrating on-premises Kafka deployments to AWS.
Running your Kafka deployment on Amazon EC2 provides a high performance, scalable solution for ingesting streaming data. AWS offers many different instance types and storage option combinations for Kafka deployments. However, given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.
In this blog post, we cover the following aspects of running Kafka clusters on AWS:
Deployment considerations and patterns
Storage options
Instance types
Networking
Upgrades
Performance tuning
Monitoring
Security
Backup and restore
Note: While implementing Kafka clusters in a production environment, make sure also to consider factors like your number of messages, message size, monitoring, failure handling, and any operational issues.
Deployment considerations and patterns
In this section, we discuss various deployment options available for Kafka on AWS, along with pros and cons of each option. A successful deployment starts with thoughtful consideration of these options. Considering availability, consistency, and operational overhead of the deployment helps when choosing the right option.
Single AWS Region, Three Availability Zones, All Active
One typical deployment pattern (all active) is in a single AWS Region with three Availability Zones (AZs). One Kafka cluster is deployed in each AZ along with Apache ZooKeeper and Kafka producer and consumer instances as shown in the illustration following.
In this pattern, this is the Kafka cluster deployment:
Kafka producers and Kafka cluster are deployed on each AZ.
Data is distributed evenly across three Kafka clusters by using Elastic Load Balancer.
Kafka consumers aggregate data from all three Kafka clusters.
Kafka cluster failover occurs this way:
Mark down all Kafka producers
Stop consumers
Debug and restack Kafka
Restart consumers
Restart Kafka producers
Following are the pros and cons of this pattern.
Pros
Cons
Highly available
Can sustain the failure of two AZs
No message loss during failover
Simple deployment
Very high operational overhead:
All changes need to be deployed three times, one for each Kafka cluster
Maintaining and monitoring three Kafka clusters
Maintaining and monitoring three consumer clusters
A restart is required for patching and upgrading brokers in a Kafka cluster. In this approach, a rolling upgrade is done separately for each cluster.
Single Region, Three Availability Zones, Active-Standby
Another typical deployment pattern (active-standby) is in a single AWS Region with a single Kafka cluster and Kafka brokers and Zookeepers distributed across three AZs. Another similar Kafka cluster acts as a standby as shown in the illustration following. You can use Kafka mirroring with MirrorMaker to replicate messages between any two clusters.
In this pattern, this is the Kafka cluster deployment:
Kafka producers are deployed on all three AZs.
Only one Kafka cluster is deployed across three AZs (active).
ZooKeeper instances are deployed on each AZ.
Brokers are spread evenly across all three AZs.
Kafka consumers can be deployed across all three AZs.
Standby Kafka producers and a Multi-AZ Kafka cluster are part of the deployment.
Kafka cluster failover occurs this way:
Switch traffic to standby Kafka producers cluster and Kafka cluster.
Restart consumers to consume from standby Kafka cluster.
Following are the pros and cons of this pattern.
Pros
Cons
Less operational overhead when compared to the first option
Only one Kafka cluster to manage and consume data from
Can handle single AZ failures without activating a standby Kafka cluster
Added latency due to cross-AZ data transfer among Kafka brokers
For Kafka versions before 0.10, replicas for topic partitions have to be assigned so they’re distributed to the brokers on different AZs (rack-awareness)
The cluster can become unavailable in case of a network glitch, where ZooKeeper does not see Kafka brokers
Possibility of in-transit message loss during failover
Intuit recommends using a single Kafka cluster in one AWS Region, with brokers distributing across three AZs (single region, three AZs). This approach offers stronger fault tolerance than otherwise, because a failed AZ won’t cause Kafka downtime.
Storage options
There are two storage options for file storage in Amazon EC2:
Ephemeral storage is local to the Amazon EC2 instance. It can provide high IOPS based on the instance type. On the other hand, Amazon EBS volumes offer higher resiliency and you can configure IOPS based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. Your choice of storage is closely related to the type of workload supported by your Kafka cluster.
Kafka provides built-in fault tolerance by replicating data partitions across a configurable number of instances. If a broker fails, you can recover it by fetching all the data from other brokers in the cluster that host the other replicas. Depending on the size of the data transfer, it can affect recovery process and network traffic. These in turn eventually affect the cluster’s performance.
The following table contrasts the benefits of using an instance store versus using EBS for storage.
Instance store
EBS
Instance storage is recommended for large- and medium-sized Kafka clusters. For a large cluster, read/write traffic is distributed across a high number of brokers, so the loss of a broker has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important, but a failed broker takes longer and requires more network traffic for a smaller Kafka cluster.
Storage-optimized instances like h1, i3, and d2 are an ideal choice for distributed applications like Kafka.
The primary advantage of using EBS in a Kafka deployment is that it significantly reduces data-transfer traffic when a broker fails or must be replaced. The replacement broker joins the cluster much faster.
Data stored on EBS is persisted in case of an instance failure or termination. The broker’s data stored on an EBS volume remains intact, and you can mount the EBS volume to a new EC2 instance. Most of the replicated data for the replacement broker is already available in the EBS volume and need not be copied over the network from another broker. Only the changes made after the original broker failure need to be transferred across the network. That makes this process much faster.
Intuit chose EBS because of their frequent instance restacking requirements and also other benefits provided by EBS.
Generally, Kafka deployments use a replication factor of three. EBS offers replication within their service, so Intuit chose a replication factor of two instead of three.
Instance types
The choice of instance types is generally driven by the type of storage required for your streaming applications on a Kafka cluster. If your application requires ephemeral storage, h1, i3, and d2 instances are your best option.
Intuit used r3.xlarge instances for their brokers and r3.large for ZooKeeper, with ST1 (throughput optimized HDD) EBS for their Kafka cluster.
Here are sample benchmark numbers from Intuit tests.
Configuration
Broker bytes (MB/s)
r3.xlarge
ST1 EBS
12 brokers
12 partitions
Aggregate 346.9
If you need EBS storage, then AWS has a newer-generation r4 instance. The r4 instance is superior to R3 in many ways:
It has a faster processor (Broadwell).
EBS is optimized by default.
It features networking based on Elastic Network Adapter (ENA), with up to 10 Gbps on smaller sizes.
The network plays a very important role in a distributed system like Kafka. A fast and reliable network ensures that nodes can communicate with each other easily. The available network throughput controls the maximum amount of traffic that Kafka can handle. Network throughput, combined with disk storage, is often the governing factor for cluster sizing.
If you expect your cluster to receive high read/write traffic, select an instance type that offers 10-Gb/s performance.
In addition, choose an option that keeps interbroker network traffic on the private subnet, because this approach allows clients to connect to the brokers. Communication between brokers and clients uses the same network interface and port. For more details, see the documentation about IP addressing for EC2 instances.
If you are deploying in more than one AWS Region, you can connect the two VPCs in the two AWS Regions using cross-region VPC peering. However, be aware of the networking costs associated with cross-AZ deployments.
Upgrades
Kafka has a history of not being backward compatible, but its support of backward compatibility is getting better. During a Kafka upgrade, you should keep your producer and consumer clients on a version equal to or lower than the version you are upgrading from. After the upgrade is finished, you can start using a new protocol version and any new features it supports. There are three upgrade approaches available, discussed following.
Rolling or in-place upgrade
In a rolling or in-place upgrade scenario, upgrade one Kafka broker at a time. Take into consideration the recommendations for doing rolling restarts to avoid downtime for end users.
Downtime upgrade
If you can afford the downtime, you can take your entire cluster down, upgrade each Kafka broker, and then restart the cluster.
Blue/green upgrade
Intuit followed the blue/green deployment model for their workloads, as described following.
If you can afford to create a separate Kafka cluster and upgrade it, we highly recommend the blue/green upgrade scenario. In this scenario, we recommend that you keep your clusters up-to-date with the latest Kafka version. For additional details on Kafka version upgrades or more details, see the Kafka upgrade documentation.
The following illustration shows a blue/green upgrade.
In this scenario, the upgrade plan works like this:
Create a new Kafka cluster on AWS.
Create a new Kafka producers stack to point to the new Kafka cluster.
Create topics on the new Kafka cluster.
Test the green deployment end to end (sanity check).
Using Amazon Route 53, change the new Kafka producers stack on AWS to point to the new green Kafka environment that you have created.
The roll-back plan works like this:
Switch Amazon Route 53 to the old Kafka producers stack on AWS to point to the old Kafka environment.
You can tune Kafka performance in multiple dimensions. Following are some best practices for performance tuning.
These are some general performance tuning techniques:
If throughput is less than network capacity, try the following:
Add more threads
Increase batch size
Add more producer instances
Add more partitions
To improve latency when acks =-1, increase your num.replica.fetches value.
For cross-AZ data transfer, tune your buffer settings for sockets and for OS TCP.
Make sure that num.io.threads is greater than the number of disks dedicated for Kafka.
Adjust num.network.threads based on the number of producers plus the number of consumers plus the replication factor.
Your message size affects your network bandwidth. To get higher performance from a Kafka cluster, select an instance type that offers 10 Gb/s performance.
For Java and JVM tuning, try the following:
Minimize GC pauses by using the Oracle JDK, which uses the new G1 garbage-first collector.
Try to keep the Kafka heap size below 4 GB.
Monitoring
Knowing whether a Kafka cluster is working correctly in a production environment is critical. Sometimes, just knowing that the cluster is up is enough, but Kafka applications have many moving parts to monitor. In fact, it can easily become confusing to understand what’s important to watch and what you can set aside. Items to monitor range from simple metrics about the overall rate of traffic, to producers, consumers, brokers, controller, ZooKeeper, topics, partitions, messages, and so on.
For monitoring, Intuit used several tools, including Newrelec, Wavefront, Amazon CloudWatch, and AWS CloudTrail. Our recommended monitoring approach follows.
For system metrics, we recommend that you monitor:
CPU load
Network metrics
File handle usage
Disk space
Disk I/O performance
Garbage collection
ZooKeeper
For producers, we recommend that you monitor:
Batch-size-avg
Compression-rate-avg
Waiting-threads
Buffer-available-bytes
Record-queue-time-max
Record-send-rate
Records-per-request-avg
For consumers, we recommend that you monitor:
Batch-size-avg
Compression-rate-avg
Waiting-threads
Buffer-available-bytes
Record-queue-time-max
Record-send-rate
Records-per-request-avg
Security
Like most distributed systems, Kafka provides the mechanisms to transfer data with relatively high security across the components involved. Depending on your setup, security might involve different services such as encryption, Kerberos, Transport Layer Security (TLS) certificates, and advanced access control list (ACL) setup in brokers and ZooKeeper. The following tells you more about the Intuit approach. For details on Kafka security not covered in this section, see the Kafka documentation.
Encryption at rest
For EBS-backed EC2 instances, you can enable encryption at rest by using Amazon EBS volumes with encryption enabled. Amazon EBS uses AWS Key Management Service (AWS KMS) for encryption. For more details, see Amazon EBS Encryption in the EBS documentation. For instance store–backed EC2 instances, you can enable encryption at rest by using Amazon EC2 instance store encryption.
Encryption in transit
Kafka uses TLS for client and internode communications.
Authentication
Authentication of connections to brokers from clients (producers and consumers) to other brokers and tools uses either Secure Sockets Layer (SSL) or Simple Authentication and Security Layer (SASL).
Kafka supports Kerberos authentication. If you already have a Kerberos server, you can add Kafka to your current configuration.
Authorization
In Kafka, authorization is pluggable and integration with external authorization services is supported.
Backup and restore
The type of storage used in your deployment dictates your backup and restore strategy.
The best way to back up a Kafka cluster based on instance storage is to set up a second cluster and replicate messages using MirrorMaker. Kafka’s mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. Depending on your setup and requirements, your backup cluster might be in the same AWS Region as your main cluster or in a different one.
For EBS-based deployments, you can enable automatic snapshots of EBS volumes to back up volumes. You can easily create new EBS volumes from these snapshots to restore. We recommend storing backup files in Amazon S3.
For more information on how to back up in Kafka, see the Kafka documentation.
Conclusion
In this post, we discussed several patterns for running Kafka in the AWS Cloud. AWS also provides an alternative managed solution with Amazon Kinesis Data Streams, there are no servers to manage or scaling cliffs to worry about, you can scale the size of your streaming pipeline in seconds without downtime, data replication across availability zones is automatic, you benefit from security out of the box, Kinesis Data Streams is tightly integrated with a wide variety of AWS services like Lambda, Redshift, Elasticsearch and it supports open source frameworks like Storm, Spark, Flink, and more. You may refer to kafka-kinesis connector.
If you have questions or suggestions, please comment below.
Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.
With the explosion in virtual reality (VR) technologies over the past few years, we’ve had an increasing number of customers ask us for advice and best practices around deploying their VR-based products and service offerings on the AWS Cloud. It soon became apparent that while the VR ecosystem is large in both scope and depth of types of workloads (gaming, e-medicine, security analytics, live streaming events, etc.), many of the workloads followed repeatable patterns, with storage and delivery of live and on-demand immersive video at the top of the list.
Looking at consumer trends, the desire for live and on-demand immersive video is fairly self-explanatory. VR has ushered in convenient and low-cost access for consumers and businesses to a wide variety of options for consuming content, ranging from browser playback of live and on-demand 360º video, all the way up to positional tracking systems with a high degree of immersion. All of these scenarios contain one lowest common denominator: video.
Which brings us to the topic of this post. We set out to build a solution that could support both live and on-demand events, bring with it a high degree of scalability, be flexible enough to support transformation of video if required, run at a low cost, and use open-source software to every extent possible.
In this post, we describe the reference architecture we created to solve this challenge, using Amazon EC2 Spot Instances, Amazon S3, Elastic Load Balancing, Amazon CloudFront, AWS CloudFormation, and Amazon CloudWatch, with open-source software such as NGINX, FFMPEG, and JavaScript-based client-side playback technologies. We step you through deployment of the solution and how the components work, as well as the capture, processing, and playback of the underlying live and on-demand immersive media streams.
This GitHub repository includes the source code necessary to follow along. We’ve also provided a self-paced workshop, from AWS re:Invent 2017 that breaks down this architecture even further. If you experience any issues or would like to suggest an enhancement, please use the GitHub issue tracker.
Prerequisites
As a side note, you’ll also need a few additional components to take best advantage of the infrastructure:
A camera/capture device capable of encoding and streaming RTMP video
A browser to consume the content.
You’re going to generate HTML5-compatible video (Apple HLS to be exact), but there are many other native iOS and Android options for consuming the media that you create. It’s also worth noting that your playback device should support projection of your input stream. We’ll talk more about that in the next section.
How does immersive media work?
At its core, any flavor of media, be that audio or video, can be viewed with some level of immersion. The ability to interact passively or actively with the content brings with it a further level of immersion. When you look at VR devices with rotational and positional tracking, you naturally need more than an ability to interact with a flat plane of video. The challenge for any creative thus becomes a tradeoff between immersion features (degrees of freedom, monoscopic 2D or stereoscopic 3D, resolution, framerate) and overall complexity.
Where can you start from a simple and effective point of view, that enables you to build out a fairly modular solution and test it? There are a few areas we chose to be prescriptive with our solution.
Source capture from the Ricoh Theta S
First, monoscopic 360-degree video is currently one of the most commonly consumed formats on consumer devices. We explicitly chose to focus on this format, although the infrastructure is not limited to it. More on this later.
Second, if you look at most consumer-level cameras that provide live streaming ability, and even many professional rigs, there are at least two lenses or cameras at a minimum. The figure above illustrates a single capture from a Ricoh Theta S in monoscopic 2D. The left image captures 180 degrees of the field of view, and the right image captures the other 180 degrees.
For this post, we chose a typical midlevel camera (the Ricoh Theta S), and used a laptop with open-source software (Open Broadcaster Software) to encode and stream the content. Again, the solution infrastructure is not limited to this particular brand of camera. Any camera or encoder that outputs 360º video and encodes to H264+AAC with an RTMP transport will work.
Third, capturing and streaming multiple camera feeds brings additional requirements around stream synchronization and cost of infrastructure. There is also a requirement to stitch media in real time, which can be CPU and GPU-intensive. Many devices and platforms do this either on the device, or via outboard processing that sits close to the camera location. If you stitch and deliver a single stream, you can save the costs of infrastructure and bitrate/connectivity requirements. We chose to keep these aspects on the encoder side to save on cost and reduce infrastructure complexity.
Last, the most common delivery format that requires little to no processing on the infrastructure side is equirectangular projection, as per the above figure. By stitching and unwrapping the spherical coordinates into a flat plane, you can easily deliver the video exactly as you would with any other live or on-demand stream. The only caveat is that resolution and bit rate are of utmost importance. The higher you can push these (high bit rate @ 4K resolution), the more immersive the experience is for viewers. This is due to the increase in sharpness and reduction of compression artifacts.
Knowing that we would be transcoding potentially at 4K on the source camera, but in a format that could be transmuxed without an encoding penalty on the origin servers, we implemented a pass-through for the highest bit rate, and elected to only transcode lower bitrates. This requires some level of configuration on the source encoder, but saves on cost and infrastructure. Because you can conform the source stream, you may as well take advantage of that!
For this post, we chose not to focus on ways to optimize projection. However, the reference architecture does support this with additional open source components compiled into the FFMPEG toolchain. A number of options are available to this end, such as open source equirectangular to cubic transformation filters. There is a tradeoff, however, in that reprojection implies that all streams must be transcoded.
Processing and origination stack
To get started, we’ve provided a CloudFormation template that you can launch directly into your own AWS account. We quickly review how it works, the solution’s components, key features, processing steps, and examine the main configuration files. Following this, you launch the stack, and then proceed with camera and encoder setup.
Immersive streaming reference architecture
The event encoder publishes the RTMP source to multiple origin elastic IP addresses for packaging into the HLS adaptive bitrate.
The client requests the live stream through the CloudFront CDN.
The origin responds with the appropriate HLS stream.
The edge fleet caches media requests from clients and elastically scales across both Availability Zones to meet peak demand.
CloudFront caches media at local edge PoPs to improve performance for users and reduce the origin load.
When the live event is finished, the VOD asset is published to S3. An S3 event is then published to SQS.
The encoding fleet processes the read messages from the SQS queue, processes the VOD clips, and stores them in the S3 bucket.
How it works
A camera captures content, and with the help of a contribution encoder, publishes a live stream in equirectangular format. The stream is encoded at a high bit rate (at least 2.5 Mbps, but typically 16+ Mbps for 4K) using H264 video and AAC audio compression codecs, and delivered to a primary origin via the RTMP protocol. Streams may transit over the internet or dedicated links to the origins. Typically, for live events in the field, internet or bonded cellular are the most widely used.
The encoder is typically configured to push the live stream to a primary URI, with the ability (depending on the source encoding software/hardware) to roll over to a backup publishing point origin if the primary fails. Because you run across multiple Availability Zones, this architecture could handle an entire zone outage with minor disruption to live events. The primary and backup origins handle the ingestion of the live stream as well as transcoding to H264+AAC-based adaptive bit rate sets. After transcode, they package the streams into HLS for delivery and create a master-level manifest that references all adaptive bit rates.
The edge cache fleet pulls segments and manifests from the active origin on demand, and supports failover from primary to backup if the primary origin fails. By adding this caching tier, you effectively separate the encoding backend tier from the cache tier that responds to client or CDN requests. In addition to origin protection, this separation allows you to independently monitor, configure, and scale these components.
Viewers can use the sample HTML5 player (or compatible desktop, iOS or Android application) to view the streams. Navigation in the 360-degree view is handled either natively via device-based gyroscope, positionally via more advanced devices such as a head mount display, or via mouse drag on the desktop. Adaptive bit rate is key here, as this allows you to target multiple device types, giving the player on each device the option of selecting an optimum stream based on network conditions or device profile.
Solution components
When you deploy the CloudFormation template, all the architecture services referenced above are created and launched. This includes:
The compute tier running on Spot Instances for the corresponding components:
the primary and backup ingest origins
the edge cache fleet
the transcoding fleet
the test source
The CloudFront distribution
S3 buckets for storage of on-demand VOD assets
An Application Load Balancer for load balancing the service
An Amazon ECS cluster and container for the test source
The template also provisions the underlying dependencies:
A VPC
Security groups
IAM policies and roles
Elastic network interfaces
Elastic IP addresses
The edge cache fleet instances need some way to discover the primary and backup origin locations. You use elastic network interfaces and elastic IP addresses for this purpose.
As each component of the infrastructure is provisioned, software required to transcode and process the streams across the Spot Instances is automatically deployed. This includes NGiNX-RTMP for ingest of live streams, FFMPEG for transcoding, NGINX for serving, and helper scripts to handle various tasks (potential Spot Instance interruptions, queueing, moving content to S3). Metrics and logs are available through CloudWatch and you can manage the deployment using the CloudFormation console or AWS CLI.
Key features include:
Live and video-on-demand recording
You’re supporting both live and on-demand. On-demand content is created automatically when the encoder stops publishing to the origin.
Cost-optimization and operating at scale using Spot Instances
Spot Instances are used exclusively for infrastructure to optimize cost and scale throughput.
Midtier caching
To protect the origin servers, the midtier cache fleet pulls, caches, and delivers to downstream CDNs.
Distribution via CloudFront or multi-CDN
The Application Load Balancer endpoint allows CloudFront or any third-party CDN to source content from the edge fleet and, indirectly, the origin.
FFMPEG + NGINX + NGiNX-RTMP
These three components form the core of the stream ingest, transcode, packaging, and delivery infrastructure, as well as the VOD-processing component for creating transcoded VOD content on-demand.
Simple deployment using a CloudFormation template
All infrastructure can be easily created and modified using CloudFormation.
Prototype player page
To provide an end-to-end experience right away, we’ve included a test player page hosted as a static site on S3. This page uses A-Frame, a cross-platform, open-source framework for building VR experiences in the browser. Though A-Frame provides many features, it’s used here to render a sphere that acts as a 3D canvas for your live stream.
Spot Instance considerations
At this stage, and before we discuss processing, it is important to understand how the architecture operates with Spot Instances.
Spot Instances are spare compute capacity in the AWS Cloud available to you at steep discounts compared to On-Demand prices. Spot Instances enables you to optimize your costs on the AWS Cloud and scale your application’s throughput up to 10X for the same budget. By selecting Spot Instances, you can save up-to 90% on On-Demand prices. This allows you to greatly reduce the cost of running the solution because, outside of S3 for storage and CloudFront for delivery, this solution is almost entirely dependent on Spot Instances for infrastructure requirements.
We also know that customers running events look to deploy streaming infrastructure at the lowest price point, so it makes sense to take advantage of it wherever possible. A potential challenge when using Spot Instances for live streaming and on-demand processing is that you need to proactively deal with potential Spot Instance interruptions. How can you best deal with this?
First, the origin is deployed in a primary/backup deployment. If a Spot Instance interruption happens on the primary origin, you can fail over to the backup with a brief interruption. Should a potential interruption not be acceptable, then either Reserved Instances or On-Demand options (or a combination) can be used at this tier.
Second, the edge cache fleet runs a job (started automatically at system boot) that periodically queries the local instance metadata to detect if an interruption is scheduled to occur. Spot Instance Interruption Notices provide a two-minute warning of a pending interruption. If you poll every 5 seconds, you have almost 2 full minutes to detach from the Load Balancer and drain or stop any traffic directed to your instance.
Lastly, use an SQS queue when transcoding. If a transcode for a Spot Instance is interrupted, the stale item falls back into the SQS queue and is eventually re-surfaced into the processing pipeline. Only remove items from the queue after the transcoded files have been successfully moved to the destination S3 bucket.
Processing
As discussed in the previous sections, you pass through the video for the highest bit rate to save on having to increase the instance size to transcode the 4K or similar high bit rate or resolution content.
We’ve selected a handful of bitrates for the adaptive bit rate stack. You can customize any of these to suit the requirements for your event. The default ABR stack includes:
2160p (4K)
1080p
540p
480p
These can be modified by editing the /etc/nginx/rtmp.d/rtmp.conf NGINX configuration file on the origin or the CloudFormation template.
It’s important to understand where and how streams are transcoded. When the source high bit rate stream enters the primary or backup origin at the /live RTMP application entry point, it is recorded on stop and start of publishing. On completion, it is moved to S3 by a cleanup script, and a message is placed in your SQS queue for workers to use. These workers transcode the media and push it to a playout location bucket.
This solution uses Spot Fleet with automatic scaling to drive the fleet size. You can customize it based on CloudWatch metrics, such as simple utilization metrics to drive the size of the fleet. Why use Spot Instances for the transcode option instead of Amazon Elastic Transcoder? This allows you to implement reprojection of the input stream via FFMPEG filters in the future.
The origins handle all the heavy live streaming work. Edges only store and forward the segments and manifests, and provide scaling plus reduction of burden on the origin. This lets you customize the origin to the right compute capacity without having to rely on a ‘high watermark’ for compute sizing, thus saving additional costs.
Loopback is an important concept for the live origins. The incoming stream entering /live is transcoded by FFMPEG to multiple bit rates, which are streamed back to the same host via RTMP, on a secondary publishing point /show. The secondary publishing point is transparent to the user and encoder, but handles HLS segment generation and cleanup, and keeps a sliding window of live segments and constantly updating manifests.
Configuration
Our solution provides two key points of configuration that can be used to customize the solution to accommodate ingest, recording, transcoding, and delivery, all controlled via origin and edge configuration files, which are described later. In addition, a number of job scripts run on the instances to provide hooks into Spot Instance interruption events and the VOD SQS-based processing queue.
Origin instances
The rtmp.conf excerpt below also shows additional parameters that can be customized, such as maximum recording file size in Kbytes, HLS Fragment length, and Playlist sizes. We’ve created these in accordance with general industry best practices to ensure the reliable streaming and delivery of your content.
rtmp {
server {
listen 1935;
chunk_size 4000;
application live {
live on;
record all;
record_path /var/lib/nginx/rec;
record_max_size 128000K;
exec_record_done /usr/local/bin/record-postprocess.sh $path $basename;
exec /usr/local/bin/ffmpeg <…parameters…>;
}
application show {
live on;
hls on;
...
hls_type live;
hls_fragment 10s;
hls_playlist_length 60s;
...
}
}
}
This exposes a few URL endpoints for debugging and general status. In production, you would most likely turn these off:
/stat provides a statistics endpoint accessible via any standard web browser.
/control enables control of RTMP streams and publishing points.
You also control the TTLs, as previously discussed. It’s important to note here that you are setting TTLs explicitly at the origin, instead of in CloudFront’s distribution configuration. While both are valid, this approach allows you to reconfigure and restart the service on the fly without having to push changes through CloudFront. This is useful for debugging any caching or playback issues.
record-postprocess.sh – Ensures that recorded files on the origin are well-formed, and transfers them to S3 for processing.
ffmpeg.sh – Transcodes content on the encoding fleet, pulling source media from your S3 ingress bucket, based on SQS queue entries, and pushing transcoded adaptive bit rate segments and manifests to your VOD playout egress bucket.
For more details, see the Delivery and Playback section later in this post.
Camera source
With the processing and origination infrastructure running, you need to configure your camera and encoder.
As discussed, we chose to use a Ricoh Theta S camera and Open Broadcaster Software (OBS) to stitch and deliver a stream into the infrastructure. Ricoh provides a free ‘blender’ driver, which allows you to transform, stitch, encode, and deliver both transformed equirectangular (used for this post) video as well as spherical (two camera) video. The Theta provides an easy way to get capturing for under $300, and OBS is a free and open-source software application for capturing and live streaming on a budget. It is quick, cheap, and enjoys wide use by the gaming community. OBS lowers the barrier to getting started with immersive streaming.
While the resolution and bit rate of the Theta may not be 4K, it still provides us with a way to test the functionality of the entire pipeline end to end, without having to invest in a more expensive camera rig. One could also use this type of model to target smaller events, which may involve mobile devices with smaller display profiles, such as phones and potentially smaller sized tablets.
Looking for a more professional solution? Nokia, GoPro, Samsung, and many others have options ranging from $500 to $50,000. This solution is based around the Theta S capabilities, but we’d encourage you to extend it to meet your specific needs.
If your device can support equirectangular RTMP, then it can deliver media through the reference architecture (dependent on instance sizing for higher bit rate sources, of course). If additional features are required such as camera stitching, mixing, or device bonding, we’d recommend exploring a commercial solution such as Teradek Sphere.
Teradek Rig (Teradek)
Ricoh Theta (CNET)
All cameras have varied PC connectivity support. We chose the Ricoh Theta S due to the real-time video connectivity that it provides through software drivers on macOS and PC. If you plan to purchase a camera to use with a PC, confirm that it supports real-time capabilities as a peripheral device.
Encoding and publishing
Now that you have a camera, encoder, and AWS stack running, you can finally publish a live stream.
To start streaming with OBS, configure the source camera and set a publishing point. Use the RTMP application name /live on port 1935 to ingest into the primary origin’s Elastic IP address provided as the CloudFormation output: primaryOriginElasticIp.
You also need to choose a stream name or stream key in OBS. You can use any stream name, but keep the naming short and lowercase, and use only alphanumeric characters. This avoids any parsing issues on client-side player frameworks. There’s no publish point protection in your deployment, so any stream key works with the default NGiNX-RTMP configuration. For more information about stream keys, publishing point security, and extending the NGiNX-RTMP module, see the NGiNX-RTMP Wiki.
You should end up with a configuration similar to the following:
OBS Stream Settings
The Output settings dialog allows us to rescale the Video canvas and encode it for delivery to our AWS infrastructure. In the dialog below, we’ve set the Theta to encode at 5 Mbps in CBR mode using a preset optimized for low CPU utilization. We chose these settings in accordance with best practices for the stream pass-through at the origin for the initial incoming bit rate. You may notice that they largely match the FFMPEG encoding settings we use on the origin – namely constant bit rate, a single audio track, and x264 encoding with the ‘veryfast’ encoding profile.
OBS Output Settings
Live to On-Demand
As you may have noticed, an on-demand component is included in the solution architecture. When talking to customers, one frequent request that we see is that they would like to record the incoming stream with as little effort as possible.
NGINX-RTMP’s recording directives provide an easy way to accomplish this. We record any newly published stream on stream start at the primary or backup origins, using the incoming source stream, which also happens to be the highest bit rate. When the encoder stops broadcasting, NGINX-RTMP executes an exec_record_done script – record-postprocess.sh (described in the Configuration section earlier), which ensures that the content is well-formed, and then moves it to an S3 ingest bucket for processing.
Transcoding of content to make it ready for VOD as adaptive bit rate is a multi-step pipeline. First, Spot Instances in the transcoding cluster periodically poll the SQS queue for new jobs. Items on the queue are pulled off on demand by processing instances, and transcoded via FFMPEG into adaptive bit rate HLS. This allows you to also extend FFMPEG using filters for cubic and other bitrate-optimizing 360-specific transforms. Finally, transcoded content is moved from the ingest bucket to an egress bucket, making them ready for playback via your CloudFront distribution.
Separate ingest and egress by bucket to provide hard security boundaries between source recordings (which are highest quality and unencrypted), and destination derivatives (which may be lower quality and potentially require encryption). Bucket separation also allows you to order and archive input and output content using different taxonomies, which is common when moving content from an asset management and archival pipeline (the ingest bucket) to a consumer-facing playback pipeline (the egress bucket, and any other attached infrastructure or services, such as CMS, Mobile applications, and so forth).
Because streams are pushed over the internet, there is always the chance that an interruption could occur in the network path, or even at the origin side of the equation (primary to backup roll-over). Both of these scenarios could result in malformed or partial recordings being created. For the best level of reliability, encoding should always be recorded locally on-site as a precaution to deal with potential stream interruptions.
Delivery and playback
With the camera turned on and OBS streaming to AWS, the final step is to play the live stream. We’ve primarily tested the prototype player on the latest Chrome and Firefox browsers on macOS, so your mileage may vary on different browsers or operating systems. For those looking to try the livestream on Google Cardboard, or similar headsets, native apps for iOS (VRPlayer) and Android exist that can play back HLS streams.
The prototype player is hosted in an S3 bucket and can be found from the CloudFormation output clientWebsiteUrl. It requires a stream URL provided as a query parameter ?url=<stream_url> to begin playback. This stream URL is determined by the RTMP stream configuration in OBS. For example, if OBS is publishing to rtmp://x.x.x.x:1935/live/foo, the resulting playback URL would be:
https://<cloudFrontDistribution>/hls/foo.m3u8
The combined player URL and playback URL results in a path like this one:
To assist in setup/debugging, we’ve provided a test source as part of the CloudFormation template. A color bar pattern with timecode and audio is being generated by FFmpeg running as an ECS task. Much like OBS, FFmpeg is streaming the test pattern to the primary origin over the RTMP protocol. The prototype player and test HLS stream can be accessed by opening the clientTestPatternUrl CloudFormation output link.
Test Stream Playback
What’s next?
In this post, we walked you through the design and implementation of a full end-to-end immersive streaming solution architecture. As you may have noticed, there are a number of areas this could expand into, and we intend to do this in follow-up posts around the topic of virtual reality media workloads in the cloud. We’ve identified a number of topics such as load testing, content protection, client-side metrics and analytics, and CI/CD infrastructure for 24/7 live streams. If you have any requests, please drop us a line.
We would like to extend extra-special thanks to Scott Malkie and Chad Neal for their help and contributions to this post and reference architecture.
New WordPress Plugin Today we are launching a WordPress plugin that uses Polly to create high-quality audio versions of your blog posts. You can access the audio from within the post or in podcast form using a feature that we call Amazon Pollycast! Both options make your content more accessible and can help you to reach a wider audience. This plugin was a joint effort between the AWS team our friends at AWS Advanced Technology Partner WP Engine.
As you will see, the plugin is easy to install and configure. You can use it with installations of WordPress that you run on your own infrastructure or on AWS. Either way, you have access to all of Polly’s voices along with a wide variety of configuration options. The generated audio (an MP3 file for each post) can be stored alongside your WordPress content, or in Amazon Simple Storage Service (S3), with optional support for content distribution via Amazon CloudFront.
Installing the Plugin I did not have an existing WordPress-powered blog, so I begin by launching a Lightsail instance using the WordPress 4.8.1 blueprint:
Credentials in hand, I log in to the WordPress Dashboard:
The plugin makes calls to AWS, and needs to have credentials in order to do so. I hop over to the IAM Console and created a new policy. The policy allows the plugin to access a carefully selected set of S3 and Polly functions (find the full policy in the README):
Then I create an IAM user (wp-polly-user). I enter the name and indicate that it will be used for Programmatic Access:
Then I attach the policy that I just created, and click on Review:
I review my settings (not shown) and then click on Create User. Then I copy the two values (Access Key ID and Secret Access Key) into a secure location. Possession of these keys allows the bearer to make calls to AWS so I take care not to leave them lying around.
Now I am ready to install the plugin! I go back to the WordPress Dashboard and click on Add New in the Plugins menu:
Then I click on Upload Plugin and locate the ZIP file that I downloaded from the WordPress Plugins site. After I find it I click on Install Now to proceed:
WordPress uploads and installs the plugin. Now I click on Activate Plugin to move ahead:
With the plugin installed, I click on Settings to set it up:
I enter my keys and click on Save Changes:
The General settings let me control the sample rate, voice, player position, the default setting for new posts, and the autoplay option. I can leave all of the settings as-is to get started:
The Cloud Storage settings let me store audio in S3 and to use CloudFront to distribute the audio:
The AmazonPollycast settings give me control over the iTunes parameters that are included in the generated RSS feed:
Finally, the Bulk Update button lets me regenerate all of the audio files after I change any of the other settings:
With the plugin installed and configured, I can create a new post. As you can see, the plugin can be enabled and customized for each post:
I can see how much it will cost to convert to audio with a click:
When I click on Publish, the plugin breaks the text into multiple blocks on sentence boundaries, calls the Polly SynthesizeSpeech API for each block, and accumulates the resulting audio in a single MP3 file. The published blog post references the file using the <audio> tag. Here’s the post:
I can’t seem to use an <audio> tag in this post, but you can download and play the MP3 file yourself if you’d like.
The Pollycast feature generates an RSS file with links to an MP3 file for each post:
Pricing The plugin will make calls to Amazon Polly each time the post is saved or updated. Pricing is based on the number of characters in the speech requests, as described on the Polly Pricing page. Also, the AWS Free Tier lets you process up to 5 million characters per month at no charge, for a period of one year that starts when you make your first call to Polly.
Going Further The plugin is available on GitHub in source code form and we are looking forward to your pull requests! Here are a couple of ideas to get you started:
Voice Per Author – Allow selection of a distinct Polly voice for each author.
Quoted Text – For blogs that make frequent use of embedded quotes, use a distinct voice for the quotes.
Translation – Use Amazon Translate to translate the texts into another language, and then use Polly to generate audio in that language.
Other Blogging Engines – Build a similar plugin for your favorite blogging engine.
SSML Support – Figure out an interesting way to use Polly’s SSML tags to add additional character to the audio.
An ETL (Extract, Transform, Load) process enables you to load data from source systems into your data warehouse. This is typically executed as a batch or near-real-time ingest process to keep the data warehouse current and provide up-to-date analytical data to end users.
Amazon Redshift is a fast, petabyte-scale data warehouse that enables you easily to make data-driven decisions. With Amazon Redshift, you can get insights into your big data in a cost-effective fashion using standard SQL. You can set up any type of data model, from star and snowflake schemas, to simple de-normalized tables for running any analytical queries.
To operate a robust ETL platform and deliver data to Amazon Redshift in a timely manner, design your ETL processes to take account of Amazon Redshift’s architecture. When migrating from a legacy data warehouse to Amazon Redshift, it is tempting to adopt a lift-and-shift approach, but this can result in performance and scale issues long term. This post guides you through the following best practices for ensuring optimal, consistent runtimes for your ETL processes:
COPY data from multiple, evenly sized files.
Use workload management to improve ETL runtimes.
Perform table maintenance regularly.
Perform multiple steps in a single transaction.
Loading data in bulk.
Use UNLOAD to extract large result sets.
Use Amazon Redshift Spectrum for ad hoc ETL processing.
Monitor daily ETL health using diagnostic queries.
1. COPY data from multiple, evenly sized files
Amazon Redshift is an MPP (massively parallel processing) database, where all the compute nodes divide and parallelize the work of ingesting data. Each node is further subdivided into slices, with each slice having one or more dedicated cores, equally dividing the processing capacity. The number of slices per node depends on the node type of the cluster. For example, each DS2.XLARGE compute node has two slices, whereas each DS2.8XLARGE compute node has 16 slices.
When you load data into Amazon Redshift, you should aim to have each slice do an equal amount of work. When you load the data from a single large file or from files split into uneven sizes, some slices do more work than others. As a result, the process runs only as fast as the slowest, or most heavily loaded, slice. In the example shown below, a single large file is loaded into a two-node cluster, resulting in only one of the nodes, “Compute-0”, performing all the data ingestion:
When splitting your data files, ensure that they are of approximately equal size – between 1 MB and 1 GB after compression. The number of files should be a multiple of the number of slices in your cluster. Also, I strongly recommend that you individually compress the load files using gzip, lzop, or bzip2 to efficiently load large datasets.
When loading multiple files into a single table, use a single COPY command for the table, rather than multiple COPY commands. Amazon Redshift automatically parallelizes the data ingestion. Using a single COPY command to bulk load data into a table ensures optimal use of cluster resources, and quickest possible throughput.
2. Use workload management to improve ETL runtimes
Use Amazon Redshift’s workload management (WLM) to define multiple queues dedicated to different workloads (for example, ETL versus reporting) and to manage the runtimes of queries. As you migrate more workloads into Amazon Redshift, your ETL runtimes can become inconsistent if WLM is not appropriately set up.
I recommend limiting the overall concurrency of WLM across all queues to around 15 or less. This WLM guide helps you organize and monitor the different queues for your Amazon Redshift cluster.
When managing different workloads on your Amazon Redshift cluster, consider the following for the queue setup:
Create a queue dedicated to your ETL processes. Configure this queue with a small number of slots (5 or fewer). Amazon Redshift is designed for analytics queries, rather than transaction processing. The cost of COMMIT is relatively high, and excessive use of COMMIT can result in queries waiting for access to the commit queue. Because ETL is a commit-intensive process, having a separate queue with a small number of slots helps mitigate this issue.
Claim extra memory available in a queue. When executing an ETL query, you can take advantage of the wlm_query_slot_count to claim the extra memory available in a particular queue. For example, a typical ETL process might involve COPYing raw data into a staging table so that downstream ETL jobs can run transformations that calculate daily, weekly, and monthly aggregates. To speed up the COPY process (so that the downstream tasks can start in parallel sooner), the wlm_query_slot_count can be increased for this step.
Create a separate queue for reporting queries. Configure query monitoring rules on this queue to further manage long-running and expensive queries.
Take advantage of the dynamic memory parameters. They swap the memory from your ETL to your reporting queue after the ETL job has completed.
3. Perform table maintenance regularly
Amazon Redshift is a columnar database, which enables fast transformations for aggregating data. Performing regular table maintenance ensures that transformation ETLs are predictable and performant. To get the best performance from your Amazon Redshift database, you must ensure that database tables regularly are VACUUMed and ANALYZEd. The Analyze & Vacuum schema utility helps you automate the table maintenance task and have VACUUM & ANALYZE executed in a regular fashion.
Use VACUUM to sort tables and remove deleted blocks
During a typical ETL refresh process, tables receive new incoming records using COPY, and unneeded data (cold data) is removed using DELETE. New rows are added to the unsorted region in a table. Deleted rows are simply marked for deletion.
DELETE does not automatically reclaim the space occupied by the deleted rows. Adding and removing large numbers of rows can therefore cause the unsorted region and the number of deleted blocks to grow. This can degrade the performance of queries executed against these tables.
After an ETL process completes, perform VACUUM to ensure that user queries execute in a consistent manner. The complete list of tables that need VACUUMing can be found using the Amazon Redshift Util’s table_info script.
Use the following approaches to ensure that VACCUM is completed in a timely manner:
Use wlm_query_slot_count to claim all the memory allocated in the ETL WLM queue during the VACUUM process.
DROP or TRUNCATE intermediate or staging tables, thereby eliminating the need to VACUUM them.
If your table has a compound sort key with only one sort column, try to load your data in sort key order. This helps reduce or eliminate the need to VACUUM the table.
Consider using time series This helps reduce the amount of data you need to VACUUM.
Use ANALYZE to update database statistics
Amazon Redshift uses a cost-based query planner and optimizer using statistics about tables to make good decisions about the query plan for the SQL statements. Regular statistics collection after the ETL completion ensures that user queries run fast, and that daily ETL processes are performant. The Amazon Redshift utility table_info script provides insights into the freshness of the statistics. Keeping the statistics off (pct_stats_off) less than 20% ensures effective query plans for the SQL queries.
4. Perform multiple steps in a single transaction
ETL transformation logic often spans multiple steps. Because commits in Amazon Redshift are expensive, if each ETL step performs a commit, multiple concurrent ETL processes can take a long time to execute.
To minimize the number of commits in a process, the steps in an ETL script should be surrounded by a BEGIN…END statement so that a single commit is performed only after all the transformation logic has been executed. For example, here is an example multi-step ETL script that performs one commit at the end:
Begin
CREATE temporary staging_table;
INSERT INTO staging_table SELECT .. FROM source (transformation logic);
DELETE FROM daily_table WHERE dataset_date =?;
INSERT INTO daily_table SELECT .. FROM staging_table (daily aggregate);
DELETE FROM weekly_table WHERE weekending_date=?;
INSERT INTO weekly_table SELECT .. FROM staging_table(weekly aggregate);
Commit
5. Loading data in bulk
Amazon Redshift is designed to store and query petabyte-scale datasets. Using Amazon S3 you can stage and accumulate data from multiple source systems before executing a bulk COPY operation. The following methods allow efficient and fast transfer of these bulk datasets into Amazon Redshift:
Use a manifest file to ingest large datasets that span multiple files. The manifest file is a JSON file that lists all the files to be loaded into Amazon Redshift. Using a manifest file ensures that Amazon Redshift has a consistent view of the data to be loaded from S3, while also ensuring that duplicate files do not result in the same data being loaded more than one time.
Use temporary staging tables to hold the data for transformation. These tables are automatically dropped after the ETL session is complete. Temporary tables can be created using the CREATE TEMPORARY TABLE syntax, or by issuing a SELECT … INTO #TEMP_TABLE query. Explicitly specifying the CREATE TEMPORARY TABLE statement allows you to control the DISTRIBUTION KEY, SORT KEY, and compression settings to further improve performance.
User ALTER table APPEND to swap data from the staging tables to the target table. Data in the source table is moved to matching columns in the target table. Column order doesn’t matter. After data is successfully appended to the target table, the source table is empty. ALTER TABLE APPEND is much faster than a similar CREATE TABLE AS or INSERT INTO operation because it doesn’t involve copying or moving data.
6. Use UNLOAD to extract large result sets
Fetching a large number of rows using SELECT is expensive and takes a long time. When a large amount of data is fetched from the Amazon Redshift cluster, the leader node has to hold the data temporarily until the fetches are complete. Further, data is streamed out sequentially, which results in longer elapsed time. As a result, the leader node can become hot, which not only affects the SELECT that is being executed, but also throttles resources for creating execution plans and managing the overall cluster resources. Here is an example of a large SELECT statement. Notice that the leader node is doing most of the work to stream out the rows:
Use UNLOAD to extract large results sets directly to S3. After it’s in S3, the data can be shared with multiple downstream systems. By default, UNLOAD writes data in parallel to multiple files according to the number of slices in the cluster. All the compute nodes participate to quickly offload the data into S3.
If you are extracting data for use with Amazon Redshift Spectrum, you should make use of the MAXFILESIZE parameter to and keep files are 150 MB. Similar to item 1 above, having many evenly sized files ensures that Redshift Spectrum can do the maximum amount of work in parallel.
7. Use Redshift Spectrum for ad hoc ETL processing
Events such as data backfill, promotional activity, and special calendar days can trigger additional data volumes that affect the data refresh times in your Amazon Redshift cluster. To help address these spikes in data volumes and throughput, I recommend staging data in S3. After data is organized in S3, Redshift Spectrum enables you to query it directly using standard SQL. In this way, you gain the benefits of additional capacity without having to resize your cluster.
8. Monitor daily ETL health using diagnostic queries
Monitoring the health of your ETL processes on a regular basis helps identify the early onset of performance issues before they have a significant impact on your cluster. The following monitoring scripts can be used to provide insights into the health of your ETL processes:
• Follow the best practices for the COPY command. • Analyze data growth with the incoming datasets and consider cluster resize to meet the expected SLA.
• Set up regular VACCUM jobs to address unsorted rows and claim the deleted blocks so that transformation SQL execute optimally. • Consider a table redesign to avoid data skewness.
INSERT/UPDATE/COPY/DELETE operations on particular tables do not respond back in timely manner, compared to when run after the ETL
Multiple DML statements are operating on the same target table at the same moment from different transactions. Set up ETL job dependency so that they execute serially for the same target table.
Amazon Redshift data warehouse space growth is trending upwards more than normal
Analyze the individual tables that are growing at higher rate than normal. Consider data archival using UNLOAD to S3 and Redshift Spectrum for later analysis.
Analyze the top transformation SQL and use EXPLAIN to find opportunities for tuning the query plan.
There are several other useful scripts available in the amazon-redshift-utils repository. The AWS Lambda Utility Runner runs a subset of these scripts on a scheduled basis, allowing you to automate much of monitoring of your ETL processes.
Example ETL process
The following ETL process reinforces some of the best practices discussed in this post. Consider the following four-step daily ETL workflow where data from an RDBMS source system is staged in S3 and then loaded into Amazon Redshift. Amazon Redshift is used to calculate daily, weekly, and monthly aggregations, which are then unloaded to S3, where they can be further processed and made available for end-user reporting using a number of different tools, including Redshift Spectrum and Amazon Athena.
Step 1: Extract from the RDBMS source to a S3 bucket
In this ETL process, the data extract job fetches change data every 1 hour and it is staged into multiple hourly files. For example, the staged S3 folder looks like the following:
Organizing the data into multiple, evenly sized files enables the COPY command to ingest this data using all available resources in the Amazon Redshift cluster. Further, the files are compressed (gzipped) to further reduce COPY times.
Step 2: Stage data to the Amazon Redshift table for cleansing
Ingesting the data can be accomplished using a JSON-based manifest file. Using the manifest file ensures that S3 eventual consistency issues can be eliminated and also provides an opportunity to dedupe any files if needed. A sample manifest20170702.json file looks like the following:
The data can be ingested using the following command:
SET wlm_query_slot_count TO <<max available concurrency in the ETL queue>>;
COPY stage_tbl FROM 's3:// <<S3 Bucket>>/batch/manifest20170702.json' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole' manifest;
Because the downstream ETL processes depend on this COPY command to complete, the wlm_query_slot_count is used to claim all the memory available to the queue. This helps the COPY command complete as quickly as possible.
Step 3: Transform data to create daily, weekly, and monthly datasets and load into target tables
Data is staged in the “stage_tbl” from where it can be transformed into the daily, weekly, and monthly aggregates and loaded into target tables. The following job illustrates a typical weekly process:
Begin
INSERT into ETL_LOG (..) values (..);
DELETE from weekly_tbl where dataset_week = <<current week>>;
INSERT into weekly_tbl (..)
SELECT date_trunc('week', dataset_day) AS week_begin_dataset_date, SUM(C1) AS C1, SUM(C2) AS C2
FROM stage_tbl
GROUP BY date_trunc('week', dataset_day);
INSERT into AUDIT_LOG values (..);
COMMIT;
End;
As shown above, multiple steps are combined into one transaction to perform a single commit, reducing contention on the commit queue.
Step 4: Unload the daily dataset to populate the S3 data lake bucket
The transformed results are now unloaded into another S3 bucket, where they can be further processed and made available for end-user reporting using a number of different tools, including Redshift Spectrum and Amazon Athena.
unload ('SELECT * FROM weekly_tbl WHERE dataset_week = <<current week>>’) TO 's3:// <<S3 Bucket>>/datalake/weekly/20170526/' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole';
Summary
Amazon Redshift lets you easily operate petabyte-scale data warehouses on the cloud. This post summarized the best practices for operating scalable ETL natively within Amazon Redshift. I demonstrated efficient ways to ingest and transform data, along with close monitoring. I also demonstrated the best practices being used in a typical sample ETL workload to transform the data into Amazon Redshift.
If you have questions or suggestions, please comment below.
About the Author
Thiyagarajan Arumugam is a Big Data Solutions Architect at Amazon Web Services and designs customer architectures to process data at scale. Prior to AWS, he built data warehouse solutions at Amazon.com. In his free time, he enjoys all outdoor sports and practices the Indian classical drum mridangam.
Researchers at Ben Gurion University in Beer Sheva, Israel have built a proof-of-concept system for counter-surveillance against spy drones that demonstrates a clever, if not exactly simple, way to determine whether a certain person or object is under aerial surveillance. They first generate a recognizable pattern on whatever subject — a window, say — someone might want to guard from potential surveillance. Then they remotely intercept a drone’s radio signals to look for that pattern in the streaming video the drone sends back to its operator. If they spot it, they can determine that the drone is looking at their subject.
In other words, they can see what the drone sees, pulling out their recognizable pattern from the radio signal, even without breaking the drone’s encrypted video.
The details have to do with the way drone video is compressed:
The researchers’ technique takes advantage of an efficiency feature streaming video has used for years, known as “delta frames.” Instead of encoding video as a series of raw images, it’s compressed into a series of changes from the previous image in the video. That means when a streaming video shows a still object, it transmits fewer bytes of data than when it shows one that moves or changes color.
That compression feature can reveal key information about the content of the video to someone who’s intercepting the streaming data, security researchers have shown in recent research, even when the data is encrypted.
With Amazon Redshift, you can build petabyte-scale data warehouses that unify data from a variety of internal and external sources. Because Amazon Redshift is optimized for complex queries (often involving multiple joins) across large tables, it can handle large volumes of retail, inventory, and financial data without breaking a sweat.
In this post, we describe how to combine data in Aurora in Amazon Redshift. Here’s an overview of the solution:
Use AWS Lambda functions with Amazon Aurora to capture data changes in a table.
Serverless architecture for capturing and analyzing Aurora data changes
Consider a scenario in which an e-commerce web application uses Amazon Aurora for a transactional database layer. The company has a sales table that captures every single sale, along with a few corresponding data items. This information is stored as immutable data in a table. Business users want to monitor the sales data and then analyze and visualize it.
In this example, you take the changes in data in an Aurora database table and save it in Amazon S3. After the data is captured in Amazon S3, you combine it with data in your existing Amazon Redshift cluster for analysis.
By the end of this post, you will understand how to capture data events in an Aurora table and push them out to other AWS services using AWS Lambda.
The following diagram shows the flow of data as it occurs in this tutorial:
The starting point in this architecture is a database insert operation in Amazon Aurora. When the insert statement is executed, a custom trigger calls a Lambda function and forwards the inserted data. Lambda writes the data that it received from Amazon Aurora to a Kinesis data delivery stream. Kinesis Data Firehose writes the data to an Amazon S3 bucket. Once the data is in an Amazon S3 bucket, it is queried in place using Amazon Redshift Spectrum.
Creating an Aurora database
First, create a database by following these steps in the Amazon RDS console:
Sign in to the AWS Management Console, and open the Amazon RDS console.
Choose Launch a DB instance, and choose Next.
For Engine, choose Amazon Aurora.
Choose a DB instance class. This example uses a small, since this is not a production database.
In Multi-AZ deployment, choose No.
Configure DB instance identifier, Master username, and Master password.
Launch the DB instance.
After you create the database, use MySQL Workbench to connect to the database using the CNAME from the console. For information about connecting to an Aurora database, see Connecting to an Amazon Aurora DB Cluster.
The following screenshot shows the MySQL Workbench configuration:
Next, create a table in the database by running the following SQL statement:
Create Table
CREATE TABLE Sales (
InvoiceID int NOT NULL AUTO_INCREMENT,
ItemID int NOT NULL,
Category varchar(255),
Price double(10,2),
Quantity int not NULL,
OrderDate timestamp,
DestinationState varchar(2),
ShippingType varchar(255),
Referral varchar(255),
PRIMARY KEY (InvoiceID)
)
You can now populate the table with some sample data. To generate sample data in your table, copy and run the following script. Ensure that the highlighted (bold) variables are replaced with appropriate values.
The following screenshot shows how the table appears with the sample data:
Sending data from Amazon Aurora to Amazon S3
There are two methods available to send data from Amazon Aurora to Amazon S3:
Using a Lambda function
Using SELECT INTO OUTFILE S3
To demonstrate the ease of setting up integration between multiple AWS services, we use a Lambda function to send data to Amazon S3 using Amazon Kinesis Data Firehose.
Alternatively, you can use a SELECT INTO OUTFILE S3 statement to query data from an Amazon Aurora DB cluster and save it directly in text files that are stored in an Amazon S3 bucket. However, with this method, there is a delay between the time that the database transaction occurs and the time that the data is exported to Amazon S3 because the default file size threshold is 6 GB.
Creating a Kinesis data delivery stream
The next step is to create a Kinesis data delivery stream, since it’s a dependency of the Lambda function.
To create a delivery stream:
Open the Kinesis Data Firehose console
Choose Create delivery stream.
For Delivery stream name, type AuroraChangesToS3.
For Source, choose Direct PUT.
For Record transformation, choose Disabled.
For Destination, choose Amazon S3.
In the S3 bucket drop-down list, choose an existing bucket, or create a new one.
Enter a prefix if needed, and choose Next.
For Data compression, choose GZIP.
In IAM role, choose either an existing role that has access to write to Amazon S3, or choose to generate one automatically. Choose Next.
Review all the details on the screen, and choose Create delivery stream when you’re finished.
Creating a Lambda function
Now you can create a Lambda function that is called every time there is a change that needs to be tracked in the database table. This Lambda function passes the data to the Kinesis data delivery stream that you created earlier.
To create the Lambda function:
Open the AWS Lambda console.
Ensure that you are in the AWS Region where your Amazon Aurora database is located.
If you have no Lambda functions yet, choose Get started now. Otherwise, choose Create function.
Choose Author from scratch.
Give your function a name and select Python 3.6 for Runtime
Choose and existing or create a new Role, the role would need to have access to call firehose:PutRecord
Choose Next on the trigger selection screen.
Paste the following code in the code window. Change the stream_name variable to the Kinesis data delivery stream that you created in the previous step.
Choose File -> Save in the code editor and then choose Save.
Once you are finished, the Amazon Aurora database has access to invoke a Lambda function.
Creating a stored procedure and a trigger in Amazon Aurora
Now, go back to MySQL Workbench, and run the following command to create a new stored procedure. When this stored procedure is called, it invokes the Lambda function you created. Change the ARN in the following code to your Lambda function’s ARN.
DROP PROCEDURE IF EXISTS CDC_TO_FIREHOSE;
DELIMITER ;;
CREATE PROCEDURE CDC_TO_FIREHOSE (IN ItemID VARCHAR(255),
IN Category varchar(255),
IN Price double(10,2),
IN Quantity int(11),
IN OrderDate timestamp,
IN DestinationState varchar(2),
IN ShippingType varchar(255),
IN Referral varchar(255)) LANGUAGE SQL
BEGIN
CALL mysql.lambda_async('arn:aws:lambda:us-east-1:XXXXXXXXXXXXX:function:CDCFromAuroraToKinesis',
CONCAT('{ "ItemID" : "', ItemID,
'", "Category" : "', Category,
'", "Price" : "', Price,
'", "Quantity" : "', Quantity,
'", "OrderDate" : "', OrderDate,
'", "DestinationState" : "', DestinationState,
'", "ShippingType" : "', ShippingType,
'", "Referral" : "', Referral, '"}')
);
END
;;
DELIMITER ;
Create a trigger TR_Sales_CDC on the Sales table. When a new record is inserted, this trigger calls the CDC_TO_FIREHOSE stored procedure.
DROP TRIGGER IF EXISTS TR_Sales_CDC;
DELIMITER ;;
CREATE TRIGGER TR_Sales_CDC
AFTER INSERT ON Sales
FOR EACH ROW
BEGIN
SELECT NEW.ItemID , NEW.Category, New.Price, New.Quantity, New.OrderDate
, New.DestinationState, New.ShippingType, New.Referral
INTO @ItemID , @Category, @Price, @Quantity, @OrderDate
, @DestinationState, @ShippingType, @Referral;
CALL CDC_TO_FIREHOSE(@ItemID , @Category, @Price, @Quantity, @OrderDate
, @DestinationState, @ShippingType, @Referral);
END
;;
DELIMITER ;
If a new row is inserted in the Sales table, the Lambda function that is mentioned in the stored procedure is invoked.
Verify that data is being sent from the Lambda function to Kinesis Data Firehose to Amazon S3 successfully. You might have to insert a few records, depending on the size of your data, before new records appear in Amazon S3. This is due to Kinesis Data Firehose buffering. To learn more about Kinesis Data Firehose buffering, see the “Amazon S3” section in Amazon Kinesis Data Firehose Data Delivery.
Every time a new record is inserted in the sales table, a stored procedure is called, and it updates data in Amazon S3.
Querying data in Amazon Redshift
In this section, you use the data you produced from Amazon Aurora and consume it as-is in Amazon Redshift. In order to allow you to process your data as-is, where it is, while taking advantage of the power and flexibility of Amazon Redshift, you use Amazon Redshift Spectrum. You can use Redshift Spectrum to run complex queries on data stored in Amazon S3, with no need for loading or other data prep.
Just create a data source and issue your queries to your Amazon Redshift cluster as usual. Behind the scenes, Redshift Spectrum scales to thousands of instances on a per-query basis, ensuring that you get fast, consistent performance even as your dataset grows to beyond an exabyte! Being able to query data that is stored in Amazon S3 means that you can scale your compute and your storage independently. You have the full power of the Amazon Redshift query model and all the reporting and business intelligence tools at your disposal. Your queries can reference any combination of data stored in Amazon Redshift tables and in Amazon S3.
Redshift Spectrum supports open, common data types, including CSV/TSV, Apache Parquet, SequenceFile, and RCFile. Files can be compressed using gzip or Snappy, with other data types and compression methods in the works.
Next, create an IAM role that has access to Amazon S3 and Athena. By default, Amazon Redshift Spectrum uses the Amazon Athena data catalog. Your cluster needs authorization to access your external data catalog in AWS Glue or Athena and your data files in Amazon S3.
In the demo setup, I attached AmazonS3FullAccess and AmazonAthenaFullAccess. In a production environment, the IAM roles should follow the standard security of granting least privilege. For more information, see IAM Policies for Amazon Redshift Spectrum.
create external schema if not exists spectrum_schema
from data catalog
database 'spectrum_db'
region 'us-east-1'
IAM_ROLE 'arn:aws:iam::XXXXXXXXXXXX:role/RedshiftSpectrumRole'
create external database if not exists;
Don’t forget to replace the IAM role in the statement.
Then create an external table within the database:
CREATE EXTERNAL TABLE IF NOT EXISTS spectrum_schema.ecommerce_sales(
ItemID int,
Category varchar,
Price DOUBLE PRECISION,
Quantity int,
OrderDate TIMESTAMP,
DestinationState varchar,
ShippingType varchar,
Referral varchar)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
LOCATION 's3://{BUCKET_NAME}/CDC/'
Query the table, and it should contain data. This is a fact table.
select top 10 * from spectrum_schema.ecommerce_sales
Next, create a dimension table. For this example, we create a date/time dimension table. Create the table:
CREATE TABLE date_dimension (
d_datekey integer not null sortkey,
d_dayofmonth integer not null,
d_monthnum integer not null,
d_dayofweek varchar(10) not null,
d_prettydate date not null,
d_quarter integer not null,
d_half integer not null,
d_year integer not null,
d_season varchar(10) not null,
d_fiscalyear integer not null)
diststyle all;
Populate the table with data:
copy date_dimension from 's3://reparmar-lab/2016dates'
iam_role 'arn:aws:iam::XXXXXXXXXXXX:role/redshiftspectrum'
DELIMITER ','
dateformat 'auto';
The date dimension table should look like the following:
Querying data in local and external tables using Amazon Redshift
Now that you have the fact and dimension table populated with data, you can combine the two and run analysis. For example, if you want to query the total sales amount by weekday, you can run the following:
select sum(quantity*price) as total_sales, date_dimension.d_season
from spectrum_schema.ecommerce_sales
join date_dimension on spectrum_schema.ecommerce_sales.orderdate = date_dimension.d_prettydate
group by date_dimension.d_season
You get the following results:
Similarly, you can replace d_season with d_dayofweek to get sales figures by weekday:
With Amazon Redshift Spectrum, you pay only for the queries you run against the data that you actually scan. We encourage you to use file partitioning, columnar data formats, and data compression to significantly minimize the amount of data scanned in Amazon S3. This is important for data warehousing because it dramatically improves query performance and reduces cost.
Partitioning your data in Amazon S3 by date, time, or any other custom keys enables Amazon Redshift Spectrum to dynamically prune nonrelevant partitions to minimize the amount of data processed. If you store data in a columnar format, such as Parquet, Amazon Redshift Spectrum scans only the columns needed by your query, rather than processing entire rows. Similarly, if you compress your data using one of the supported compression algorithms in Amazon Redshift Spectrum, less data is scanned.
Analyzing and visualizing Amazon Redshift data in Amazon QuickSight
After modifying the Amazon Redshift security group, go to Amazon QuickSight. Create a new analysis, and choose Amazon Redshift as the data source.
Enter the database connection details, validate the connection, and create the data source.
Choose the schema to be analyzed. In this case, choose spectrum_schema, and then choose the ecommerce_sales table.
Next, we add a custom field for Total Sales = Price*Quantity. In the drop-down list for the ecommerce_sales table, choose Edit analysis data sets.
On the next screen, choose Edit.
In the data prep screen, choose New Field. Add a new calculated field Total Sales $, which is the product of the Price*Quantity fields. Then choose Create. Save and visualize it.
Next, to visualize total sales figures by month, create a graph with Total Sales on the x-axis and Order Data formatted as month on the y-axis.
After you’ve finished, you can use Amazon QuickSight to add different columns from your Amazon Redshift tables and perform different types of visualizations. You can build operational dashboards that continuously monitor your transactional and analytical data. You can publish these dashboards and share them with others.
Final notes
Amazon QuickSight can also read data in Amazon S3 directly. However, with the method demonstrated in this post, you have the option to manipulate, filter, and combine data from multiple sources or Amazon Redshift tables before visualizing it in Amazon QuickSight.
In this example, we dealt with data being inserted, but triggers can be activated in response to an INSERT, UPDATE, or DELETE trigger.
Keep the following in mind:
Be careful when invoking a Lambda function from triggers on tables that experience high write traffic. This would result in a large number of calls to your Lambda function. Although calls to the lambda_async procedure are asynchronous, triggers are synchronous.
A statement that results in a large number of trigger activations does not wait for the call to the AWS Lambda function to complete. But it does wait for the triggers to complete before returning control to the client.
Similarly, you must account for Amazon Kinesis Data Firehose limits. By default, Kinesis Data Firehose is limited to a maximum of 5,000 records/second. For more information, see Monitoring Amazon Kinesis Data Firehose.
In certain cases, it may be optimal to use AWS Database Migration Service (AWS DMS) to capture data changes in Aurora and use Amazon S3 as a target. For example, AWS DMS might be a good option if you don’t need to transform data from Amazon Aurora. The method used in this post gives you the flexibility to transform data from Aurora using Lambda before sending it to Amazon S3. Additionally, the architecture has the benefits of being serverless, whereas AWS DMS requires an Amazon EC2 instance for replication.
Re Alvarez-Parmar is a solutions architect for Amazon Web Services. He helps enterprises achieve success through technical guidance and thought leadership. In his spare time, he enjoys spending time with his two kids and exploring outdoors.
In late September, during the annual Splunk .conf, Splunk and Amazon Web Services (AWS) jointly announced that Amazon Kinesis Data Firehose now supports Splunk Enterprise and Splunk Cloud as a delivery destination. This native integration between Splunk Enterprise, Splunk Cloud, and Amazon Kinesis Data Firehose is designed to make AWS data ingestion setup seamless, while offering a secure and fault-tolerant delivery mechanism. We want to enable customers to monitor and analyze machine data from any source and use it to deliver operational intelligence and optimize IT, security, and business performance.
With Kinesis Data Firehose, customers can use a fully managed, reliable, and scalable data streaming solution to Splunk. In this post, we tell you a bit more about the Kinesis Data Firehose and Splunk integration. We also show you how to ingest large amounts of data into Splunk using Kinesis Data Firehose.
Push vs. Pull data ingestion
Presently, customers use a combination of two ingestion patterns, primarily based on data source and volume, in addition to existing company infrastructure and expertise:
Push-based approach: Streaming data directly from AWS to Splunk HTTP Event Collector (HEC) by using AWS Lambda. Examples of applicable data sources include CloudWatch Logs and Amazon Kinesis Data Streams.
The pull-based approach offers data delivery guarantees such as retries and checkpointing out of the box. However, it requires more ops to manage and orchestrate the dedicated pollers, which are commonly running on Amazon EC2 instances. With this setup, you pay for the infrastructure even when it’s idle.
On the other hand, the push-based approach offers a low-latency scalable data pipeline made up of serverless resources like AWS Lambda sending directly to Splunk indexers (by using Splunk HEC). This approach translates into lower operational complexity and cost. However, if you need guaranteed data delivery then you have to design your solution to handle issues such as a Splunk connection failure or Lambda execution failure. To do so, you might use, for example, AWS Lambda Dead Letter Queues.
How about getting the best of both worlds?
Let’s go over the new integration’s end-to-end solution and examine how Kinesis Data Firehose and Splunk together expand the push-based approach into a native AWS solution for applicable data sources.
By using a managed service like Kinesis Data Firehose for data ingestion into Splunk, we provide out-of-the-box reliability and scalability. One of the pain points of the old approach was the overhead of managing the data collection nodes (Splunk heavy forwarders). With the new Kinesis Data Firehose to Splunk integration, there are no forwarders to manage or set up. Data producers (1) are configured through the AWS Management Console to drop data into Kinesis Data Firehose.
You can also create your own data producers. For example, you can drop data into a Firehose delivery stream by using Amazon Kinesis Agent, or by using the Firehose API (PutRecord(), PutRecordBatch()), or by writing to a Kinesis Data Stream configured to be the data source of a Firehose delivery stream. For more details, refer to Sending Data to an Amazon Kinesis Data Firehose Delivery Stream.
You might need to transform the data before it goes into Splunk for analysis. For example, you might want to enrich it or filter or anonymize sensitive data. You can do so using AWS Lambda. In this scenario, Kinesis Data Firehose buffers data from the incoming source data, sends it to the specified Lambda function (2), and then rebuffers the transformed data to the Splunk Cluster. Kinesis Data Firehose provides the Lambda blueprints that you can use to create a Lambda function for data transformation.
Systems fail all the time. Let’s see how this integration handles outside failures to guarantee data durability. In cases when Kinesis Data Firehose can’t deliver data to the Splunk Cluster, data is automatically backed up to an S3 bucket. You can configure this feature while creating the Firehose delivery stream (3). You can choose to back up all data or only the data that’s failed during delivery to Splunk.
In addition to using S3 for data backup, this Firehose integration with Splunk supports Splunk Indexer Acknowledgments to guarantee event delivery. This feature is configured on Splunk’s HTTP Event Collector (HEC) (4). It ensures that HEC returns an acknowledgment to Kinesis Data Firehose only after data has been indexed and is available in the Splunk cluster (5).
Now let’s look at a hands-on exercise that shows how to forward VPC flow logs to Splunk.
How-to guide
To process VPC flow logs, we implement the following architecture.
Amazon Virtual Private Cloud (Amazon VPC) delivers flow log files into an Amazon CloudWatch Logs group. Using a CloudWatch Logs subscription filter, we set up real-time delivery of CloudWatch Logs to an Kinesis Data Firehose stream.
Data coming from CloudWatch Logs is compressed with gzip compression. To work with this compression, we need to configure a Lambda-based data transformation in Kinesis Data Firehose to decompress the data and deposit it back into the stream. Firehose then delivers the raw logs to the Splunk Http Event Collector (HEC).
If delivery to the Splunk HEC fails, Firehose deposits the logs into an Amazon S3 bucket. You can then ingest the events from S3 using an alternate mechanism such as a Lambda function.
When data reaches Splunk (Enterprise or Cloud), Splunk parsing configurations (packaged in the Splunk Add-on for Kinesis Data Firehose) extract and parse all fields. They make data ready for querying and visualization using Splunk Enterprise and Splunk Cloud.
Walkthrough
Install the Splunk Add-on for Amazon Kinesis Data Firehose
The Splunk Add-on for Amazon Kinesis Data Firehose enables Splunk (be it Splunk Enterprise, Splunk App for AWS, or Splunk Enterprise Security) to use data ingested from Amazon Kinesis Data Firehose. Install the Add-on on all the indexers with an HTTP Event Collector (HEC). The Add-on is available for download from Splunkbase.
HTTP Event Collector (HEC)
Before you can use Kinesis Data Firehose to deliver data to Splunk, set up the Splunk HEC to receive the data. From Splunk web, go to the Setting menu, choose Data Inputs, and choose HTTP Event Collector. Choose Global Settings, ensure All tokens is enabled, and then choose Save. Then choose New Token to create a new HEC endpoint and token. When you create a new token, make sure that Enable indexer acknowledgment is checked.
When prompted to select a source type, select aws:cloudwatch:vpcflow.
Create an S3 backsplash bucket
To provide for situations in which Kinesis Data Firehose can’t deliver data to the Splunk Cluster, we use an S3 bucket to back up the data. You can configure this feature to back up all data or only the data that’s failed during delivery to Splunk.
Note: Bucket names are unique. Thus, you can’t use tmak-backsplash-bucket.
Create an IAM role for the Lambda transform function
Firehose triggers an AWS Lambda function that transforms the data in the delivery stream. Let’s first create a role for the Lambda function called LambdaBasicRole.
Note: You can also set this role up when creating your Lambda function.
$ aws iam create-role --role-name LambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json
After the role is created, attach the managed Lambda basic execution policy to it.
$ aws iam attach-role-policy
--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
--role-name LambdaBasicRole
Create a Firehose Stream
On the AWS console, open the Amazon Kinesis service, go to the Firehose console, and choose Create Delivery Stream.
In the next section, you can specify whether you want to use an inline Lambda function for transformation. Because incoming CloudWatch Logs are gzip compressed, choose Enabled for Record transformation, and then choose Create new.
From the list of the available blueprint functions, choose Kinesis Data Firehose CloudWatch Logs Processor. This function unzips data and place it back into the Firehose stream in compliance with the record transformation output model.
Enter a name for the Lambda function, choose Choose an existing role, and then choose the role you created earlier. Then choose Create Function.
Go back to the Firehose Stream wizard, choose the Lambda function you just created, and then choose Next.
Select Splunk as the destination, and enter your Splunk Http Event Collector information.
Note: Amazon Kinesis Data Firehose requires the Splunk HTTP Event Collector (HEC) endpoint to be terminated with a valid CA-signed certificate matching the DNS hostname used to connect to your HEC endpoint. You receive delivery errors if you are using a self-signed certificate.
In this example, we only back up logs that fail during delivery.
To monitor your Firehose delivery stream, enable error logging. Doing this means that you can monitor record delivery errors.
Create an IAM role for the Firehose stream by choosing Create new, or Choose. Doing this brings you to a new screen. Choose Create a new IAM role, give the role a name, and then choose Allow.
If you look at the policy document, you can see that the role gives Kinesis Data Firehose permission to publish error logs to CloudWatch, execute your Lambda function, and put records into your S3 backup bucket.
You now get a chance to review and adjust the Firehose stream settings. When you are satisfied, choose Create Stream. You get a confirmation once the stream is created and active.
Create a VPC Flow Log
To send events from Amazon VPC, you need to set up a VPC flow log. If you already have a VPC flow log you want to use, you can skip to the “Publish CloudWatch to Kinesis Data Firehose” section.
On the AWS console, open the Amazon VPC service. Then choose VPC, Your VPC, and choose the VPC you want to send flow logs from. Choose Flow Logs, and then choose Create Flow Log. If you don’t have an IAM role that allows your VPC to publish logs to CloudWatch, choose Set Up Permissions and Create new role. Use the defaults when presented with the screen to create the new IAM role.
Once active, your VPC flow log should look like the following.
Publish CloudWatch to Kinesis Data Firehose
When you generate traffic to or from your VPC, the log group is created in Amazon CloudWatch. The new log group has no subscription filter, so set up a subscription filter. Setting this up establishes a real-time data feed from the log group to your Firehose delivery stream.
At present, you have to use the AWS Command Line Interface (AWS CLI) to create a CloudWatch Logs subscription to a Kinesis Data Firehose stream. However, you can use the AWS console to create subscriptions to Lambda and Amazon Elasticsearch Service.
To allow CloudWatch to publish to your Firehose stream, you need to give it permissions.
$ aws iam create-role --role-name CWLtoKinesisFirehoseRole --assume-role-policy-document file://TrustPolicyForCWLToFireHose.json
Here is the content for TrustPolicyForCWLToFireHose.json.
When you run the AWS CLI command preceding, you don’t get any acknowledgment. To validate that your CloudWatch Log Group is subscribed to your Firehose stream, check the CloudWatch console.
As soon as the subscription filter is created, the real-time log data from the log group goes into your Firehose delivery stream. Your stream then delivers it to your Splunk Enterprise or Splunk Cloud environment for querying and visualization. The screenshot following is from Splunk Enterprise.
In addition, you can monitor and view metrics associated with your delivery stream using the AWS console.
Conclusion
Although our walkthrough uses VPC Flow Logs, the pattern can be used in many other scenarios. These include ingesting data from AWS IoT, other CloudWatch logs and events, Kinesis Streams or other data sources using the Kinesis Agent or Kinesis Producer Library. We also used Lambda blueprint Kinesis Data Firehose CloudWatch Logs Processor to transform streaming records from Kinesis Data Firehose. However, you might need to use a different Lambda blueprint or disable record transformation entirely depending on your use case. For an additional use case using Kinesis Data Firehose, check out This is My Architecture Video, which discusses how to securely centralize cross-account data analytics using Kinesis and Splunk.
Tarik Makota is a solutions architect with the Amazon Web Services Partner Network. He provides technical guidance, design advice and thought leadership to AWS’ most strategic software partners. His career includes work in an extremely broad software development and architecture roles across ERP, financial printing, benefit delivery and administration and financial services. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.
Roy Arsan is a solutions architect in the Splunk Partner Integrations team. He has a background in product development, cloud architecture, and building consumer and enterprise cloud applications. More recently, he has architected Splunk solutions on major cloud providers, including an AWS Quick Start for Splunk that enables AWS users to easily deploy distributed Splunk Enterprise straight from their AWS console. He’s also the co-author of the AWS Lambda blueprints for Splunk. He holds an M.S. in Computer Science Engineering from the University of Michigan.
Genomics analysis has taken off in recent years as organizations continue to adopt the cloud for its elasticity, durability, and cost. With the AWS Cloud, customers have a number of performant options to choose from. These options include AWS Batch in conjunction with AWS Lambda and AWS Step Functions; AWS Glue, a serverless extract, transform, and load (ETL) service; and of course, the AWS big data and machine learning workhorse Amazon EMR.
For this task, we use Hail, an open source framework for exploring and analyzing genomic data that uses the Apache Spark framework. In this post, we use Amazon EMR to run Hail. We walk through the setup, configuration, and data processing. Finally, we generate an Apache Parquet–formatted variant dataset and explore it using Amazon Athena.
Compiling Hail
Because Hail is still under active development, you must compile it before you can start using it. To help simplify the process, you can launch the following AWS CloudFormation template that creates an EMR cluster, compiles Hail, and installs a Jupyter Notebook so that you’re ready to go with Hail.
There are a few things to note about the AWS CloudFormation template. You must provide a password for the Jupyter Notebook. Also, you must provide a virtual private cloud (VPC) to launch Amazon EMR in, and make sure that you select a subnet from within that VPC. Next, update the cluster resources to fit your needs. Lastly, the HailBuildOutputS3Path parameter should be an Amazon S3 bucket/prefix, where you should save the compiled Hail binaries for later use. Leave the Hail and Spark versions as is, unless you’re comfortable experimenting with more recent versions.
When you’ve completed these steps, the following files are saved locally on the cluster to be used when running the Apache Spark Python API (PySpark) shell.
The files are also copied to the Amazon S3 location defined by the AWS CloudFormation template so that you can include them when running jobs using the Amazon EMR Step API.
Collecting genome data
To get started with Hail, use the 1000 Genome Project dataset available on AWS. The data that you will use is located at s3://1000genomes/release/20130502/.
For Hail to process these files in an efficient manner, they need to be block compressed. In many cases, files that use gzip compression are compressed in blocks, so you don’t need to recompress—you can just rename the file extension to “.bgz” from “.gz” . Hail can process .gz files, but it’s much slower and not recommended. The simple way to accomplish this is to copy the data files from the public S3 bucket to your own and rename them.
The following is the Bash command line to copy the first five genome Variant Call Format (VCF) files and rename them appropriately using the AWS CLI.
for i in $(seq 5); do aws s3 cp s3://1000genomes/release/20130502/ALL.chr$i.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz s3://your_bucket/prefix/ALL.chr$i.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.bgz; done
Now that you have some data files containing variants in the Variant Call Format, you need to get the sample annotations that go along with them. These annotations provide more information about each sample, such as the population they are a part of.
In this section, you use the data collected in the previous section to explore genome variations interactively using a Jupyter Notebook. You then create a simple ETL job to convert these variations into Parquet format. Finally, you query it using Amazon Athena.
Let’s open the Jupyter notebook. To start, sign in to the AWS Management Console, and open the AWS CloudFormation console. Choose the stack that you created, and then choose the Output tab. There you see the JupyterURL. Open this URL in your browser.
Go ahead and download the Jupyter Notebook that is provided to your local machine. Log in to Jupyter with the password that you provided during stack creation. Choose Upload on the right side, and then choose the notebook from your local machine.
After the notebook is uploaded, choose it from the list on the left to open it.
Select the first cell, update the S3 bucket location to point to the bucket where you saved the compiled Hail libraries, and then choose Run. This code imports the Hail modules that you compiled at the beginning. When the cell is executing, you will see In [*]. When the process is complete, the asterisk (*) is replaced by a number, for example, In [1].
Next, run the subsequent two cells, which imports the Hail module into PySpark and initiates the Hail context.
The next cell imports a single VCF file from the bucket where you saved your data in the previous section. If you change the Amazon S3 path to not include a file name, it imports all the VCF files in that directory. Depending on your cluster size, it might take a few minutes.
Remember that in the previous section, you also copied an annotation file. Now you use it to annotate the VCF files that you’ve loaded with Hail. Execute the next cell—as a shortcut, you can select the cell and press Shift+Enter.
The import_table API takes a path to the annotation file in TSV (tab-separated values) format and a parameter named impute that attempts to infer the schema of the file, as shown in the output below the cell.
At this point, you can interactively explore the data. For example, you can count the number of samples you have and group them by population.
You can also calculate the standard quality control (QC) metrics on your variants and samples.
What if you want to query this data outside of Hail and Spark, for example, using Amazon Athena? To start, you need to change the column names to lowercase because Athena currently supports only lowercase names. To do that, use the two functions provided in the notebook and call them on your virtual dedicated server (VDS), as shown in the following image. Note that you’re only changing the case of the variants and samples schemas. If you’ve further augmented your VDS, you might need to modify the lowercase functions to do the same for those schemas.
In the current version of Hail, the sample annotations are not stored in the exported Parquet VDS, so you need to save them separately. As noted by the Hail maintainers, in future versions, the data represented by the VDS Parquet output will change, and it is recommended that you also export the variant annotations. So let’s do that.
Note that both of these lines are similar in that they export a table representation of the sample and variant annotations, convert them to a Spark DataFrame, and finally save them to Amazon S3 in Parquet file format.
Finally, it is beneficial to save the VDS file back to Amazon S3 so that next time you need to analyze your data, you can load it without having to start from the raw VCF. Note that when Hail saves your data, it requires a path and a file name.
After you run these cells, expect it to take some time as it writes out the data.
Discovering table metadata
Before you can query your data, you need to tell Athena the schema of your data. You have a couple of options. The first is to use AWS Glue to crawl the S3 bucket, infer the schema from the data, and create the appropriate table. Before proceeding, you might need to migrate your Athena database to use the AWS Glue Data Catalog.
Creating tables in AWS Glue
To use the AWS Glue crawler, open the AWS Glue console and choose Crawlers in the left navigation pane.
Then choose Add crawler to create a new crawler.
Next, give your crawler a name and assign the appropriate IAM role. Leave Amazon S3 as the data source, and select the S3 bucket where you saved the data and the sample annotations. When you set the crawler’s Include path, be sure to include the entire path, for example: s3://output_bucket/hail_data/sample_annotations/
Under the Exclusion Paths, type _SUCCESS, so that you don’t crawl that particular file.
Continue forward with the default settings until you are asked if you want to add another source. Choose Yes, and add the Amazon S3 path to the variant annotation bucket s3://your_output_bucket/hail_data/sample_annotations/ so that it can build your variant annotation table. Give it an existing database name, or create a new one.
Provide a table prefix and choose Next. Then choose Finish. At this point, assuming that the data is finished writing, you can go ahead and run the crawler. When it finishes, you have two new tables in the database you created that should look something like the following:
You can explore the schema of these tables by choosing their name and then choosing Edit Schema on the right side of the table view; for example:
Creating tables in Amazon Athena
If you cannot or do not want to use AWS Glue crawlers, you can add the tables via the Athena console by typing the following statements:
In the Amazon Athena console, choose the database in which your tables were created. In this case, it looks something like the following:
To verify that you have data, choose the three dots on the right, and then choose Preview table.
Indeed, you can see some data.
You can further explore the sample and variant annotations along with the calculated QC metrics that you calculated previously using Hail.
Summary
To summarize, this post demonstrated the ease in which you can install, configure, and use Hail, an open source highly scalable framework for exploring and analyzing genomics data on Amazon EMR. We demonstrated setting up a Jupyter Notebook to make our exploration easy. We also used the power of Hail to calculate quality control metrics for variants and samples. We exported them to Amazon S3 and allowed a broader range of users and analysts to explore them on-demand in a serverless environment using Amazon Athena.
Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.
Today AWS announced contributions to the milestone 1.0 release of the Apache MXNet deep learning engine including the introduction of a new model-serving capability for MXNet. The new capabilities in MXNet provide the following benefits to users:
1) MXNet is easier to use: The model server for MXNet is a new capability introduced by AWS, and it packages, runs, and serves deep learning models in seconds with just a few lines of code, making them accessible over the internet via an API endpoint and thus easy to integrate into applications. The 1.0 release also includes an advanced indexing capability that enables users to perform matrix operations in a more intuitive manner.
Model Serving enables set up of an API endpoint for prediction: It saves developers time and effort by condensing the task of setting up an API endpoint for running and integrating prediction functionality into an application to just a few lines of code. It bridges the barrier between Python-based deep learning frameworks and production systems through a Docker container-based deployment model.
Advanced indexing for array operations in MXNet: It is now more intuitive for developers to leverage the powerful array operations in MXNet. They can use the advanced indexing capability by leveraging existing knowledge of NumPy/SciPy arrays. For example, it supports MXNet NDArray and Numpy ndarray as index, e.g. (a[mx.nd.array([1,2], dtype = ‘int32’]).
2) MXNet is faster: The 1.0 release includes implementation of cutting-edge features that optimize the performance of training and inference. Gradient compression enables users to train models up to five times faster by reducing communication bandwidth between compute nodes without loss in convergence rate or accuracy. For speech recognition acoustic modeling like the Alexa voice, this feature can reduce network bandwidth by up to three orders of magnitude during training. With the support of NVIDIA Collective Communication Library (NCCL), users can train a model 20% faster on multi-GPU systems.
Optimize network bandwidth with gradient compression: In distributed training, each machine must communicate frequently with others to update the weight-vectors and thereby collectively build a single model, leading to high network traffic. Gradient compression algorithm enables users to train models up to five times faster by compressing the model changes communicated by each instance.
Optimize the training performance by taking advantage of NCCL: NCCL implements multi-GPU and multi-node collective communication primitives that are performance optimized for NVIDIA GPUs. NCCL provides communication routines that are optimized to achieve high bandwidth over interconnection between multi-GPUs. MXNet supports NCCL to train models about 20% faster on multi-GPU systems.
3) MXNet provides easy interoperability: MXNet now includes a tool for converting neural network code written with the Caffe framework to MXNet code, making it easier for users to take advantage of MXNet’s scalability and performance.
Migrate Caffe models to MXNet: It is now possible to easily migrate Caffe code to MXNet, using the new source code translation tool for converting Caffe code to MXNet code.
MXNet has helped developers and researchers make progress with everything from language translation to autonomous vehicles and behavioral biometric security. We are excited to see the broad base of users that are building production artificial intelligence applications powered by neural network models developed and trained with MXNet. For example, the autonomous driving company TuSimple recently piloted a self-driving truck on a 200-mile journey from Yuma, Arizona to San Diego, California using MXNet. This release also includes a full-featured and performance optimized version of the Gluon programming interface. The ease-of-use associated with it combined with the extensive set of tutorials has led significant adoption among developers new to deep learning. The flexibility of the interface has driven interest within the research community, especially in the natural language processing domain.
Getting started with MXNet Getting started with MXNet is simple. To learn more about the Gluon interface and deep learning, you can reference this comprehensive set of tutorials, which covers everything from an introduction to deep learning to how to implement cutting-edge neural network models. If you’re a contributor to a machine learning framework, check out the interface specs on GitHub.
To get started with the Model Server for Apache MXNet, install the library with the following command:
$ pip install mxnet-model-server
The Model Server library has a Model Zoo with 10 pre-trained deep learning models, including the SqueezeNet 1.1 object classification model. You can start serving the SqueezeNet model with just the following command:
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.