Join us this month to learn about AWS services and solutions. New this month, we have a fireside chat with the GM of Amazon WorkSpaces and our 2nd episode of the “How to re:Invent” series. We’ll also cover best practices, deep dives, use cases and more! Join us and register today!
AWS re:Invent June 13, 2018 | 05:00 PM – 05:30 PM PT – Episode 2: AWS re:Invent Breakout Content Secret Sauce – Hear from one of our own AWS content experts as we dive deep into the re:Invent content strategy and how we maintain a high bar. Compute
Containers June 25, 2018 | 09:00 AM – 09:45 AM PT – Running Kubernetes on AWS – Learn about the basics of running Kubernetes on AWS including how setup masters, networking, security, and add auto-scaling to your cluster.
June 19, 2018 | 11:00 AM – 11:45 AM PT – Launch AWS Faster using Automated Landing Zones – Learn how the AWS Landing Zone can automate the set up of best practice baselines when setting up new
June 21, 2018 | 01:00 PM – 01:45 PM PT – Enabling New Retail Customer Experiences with Big Data – Learn how AWS can help retailers realize actual value from their big data and deliver on differentiated retail customer experiences.
June 28, 2018 | 01:00 PM – 01:45 PM PT – Fireside Chat: End User Collaboration on AWS – Learn how End User Compute services can help you deliver access to desktops and applications anywhere, anytime, using any device. IoT
June 27, 2018 | 11:00 AM – 11:45 AM PT – AWS IoT in the Connected Home – Learn how to use AWS IoT to build innovative Connected Home products.
Mobile June 25, 2018 | 11:00 AM – 11:45 AM PT – Drive User Engagement with Amazon Pinpoint – Learn how Amazon Pinpoint simplifies and streamlines effective user engagement.
June 26, 2018 | 11:00 AM – 11:45 AM PT – Deep Dive: Hybrid Cloud Storage with AWS Storage Gateway – Learn how you can reduce your on-premises infrastructure by using the AWS Storage Gateway to connecting your applications to the scalable and reliable AWS storage services. June 27, 2018 | 01:00 PM – 01:45 PM PT – Changing the Game: Extending Compute Capabilities to the Edge – Discover how to change the game for IIoT and edge analytics applications with AWS Snowball Edge plus enhanced Compute instances. June 28, 2018 | 11:00 AM – 11:45 AM PT – Big Data and Analytics Workloads on Amazon EFS – Get best practices and deployment advice for running big data and analytics workloads on Amazon EFS.
Last year, we released Amazon Connect, a cloud-based contact center service that enables any business to deliver better customer service at low cost. This service is built based on the same technology that empowers Amazon customer service associates. Using this system, associates have millions of conversations with customers when they inquire about their shipping or order information. Because we made it available as an AWS service, you can now enable your contact center agents to make or receive calls in a matter of minutes. You can do this without having to provision any kind of hardware. 2
There are several advantages of building your contact center in the AWS Cloud, as described in our documentation. In addition, customers can extend Amazon Connect capabilities by using AWS products and the breadth of AWS services. In this blog post, we focus on how to get analytics out of the rich set of data published by Amazon Connect. We make use of an Amazon Connect data stream and create an end-to-end workflow to offer an analytical solution that can be customized based on need.
Solution overview
The following diagram illustrates the solution.
In this solution, Amazon Connect exports its contact trace records (CTRs) using Amazon Kinesis. CTRs are data streams in JSON format, and each has information about individual contacts. For example, this information might include the start and end time of a call, which agent handled the call, which queue the user chose, queue wait times, number of holds, and so on. You can enable this feature by reviewing our documentation.
In this architecture, we use Kinesis Firehose to capture Amazon Connect CTRs as raw data in an Amazon S3 bucket. We don’t use the recent feature added by Kinesis Firehose to save the data in S3 as Apache Parquet format. We use AWS Glue functionality to automatically detect the schema on the fly from an Amazon Connect data stream.
The primary reason for this approach is that it allows us to use attributes and enables an Amazon Connect administrator to dynamically add more fields as needed. Also by converting data to parquet in batch (every couple of hours) compression can be higher. However, if your requirement is to ingest the data in Parquet format on realtime, we recoment using Kinesis Firehose recently launched feature. You can review this blog post for further information.
By default, Firehose puts these records in time-series format. To make it easy for AWS Glue crawlers to capture information from new records, we use AWS Lambda to move all new records to a single S3 prefix called flatfiles. Our Lambda function is configured using S3 event notification. To comply with AWS Glue and Athena best practices, the Lambda function also converts all column names to lowercase. Finally, we also use the Lambda function to start AWS Glue crawlers. AWS Glue crawlers identify the data schema and update the AWS Glue Data Catalog, which is used by extract, transform, load (ETL) jobs in AWS Glue in the latter half of the workflow.
You can see our approach in the Lambda code following.
from __future__ import print_function
import json
import urllib
import boto3
import os
import re
s3 = boto3.resource('s3')
client = boto3.client('s3')
def convertColumntoLowwerCaps(obj):
for key in obj.keys():
new_key = re.sub(r'[\W]+', '', key.lower())
v = obj[key]
if isinstance(v, dict):
if len(v) > 0:
convertColumntoLowwerCaps(v)
if new_key != key:
obj[new_key] = obj[key]
del obj[key]
return obj
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
try:
client.download_file(bucket, key, '/tmp/file.json')
with open('/tmp/out.json', 'w') as output, open('/tmp/file.json', 'rb') as file:
i = 0
for line in file:
for object in line.replace("}{","}\n{").split("\n"):
record = json.loads(object,object_hook=convertColumntoLowwerCaps)
if i != 0:
output.write("\n")
output.write(json.dumps(record))
i += 1
newkey = 'flatfiles/' + key.replace("/", "")
client.upload_file('/tmp/out.json', bucket,newkey)
s3.Object(bucket,key).delete()
return "success"
except Exception as e:
print(e)
print('Error coping object {} from bucket {}'.format(key, bucket))
raise e
We trigger AWS Glue crawlers based on events because this approach lets us capture any new data frame that we want to be dynamic in nature. CTR attributes are designed to offer multiple custom options based on a particular call flow. Attributes are essentially key-value pairs in nested JSON format. With the help of event-based AWS Glue crawlers, you can easily identify newer attributes automatically.
We recommend setting up an S3 lifecycle policy on the flatfiles folder that keeps records only for 24 hours. Doing this optimizes AWS Glue ETL jobs to process a subset of files rather than the entire set of records.
After we have data in the flatfiles folder, we use AWS Glue to catalog the data and transform it into Parquet format inside a folder called parquet/ctr/. The AWS Glue job performs the ETL that transforms the data from JSON to Parquet format. We use AWS Glue crawlers to capture any new data frame inside the JSON code that we want to be dynamic in nature. What this means is that when you add new attributes to an Amazon Connect instance, the solution automatically recognizes them and incorporates them in the schema of the results.
After AWS Glue stores the results in Parquet format, you can perform analytics using Amazon Redshift Spectrum, Amazon Athena, or any third-party data warehouse platform. To keep this solution simple, we have used Amazon Athena for analytics. Amazon Athena allows us to query data without having to set up and manage any servers or data warehouse platforms. Additionally, we only pay for the queries that are executed.
Try it out!
You can get started with our sample AWS CloudFormation template. This template creates the components starting from the Kinesis stream and finishes up with S3 buckets, the AWS Glue job, and crawlers. To deploy the template, open the AWS Management Console by clicking the following link.
In the console, specify the following parameters:
BucketName: The name for the bucket to store all the solution files. This name must be unique; if it’s not, template creation fails.
etlJobSchedule: The schedule in cron format indicating how often the AWS Glue job runs. The default value is every hour.
KinesisStreamName: The name of the Kinesis stream to receive data from Amazon Connect. This name must be different from any other Kinesis stream created in your AWS account.
s3interval: The interval in seconds for Kinesis Firehose to save data inside the flatfiles folder on S3. The value must between 60 and 900 seconds.
sampledata: When this parameter is set to true, sample CTR records are used. Doing this lets you try this solution without setting up an Amazon Connect instance. All examples in this walkthrough use this sample data.
Select the “I acknowledge that AWS CloudFormation might create IAM resources.” check box, and then choose Create. After the template finishes creating resources, you can see the stream name on the stack Outputs tab.
If you haven’t created your Amazon Connect instance, you can do so by following the Getting Started Guide. When you are done creating, choose your Amazon Connect instance in the console, which takes you to instance settings. Choose Data streaming to enable streaming for CTR records. Here, you can choose the Kinesis stream (defined in the KinesisStreamName parameter) that was created by the CloudFormation template.
Now it’s time to generate the data by making or receiving calls by using Amazon Connect. You can go to Amazon Connect Cloud Control Panel (CCP) to make or receive calls using a software phone or desktop phone. After a few minutes, we should see data inside the flatfiles folder. To make it easier to try this solution, we provide sample data that you can enable by setting the sampledata parameter to true in your CloudFormation template.
You can navigate to the AWS Glue console by choosing Jobs on the left navigation pane of the console. We can select our job here. In my case, the job created by CloudFormation is called glueJob-i3TULzVtP1W0; yours should be similar. You run the job by choosing Run job for Action.
After that, we wait for the AWS Glue job to run and to finish successfully. We can track the status of the job by checking the History tab.
When the job finishes running, we can check the Database section. There should be a new table created called ctr in Parquet format.
To query the data with Athena, we can select the ctr table, and for Action choose View data.
Doing this takes us to the Athena console. If you run a query, Athena shows a preview of the data.
When we can query the data using Athena, we can visualize it using Amazon QuickSight. Before connecting Amazon QuickSight to Athena, we must make sure to grant Amazon QuickSight access to Athena and the associated S3 buckets in the account. For more information on doing this, see Managing Amazon QuickSight Permissions to AWS Resources in the Amazon QuickSight User Guide. We can then create a new data set in Amazon QuickSight based on the Athena table that was created.
After setting up permissions, we can create a new analysis in Amazon QuickSight by choosing New analysis.
Then we add a new data set.
We choose Athena as the source and give the data source a name (in this case, I named it connectctr).
Choose the name of the database and the table referencing the Parquet results.
Then choose Visualize.
After that, we should see the following screen.
Now we can create some visualizations. First, search for the agent.username column, and drag it to the AutoGraph section.
We can see the agents and the number of calls for each, so we can easily see which agents have taken the largest amount of calls. If we want to see from what queues the calls came for each agent, we can add the queue.arn column to the visual.
After following all these steps, you can use Amazon QuickSight to add different columns from the call records and perform different types of visualizations. You can build dashboards that continuously monitor your connect instance. You can share those dashboards with others in your organization who might need to see this data.
Conclusion
In this post, you see how you can use services like AWS Lambda, AWS Glue, and Amazon Athena to process Amazon Connect call records. The post also demonstrates how to use AWS Lambda to preprocess files in Amazon S3 and transform them into a format that recognized by AWS Glue crawlers. Finally, the post shows how to used Amazon QuickSight to perform visualizations.
You can use the provided template to analyze your own contact center instance. Or you can take the CloudFormation template and modify it to process other data streams that can be ingested using Amazon Kinesis or stored on Amazon S3.
Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.
Peter Dalbhanjan is a Solutions Architect for AWS based in Herndon, VA. Peter has a keen interest in evangelizing AWS solutions and has written multiple blog posts that focus on simplifying complex use cases. At AWS, Peter helps with designing and architecting variety of customer workloads.
Amazon QuickSight is a fully managed cloud business intelligence system that gives you Fast & Easy to Use Business Analytics for Big Data. QuickSight makes business analytics available to organizations of all shapes and sizes, with the ability to access data that is stored in your Amazon Redshift data warehouse, your Amazon Relational Database Service (RDS) relational databases, flat files in S3, and (via connectors) data stored in on-premises MySQL, PostgreSQL, and SQL Server databases. QuickSight scales to accommodate tens, hundreds, or thousands of users per organization.
Today we are launching a new, session-based pricing option for QuickSight, along with additional region support and other important new features. Let’s take a look at each one:
Pay-per-Session Pricing Our customers are making great use of QuickSight and take full advantage of the power it gives them to connect to data sources, create reports, and and explore visualizations.
However, not everyone in an organization needs or wants such powerful authoring capabilities. Having access to curated data in dashboards and being able to interact with the data by drilling down, filtering, or slicing-and-dicing is more than adequate for their needs. Subscribing them to a monthly or annual plan can be seen as an unwarranted expense, so a lot of such casual users end up not having access to interactive data or BI.
In order to allow customers to provide all of their users with interactive dashboards and reports, the Enterprise Edition of Amazon QuickSight now allows Reader access to dashboards on a Pay-per-Session basis. QuickSight users are now classified as Admins, Authors, or Readers, with distinct capabilities and prices:
Authors have access to the full power of QuickSight; they can establish database connections, upload new data, create ad hoc visualizations, and publish dashboards, all for $9 per month (Standard Edition) or $18 per month (Enterprise Edition).
Readers can view dashboards, slice and dice data using drill downs, filters and on-screen controls, and download data in CSV format, all within the secure QuickSight environment. Readers pay $0.30 for 30 minutes of access, with a monthly maximum of $5 per reader.
Admins have all authoring capabilities, and can manage users and purchase SPICE capacity in the account. The QuickSight admin now has the ability to set the desired option (Author or Reader) when they invite members of their organization to use QuickSight. They can extend Reader invites to their entire user base without incurring any up-front or monthly costs, paying only for the actual usage.
A New Region QuickSight is now available in the Asia Pacific (Tokyo) Region:
The UI is in English, with a localized version in the works.
Hourly Data Refresh Enterprise Edition SPICE data sets can now be set to refresh as frequently as every hour. In the past, each data set could be refreshed up to 5 times a day. To learn more, read Refreshing Imported Data.
Access to Data in Private VPCs This feature was launched in preview form late last year, and is now available in production form to users of the Enterprise Edition. As I noted at the time, you can use it to implement secure, private communication with data sources that do not have public connectivity, including on-premises data in Teradata or SQL Server, accessed over an AWS Direct Connect link. To learn more, read Working with AWS VPC.
Parameters with On-Screen Controls QuickSight dashboards can now include parameters that are set using on-screen dropdown, text box, numeric slider or date picker controls. The default value for each parameter can be set based on the user name (QuickSight calls this a dynamic default). You could, for example, set an appropriate default based on each user’s office location, department, or sales territory. Here’s an example:
URL Actions for Linked Dashboards You can now connect your QuickSight dashboards to external applications by defining URL actions on visuals. The actions can include parameters, and become available in the Details menu for the visual. URL actions are defined like this:
You can use this feature to link QuickSight dashboards to third party applications (e.g. Salesforce) or to your own internal applications. Read Custom URL Actions to learn how to use this feature.
Dashboard Sharing You can now share QuickSight dashboards across every user in an account.
Larger SPICE Tables The per-data set limit for SPICE tables has been raised from 10 GB to 25 GB.
Upgrade to Enterprise Edition The QuickSight administrator can now upgrade an account from Standard Edition to Enterprise Edition with a click. This enables provisioning of Readers with pay-per-session pricing, private VPC access, row-level security for dashboards and data sets, and hourly refresh of data sets. Enterprise Edition pricing applies after the upgrade.
Available Now Everything I listed above is available now and you can start using it today!
Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.
To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.
Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.
What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.
Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.
Solution overview
Here is how the solution is laid out:
The following sections walk you through each of these steps to set up the pipeline.
1. Define the schema
When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.
To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.
The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.
That’s all you have to do to prepare the schema for Kinesis Data Firehose.
2. Define the data streams
Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events. Open the Kinesis Data Streams console and create two streams. You can configure them with only one shard each because you don’t have a lot of data right now.
3. Define the Kinesis Data Firehose delivery stream for Parquet
Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.
As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.
To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.
On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.
Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.
Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.
Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.
4. Set up the Amazon Connect contact center
After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.
After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.
At this point, your pipeline is complete. Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.
So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.
To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.
5. Analyze the data with Athena
Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures. The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.
Let’s run the first query to see which agents have logged into the system.
The query might look complex, but it’s fairly straightforward:
WITH dataset AS (
SELECT
from_iso8601_timestamp(eventtimestamp) AS event_ts,
eventtype,
-- CURRENT STATE
json_extract_scalar(
currentagentsnapshot,
'$.agentstatus.name') AS current_status,
from_iso8601_timestamp(
json_extract_scalar(
currentagentsnapshot,
'$.agentstatus.starttimestamp')) AS current_starttimestamp,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.firstname') AS current_firstname,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.lastname') AS current_lastname,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.username') AS current_username,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.routingprofile.defaultoutboundqueue.name') AS current_outboundqueue,
json_extract_scalar(
currentagentsnapshot,
'$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
-- PREVIOUS STATE
json_extract_scalar(
previousagentsnapshot,
'$.agentstatus.name') as prev_status,
from_iso8601_timestamp(
json_extract_scalar(
previousagentsnapshot,
'$.agentstatus.starttimestamp')) as prev_starttimestamp,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.firstname') as prev_firstname,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.lastname') as prev_lastname,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.username') as prev_username,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
json_extract_scalar(
previousagentsnapshot,
'$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
from kfhconnectblog
where eventtype <> 'HEART_BEAT'
)
SELECT
current_status as status,
current_username as username,
event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC
The query output looks something like this:
Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.
WITH src AS (
SELECT
eventid,
json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
from kfhconnectblog
),
src2 AS (
SELECT *
FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT
eventid,
username,
json_extract_scalar(c_item, '$.contactid') as c_contactid,
json_extract_scalar(c_item, '$.channel') as c_channel,
json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
json_extract_scalar(c_item, '$.queue.name') as c_queue,
json_extract_scalar(c_item, '$.state') as c_state,
from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
json_extract_scalar(p_item, '$.contactid') as p_contactid,
json_extract_scalar(p_item, '$.channel') as p_channel,
json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
json_extract_scalar(p_item, '$.queue.name') as p_queue,
json_extract_scalar(p_item, '$.state') as p_state,
from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT
username,
c_channel as channel,
c_direction as direction,
p_state as prev_state,
c_state as current_state,
c_ts as current_ts,
c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC
The query output looks similar to the following:
6. Analyze the data with Amazon Redshift Spectrum
With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.
Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.
SELECT
eventtype,
json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
json_extract_path_text(
currentagentsnapshot,
'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog
The following shows the query output:
Summary
In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.
Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.
Yesterday, I received an email from someone called Mayank Sinha, showing us the Raspberry Pi home security project he’s been working on. He got in touch particularly because, he writes, the Raspberry Pi community has given him “immense support” with his build, and he wanted to dedicate it to the commmunity as thanks.
Mayank’s project is named Asfaleia, a Greek word that means safety, certainty, or security against threats. It’s part of an honourable tradition dating all the way back to 2012: it’s a prototype housed in a polystyrene box, using breadboards and jumper leads and sticky tape. And it’s working! Take a look.
An IOT based home security system. The link to the code: https://github.com/mayanksinha11/Asfaleia
Home security with Asfaleida
Asfaleia has a PIR (passive infrared) motion sensor, an IR break beam sensor, and a gas sensor. All are connected to a Raspberry Pi 3 Model B, the latter two via a NodeMCU board. Mayank currently has them set up in a box that’s divided into compartments to model different rooms in a house.
All the best prototypes have sticky tape or rubber bands
If the IR sensors detect motion or a broken beam, the webcam takes a photo and emails it to the build’s owner, and the build also calls their phone (I like your ringtone, Mayank). If the gas sensor detects a leak, the system activates an exhaust fan via a small relay board, and again the owner receives a phone call. The build can also authenticate users via face and fingerprint recognition. The software that runs it all is written in Python, and you can see Mayank’s code on GitHub.
Of prototypes and works-in-progess
Reading Mayank’s email made me very happy yesterday. We know that thousands of people in our community give a great deal of time and effort to help others learn and make things, and it is always wonderful to see an example of how that support is helping someone turn their ideas into reality. It’s great, too, to see people sharing works-in-progress, as well as polished projects! After all, the average build is more likely to feature rubber bands and Tupperware boxes than meticulously designed laser-cut parts or expert joinery. Mayank’s YouTube channel shows earlier work on this and another Pi project, and I hope he’ll continue to document his builds.
So here’s to Raspberry Pi projects big, small, beginner, professional, endlessly prototyped, unashamedly bodged, unfinished or fully working, shonky or shiny. Please keep sharing them all!
Since our last System and Organization Control (SOC) audit, our service and compliance teams have been working to increase the number of AWS Services in scope prioritized based on customer requests. Today, we’re happy to report 11 services are newly SOC compliant, which is a 21 percent increase in the last six months.
With the addition of the following 11 new services, you can now select from a total of 62 SOC-compliant services. To see the full list, go to our Services in Scope by Compliance Program page:
Our latest SOC 1, 2, and 3 reports covering the period from October 1, 2017 to March 31, 2018 are now available. The SOC 1 and 2 reports are available on-demand through AWS Artifact by logging into the AWS Management Console. The SOC 3 report can be downloaded here.
Finally, prospective customers can read our SOC 1 and 2 reports by reaching out to AWS Compliance.
Want more AWS Security news? Follow us on Twitter.
Открих забавен проблем с python и multiprocessing, който в момента още не мога да реша чий проблем е (в крайна сметка ще се окаже мой). Отне ми прилично количество време да го хвана и си струва да го разкажа.
Малко предистория: ползваме influxdb, в което тъпчем бая секундни данни, които после предъвкваме до минутни. InfluxDB има continuous queries, които вършат тази работа – на някакъв интервал от време хващат новите данни и ги сгъват. Тези заявки имаха няколко проблема: – не се оправят с попълване на стари данни; – изпълняват се рядко и минутните данни изостават; – изпълняват се в общи линии в един thread, което кара минутните данни да изостават още повече (в нашия случай преди да ги сменим с около 12 часа).
Хванаха ме дяволите и си написах просто демонче на python, което да събира информация за различните бази какви данни могат да се сгънат, и паралелно да попълва данните. Работи в общи линии по следния начин: – взима списък с базите данни – пуска през multiprocessing-а да се събере за всяка база какви заявки трябва да се пуснат, на база на какви measurement-и има и докога са минутните и секундните данни в тях; – пуска през multiprocessing-а събраните от предния pass заявки – и така до края на света (или докато зависне).
След като навакса за няколко часа, успяваше да държи минутните данни в рамките на няколко минути от последните секундни данни, което си беше сериозно подобрение на ситуацията. Единственият проблем беше, че от време на време спираше да process-ва и увисваше.
Днес намерих време да го прегледам внимателно какво му се случва. Процесът изглежда като един parent и 5 fork()-нати child-а, като: Parent-а спи във futex 0x22555a0; Child 18455 във futex 0x7fdbfa366000; Child 18546 read Child 18457 във futex 0x7fdbfa366000 Child 18461 във futex 0x7fdbfa366000 Child 18462 във futex 0x7fdbfa366000 Child 18465 във futex 0x7fdbf908c2c0
Това не беше особено полезно, и се оказа, че стандартния python debugger (pdb) не може да се закача за съществуващи процеси, но за сметка на това gdb с подходящи debug символи може, и може да дава доста полезна информация. По този начин открих, че parent-а чака един child да приключи работата си:
#11 PyEval_EvalFrameEx ( [email protected]=Frame 0x235fb80, for file /usr/lib64/python2.7/multiprocessing/pool.py, line 543, in wait (self== 1525137960000000000 AND time < 1525138107000000000 GROUP BY time(1m), * fill(linear)\' in a read only context, please use a POST request instead', u'level': u'warning'}], u'statement_id': 0}]}, None], _callback=None, _chunksize=1, _number_left=1, _ready=False, _success=True, _cond=<_Condition(_Verbose__verbose=False, _Condition__lock=, acquire=, _Condition__waiters=[], release=) at remote 0x7fdbe0015310>, _job=45499, _cache={45499: < ...>}) a...(truncated), [email protected]=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
Като в pool.py около ред 543 има следното:
class ApplyResult(object):
...
def wait(self, timeout=None): self._cond.acquire() try: if not self._ready: self._cond.wait(timeout) finally: self._cond.release()
Първоначално си мислех, че 18546 очаква да прочете нещо от грешното място, но излезе, че това е child-а, който е спечелил състезанието за изпълняване на следващата задача и чака да му я дадат (което изглежда се раздава през futex 0x7fdbfa366000). Един от child-овете обаче чака в друг lock:
(gdb) bt #0 __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135 #1 0x00007fdbf9b68dcb in _L_lock_812 () from /lib64/libpthread.so.0 #2 0x00007fdbf9b68c98 in __GI___pthread_mutex_lock ([email protected]=0x7fdbf908c2c0 ) at ../nptl/pthread_mutex_lock.c:79 #3 0x00007fdbf8e846ea in _nss_files_gethostbyname4_r ([email protected]=0x233fa44 "localhost", [email protected]=0x7fdbecfcb8e0, [email protected]=0x7fdbecfcb340 "hZ \372\333\177", [email protected]=1064, [email protected]=0x7fdbecfcb8b0, [email protected]=0x7fdbecfcb910, [email protected]=0x0) at nss_files/files-hosts.c:381 #4 0x00007fdbf9170ed8 in gaih_inet (name=, [email protected]=0x233fa44 "localhost", service=, [email protected]=0x7fdbecfcbb90, [email protected]=0x7fdbecfcb9f0, [email protected]=0x7fdbecfcb9e0) at ../sysdeps/posix/getaddrinfo.c:877 #5 0x00007fdbf91745cd in __GI_getaddrinfo ([email protected]=0x233fa44 "localhost", [email protected]=0x7fdbecfcbbc0 "8086", [email protected]=0x7fdbecfcbb90, [email protected]=0x7fdbecfcbb78) at ../sysdeps/posix/getaddrinfo.c:2431 #6 0x00007fdbeed8760d in socket_getaddrinfo (self=, args=) at /usr/src/debug/Python-2.7.5/Modules/socketmodule.c:4193 #7 0x00007fdbf9e5fbb0 in call_function (oparg=, pp_stack=0x7fdbecfcbd10) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4408 #8 PyEval_EvalFrameEx ( [email protected]=Frame 0x7fdbe8013350, for file /usr/lib/python2.7/site-packages/urllib3/util/connection.py, line 64, in create_connection (address=('localhost', 8086), timeout=3000, source_address=None, socket_options=[(6, 1, 1)], host='localhost', port=8086, err=None), [email protected]=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
(gdb) frame 3 #3 0x00007fdbf8e846ea in _nss_files_gethostbyname4_r ([email protected]=0x233fa44 "localhost", [email protected]=0x7fdbecfcb8e0, [email protected]=0x7fdbecfcb340 "hZ \372\333\177", [email protected]=1064, [email protected]=0x7fdbecfcb8b0, [email protected]=0x7fdbecfcb910, [email protected]=0x0) at nss_files/files-hosts.c:381 381 __libc_lock_lock (lock); (gdb) list 376 enum nss_status 377 _nss_files_gethostbyname4_r (const char *name, struct gaih_addrtuple **pat, 378 char *buffer, size_t buflen, int *errnop, 379 int *herrnop, int32_t *ttlp) 380 { 381 __libc_lock_lock (lock); 382 383 /* Reset file pointer to beginning or open file. */ 384 enum nss_status status = internal_setent (keep_stream); 385
Или в превод – опитваме се да вземем стандартния lock, който libc-то използва за да си пази reentrant функциите, и някой го държи. Кой ли?
… което е и съвсем очаквано, при условие, че са два процеса и тая памет не е обща.
Та, явно това, което се е случило е, че докато parent-а е правел fork(), тоя lock го е държал някой, и child-а реално не може да пипне каквото и да е, свързано с него (което значи никакви reentrant функции в glibc-то, каквито па всички ползват (и би трябвало да ползват)). Въпросът е, че по принцип това не би трябвало да е възможно, щото около fork() няма нищо, което да взима тоя lock, и би трябвало glibc да си освобождава lock-а като излиза от функциите си.
Първоначалното ми идиотско предположение беше, че в signal handler-а на SIGCHLD multiprocessing модула създава новите child-ове, и така докато нещо друго държи lock-а идва сигнал, прави се нов процес и той го “наследява” заключен. Това беше твърде глупаво, за да е истина, и се оказа, че не е…
Около въпросите с lock-а бях стигнал с търсене до две неща – issue 127 в gperftools и Debian bug 657835. Първото каза, че проблемът ми може да е от друг lock, който някой друг държи преди fork-а (което ме накара да се загледам по-внимателно какви lock-ове се държат), а второто, че като цяло ако fork-ваш thread-нато приложение, може после единствено да правиш execve(), защото всичко друго не е ясно колко ще работи.
И накрая се оказа, че ако се ползва multiprocessing модула, той пуска в главния процес няколко thread-а, които да се занимават със следенето и пускането на child-ове за обработка. Та ето какво реално се случва:
– някой child си изработва нужния брой операции и излиза – parent-а получава SIGCHLD и си отбелязва, че трябва да види какво става – главния thread на parent-а тръгва да събира списъка бази, и вика в някакъв момент _nss_files_gethostbyname4_r, който взима lock-а; – по това време другия thread казва “а, нямам достатъчно child-ове, fork()” – profit.
Текущото ми глупаво решение е да не правя нищо в главния thread, което може да взима тоя lock и да се надявам, че няма още някой такъв. Бъдещото ми решение е или да го пиша на python3 с някой друг модул по темата, или на go (което ще трябва да науча).
Today, I’m pleased to announce that, as of April 24th 2018, the AWS IoT Analytics service is generally available. Customers can use IoT Analytics to clean, process, encrich, store, and analyze their connected device data at scale. AWS IoT Analytics is now available in US East (N. Virginia), US West (Oregon), US East (Ohio), and EU (Ireland). In November of last year, my colleague Tara Walker wrote an excellent post that walks through some of the features of the AWS IoT Analytics service and Ben Kehoe (an AWS Community Hero and Research Scientist at iRobot) spoke at AWS Re:Invent about replacing iRobot’s existing “rube goldberg machine” for forwarding data into an elasticsearch cluster with AWS IoT Analytics.
Iterating on customer feedback received during the service preview the AWS IoT Analytics team has added a number of new features including the ability to ingest data from external souces using the BatchPutMessage API, the ability to set a data retention policy on stored data, the ability to reprocess existing data, preview pipeline results, and preview messages from channels with the SampleChannelData API.
Let’s cover the core concepts of IoT Analytics and then walk through an example.
AWS IoT Analytics Concepts
AWS IoT Analytics can be broken down into a few simple concepts. For data preparation customers have: Channels, Pipelines, and Data Stores. For analyzing data customers have: Datasets and Notebooks.
Data Preparation
Channels are the entry point into IoT Analytics and they collect data from an existing IoT Core MQTT topic or from external sources that send messages to the channel using the Ingestion API. Channels are elastically scalable and consume messages in Binary or JSON format. Channels also immutably store raw device data for easily reprocessing using different logic if your needs change.
Pipelines consume messages from channels and allow you to process messages with steps, called activities, such as filtering on attributes, transforming the content of the message by adding or remvoing fields, invoking lambda functions for complex transformations and adding data from external data sources, or even enriching the messages with data from IoT Core. Pipelines output their data to a Data Store.
Data Stores are a queryable IoT-optimized data storage solution for the output of your pipelines. Data stores support custom retention periods to optimize costs. When a customer queries a Data Store the result is put into a Dataset.
Data Analytics
Datasets are similar to a view in a SQL database. Customers create a dataset by running a query against a data store. Data sets can be generated manually or on a recurring schedule.
Notebooks are Amazon SageMaker hosted Jupyter notebooks that let customers analyze their data with custom code and even build or train ML models on the data. IoT Analytics offers several notebook templates with pre-authored models for common IoT use cases such as Predictive Maintenance, Anomaly Detection, Fleet Segmentation, and Forecasting.
Additionally, you can use IoT analytics as a data source for Amazon QuickSight for easy visualizations of your data. You can find pricing information for each of these services on the AWS IoT Analytics Pricing Page.
IoT Analytics Walkthrough
While this walkthrough uses the console everything shown here is equally easy to do with the CLI. When we first navigate to the console we have a helpful guide telling us to build a channel, pipeline, and a data store: Our first step is to create a channel. I already have some data into an MQTT channel with IoT core so I’ll select that channel. First we’ll name the channel and select a retention period.
Now, I’ll select my IoT Core topic and grab the data. I can also post messages directly into the channel with the PutMessages APIs.
Now that I have a channel my next step is to create a pipeline. To do this I’ll select “Create a pipeline from this channel” from the “Actions” drop down.
Now, I’ll walk through the pipeline wizard giving my pipeline a name and a source.
I’ll select which of the message attributes the pipeline should expect. This can draw from the channel with the sampling API and guess at which attributes are needed or I could upload a specification in JSON.
Next I define the pipeline activities. If I’m dealing with binary data I need a lambda function to first deserialize the message into JSON so the other filter functions can operate on it. I can create filters, calculate attributes based on other attributes, and I can also enrich the message with metadata from IoT core registry.
For now I just want to filter out some messages and make a small transform with a Lambda function.
Finally, I choose or create a data store to output the results of my pipeline.
Now that I have a data store, I can create a view of that data by creating a data set.
I’ll just select all the data from the data store for this dataset but I could also select individual attributes as needed.
I have a data set! I can adjust the cron expression in the schedule to re-run this as frequently or infrequently as I wish.
If I want to create a model from my data I can create a SageMaker powered Jupyter notebook. There are a few templates that are great starting points like anomaly detection or output forecasting.
Here you can see an example of the anomaly detection notebook.
Finally, if I want to create simple visualizations of my data I can use QuickSight to bring in an IoT Analytics data set.
Let Us Know
I’m excited to see what customers build with AWS IoT Analytics. My colleagues on the IoT teams are eager to hear your feedback about the service so please let us know in the comments or on Twitter what features you want to see.
If you’re not already familiar with building visualizations for quick access to business insights using Amazon QuickSight, consider this your introduction. In this post, we’ll walk through some common scenarios with sample datasets to provide an overview of how you can connect yuor data, perform advanced analysis and access the results from any web browser or mobile device.
The following visualizations are built from the public datasets available in the links below. Before we jump into that, let’s take a look at the supported data sources, file formats and a typical QuickSight workflow to build any visualization.
Which data sources does Amazon QuickSight support?
At the time of publication, you can use the following data methods:
Connect to AWS data sources, including:
Amazon RDS
Amazon Aurora
Amazon Redshift
Amazon Athena
Amazon S3
Upload Excel spreadsheets or flat files (CSV, TSV, CLF, and ELF)
Connect to on-premises databases like Teradata, SQL Server, MySQL, and PostgreSQL
Import data from SaaS applications like Salesforce and Snowflake
Use big data processing engines like Spark and Presto
SPICE is the Amazon QuickSight super-fast, parallel, in-memory calculation engine, designed specifically for ad hoc data visualization. SPICE stores your data in a system architected for high availability, where it is saved until you choose to delete it. Improve the performance of database datasets by importing the data into SPICE instead of using a direct database query. To calculate how much SPICE capacity your dataset needs, see Managing SPICE Capacity.
Typical Amazon QuickSight workflow
When you create an analysis, the typical workflow is as follows:
Connect to a data source, and then create a new dataset or choose an existing dataset.
(Optional) If you created a new dataset, prepare the data (for example, by changing field names or data types).
Create a new analysis.
Add a visual to the analysis by choosing the fields to visualize. Choose a specific visual type, or use AutoGraph and let Amazon QuickSight choose the most appropriate visual type, based on the number and data types of the fields that you select.
(Optional) Modify the visual to meet your requirements (for example, by adding a filter or changing the visual type).
(Optional) Add more visuals to the analysis.
(Optional) Add scenes to the default story to provide a narrative about some aspect of the analysis data.
(Optional) Publish the analysis as a dashboard to share insights with other users.
The following graphic illustrates a typical Amazon QuickSight workflow.
Visualizations created in Amazon QuickSight with sample datasets
Data catalog: The World Bank invests into multiple development projects at the national, regional, and global levels. It’s a great source of information for data analysts.
The following graph shows the percentage of the population that has access to electricity (rural and urban) during 2000 in Asia, Africa, the Middle East, and Latin America.
The following graph shows the share of healthcare costs that are paid out-of-pocket (private vs. public). Also, you can maneuver over the graph to get detailed statistics at a glance.
Data catalog: The DBG PDS project makes real-time data derived from Deutsche Börse’s trading market systems available to the public for free. This is the first time that such detailed financial market data has been shared freely and continually from the source provider.
The following graph shows the market trend of max trade volume for different EU banks. It builds on the data available on XETRA engines, which is made up of a variety of equities, funds, and derivative securities. This graph can be scrolled to visualize trade for a period of an hour or more.
The following graph shows the common stock beating the rest of the maximum trade volume over a period of time, grouped by security type.
Data catalog: Data derived from different sensor stations placed on the city bridges and surface streets are a core information source. The road weather information station has a temperature sensor that measures the temperature of the street surface. It also has a sensor that measures the ambient air temperature at the station each second.
The following graph shows the present max air temperature in Seattle from different RWI station sensors.
The following graph shows the minimum temperature of the road surface at different times, which helps predicts road conditions at a particular time of the year.
Data catalog: Kaggle has come up with a platform where people can donate open datasets. Data engineers and other community members can have open access to these datasets and can contribute to the open data movement. They have more than 350 datasets in total, with more than 200 as featured datasets. It has a few interesting datasets on the platform that are not present at other places, and it’s a platform to connect with other data enthusiasts.
The following graph shows the trending YouTube videos and presents the max likes for the top 20 channels. This is one of the most popular datasets for data engineers.
The following graph shows the YouTube daily statistics for the max views of video titles published during a specific time period.
Data catalog: NYC Open data hosts some very popular open data sets for all New Yorkers. This platform allows you to get involved in dive deep into the data set to pull some useful visualizations. 2016 Green taxi trip dataset includes trip records from all trips completed in green taxis in NYC in 2016. Records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.
The following graph presents maximum fare amount grouped by the passenger count during a period of time during a day. This can be further expanded to follow through different day of the month based on the business need.
The following graph shows the NewYork taxi data from January 2016, showing the dip in the number of taxis ridden on January 23, 2016 across all types of taxis.
A quick search for that date and location shows you the following news report:
Summary
Using Amazon QuickSight, you can see patterns across a time-series data by building visualizations, performing ad hoc analysis, and quickly generating insights. We hope you’ll give it a try today!
Karthik Odapally is a Sr. Solutions Architect in AWS. His passion is to build cost effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.
Pranabesh Mandal is a Solutions Architect in AWS. He has over a decade of IT experience. He is passionate about cloud technology and focuses on Analytics. In his spare time, he likes to hike and explore the beautiful nature and wild life of most divine national parks around the United States alongside his wife.
Amazon Redshift is a data warehouse service that logs the history of the system in STL log tables. The STL log tables manage disk space by retaining only two to five days of log history, depending on log usage and available disk space.
To retain STL tables’ data for an extended period, you usually have to create a replica table for every system table. Then, for each you load the data from the system table into the replica at regular intervals. By maintaining replica tables for STL tables, you can run diagnostic queries on historical data from the STL tables. You then can derive insights from query execution times, query plans, and disk-spill patterns, and make better cluster-sizing decisions. However, refreshing replica tables with live data from STL tables at regular intervals requires schedulers such as Cron or AWS Data Pipeline. Also, these tables are specific to one cluster and they are not accessible after the cluster is terminated. This is especially true for transient Amazon Redshift clusters that last for only a finite period of ad hoc query execution.
In this blog post, I present a solution that exports system tables from multiple Amazon Redshift clusters into an Amazon S3 bucket. This solution is serverless, and you can schedule it as frequently as every five minutes. The AWS CloudFormation deployment template that I provide automates the solution setup in your environment. The system tables’ data in the Amazon S3 bucket is partitioned by cluster name and query execution date to enable efficient joins in cross-cluster diagnostic queries.
I also provide another CloudFormation template later in this post. This second template helps to automate the creation of tables in the AWS Glue Data Catalog for the system tables’ data stored in Amazon S3. After the system tables are exported to Amazon S3, you can run cross-cluster diagnostic queries on the system tables’ data and derive insights about query executions in each Amazon Redshift cluster. You can do this using Amazon QuickSight, Amazon Athena, Amazon EMR, or Amazon Redshift Spectrum.
You can find all the code examples in this post, including the CloudFormation templates, AWS Glue extract, transform, and load (ETL) scripts, and the resolution steps for common errors you might encounter in this GitHub repository.
Solution overview
The solution in this post uses AWS Glue to export system tables’ log data from Amazon Redshift clusters into Amazon S3. The AWS Glue ETL jobs are invoked at a scheduled interval by AWS Lambda. AWS Systems Manager, which provides secure, hierarchical storage for configuration data management and secrets management, maintains the details of Amazon Redshift clusters for which the solution is enabled. The last-fetched time stamp values for the respective cluster-table combination are maintained in an Amazon DynamoDB table.
The following diagram covers the key steps involved in this solution.
The solution as illustrated in the preceding diagram flows like this:
The Lambda function, invoke_rs_stl_export_etl, is triggered at regular intervals, as controlled by Amazon CloudWatch. It’s triggered to look up the AWS Systems Manager parameter store to get the details of the Amazon Redshift clusters for which the system table export is enabled.
The same Lambda function, based on the Amazon Redshift cluster details obtained in step 1, invokes the AWS Glue ETL job designated for the Amazon Redshift cluster. If an ETL job for the cluster is not found, the Lambda function creates one.
The ETL job invoked for the Amazon Redshift cluster gets the cluster credentials from the parameter store. It gets from the DynamoDB table the last exported time stamp of when each of the system tables was exported from the respective Amazon Redshift cluster.
The ETL job unloads the system tables’ data from the Amazon Redshift cluster into an Amazon S3 bucket.
The ETL job updates the DynamoDB table with the last exported time stamp value for each system table exported from the Amazon Redshift cluster.
The Amazon Redshift cluster system tables’ data is available in Amazon S3 and is partitioned by cluster name and date for running cross-cluster diagnostic queries.
Understanding the configuration data
This solution uses AWS Systems Manager parameter store to store the Amazon Redshift cluster credentials securely. The parameter store also securely stores other configuration information that the AWS Glue ETL job needs for extracting and storing system tables’ data in Amazon S3. Systems Manager comes with a default AWS Key Management Service (AWS KMS) key that it uses to encrypt the password component of the Amazon Redshift cluster credentials.
The following table explains the global parameters and cluster-specific parameters required in this solution. The global parameters are defined once and applicable at the overall solution level. The cluster-specific parameters are specific to an Amazon Redshift cluster and repeat for each cluster for which you enable this post’s solution. The CloudFormation template explained later in this post creates these parameters as part of the deployment process.
Parameter name
Type
Description
Global parameters—defined once and applied to all jobs
redshift_query_logs.global.s3_prefix
String
The Amazon S3 path where the query logs are exported. Under this path, each exported table is partitioned by cluster name and date.
redshift_query_logs.global.tempdir
String
The Amazon S3 path that AWS Glue ETL jobs use for temporarily staging the data.
redshift_query_logs.global.role>
String
The name of the role that the AWS Glue ETL jobs assume. Just the role name is sufficient. The complete Amazon Resource Name (ARN) is not required.
redshift_query_logs.global.enabled_cluster_list
StringList
A comma-separated list of cluster names for which system tables’ data export is enabled. This gives flexibility for a user to exclude certain clusters.
Cluster-specific parameters—for each cluster specified in the enabled_cluster_list parameter
redshift_query_logs.<<cluster_name>>.connection
String
The name of the AWS Glue Data Catalog connection to the Amazon Redshift cluster. For example, if the cluster name is product_warehouse, the entry is redshift_query_logs.product_warehouse.connection.
redshift_query_logs.<<cluster_name>>.user
String
The user name that AWS Glue uses to connect to the Amazon Redshift cluster.
redshift_query_logs.<<cluster_name>>.password
Secure String
The password that AWS Glue uses to connect the Amazon Redshift cluster’s encrypted-by key that is managed in AWS KMS.
For example, suppose that you have two Amazon Redshift clusters, product-warehouse and category-management, for which the solution described in this post is enabled. In this case, the parameters shown in the following screenshot are created by the solution deployment CloudFormation template in the AWS Systems Manager parameter store.
Solution deployment
To make it easier for you to get started, I created a CloudFormation template that automatically configures and deploys the solution—only one step is required after deployment.
Prerequisites
To deploy the solution, you must have one or more Amazon Redshift clusters in a private subnet. This subnet must have a network address translation (NAT) gateway or a NAT instance configured, and also a security group with a self-referencing inbound rule for all TCP ports. For more information about why AWS Glue ETL needs the configuration it does, described previously, see Connecting to a JDBC Data Store in a VPC in the AWS Glue documentation.
To start the deployment, launch the CloudFormation template:
CloudFormation stack parameters
The following table lists and describes the parameters for deploying the solution to export query logs from multiple Amazon Redshift clusters.
Property
Default
Description
S3Bucket
mybucket
The bucket this solution uses to store the exported query logs, stage code artifacts, and perform unloads from Amazon Redshift. For example, the mybucket/extract_rs_logs/data bucket is used for storing all the exported query logs for each system table partitioned by the cluster. The mybucket/extract_rs_logs/temp/ bucket is used for temporarily staging the unloaded data from Amazon Redshift. The mybucket/extract_rs_logs/code bucket is used for storing all the code artifacts required for Lambda and the AWS Glue ETL jobs.
ExportEnabledRedshiftClusters
Requires Input
A comma-separated list of cluster names from which the system table logs need to be exported.
DataStoreSecurityGroups
Requires Input
A list of security groups with an inbound rule to the Amazon Redshift clusters provided in the parameter, ExportEnabledClusters. These security groups should also have a self-referencing inbound rule on all TCP ports, as explained on Connecting to a JDBC Data Store in a VPC.
After you launch the template and create the stack, you see that the following resources have been created:
AWS Glue connections for each Amazon Redshift cluster you provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
All parameters required for this solution created in the parameter store.
The Lambda function that invokes the AWS Glue ETL jobs for each configured Amazon Redshift cluster at a regular interval of five minutes.
The DynamoDB table that captures the last exported time stamps for each exported cluster-table combination.
The AWS Glue ETL jobs to export query logs from each Amazon Redshift cluster provided in the CloudFormation stack parameter, ExportEnabledRedshiftClusters.
The IAM roles and policies required for the Lambda function and AWS Glue ETL jobs.
After the deployment
For each Amazon Redshift cluster for which you enabled the solution through the CloudFormation stack parameter, ExportEnabledRedshiftClusters, the automated deployment includes temporary credentials that you must update after the deployment:
Note the parameters <<cluster_name>>.user and redshift_query_logs.<<cluster_name>>.password that correspond to each Amazon Redshift cluster for which you enabled this solution. Edit these parameters to replace the placeholder values with the right credentials.
For example, if product-warehouse is one of the clusters for which you enabled system table export, you edit these two parameters with the right user name and password and choose Save parameter.
Querying the exported system tables
Within a few minutes after the solution deployment, you should see Amazon Redshift query logs being exported to the Amazon S3 location, <<S3Bucket_you_provided>>/extract_redshift_query_logs/data/. In that bucket, you should see the eight system tables partitioned by customer name and date: stl_alert_event_log, stl_dlltext, stl_explain, stl_query, stl_querytext, stl_scan, stl_utilitytext, and stl_wlm_query.
To run cross-cluster diagnostic queries on the exported system tables, create external tables in the AWS Glue Data Catalog. To make it easier for you to get started, I provide a CloudFormation template that creates an AWS Glue crawler, which crawls the exported system tables stored in Amazon S3 and builds the external tables in the AWS Glue Data Catalog.
Launch this CloudFormation template to create external tables that correspond to the Amazon Redshift system tables. S3Bucket is the only input parameter required for this stack deployment. Provide the same Amazon S3 bucket name where the system tables’ data is being exported. After you successfully create the stack, you can see the eight tables in the database, redshift_query_logs_db, as shown in the following screenshot.
Now, navigate to the Athena console to run cross-cluster diagnostic queries. The following screenshot shows a diagnostic query executed in Athena that retrieves query alerts logged across multiple Amazon Redshift clusters.
You can build the following example Amazon QuickSight dashboard by running cross-cluster diagnostic queries on Athena to identify the hourly query count and the key query alert events across multiple Amazon Redshift clusters.
How to extend the solution
You can extend this post’s solution in two ways:
Add any new Amazon Redshift clusters that you spin up after you deploy the solution.
Add other system tables or custom query results to the list of exports from an Amazon Redshift cluster.
Extend the solution to other Amazon Redshift clusters
To extend the solution to more Amazon Redshift clusters, add the three cluster-specific parameters in the AWS Systems Manager parameter store following the guidelines earlier in this post. Modify the redshift_query_logs.global.enabled_cluster_list parameter to append the new cluster to the comma-separated string.
Extend the solution to add other tables or custom queries to an Amazon Redshift cluster
The current solution ships with the export functionality for the following Amazon Redshift system tables:
stl_alert_event_log
stl_dlltext
stl_explain
stl_query
stl_querytext
stl_scan
stl_utilitytext
stl_wlm_query
You can easily add another system table or custom query by adding a few lines of code to the AWS Glue ETL job, <<cluster-name>_extract_rs_query_logs. For example, suppose that from the product-warehouse Amazon Redshift cluster you want to export orders greater than $2,000. To do so, add the following five lines of code to the AWS Glue ETL job product-warehouse_extract_rs_query_logs, where product-warehouse is your cluster name:
Get the last-processed time-stamp value. The function creates a value if it doesn’t already exist.
returnDF=functions.runQuery(query="select * from sales s join order o where o.order_amnt > 2000 and sale_timestamp > '{}'".format (salesLastProcessTSValue) ,tableName="mydb.sales_2000",job_configs=job_configs)
In this post, I demonstrate a serverless solution to retain the system tables’ log data across multiple Amazon Redshift clusters. By using this solution, you can incrementally export the data from system tables into Amazon S3. By performing this export, you can build cross-cluster diagnostic queries, build audit dashboards, and derive insights into capacity planning by using services such as Athena. I also demonstrate how you can extend this solution to other ad hoc query use cases or tables other than system tables by adding a few lines of code.
Karthik Sonti is a senior big data architect at Amazon Web Services. He helps AWS customers build big data and analytical solutions and provides guidance on architecture and best practices.
Trying something new can often be a daunting task. Where do you start? What resources are available to help guide you through unfamiliar territory? Where can you go if you need additional help?
Whether you’ve just signed up for your first AWS account or you’ve been with us for some time, there’s always something new to learn as our services evolve to meet the ever-changing needs of our customers. To help ensure you’re set up for success as you build with AWS, we put together this quick reference guide for Big Data training and resources available here on the AWS site.
Here, you’ll find a round-up of all things Big Data, including comprehensive, step-by-step project guides to walk you through production-ready solutions, tutorials, labs and additional resources broken out by service.
This site is a fantastic resource, offering free and unlimited access to more than 100 digital training courses built by AWS experts. This link will take you directly to the Analytics category for all things Big Data on AWS, but you can always browse and search the site for other courses. Here are a few quick links for service-specific training:
If you prefer the interactive nature of a classroom environment, register for any of these courses (near you or online) to learn best practices, get live feedback, and receive answers to your questions in real-time from the class instructor.
Big Data on AWS (3 days) Introduces you to cloud-based big data solutions such as Amazon EMR, Amazon Redshift, Amazon Kinesis, and the rest of the AWS big data platform.
Data Warehousing on AWS (3 days) Introduces you to concepts, strategies, and best practices for designing a cloud-based data warehousing solution, and demonstrates how to collect, store, and prepare data for the data warehouse.
Building a Serverless Data Lake (1 day) Teaches you how to design, build, and operate a serverless data lake solution with AWS services. Includes topics such as ingesting data from any data source at large scale, storing the data securely and durably, using the right tool to process large volumes of data, and understanding the options available for analyzing the data in near-real time.
Apache Cassandra is a commonly used, high performance NoSQL database. AWS customers that currently maintain Cassandra on-premises may want to take advantage of the scalability, reliability, security, and economic benefits of running Cassandra on Amazon EC2.
Amazon EC2 and Amazon Elastic Block Store (Amazon EBS) provide secure, resizable compute capacity and storage in the AWS Cloud. When combined, you can deploy Cassandra, allowing you to scale capacity according to your requirements. Given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.
In this post, we outline three Cassandra deployment options, as well as provide guidance about determining the best practices for your use case in the following areas:
Cassandra resource overview
Deployment considerations
Storage options
Networking
High availability and resiliency
Maintenance
Security
Before we jump into best practices for running Cassandra on AWS, we should mention that we have many customers who decided to use DynamoDB instead of managing their own Cassandra cluster. DynamoDB is fully managed, serverless, and provides multi-master cross-region replication, encryption at rest, and managed backup and restore. Integration with AWS Identity and Access Management (IAM) enables DynamoDB customers to implement fine-grained access control for their data security needs.
Several customers who have been using large Cassandra clusters for many years have moved to DynamoDB to eliminate the complications of administering Cassandra clusters and maintaining high availability and durability themselves. Gumgum.com is one customer who migrated to DynamoDB and observed significant savings. For more information, see Moving to Amazon DynamoDB from Hosted Cassandra: A Leap Towards 60% Cost Saving per Year.
AWS provides options, so you’re covered whether you want to run your own NoSQL Cassandra database, or move to a fully managed, serverless DynamoDB database.
Cassandra resource overview
Here’s a short introduction to standard Cassandra resources and how they are implemented with AWS infrastructure. If you’re already familiar with Cassandra or AWS deployments, this can serve as a refresher.
Resource
Cassandra
AWS
Cluster
A single Cassandra deployment.
This typically consists of multiple physical locations, keyspaces, and physical servers.
A logical deployment construct in AWS that maps to an AWS CloudFormation StackSet, which consists of one or many CloudFormation stacks to deploy Cassandra.
Datacenter
A group of nodes configured as a single replication group.
A logical deployment construct in AWS.
A datacenter is deployed with a single CloudFormation stack consisting of Amazon EC2 instances, networking, storage, and security resources.
Rack
A collection of servers.
A datacenter consists of at least one rack. Cassandra tries to place the replicas on different racks.
A single Availability Zone.
Server/node
A physical virtual machine running Cassandra software.
An EC2 instance.
Token
Conceptually, the data managed by a cluster is represented as a ring. The ring is then divided into ranges equal to the number of nodes. Each node being responsible for one or more ranges of the data. Each node gets assigned with a token, which is essentially a random number from the range. The token value determines the node’s position in the ring and its range of data.
Managed within Cassandra.
Virtual node (vnode)
Responsible for storing a range of data. Each vnode receives one token in the ring. A cluster (by default) consists of 256 tokens, which are uniformly distributed across all servers in the Cassandra datacenter.
Managed within Cassandra.
Replication factor
The total number of replicas across the cluster.
Managed within Cassandra.
Deployment considerations
One of the many benefits of deploying Cassandra on Amazon EC2 is that you can automate many deployment tasks. In addition, AWS includes services, such as CloudFormation, that allow you to describe and provision all your infrastructure resources in your cloud environment.
We recommend orchestrating each Cassandra ring with one CloudFormation template. If you are deploying in multiple AWS Regions, you can use a CloudFormation StackSet to manage those stacks. All the maintenance actions (scaling, upgrading, and backing up) should be scripted with an AWS SDK. These may live as standalone AWS Lambda functions that can be invoked on demand during maintenance.
You can get started by following the Cassandra Quick Start deployment guide. Keep in mind that this guide does not address the requirements to operate a production deployment and should be used only for learning more about Cassandra.
Deployment patterns
In this section, we discuss various deployment options available for Cassandra in Amazon EC2. A successful deployment starts with thoughtful consideration of these options. Consider the amount of data, network environment, throughput, and availability.
Single AWS Region, 3 Availability Zones
Active-active, multi-Region
Active-standby, multi-Region
Single region, 3 Availability Zones
In this pattern, you deploy the Cassandra cluster in one AWS Region and three Availability Zones. There is only one ring in the cluster. By using EC2 instances in three zones, you ensure that the replicas are distributed uniformly in all zones.
To ensure the even distribution of data across all Availability Zones, we recommend that you distribute the EC2 instances evenly in all three Availability Zones. The number of EC2 instances in the cluster is a multiple of three (the replication factor).
This pattern is suitable in situations where the application is deployed in one Region or where deployments in different Regions should be constrained to the same Region because of data privacy or other legal requirements.
Pros
Cons
● Highly available, can sustain failure of one Availability Zone.
● Simple deployment
● Does not protect in a situation when many of the resources in a Region are experiencing intermittent failure.
Active-active, multi-Region
In this pattern, you deploy two rings in two different Regions and link them. The VPCs in the two Regions are peered so that data can be replicated between two rings.
We recommend that the two rings in the two Regions be identical in nature, having the same number of nodes, instance types, and storage configuration.
This pattern is most suitable when the applications using the Cassandra cluster are deployed in more than one Region.
Pros
Cons
● No data loss during failover.
● Highly available, can sustain when many of the resources in a Region are experiencing intermittent failures.
● Read/write traffic can be localized to the closest Region for the user for lower latency and higher performance.
● High operational overhead
● The second Region effectively doubles the cost
Active-standby, multi-region
In this pattern, you deploy two rings in two different Regions and link them. The VPCs in the two Regions are peered so that data can be replicated between two rings.
However, the second Region does not receive traffic from the applications. It only functions as a secondary location for disaster recovery reasons. If the primary Region is not available, the second Region receives traffic.
We recommend that the two rings in the two Regions be identical in nature, having the same number of nodes, instance types, and storage configuration.
This pattern is most suitable when the applications using the Cassandra cluster require low recovery point objective (RPO) and recovery time objective (RTO).
Pros
Cons
● No data loss during failover.
● Highly available, can sustain failure or partitioning of one whole Region.
● High operational overhead.
● High latency for writes for eventual consistency.
● The second Region effectively doubles the cost.
Storage options
In on-premises deployments, Cassandra deployments use local disks to store data. There are two storage options for EC2 instances:
Your choice of storage is closely related to the type of workload supported by the Cassandra cluster. Instance store works best for most general purpose Cassandra deployments. However, in certain read-heavy clusters, Amazon EBS is a better choice.
The choice of instance type is generally driven by the type of storage:
If ephemeral storage is required for your application, a storage-optimized (I3) instance is the best option.
If your workload requires Amazon EBS, it is best to go with compute-optimized (C5) instances.
Burstable instance types (T2) don’t offer good performance for Cassandra deployments.
Instance store
Ephemeral storage is local to the EC2 instance. It may provide high input/output operations per second (IOPs) based on the instance type. An SSD-based instance store can support up to 3.3M IOPS in I3 instances. This high performance makes it an ideal choice for transactional or write-intensive applications such as Cassandra.
In general, instance storage is recommended for transactional, large, and medium-size Cassandra clusters. For a large cluster, read/write traffic is distributed across a higher number of nodes, so the loss of one node has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important.
As an example, for a cluster with 100 nodes, the loss of 1 node is 3.33% loss (with a replication factor of 3). Similarly, for a cluster with 10 nodes, the loss of 1 node is 33% less capacity (with a replication factor of 3).
Ephemeral storage
Amazon EBS
Comments
IOPS
(translates to higher query performance)
Up to 3.3M on I3
80K/instance
10K/gp2/volume
32K/io1/volume
This results in a higher query performance on each host. However, Cassandra implicitly scales well in terms of horizontal scale. In general, we recommend scaling horizontally first. Then, scale vertically to mitigate specific issues.
Note: 3.3M IOPS is observed with 100% random read with a 4-KB block size on Amazon Linux.
AWS instance types
I3
Compute optimized, C5
Being able to choose between different instance types is an advantage in terms of CPU, memory, etc., for horizontal and vertical scaling.
Backup/ recovery
Custom
Basic building blocks are available from AWS.
Amazon EBS offers distinct advantage here. It is small engineering effort to establish a backup/restore strategy.
a) In case of an instance failure, the EBS volumes from the failing instance are attached to a new instance.
b) In case of an EBS volume failure, the data is restored by creating a new EBS volume from last snapshot.
Amazon EBS
EBS volumes offer higher resiliency, and IOPs can be configured based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. EBS volumes can support up to 32K IOPS per volume and up to 80K IOPS per instance in RAID configuration. They have an annualized failure rate (AFR) of 0.1–0.2%, which makes EBS volumes 20 times more reliable than typical commodity disk drives.
The primary advantage of using Amazon EBS in a Cassandra deployment is that it reduces data-transfer traffic significantly when a node fails or must be replaced. The replacement node joins the cluster much faster. However, Amazon EBS could be more expensive, depending on your data storage needs.
Cassandra has built-in fault tolerance by replicating data to partitions across a configurable number of nodes. It can not only withstand node failures but if a node fails, it can also recover by copying data from other replicas into a new node. Depending on your application, this could mean copying tens of gigabytes of data. This adds additional delay to the recovery process, increases network traffic, and could possibly impact the performance of the Cassandra cluster during recovery.
Data stored on Amazon EBS is persisted in case of an instance failure or termination. The node’s data stored on an EBS volume remains intact and the EBS volume can be mounted to a new EC2 instance. Most of the replicated data for the replacement node is already available in the EBS volume and won’t need to be copied over the network from another node. Only the changes made after the original node failed need to be transferred across the network. That makes this process much faster.
EBS volumes are snapshotted periodically. So, if a volume fails, a new volume can be created from the last known good snapshot and be attached to a new instance. This is faster than creating a new volume and coping all the data to it.
Most Cassandra deployments use a replication factor of three. However, Amazon EBS does its own replication under the covers for fault tolerance. In practice, EBS volumes are about 20 times more reliable than typical disk drives. So, it is possible to go with a replication factor of two. This not only saves cost, but also enables deployments in a region that has two Availability Zones.
EBS volumes are recommended in case of read-heavy, small clusters (fewer nodes) that require storage of a large amount of data. Keep in mind that the Amazon EBS provisioned IOPS could get expensive. General purpose EBS volumes work best when sized for required performance.
Networking
If your cluster is expected to receive high read/write traffic, select an instance type that offers 10–Gb/s performance. As an example, i3.8xlarge and c5.9xlarge both offer 10–Gb/s networking performance. A smaller instance type in the same family leads to a relatively lower networking throughput.
Cassandra generates a universal unique identifier (UUID) for each node based on IP address for the instance. This UUID is used for distributing vnodes on the ring.
In the case of an AWS deployment, IP addresses are assigned automatically to the instance when an EC2 instance is created. With the new IP address, the data distribution changes and the whole ring has to be rebalanced. This is not desirable.
To preserve the assigned IP address, use a secondary elastic network interface with a fixed IP address. Before swapping an EC2 instance with a new one, detach the secondary network interface from the old instance and attach it to the new one. This way, the UUID remains same and there is no change in the way that data is distributed in the cluster.
If you are deploying in more than one region, you can connect the two VPCs in two regions using cross-region VPC peering.
High availability and resiliency
Cassandra is designed to be fault-tolerant and highly available during multiple node failures. In the patterns described earlier in this post, you deploy Cassandra to three Availability Zones with a replication factor of three. Even though it limits the AWS Region choices to the Regions with three or more Availability Zones, it offers protection for the cases of one-zone failure and network partitioning within a single Region. The multi-Region deployments described earlier in this post protect when many of the resources in a Region are experiencing intermittent failure.
Resiliency is ensured through infrastructure automation. The deployment patterns all require a quick replacement of the failing nodes. In the case of a regionwide failure, when you deploy with the multi-Region option, traffic can be directed to the other active Region while the infrastructure is recovering in the failing Region. In the case of unforeseen data corruption, the standby cluster can be restored with point-in-time backups stored in Amazon S3.
Maintenance
In this section, we look at ways to ensure that your Cassandra cluster is healthy:
Scaling
Upgrades
Backup and restore
Scaling
Cassandra is horizontally scaled by adding more instances to the ring. We recommend doubling the number of nodes in a cluster to scale up in one scale operation. This leaves the data homogeneously distributed across Availability Zones. Similarly, when scaling down, it’s best to halve the number of instances to keep the data homogeneously distributed.
Cassandra is vertically scaled by increasing the compute power of each node. Larger instance types have proportionally bigger memory. Use deployment automation to swap instances for bigger instances without downtime or data loss.
Upgrades
All three types of upgrades (Cassandra, operating system patching, and instance type changes) follow the same rolling upgrade pattern.
In this process, you start with a new EC2 instance and install software and patches on it. Thereafter, remove one node from the ring. For more information, see Cassandra cluster Rolling upgrade. Then, you detach the secondary network interface from one of the EC2 instances in the ring and attach it to the new EC2 instance. Restart the Cassandra service and wait for it to sync. Repeat this process for all nodes in the cluster.
Backup and restore
Your backup and restore strategy is dependent on the type of storage used in the deployment. Cassandra supports snapshots and incremental backups. When using instance store, a file-based backup tool works best. Customers use rsync or other third-party products to copy data backups from the instance to long-term storage. For more information, see Backing up and restoring data in the DataStax documentation. This process has to be repeated for all instances in the cluster for a complete backup. These backup files are copied back to new instances to restore. We recommend using S3 to durably store backup files for long-term storage.
For Amazon EBS based deployments, you can enable automated snapshots of EBS volumes to back up volumes. New EBS volumes can be easily created from these snapshots for restoration.
Security
We recommend that you think about security in all aspects of deployment. The first step is to ensure that the data is encrypted at rest and in transit. The second step is to restrict access to unauthorized users. For more information about security, see the Cassandra documentation.
Encryption at rest
Encryption at rest can be achieved by using EBS volumes with encryption enabled. Amazon EBS uses AWS KMS for encryption. For more information, see Amazon EBS Encryption.
Instance store–based deployments require using an encrypted file system or an AWS partner solution. If you are using DataStax Enterprise, it supports transparent data encryption.
Encryption in transit
Cassandra uses Transport Layer Security (TLS) for client and internode communications.
Authentication
The security mechanism is pluggable, which means that you can easily swap out one authentication method for another. You can also provide your own method of authenticating to Cassandra, such as a Kerberos ticket, or if you want to store passwords in a different location, such as an LDAP directory.
Authorization
The authorizer that’s plugged in by default is org.apache.cassandra.auth.Allow AllAuthorizer. Cassandra also provides a role-based access control (RBAC) capability, which allows you to create roles and assign permissions to these roles.
Conclusion
In this post, we discussed several patterns for running Cassandra in the AWS Cloud. This post describes how you can manage Cassandra databases running on Amazon EC2. AWS also provides managed offerings for a number of databases. To learn more, see Purpose-built databases for all your application needs.
If you have questions or suggestions, please comment below.
Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.
Provanshu Dey is a Senior IoT Consultant with AWS Professional Services. He works on highly scalable and reliable IoT, data and machine learning solutions with our customers. In his spare time, he enjoys spending time with his family and tinkering with electronics & gadgets.
This is a customer post by Ajay Rathod, a Staff Data Engineer at Realtor.com.
Realtor.com, in their own words: Realtor.com®, operated by Move, Inc., is a trusted resource for home buyers, sellers, and dreamers. It offers the most comprehensive database of for-sale properties, among competing national sites, and the information, tools, and professional expertise to help people move confidently through every step of their home journey.
Move, Inc. processes hundreds of terabytes of data partitioned by day and hour. Various teams run hundreds of queries on this data. Using AWS services, Move, Inc. has built an infrastructure for gathering and analyzing data:
To increase the effectiveness of the storage and subsequent querying, the data is converted into a Parquet format, and stored again in S3.
Amazon Athena is used as the SQL (Structured Query Language) engine to query the data in S3. Athena is easy to use and is often quickly adopted by various teams.
Teams visualize query results in Amazon QuickSight. Amazon QuickSight is a business analytics service that allows you to quickly and easily visualize data and collaborate with other users in your account.
This architecture is known as the data platform and is shared by the data science, data engineering, and the data operations teams within the organization. Move, Inc. also enables other cross-functional teams to use Athena. When many users use Athena, it helps to monitor its usage to ensure cost-effectiveness. This leads to a strong need for Athena metrics that can give details about the following:
Users
Amount of data scanned (to monitor the cost of AWS service usage)
The databases used for queries
Actual queries that teams run
Currently, the Move, Inc. team does not have an easy way of obtaining all these metrics from a single tool. Having a way to do this would greatly simplify monitoring efforts. For example, the data operations team wants to collect several metrics every day obtained from queries run on Athena for their data. They require the following metrics:
Amount of data scanned by each user
Number of queries by each user
Databases accessed by each user
In this post, I discuss how to build a solution for monitoring Athena usage. To build this solution, you rely on AWS CloudTrail. CloudTrail is a web service that records AWS API calls for your AWS account and delivers log files to an S3 bucket.
Solution
Here is the high-level overview:
Use the CloudTrail API to audit the user queries, and then use Athena to create a table from the CloudTrail logs.
Query the Athena API with the AWS CLI to gather metrics about the data scanned by the user queries and put this information into another table in Athena.
Combine the information from these two sources by joining the two tables.
Use the resulting data to analyze, build insights, and create a dashboard that shows the usage of Athena by users within different teams in the organization.
The architecture of this solution is shown in the following diagram.
Take a look at this solution step by step.
IAM and permissions setup
This solution uses CloudTrail, Athena, and S3. Make sure that the users who run the following scripts and steps have the appropriate IAM roles and policies. For more information, see Tutorial: Delegate Access Across AWS Accounts Using IAM Roles.
Step 1: Create a table in Athena for data in CloudTrail
The CloudTrail API records all Athena queries run by different teams within the organization. These logs are saved in S3. The fields of most interest are:
User identity
Start time of the API call
Source IP address
Request parameters
Response elements returned by the service
When end users make queries in Athena, these queries are recorded by CloudTrail as responses from Athena web service calls. In these responses, each query is represented as a JSON (JavaScript Object Notation) string.
You can use the following CREATE TABLE statement to create the cloudtrail_logs table in Athena. For more information, see Querying CloudTrail Logs in the Athena documentation.
Step 2: Create a table in Amazon Athena for data from API output
Athena provides an API that can be queried to obtain information of a specific query ID. It also provides an API to obtain information of a batch of query IDs, with a batch size of up to 50 query IDs.
You can use this API call to obtain information about the Athena queries that you are interested in and store this information in an S3 location. Create an Athena table to represent this data in S3. For the purpose of this post, the response fields that are of interest are as follows:
QueryExecutionId
Database
EngineExecutionTimeInMillis
DataScannedInBytes
Status
SubmissionDateTime
CompletionDateTime
The CREATE TABLE statement for athena_api_output, is as follows:
CREATE EXTERNAL TABLE IF NOT EXISTS athena_api_output(
queryid string,
querydatabase string,
executiontime bigint,
datascanned bigint,
status string,
submissiondatetime string,
completiondatetime string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = ',',
'field.delim' = ','
) LOCATION 's3://<s3 location of the output from the API calls>'
TBLPROPERTIES ('has_encrypted_data'='false')
You can inspect the query IDs and user information for the last day. The query is as follows:
with data AS (
SELECT
json_extract(responseelements,
'$.queryExecutionId') AS query_id,
(useridentity.arn) AS uid,
(useridentity.sessioncontext.sessionIssuer.userName) AS role,
from_iso8601_timestamp(eventtime) AS dt
FROM cloudtrail_logs
WHERE eventsource='athena.amazonaws.com'
AND eventname='StartQueryExecution'
AND json_extract(responseelements, '$.queryExecutionId') is NOT null)
SELECT *
FROM data
WHERE dt > date_add('day',-1,now() )
Step 3: Obtain Query Statistics from Athena API
You can write a simple Python script to loop through queries in batches of 50 and query the Athena API for query statistics. You can use the Boto library for these lookups. Boto is a library that provides you with an easy way to interact with and automate your AWS development. The response from the Boto API can be parsed to extract the fields that you need as described in Step 2.
An example Python script is available in the AthenaMetrics GitHub repo.
Format these fields, for each query ID, as CSV strings and store them for the entire batch response in an S3 bucket. This S3 bucket is represented by the table created in Step 2, cloudtrail_logs.
In your Python code, create a variable named sql_query, and assign it a string representing the SQL query defined in Step 2. The s3_query_folder is the location in S3 that is used by Athena for storing results of the query. The code is as follows:
sql_query =
“””
with data AS (
SELECT
json_extract(responseelements,
'$.queryExecutionId') AS query_id,
(useridentity.arn) AS uid,
(useridentity.sessioncontext.sessionIssuer.userName) AS role,
from_iso8601_timestamp(eventtime) AS dt
FROM cloudtrail_logs
WHERE eventsource='athena.amazonaws.com'
AND eventname='StartQueryExecution'
AND json_extract(responseelements, '$.queryExecutionId') is NOT null)
SELECT *
FROM data
WHERE dt > date_add('day',-1,now() )
“””
athena_client = boto3.client('athena')
query_execution = self.client.start_query_execution(
QueryString=sql_query,
ClientRequestToken=str(uuid.uuid4()),
ResultConfiguration={
'OutputLocation': s3_staging_folder,
}
)
query_execution_id = query_execution['QueryExecutionId']
### Allow query to complete, check for status response["QueryExecution"]["Status"]["State"]
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
if response[“QueryExecution”][“Status”][“State”] == “SUCCEEDED”:
results = athena_client.get_query_results(QueryEecutionId=query_exection_id)
You can iterate through the results in the response object and consolidate them in batches of 50 results. For each batch, you can invoke the Athena API, batch-get-query-execution.
Store the output in the S3 location pointed to by the CREATE TABLE definition for the table athena_api_output, in Step 2. The SQL statement above returns only queries run in the last 24 hours. You may want to increase that to get usage over a longer period of time. The code snippet for this API call is as follows:
The batchqueryids value is an array of 50 query IDs extracted from the result set of the SELECT query. This script creates the data needed by your second table, athena_api_output, and you are now ready to join both tables in Athena.
Step 4: Join the CloudTrail and Athena API data
Now that the two tables are available with the data that you need, you can run the following Athena query to look at the usage by user. You can limit the output of this query to the most recent five days.
SELECT
c.useridentity.arn,
json_extract(c.responseelements, '$.queryExecutionId') qid,
a.datascanned,
a.querydatabase,
a.executiontime,
a.submissiondatetime,
a.completiondatetime,
a.status
FROM cloudtrail_logs c
JOIN athena_api_output a
ON cast(json_extract(c.responseelements, '$.queryExecutionId') as varchar) = a.queryid
WHERE eventsource = 'athena.amazonaws.com'
AND eventname = 'StartQueryExecution'
AND from_iso8601_timestamp(eventtime) > date_add('day',-5 ,now() )
Step 5: Analyze and visualize the results
In this step, using QuickSight, you can create a dashboard that shows the following metrics:
Average amount of data scanned (MB) by a user and database
Using the solution described in this post, you can continuously monitor the usage of Athena by various teams. Taking this a step further, you can automate and set user limits for how much data the Athena users in your team can query within a given period of time. You may also choose to add notifications when the usage by a particular user crosses a specified threshold. This helps you manage costs incurred by different teams in your organization.
Realtor.com would like to acknowledge the tremendous support and guidance provided by Hemant Borole, Senior Consultant, Big Data & Analytics with AWS Professional Services in helping to author this post.
Ajay Rathod is Staff Data Engineer at Realtor.com. With a deep background in AWS Cloud Platform and Data Infrastructure, Ajay leads the Data Engineering and automation aspect of Data Operations at Realtor.com. He has designed and deployed many ETL pipelines and workflows for the Realtor Data Analytics Platform using AWS services like Data Pipeline, Athena, Batch, Glue and Boto3. He has created various operational metrics to monitor ETL Pipelines and Resource Usage.
This is a customer post by Stephen Borg, the Head of Big Data and BI at Cerberus Technologies.
Cerberus Technologies, in their own words: Cerberus is a company founded in 2017 by a team of visionary iGaming veterans. Our mission is simple – to offer the best tech solutions through a data-driven and a customer-first approach, delivering innovative solutions that go against traditional forms of working and process. This mission is based on the solid foundations of reliability, flexibility and security, and we intend to fundamentally change the way iGaming and other industries interact with technology.
Over the years, I have developed and created a number of data warehouses from scratch. Recently, I built a data warehouse for the iGaming industry single-handedly. To do it, I used the power and flexibility of Amazon Redshift and the wider AWS data management ecosystem. In this post, I explain how I was able to build a robust and scalable data warehouse without the large team of experts typically needed.
In two of my recent projects, I ran into challenges when scaling our data warehouse using on-premises infrastructure. Data was growing at many tens of gigabytes per day, and query performance was suffering. Scaling required major capital investment for hardware and software licenses, and also significant operational costs for maintenance and technical staff to keep it running and performing well. Unfortunately, I couldn’t get the resources needed to scale the infrastructure with data growth, and these projects were abandoned. Thanks to cloud data warehousing, the bottleneck of infrastructure resources, capital expense, and operational costs have been significantly reduced or have totally gone away. There is no more excuse for allowing obstacles of the past to delay delivering timely insights to decision makers, no matter how much data you have.
With Amazon Redshift and AWS, I delivered a cloud data warehouse to the business very quickly, and with a small team: me. I didn’t have to order hardware or software, and I no longer needed to install, configure, tune, or keep up with patches and version updates. Instead, I easily set up a robust data processing pipeline and we were quickly ingesting and analyzing data. Now, my data warehouse team can be extremely lean, and focus more time on bringing in new data and delivering insights. In this post, I show you the AWS services and the architecture that I used.
Handling data feeds
I have several different data sources that provide everything needed to run the business. The data includes activity from our iGaming platform, social media posts, clickstream data, marketing and campaign performance, and customer support engagements.
To handle the diversity of data feeds, I developed abstract integration applications using Docker that run on Amazon EC2 Container Service (Amazon ECS) and feed data to Amazon Kinesis Data Streams. These data streams can be used for real time analytics. In my system, each record in Kinesis is preprocessed by an AWS Lambda function to cleanse and aggregate information. My system then routes it to be stored where I need on Amazon S3 by Amazon Kinesis Data Firehose. Suppose that you used an on-premises architecture to accomplish the same task. A team of data engineers would be required to maintain and monitor a Kafka cluster, develop applications to stream data, and maintain a Hadoop cluster and the infrastructure underneath it for data storage. With my stream processing architecture, there are no servers to manage, no disk drives to replace, and no service monitoring to write.
Setting up a Kinesis stream can be done with a few clicks, and the same for Kinesis Firehose. Firehose can be configured to automatically consume data from a Kinesis Data Stream, and then write compressed data every N minutes to Amazon S3. When I want to process a Kinesis data stream, it’s very easy to set up a Lambda function to be executed on each message received. I can just set a trigger from the AWS Lambda Management Console, as shown following.
Regardless of the format I receive the data from our partners, I can send it to Kinesis as JSON data using my own formatters. After Firehose writes this to Amazon S3, I have everything in nearly the same structure I received but compressed, encrypted, and optimized for reading.
This data is automatically crawled by AWS Glue and placed into the AWS Glue Data Catalog. This means that I can immediately query the data directly on S3 using Amazon Athena or through Amazon Redshift Spectrum. Previously, I used Amazon EMR and an Amazon RDS–based metastore in Apache Hive for catalog management. Now I can avoid the complexity of maintaining Hive Metastore catalogs. Glue takes care of high availability and the operations side so that I know that end users can always be productive.
Working with Amazon Athena and Amazon Redshift for analysis
I found Amazon Athena extremely useful out of the box for ad hoc analysis. Our engineers (me) use Athena to understand new datasets that we receive and to understand what transformations will be needed for long-term query efficiency.
For our data analysts and data scientists, we’ve selected Amazon Redshift. Amazon Redshift has proven to be the right tool for us over and over again. It easily processes 20+ million transactions per day, regardless of the footprint of the tables and the type of analytics required by the business. Latency is low and query performance expectations have been more than met. We use Redshift Spectrum for long-term data retention, which enables me to extend the analytic power of Amazon Redshift beyond local data to anything stored in S3, and without requiring me to load any data. Redshift Spectrum gives me the freedom to store data where I want, in the format I want, and have it available for processing when I need it.
To load data directly into Amazon Redshift, I use AWS Data Pipeline to orchestrate data workflows. I create Amazon EMR clusters on an intra-day basis, which I can easily adjust to run more or less frequently as needed throughout the day. EMR clusters are used together with Amazon RDS, Apache Spark 2.0, and S3 storage. The data pipeline application loads ETL configurations from Spring RESTful services hosted on AWS Elastic Beanstalk. The application then loads data from S3 into memory, aggregates and cleans the data, and then writes the final version of the data to Amazon Redshift. This data is then ready to use for analysis. Spark on EMR also helps with recommendations and personalization use cases for various business users, and I find this easy to set up and deliver what users want. Finally, business users use Amazon QuickSight for self-service BI to slice, dice, and visualize the data depending on their requirements.
Each AWS service in this architecture plays its part in saving precious time that’s crucial for delivery and getting different departments in the business on board. I found the services easy to set up and use, and all have proven to be highly reliable for our use as our production environments. When the architecture was in place, scaling out was either completely handled by the service, or a matter of a simple API call, and crucially doesn’t require me to change one line of code. Increasing shards for Kinesis can be done in a minute by editing a stream. Increasing capacity for Lambda functions can be accomplished by editing the megabytes allocated for processing, and concurrency is handled automatically. EMR cluster capacity can easily be increased by changing the master and slave node types in Data Pipeline, or by using Auto Scaling. Lastly, RDS and Amazon Redshift can be easily upgraded without any major tasks to be performed by our team (again, me).
In the end, using AWS services including Kinesis, Lambda, Data Pipeline, and Amazon Redshift allows me to keep my team lean and highly productive. I eliminated the cost and delays of capital infrastructure, as well as the late night and weekend calls for support. I can now give maximum value to the business while keeping operational costs down. My team pushed out an agile and highly responsive data warehouse solution in record time and we can handle changing business requirements rapidly, and quickly adapt to new data and new user requests.
Stephen Borg is the Head of Big Data and BI at Cerberus Technologies. He has a background in platform software engineering, and first became involved in data warehousing using the typical RDBMS, SQL, ETL, and BI tools. He quickly became passionate about providing insight to help others optimize the business and add personalization to products. He is now the Head of Big Data and BI at Cerberus Technologies.
AWS has added 16 more AWS services to its Payment Card Industry Data Security Standard (PCI DSS) compliance program, giving you more options, flexibility, and functionality to process and store sensitive payment card data in the AWS Cloud. The services were audited by Coalfire to ensure that they meet strict PCI DSS standards.
AWS now offers 58 services that are officially PCI DSS compliant, giving administrators more service options for implementing a PCI-compliant cardholder environment.
With Amazon Redshift, you can build petabyte-scale data warehouses that unify data from a variety of internal and external sources. Because Amazon Redshift is optimized for complex queries (often involving multiple joins) across large tables, it can handle large volumes of retail, inventory, and financial data without breaking a sweat.
In this post, we describe how to combine data in Aurora in Amazon Redshift. Here’s an overview of the solution:
Use AWS Lambda functions with Amazon Aurora to capture data changes in a table.
Serverless architecture for capturing and analyzing Aurora data changes
Consider a scenario in which an e-commerce web application uses Amazon Aurora for a transactional database layer. The company has a sales table that captures every single sale, along with a few corresponding data items. This information is stored as immutable data in a table. Business users want to monitor the sales data and then analyze and visualize it.
In this example, you take the changes in data in an Aurora database table and save it in Amazon S3. After the data is captured in Amazon S3, you combine it with data in your existing Amazon Redshift cluster for analysis.
By the end of this post, you will understand how to capture data events in an Aurora table and push them out to other AWS services using AWS Lambda.
The following diagram shows the flow of data as it occurs in this tutorial:
The starting point in this architecture is a database insert operation in Amazon Aurora. When the insert statement is executed, a custom trigger calls a Lambda function and forwards the inserted data. Lambda writes the data that it received from Amazon Aurora to a Kinesis data delivery stream. Kinesis Data Firehose writes the data to an Amazon S3 bucket. Once the data is in an Amazon S3 bucket, it is queried in place using Amazon Redshift Spectrum.
Creating an Aurora database
First, create a database by following these steps in the Amazon RDS console:
Sign in to the AWS Management Console, and open the Amazon RDS console.
Choose Launch a DB instance, and choose Next.
For Engine, choose Amazon Aurora.
Choose a DB instance class. This example uses a small, since this is not a production database.
In Multi-AZ deployment, choose No.
Configure DB instance identifier, Master username, and Master password.
Launch the DB instance.
After you create the database, use MySQL Workbench to connect to the database using the CNAME from the console. For information about connecting to an Aurora database, see Connecting to an Amazon Aurora DB Cluster.
The following screenshot shows the MySQL Workbench configuration:
Next, create a table in the database by running the following SQL statement:
Create Table
CREATE TABLE Sales (
InvoiceID int NOT NULL AUTO_INCREMENT,
ItemID int NOT NULL,
Category varchar(255),
Price double(10,2),
Quantity int not NULL,
OrderDate timestamp,
DestinationState varchar(2),
ShippingType varchar(255),
Referral varchar(255),
PRIMARY KEY (InvoiceID)
)
You can now populate the table with some sample data. To generate sample data in your table, copy and run the following script. Ensure that the highlighted (bold) variables are replaced with appropriate values.
The following screenshot shows how the table appears with the sample data:
Sending data from Amazon Aurora to Amazon S3
There are two methods available to send data from Amazon Aurora to Amazon S3:
Using a Lambda function
Using SELECT INTO OUTFILE S3
To demonstrate the ease of setting up integration between multiple AWS services, we use a Lambda function to send data to Amazon S3 using Amazon Kinesis Data Firehose.
Alternatively, you can use a SELECT INTO OUTFILE S3 statement to query data from an Amazon Aurora DB cluster and save it directly in text files that are stored in an Amazon S3 bucket. However, with this method, there is a delay between the time that the database transaction occurs and the time that the data is exported to Amazon S3 because the default file size threshold is 6 GB.
Creating a Kinesis data delivery stream
The next step is to create a Kinesis data delivery stream, since it’s a dependency of the Lambda function.
To create a delivery stream:
Open the Kinesis Data Firehose console
Choose Create delivery stream.
For Delivery stream name, type AuroraChangesToS3.
For Source, choose Direct PUT.
For Record transformation, choose Disabled.
For Destination, choose Amazon S3.
In the S3 bucket drop-down list, choose an existing bucket, or create a new one.
Enter a prefix if needed, and choose Next.
For Data compression, choose GZIP.
In IAM role, choose either an existing role that has access to write to Amazon S3, or choose to generate one automatically. Choose Next.
Review all the details on the screen, and choose Create delivery stream when you’re finished.
Creating a Lambda function
Now you can create a Lambda function that is called every time there is a change that needs to be tracked in the database table. This Lambda function passes the data to the Kinesis data delivery stream that you created earlier.
To create the Lambda function:
Open the AWS Lambda console.
Ensure that you are in the AWS Region where your Amazon Aurora database is located.
If you have no Lambda functions yet, choose Get started now. Otherwise, choose Create function.
Choose Author from scratch.
Give your function a name and select Python 3.6 for Runtime
Choose and existing or create a new Role, the role would need to have access to call firehose:PutRecord
Choose Next on the trigger selection screen.
Paste the following code in the code window. Change the stream_name variable to the Kinesis data delivery stream that you created in the previous step.
Choose File -> Save in the code editor and then choose Save.
Once you are finished, the Amazon Aurora database has access to invoke a Lambda function.
Creating a stored procedure and a trigger in Amazon Aurora
Now, go back to MySQL Workbench, and run the following command to create a new stored procedure. When this stored procedure is called, it invokes the Lambda function you created. Change the ARN in the following code to your Lambda function’s ARN.
DROP PROCEDURE IF EXISTS CDC_TO_FIREHOSE;
DELIMITER ;;
CREATE PROCEDURE CDC_TO_FIREHOSE (IN ItemID VARCHAR(255),
IN Category varchar(255),
IN Price double(10,2),
IN Quantity int(11),
IN OrderDate timestamp,
IN DestinationState varchar(2),
IN ShippingType varchar(255),
IN Referral varchar(255)) LANGUAGE SQL
BEGIN
CALL mysql.lambda_async('arn:aws:lambda:us-east-1:XXXXXXXXXXXXX:function:CDCFromAuroraToKinesis',
CONCAT('{ "ItemID" : "', ItemID,
'", "Category" : "', Category,
'", "Price" : "', Price,
'", "Quantity" : "', Quantity,
'", "OrderDate" : "', OrderDate,
'", "DestinationState" : "', DestinationState,
'", "ShippingType" : "', ShippingType,
'", "Referral" : "', Referral, '"}')
);
END
;;
DELIMITER ;
Create a trigger TR_Sales_CDC on the Sales table. When a new record is inserted, this trigger calls the CDC_TO_FIREHOSE stored procedure.
DROP TRIGGER IF EXISTS TR_Sales_CDC;
DELIMITER ;;
CREATE TRIGGER TR_Sales_CDC
AFTER INSERT ON Sales
FOR EACH ROW
BEGIN
SELECT NEW.ItemID , NEW.Category, New.Price, New.Quantity, New.OrderDate
, New.DestinationState, New.ShippingType, New.Referral
INTO @ItemID , @Category, @Price, @Quantity, @OrderDate
, @DestinationState, @ShippingType, @Referral;
CALL CDC_TO_FIREHOSE(@ItemID , @Category, @Price, @Quantity, @OrderDate
, @DestinationState, @ShippingType, @Referral);
END
;;
DELIMITER ;
If a new row is inserted in the Sales table, the Lambda function that is mentioned in the stored procedure is invoked.
Verify that data is being sent from the Lambda function to Kinesis Data Firehose to Amazon S3 successfully. You might have to insert a few records, depending on the size of your data, before new records appear in Amazon S3. This is due to Kinesis Data Firehose buffering. To learn more about Kinesis Data Firehose buffering, see the “Amazon S3” section in Amazon Kinesis Data Firehose Data Delivery.
Every time a new record is inserted in the sales table, a stored procedure is called, and it updates data in Amazon S3.
Querying data in Amazon Redshift
In this section, you use the data you produced from Amazon Aurora and consume it as-is in Amazon Redshift. In order to allow you to process your data as-is, where it is, while taking advantage of the power and flexibility of Amazon Redshift, you use Amazon Redshift Spectrum. You can use Redshift Spectrum to run complex queries on data stored in Amazon S3, with no need for loading or other data prep.
Just create a data source and issue your queries to your Amazon Redshift cluster as usual. Behind the scenes, Redshift Spectrum scales to thousands of instances on a per-query basis, ensuring that you get fast, consistent performance even as your dataset grows to beyond an exabyte! Being able to query data that is stored in Amazon S3 means that you can scale your compute and your storage independently. You have the full power of the Amazon Redshift query model and all the reporting and business intelligence tools at your disposal. Your queries can reference any combination of data stored in Amazon Redshift tables and in Amazon S3.
Redshift Spectrum supports open, common data types, including CSV/TSV, Apache Parquet, SequenceFile, and RCFile. Files can be compressed using gzip or Snappy, with other data types and compression methods in the works.
Next, create an IAM role that has access to Amazon S3 and Athena. By default, Amazon Redshift Spectrum uses the Amazon Athena data catalog. Your cluster needs authorization to access your external data catalog in AWS Glue or Athena and your data files in Amazon S3.
In the demo setup, I attached AmazonS3FullAccess and AmazonAthenaFullAccess. In a production environment, the IAM roles should follow the standard security of granting least privilege. For more information, see IAM Policies for Amazon Redshift Spectrum.
create external schema if not exists spectrum_schema
from data catalog
database 'spectrum_db'
region 'us-east-1'
IAM_ROLE 'arn:aws:iam::XXXXXXXXXXXX:role/RedshiftSpectrumRole'
create external database if not exists;
Don’t forget to replace the IAM role in the statement.
Then create an external table within the database:
CREATE EXTERNAL TABLE IF NOT EXISTS spectrum_schema.ecommerce_sales(
ItemID int,
Category varchar,
Price DOUBLE PRECISION,
Quantity int,
OrderDate TIMESTAMP,
DestinationState varchar,
ShippingType varchar,
Referral varchar)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
LOCATION 's3://{BUCKET_NAME}/CDC/'
Query the table, and it should contain data. This is a fact table.
select top 10 * from spectrum_schema.ecommerce_sales
Next, create a dimension table. For this example, we create a date/time dimension table. Create the table:
CREATE TABLE date_dimension (
d_datekey integer not null sortkey,
d_dayofmonth integer not null,
d_monthnum integer not null,
d_dayofweek varchar(10) not null,
d_prettydate date not null,
d_quarter integer not null,
d_half integer not null,
d_year integer not null,
d_season varchar(10) not null,
d_fiscalyear integer not null)
diststyle all;
Populate the table with data:
copy date_dimension from 's3://reparmar-lab/2016dates'
iam_role 'arn:aws:iam::XXXXXXXXXXXX:role/redshiftspectrum'
DELIMITER ','
dateformat 'auto';
The date dimension table should look like the following:
Querying data in local and external tables using Amazon Redshift
Now that you have the fact and dimension table populated with data, you can combine the two and run analysis. For example, if you want to query the total sales amount by weekday, you can run the following:
select sum(quantity*price) as total_sales, date_dimension.d_season
from spectrum_schema.ecommerce_sales
join date_dimension on spectrum_schema.ecommerce_sales.orderdate = date_dimension.d_prettydate
group by date_dimension.d_season
You get the following results:
Similarly, you can replace d_season with d_dayofweek to get sales figures by weekday:
With Amazon Redshift Spectrum, you pay only for the queries you run against the data that you actually scan. We encourage you to use file partitioning, columnar data formats, and data compression to significantly minimize the amount of data scanned in Amazon S3. This is important for data warehousing because it dramatically improves query performance and reduces cost.
Partitioning your data in Amazon S3 by date, time, or any other custom keys enables Amazon Redshift Spectrum to dynamically prune nonrelevant partitions to minimize the amount of data processed. If you store data in a columnar format, such as Parquet, Amazon Redshift Spectrum scans only the columns needed by your query, rather than processing entire rows. Similarly, if you compress your data using one of the supported compression algorithms in Amazon Redshift Spectrum, less data is scanned.
Analyzing and visualizing Amazon Redshift data in Amazon QuickSight
After modifying the Amazon Redshift security group, go to Amazon QuickSight. Create a new analysis, and choose Amazon Redshift as the data source.
Enter the database connection details, validate the connection, and create the data source.
Choose the schema to be analyzed. In this case, choose spectrum_schema, and then choose the ecommerce_sales table.
Next, we add a custom field for Total Sales = Price*Quantity. In the drop-down list for the ecommerce_sales table, choose Edit analysis data sets.
On the next screen, choose Edit.
In the data prep screen, choose New Field. Add a new calculated field Total Sales $, which is the product of the Price*Quantity fields. Then choose Create. Save and visualize it.
Next, to visualize total sales figures by month, create a graph with Total Sales on the x-axis and Order Data formatted as month on the y-axis.
After you’ve finished, you can use Amazon QuickSight to add different columns from your Amazon Redshift tables and perform different types of visualizations. You can build operational dashboards that continuously monitor your transactional and analytical data. You can publish these dashboards and share them with others.
Final notes
Amazon QuickSight can also read data in Amazon S3 directly. However, with the method demonstrated in this post, you have the option to manipulate, filter, and combine data from multiple sources or Amazon Redshift tables before visualizing it in Amazon QuickSight.
In this example, we dealt with data being inserted, but triggers can be activated in response to an INSERT, UPDATE, or DELETE trigger.
Keep the following in mind:
Be careful when invoking a Lambda function from triggers on tables that experience high write traffic. This would result in a large number of calls to your Lambda function. Although calls to the lambda_async procedure are asynchronous, triggers are synchronous.
A statement that results in a large number of trigger activations does not wait for the call to the AWS Lambda function to complete. But it does wait for the triggers to complete before returning control to the client.
Similarly, you must account for Amazon Kinesis Data Firehose limits. By default, Kinesis Data Firehose is limited to a maximum of 5,000 records/second. For more information, see Monitoring Amazon Kinesis Data Firehose.
In certain cases, it may be optimal to use AWS Database Migration Service (AWS DMS) to capture data changes in Aurora and use Amazon S3 as a target. For example, AWS DMS might be a good option if you don’t need to transform data from Amazon Aurora. The method used in this post gives you the flexibility to transform data from Aurora using Lambda before sending it to Amazon S3. Additionally, the architecture has the benefits of being serverless, whereas AWS DMS requires an Amazon EC2 instance for replication.
Re Alvarez-Parmar is a solutions architect for Amazon Web Services. He helps enterprises achieve success through technical guidance and thought leadership. In his spare time, he enjoys spending time with his two kids and exploring outdoors.
AWS Training and Certification recently released free digital training courses that will make it easier for you to build your cloud skills and learn about using AWS Big Data services. This training includes courses like Introduction to Amazon EMR and Introduction to Amazon Athena.
You can get free and unlimited access to more than 100 new digital training courses built by AWS experts at aws.training. It’s easy to access training related to big data. Just choose the Analytics category on our Find Training page to browse through the list of courses. You can also use the keyword filter to search for training for specific AWS offerings.
Recommended training
Just getting started, or looking to learn about a new service? Check out the following digital training courses:
Introduction to Amazon EMR (15 minutes) Covers the available tools that can be used with Amazon EMR and the process of creating a cluster. It includes a demonstration of how to create an EMR cluster.
Introduction to Amazon Athena (10 minutes) Introduces the Amazon Athena service along with an overview of its operating environment. It covers the basic steps in implementing Athena and provides a brief demonstration.
Introduction to Amazon QuickSight (10 minutes) Discusses the benefits of using Amazon QuickSight and how the service works. It also includes a demonstration so that you can see Amazon QuickSight in action.
Introduction to Amazon Redshift (10 minutes) Walks you through Amazon Redshift and its core features and capabilities. It also includes a quick overview of relevant use cases and a short demonstration.
Introduction to AWS Lambda (10 minutes) Discusses the rationale for using AWS Lambda, how the service works, and how you can get started using it.
Introduction to Amazon Kinesis Analytics (10 minutes) Discusses how Amazon Kinesis Analytics collects, processes, and analyzes streaming data in real time. It discusses how to use and monitor the service and explores some use cases.
Introduction to Amazon Kinesis Streams (15 minutes) Covers how Amazon Kinesis Streams is used to collect, process, and analyze real-time streaming data to create valuable insights.
Introduction to AWS IoT (10 minutes) Describes how the AWS Internet of Things (IoT) communication architecture works, and the components that make up AWS IoT. It discusses how AWS IoT works with other AWS services and reviews a case study.
Introduction to AWS Data Pipeline (10 minutes) Covers components like tasks, task runner, and pipeline. It also discusses what a pipeline definition is, and reviews the AWS services that are compatible with AWS Data Pipeline.
Go deeper with classroom training
Want to learn more? Enroll in classroom training to learn best practices, get live feedback, and hear answers to your questions from an instructor.
Big Data on AWS (3 days) Introduces you to cloud-based big data solutions such as Amazon EMR, Amazon Redshift, Amazon Kinesis, and the rest of the AWS big data platform.
Data Warehousing on AWS (3 days) Introduces you to concepts, strategies, and best practices for designing a cloud-based data warehousing solution, and demonstrates how to collect, store, and prepare data for the data warehouse.
Building a Serverless Data Lake (1 day) Teaches you how to design, build, and operate a serverless data lake solution with AWS services. Includes topics such as ingesting data from any data source at large scale, storing the data securely and durably, using the right tool to process large volumes of data, and understanding the options available for analyzing the data in near-real time.
More training coming in 2018
We’re always evaluating and expanding our training portfolio, so stay tuned for more training options in the new year. You can always visit us at aws.training to explore our latest offerings.
AWS has updated its certifications against ISO 9001, ISO 27001, ISO 27017, and ISO 27018 standards, bringing the total to 67 services now under ISO compliance. We added the following 29 services this cycle:
AWS maintains certifications through extensive audits of its controls to ensure that information security risks that affect the confidentiality, integrity, and availability of company and customer information are appropriately managed.
You can download copies of the AWS ISO certificates that contain AWS’s in-scope services and Regions, and use these certificates to jump-start your own certification efforts:
Tim Wu, who coined “net neutrality”, has written an op-ed on the New York Times called “The Bitcoin Boom: In Code We Trust“. He is wrong about “code”.
The wrong “trust”
Wu builds a big manifesto about how real-world institutions can’t be trusted. Certainly, this reflects the rhetoric from a vocal wing of Bitcoin fanatics, but it’s not the Bitcoin manifesto.
Instead, the word “trust” in the Bitcoin paper is much narrower, referring to how online merchants can’t trust credit-cards (for example). When I bought school supplies for my niece when she studied in Canada, the online site wouldn’t accept my U.S. credit card. They didn’t trust my credit card. However, they trusted my Bitcoin, so I used that payment method instead, and succeeded in the purchase.
Real-world currencies like dollars are tethered to the real-world, which means no single transaction can be trusted, because “they” (the credit-card company, the courts, etc.) may decide to reverse the transaction. The manifesto behind Bitcoin is that a transaction cannot be reversed — and thus, can always be trusted.
Deliberately confusing the micro-trust in a transaction and macro-trust in banks and governments is a sort of bait-and-switch.
The wrong inspiration
Wu claims:
“It was, after all, a carnival of human errors and misfeasance that inspired the invention of Bitcoin in 2009, namely, the financial crisis.”
Not true. Bitcoin did not appear fully formed out of the void, but was instead based upon a series of innovations that predate the financial crisis by a decade. Moreover, the financial crisis had little to do with “currency”. The value of the dollar and other major currencies were essentially unscathed by the crisis. Certainly, enthusiasts looking backward like to cherry pick the financial crisis as yet one more reason why the offline world sucks, but it had little to do with Bitcoin.
In crypto we trust
It’s not in code that Bitcoin trusts, but in crypto. Satoshi makes that clear in one of his posts on the subject:
A generation ago, multi-user time-sharing computer systems had a similar problem. Before strong encryption, users had to rely on password protection to secure their files, placing trust in the system administrator to keep their information private. Privacy could always be overridden by the admin based on his judgment call weighing the principle of privacy against other concerns, or at the behest of his superiors. Then strong encryption became available to the masses, and trust was no longer required. Data could be secured in a way that was physically impossible for others to access, no matter for what reason, no matter how good the excuse, no matter what.
You don’t possess Bitcoins. Instead, all the coins are on the public blockchain under your “address”. What you possess is the secret, private key that matches the address. Transferring Bitcoin means using your private key to unlock your coins and transfer them to another. If you print out your private key on paper, and delete it from the computer, it can never be hacked.
Trust is in this crypto operation. Trust is in your private crypto key.
We don’t trust the code
The manifesto “in code we trust” has been proven wrong again and again. We don’t trust computer code (software) in the cryptocurrency world.
The most profound example is something known as the “DAO” on top of Ethereum, Bitcoin’s major competitor. Ethereum allows “smart contracts” containing code. The quasi-religious manifesto of the DAO smart-contract is that the “code is the contract”, that all the terms and conditions are specified within the smart-contract code, completely untethered from real-world terms-and-conditions.
Then a hacker found a bug in the DAO smart-contract and stole most of the money.
In principle, this is perfectly legal, because “the code is the contract”, and the hacker just used the code. In practice, the system didn’t live up to this. The Ethereum core developers, acting as central bankers, rewrote the Ethereum code to fix this one contract, returning the money back to its original owners. They did this because those core developers were themselves heavily invested in the DAO and got their money back.
Similar things happen with the original Bitcoin code. A disagreement has arisen about how to expand Bitcoin to handle more transactions. One group wants smaller and “off-chain” transactions. Another group wants a “large blocksize”. This caused a “fork” in Bitcoin with two versions, “Bitcoin” and “Bitcoin Cash”. The fork championed by the core developers (central bankers) is worth around $20,000 right now, while the other fork is worth around $2,000.
So it’s still “in central bankers we trust”, it’s just that now these central bankers are mostly online instead of offline institutions. They have proven to be even more corrupt than real-world central bankers. It’s certainly not the code that is trusted.
The bubble
Wu repeats the well-known reference to Amazon during the dot-com bubble. If you bought Amazon’s stock for $107 right before the dot-com crash, it still would be one of wisest investments you could’ve made. Amazon shares are now worth around $1,200 each.
The implication is that Bitcoin, too, may have such long term value. Even if you buy it today and it crashes tomorrow, it may still be worth ten-times its current value in another decade or two.
This is a poor analogy, for three reasons.
The first reason is that we knew the Internet had fundamentally transformed commerce. We knew there were going to be winners in the long run, it was just a matter of picking who would win (Amazon) and who would lose (Pets.com). We have yet to prove Bitcoin will be similarly transformative.
The second reason is that businesses are real, they generate real income. While the stock price may include some irrational exuberance, it’s ultimately still based on the rational expectations of how much the business will earn. With Bitcoin, it’s almost entirely irrational exuberance — there are no long term returns.
The third flaw in the analogy is that there are an essentially infinite number of cryptocurrencies. We saw this today as Coinbase started trading Bitcoin Cash, a fork of Bitcoin. The two are nearly identical, so there’s little reason one should be so much valuable than another. It’s only a fickle fad that makes one more valuable than another, not business fundamentals. The successful future cryptocurrency is unlikely to exist today, but will be invented in the future.
The lessons of the dot-com bubble is not that Bitcoin will have long term value, but that cryptocurrency companies like Coinbase and BitPay will have long term value. Or, the lesson is that “old” companies like JPMorgan that are early adopters of the technology will grow faster than their competitors.
Conclusion
The point of Wu’s paper is to distinguish trust in traditional real-world institutions and trust in computer software code. This is an inaccurate reading of the situation.
Bitcoin is not about replacing real-world institutions but about untethering online transactions.
The trust in Bitcoin is in crypto — the power crypto gives individuals instead of third-parties.
The trust is not in the code. Bitcoin is a “cryptocurrency” not a “codecurrency”.
We find that AWS customers often require that every query submitted to Presto running on Amazon EMR is logged. They want to track what query was submitted, when it was submitted and who submitted it.
In this blog post, we will demonstrate how to implement and install a Presto event listener for purposes of custom logging, debugging and performance analysis for queries executed on an EMR cluster. An event listener is a plugin to Presto that is invoked when an event such as query creation, query completion, or split completion occurs.
Presto also provides a system connector to access metrics and details about a running cluster. In particular, the system connector gets information about currently running and recently run queries by using the system.runtime.queries table. From the Presto command line interface (CLI), you get this data with the entries Select * from system.runtime.queries; and Select * from system.runtime.tasks;. This connector is available out of the box and exposes information and metrics through SQL.
In addition to providing custom logging and debugging, the Presto event listener allows you to enhance the information provided by the system connector by providing a mechanism to capture detailed query context, query statistics and failure information during query creation or completion, or a split completion.
We will begin by providing a detailed walkthrough of the implementation of the Presto event listener in Java followed by its deployment on the EMR cluster.
Implementation
We use the Eclipse IDE to create a Maven Project, as shown below:
Once you have created the Maven Project, modify the pom.xml file to add the dependency for Presto, as shown following:
After you add the Presto dependency to our pom.xml file, create a Java package under the src/main/java folder. In our project, we have named the package com.amazonaws.QueryEventListener. You can choose the naming convention that best fits your organization. Within this package, create three Java files for the EventListener, the EventListenerFactory, and the EventListenerPlugin.
As the Presto website says: “EventListenerFactory is responsible for creating an EventListener instance. It also defines an EventListener name, which is used by the administrator in a Presto configuration. Implementations of EventListener implement methods for the event types they are interested in handling. The implementation of EventListener and EventListenerFactory must be wrapped as a plugin and installed on the Presto cluster.”
In our project, we have named these Java files QueryEventListener, QueryEventListenerFactory, and QueryEventListenerPlugin:
Now we write our code for the Java files.
QueryEventListener – QueryEventListener implements the Presto EventListener interface. It has a constructor that creates five rotating log files of 524 MB each. After creating QueryEventListener, we implement the query creation, query completion, and split completion methods and log the events relevant to us. You can choose to include more events based on your needs.
QueryEventListenerFactory – The QueryEventListenerFactory class implements the Presto EventListenerFactory interface. We implement the method getName, which provides a registered EventListenerFactory name to Presto. We also implement the create method, which creates an EventListener instance.
You can find the code for the QueryEventListenerFactory class in this Amazon repository.
QueryEventListenerPlugin – The QueryEventListenerPlugin class implements the Presto EventListenerPlugin interface. This class has a getEventListenerFactories method that returns an immutable list containing the EventListenerFactory. Basically, in this class we are wrapping QueryEventListener and QueryEventListenerFactory.
Finally, in our project we add the META-INF folder and a services subfolder within the META-INF folder. In the services subfolder, you create a file called com.facebook.presto.spi.Plugin:
As the Presto documentation describes: “Each plugin identifies an entry point: an implementation of the plugin interface. This class name is provided to Presto via the standard Java ServiceLoader interface: the classpath contains a resource file named com.facebook.presto.spi.Plugin in the META-INF/services directory”.
We add the name of our plugin class com.amazonaws.QueryEventListener.QueryEventListenerPlugin to the com.facebook.presto.spi.Plugin file, as shown below:
Next we will show you how to deploy the Presto plugin we created to Amazon EMR for custom logging.
Presto logging overview on Amazon EMR
Presto by default will produce three log files that capture the configurations properties and the overall operational events of the components that make up Presto, plus log end user access to the Presto UI.
On Amazon EMR, these log files are written into /var/log/presto. The log files in this directory are pushed into Amazon S3. This S3 location is the location of the new log file.
Steps to deploy Presto on Amazon EMR with custom logging
To deploy Presto on EMR with custom logging a bootstrap action will be used. The bootstrap is available in this Amazon Repository.
#!/bin/bash
IS_MASTER=true
if [ -f /mnt/var/lib/info/instance.json ]
then
if grep isMaster /mnt/var/lib/info/instance.json | grep true;
then
IS_MASTER=true
else
IS_MASTER=false
fi
fi
sudo mkdir -p /usr/lib/presto/plugin/queryeventlistener
sudo /usr/bin/aws s3 cp s3://replace-with-your-bucket/QueryEventListener.jar /tmp
sudo cp /tmp/QueryEventListener.jar /usr/lib/presto/plugin/queryeventlistener/
if [ "$IS_MASTER" = true ]; then
sudo mkdir -p /usr/lib/presto/etc
sudo bash -c 'cat <<EOT >> /usr/lib/presto/etc/event-listener.properties
event-listener.name=event-listener
EOT'
fi
First, upload the JAR file created on the last section and update the s3 location in the bootstrap, s3://replace-with-your-bucket/QueryEventListener.jar, with the bucket name where the jar was placed.
After updating the bootstrap with the S3 location for your JAR, upload that bootstrap to your own bucket.
The bootstrap action will copy the jar file with the custom EventListener implementation into all machines of the cluster. Moreover, the bootstrap action will create a file named event-listener.properties on the Amazon EMR Master node. This file will configure the coordinator to enable the custom logging plugin via property event-listener.name. The event-listener.name property is set to event-listener in the event-listener.properties file. As per Presto documentation, this property is used by Presto to find a registered EventListenerFactory based on the name returned by EventListenerFactory.getName().
Now that the bootstrap is ready, the following AWS CLI command can be used to create a new EMR cluster with the bootstrap:
After we have implemented the Presto custom logging plugin and deployed it, we can run a test and see the output of the log files.
We first create a Hive table with the DDL below from the Hive Command Line (simply type Hive on the SSH terminal to the EMR Master to access the Hive Command Line):
CREATE EXTERNAL TABLE wikistats (
language STRING,
page_title STRING,
hits BIGINT,
retrived_size BIGINT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LINES TERMINATED BY '\n'
LOCATION 's3://support.elasticmapreduce/training/datasets/wikistats/';
Then, we access the presto command line by typing the following command on the terminal:
We then go to the /var/log/presto directory and look at the contents of the log file queries-YYYY-MM-DDTHH\:MM\:SS.0.log. As depicted in the screenshot below, our QueryEventListener plugin captures the fields shown for the Query Created and Query Completed events. Moreover, if there are splits, the plugin will also capture split events.
Note: If you want to include the query text executed by the user for auditing and debugging purposes, add the field appropriately in the QueryEventListener class methods, as shown below:
Because this is custom logging, you can capture as many fields as are available for the particular events. To find out the fields available for each of the events, see the Java Classes provided by Presto at this GitHub location.
Summary
In this post, you learned how to add custom logging to Presto on EMR to enhance your organization’s auditing capabilities and provide insights into performance.
If you have questions or suggestions, please leave a comment below.
Zafar Kapadia is a Cloud Application Architect for AWS. He works on Application Development and Optimization projects. He is also an avid cricketer and plays in various local leagues.
Francisco Oliveira is a Big Data Engineer with AWS Professional Services. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.
The collective thoughts of the interwebz
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.