Regardless of your career path, there’s no denying that attending industry events can provide helpful career development opportunities — not only for improving and expanding your skill sets, but for networking as well. According to this article from PayScale.com, experts estimate that somewhere between 70-85% of new positions are landed through networking.
Narrowing our focus to networking opportunities with cloud computing professionals who’re working on tackling some of today’s most innovative and exciting big data solutions, attending big data-focused sessions at an AWS Global Summit is a great place to start.
AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. As the name suggests, these summits are held in major cities around the world, and attract technologists from all industries and skill levels who’re interested in hearing from AWS leaders, experts, partners, and customers.
In addition to networking opportunities with top cloud technology providers, consultants and your peers in our Partner and Solutions Expo, you’ll also hone your AWS skills by attending and participating in a multitude of education and training opportunities.
Here’s a brief sampling of some of the upcoming sessions relevant to big data professionals:
Be sure to check out the main page for AWS Global Summits, where you can see which cities have AWS Summits planned for 2018, register to attend an upcoming event, or provide your information to be notified when registration opens for a future event.
Applying technology to healthcare data has the potential to produce many exciting and important outcomes. The analysis produced from healthcare data can empower clinicians to improve the health of individuals and populations by enabling them to make better decisions that enhance the care they provide.
The Observational Health Data Sciences and Informatics (OHDSI, pronounced “Odyssey”) program and community is working toward this goal by producing data standards and open-source solutions to store and analyze observational health data. Using the OHDSI tools, you can visualize the health of your entire population. You can build cohorts of patients, analyze incidence rates for various conditions, and estimate the effect of treatments on patients with certain conditions. You can also model health outcome predictions using machine learning algorithms.
One of the challenges often faced when working with big data tools is the expense of the infrastructure required to run them. Another challenge is the learning curve to implement and begin using these tools. Amazon Web Services has enabled us to address many of the classic IT challenges by making enterprise class infrastructure and technology available in an affordable, elastic, and automated way. This blog post demonstrates how to combine some of the OHDSI projects (Atlas, Achilles, WebAPI, and the OMOP Common Data Model) with AWS technologies. By doing so, you can quickly and inexpensively implement a health data science and informatics environment.
Shown following is just one example of the population health analysis that is possible with the OHDSI tools. This visualization shows the prevalence of various drugs within the given population of people. This information helps researchers and clinicians discover trends and make better informed decisions about patient health.
OHDSI application architecture on AWS
Before deploying an application on AWS that transmits, processes, or stores protected health information (PHI) or personally identifiable information (PII), address your organization’s compliance concerns. Make sure that you have worked with your internal compliance and legal team to ensure compliance with the laws and regulations that govern your organization. To understand how you can use AWS services as a part of your overall compliance program, see the AWS HIPAA Compliance whitepaper. With that said, we paid careful attention to the HIPAA control set during the design of this solution.
This blog post presents a complete OHDSI application environment, including a data warehouse with sample data. It has the following features:
Following, you can see a block diagram of how the OHDSI tools map to the services provided by AWS.
Atlas is the web application that researchers interact with to perform analysis. Atlas interacts with the underlying databases through a web services application named WebAPI. In this example, both Atlas and WebAPI are deployed and managed by AWS Elastic Beanstalk. Elastic Beanstalk is an easy-to-use service for deploying and scaling web applications. Simply upload the Atlas and WebAPI code and Elastic Beanstalk automatically handles the deployment. It covers everything from capacity provisioning, load balancing, autoscaling, and high availability, to application health monitoring. Using a feature of Elastic Beanstalk called ebextensions, the Atlas and WebAPI servers are customized to use an encrypted storage volume for the middleware application logs.
Atlas stores the state of the various patient cohorts that are analyzed in a dedicated database separate from your observational health data. This database is provided by Amazon Aurora with PostgreSQL compatibility.
Amazon Aurora is a relational database built for the cloud that combines the performance and availability of high-end commercial databases with the simplicity and cost-effectiveness of open-source databases. It provides cost-efficient and resizable capacity while automating time-consuming administration tasks such as hardware provisioning, database setup, patching, and backups. It is configured for high availability and uses encryption at rest for the database and backups, and encryption in flight for the JDBC connections.
All of your observational health data is stored inside the OHDSI Observational Medical Outcomes Partnership Common Data Model (OMOP CDM). This model also stores useful vocabulary tables that help to translate values from various data sources (like EHR systems and claims data).
The OMOP CDM schema is deployed onto Amazon Redshift. Amazon Redshift is a fast, fully managed data warehouse that allows you to run complex analytic queries against petabytes of structured data. It uses using sophisticated query optimization, columnar storage on high-performance local disks, and massively parallel query execution. You can also resize an Amazon Redshift cluster as your requirements for it change.
The solution in this blog post automatically loads de-identified sample data of 1,000 people from the CMS 2008–2010 Data Entrepreneurs’ Synthetic Public Use File (DE-SynPUF). The data has helpful formatting from LTS Computing LLC. Vocabulary data from the OHDSI Athena project is also loaded into the OMOP CDM, and a results set is computed by OHDSI Achilles.
Following is a detailed technical diagram showing the configuration of the architecture to be deployed.
Deploying OHDSI on AWS
Everything just described is automatically deployed by using an AWS CloudFormation template. Using this template, you can quickly get started with the OHDSI project. The CloudFormation templates for this deployment as well as all of the supporting scripts and source code can be found in the AWS Labs GitHub repo.
From your AWS account, open the CloudFormation Management Console and choose Create Stack. From there, copy and paste the following URL in the Specify an Amazon S3 template URL box, and choose Next.
On the next screen, you provide a Stack Name (this can be anything you like) and a few other parameters for your OHDSI environment.
You use the DatabasePassword parameter to set the password for the master user account of the Amazon Redshift and Aurora databases.
You use the EBEndpoint name to generate a unique URL for Atlas to access the OHDSI environment. It is http://EBEndpoint.AWS-Region.elasticbeanstalk.com, where EBEndpoint.AWS-Region indicates the Elastic Beanstalk endpoint and AWS Region. You can configure this URL through Elastic Beanstalk if you want to change it in the future.
You use the KPair option to choose one of your existing Amazon EC2 key pairs to use with the instances that Elastic Beanstalk deploys. By doing this, you can gain administrative access to these instances in the future if you need to. If you don’t already have an Amazon EC2 key pair, you can generate one for free. You do this by going to the Key Pairs section of the EC2 console and choosing Generate Key Pair.
Finally, you use the UserIPRange parameter to specify a CIDR IP address range from which to access your OHDSI environment. By default, your OHDSI environment is accessible over the public internet. Use UserIPRange to limit access over the Internet to a single IP address or a range of IP addresses that represent users you want to have access. Through additional configuration, you can also make your OHDSI environment completely private and accessible only through a VPN or AWS Direct Connect private circuit.
When you’ve provided all Parameters, choose Next.
On the next screen, you can provide some other optional information like tags at your discretion, or just choose Next.
On the next screen, you can review what will be deployed. At the bottom of the screen, there is a check box for you to acknowledge that AWS CloudFormation might create IAM resources with custom names. This is correct; the template being deployed creates four custom roles that give permission for the AWS services involved to communicate with each other. Details of these permissions are inside the CloudFormation template referenced in the URL given in the first step. Check the box acknowledging this and choose Next.
You can watch as CloudFormation builds out your OHDSI architecture. A CloudFormation deployment is called a stack. The parent stack creates two child stacks, one containing the VPC and IAM roles and another created by Elastic Beanstalk with the Atlas and WebAPI servers. When all three stacks have reached the green CREATE_COMPLETE status, as shown in the screenshot following, then the OHDSI architecture has been deployed.
There is still some work going on behind the scenes, though. To watch the progress, browse to the Amazon Redshift section of your AWS Management Console and choose the Amazon Redshift cluster that was created for your OHDSI architecture. After you do so, you can observe the Loads and Queries tabs.
First, on the Loads tab, you can see the CMS De-SynPUF sample data and Athena vocabulary data being loaded into the OMOP Common Data Model. After you see the VOCABULARY table reach the COMPLETED status (as shown following), all of the sample and vocabulary data has been loaded.
After the data loads, the Achilles computation starts. On the Queries tab, you can watch Achilles running queries against your database to build out the Results schema. Achilles runs a large number of queries, and the entire process can take quite some time (about 20 minutes for the sample data we’ve loaded). Eventually, no new queries show up in the Queries tab, which shows that the Achilles computation is completed. The entire process from the time you executed the CloudFormation template until the Achilles computation is completed usually takes about an hour and 15 minutes.
At this point, you can browse to the Elastic Beanstalk section of the AWS Management Console. There, you can choose the OHDSI Application and Environment (green box) that was deployed by the CloudFormation template. At the top of the dashboard, as shown following, you see a link to a URL. This URL matches the name you provided in the EBEndpoint parameter of the CloudFormation template. Choose this URL, and you can start using Atlas to explore the CMS DE-SynPUF sample data!
Cost of deploying this environment
It used to be common to see healthcare data analytics environments deployed in an on-premises data center with expensive data warehouse appliances and virtualized environments. The cloud era has democratized the availability of the infrastructure required to do this type of data analysis, so that now it is within reach of even small organizations. This environment can expand to analyze petabyte-scale health data, and you only pay for what you need. See an estimated breakdown of the monthly cost components for this environment as deployed on the AWS Solution Calculator.
It’s also worth noting that this environment does not have to be run all of the time. If you are only performing analyses periodically, you can terminate the environment when you are finished and restore it from the database backups when you want to continue working. This would reduce the cost of operation even further.
Summary
Now that you have a fully functional OHDSI environment with sample data, you can use this to explore and learn the toolset and its capabilities. After learning with the sample data, you can begin gaining insights by analyzing your own organization’s health data. You can do this using an extract, transform, load (ETL) process from one or more of your health data sources.
James Wiggins is a senior healthcare solutions architect at AWS. He is passionate about using technology to help organizations positively impact world health. He also loves spending time with his wife and three children.
Amazon EMR enables data analysts and scientists to deploy a cluster running popular frameworks such as Spark, HBase, Presto, and Flink of any size in minutes. When you launch a cluster, Amazon EMR automatically configures the underlying Amazon EC2 instances with the frameworks and applications that you choose for your cluster. This can include popular web interfaces such as Hue workbench, Zeppelin notebook, and Ganglia monitoring dashboards and tools.
These web interfaces are hosted on the EMR master node and must be accessed using the public DNS name of the master node (master public DNS value). The master public DNS value is dynamically created, not very user friendly and is hard to remember— it looks something like ip-###-###-###-###.us-west-2.compute.internal. Not having a friendly URL to connect to the popular workbench or notebook interfaces may impact the workflow and hinder your gained agility.
Some customers have addressed this challenge through custom bootstrap actions, steps, or external scripts that periodically check for new clusters and register a friendlier name in DNS. These approaches either put additional burden on the data practitioners or require additional resources to execute the scripts. In addition, there is typically some lag time associated with such scripts. They often don’t do a great job cleaning up the DNS records after the cluster has terminated, potentially resulting in a security risk.
The solution in this post provides an automated, serverless approach to registering a friendly master node name for easy access to the web interfaces.
Before I dive deeper, I review these key services and how they are part of this solution.
CloudWatch Events
CloudWatch Events delivers a near real-time stream of system events that describe changes in AWS resources. Using simple rules, you can match events and route them to one or more target functions or streams. An event can be generated in one of four ways:
From an AWS service when resources change state
From API calls that are delivered via AWS CloudTrail
From your own code that can generate application-level events
In this solution, I cover the first type of event, which is automatically emitted by EMR when the cluster state changes. Based on the state of this event, either create or update the DNS record in Route 53 when the cluster state changes to STARTING, or delete the DNS record when the cluster is no longer needed and the state changes to TERMINATED. For more information about all EMR event details, see Monitor CloudWatch Events.
Route 53 private hosted zones
A private hosted zone is a container that holds information about how to route traffic for a domain and its subdomains within one or more VPCs. Private hosted zones enable you to use custom DNS names for your internal resources without exposing the names or IP addresses to the internet.
Route 53 supports resource record sets with a wide range of record types. In this solution, you use a CNAME record that is used to specify a domain name as an alias for another domain (the ‘canonical’ domain). You use a friendly name of the cluster as the CNAME for the EMR master public DNS value.
You are using private hosted zones because an EMR cluster is typically deployed within a private subnet and is accessed either from within the VPC or from on-premises resources over VPN or AWS Direct Connect. To resolve domain names in private hosted zones from your on-premises network, configure a DNS forwarder, as described in How can I resolve Route 53 private hosted zones from an on-premises network via an Ubuntu instance?.
Lambda
Lambda is a compute service that lets you run code without provisioning or managing servers. Lambda executes your code only when needed and scales automatically to thousands of requests per second. Lambda takes care of high availability, and server and OS maintenance and patching. You pay only for the consumed compute time. There is no charge when your code is not running.
Lambda provides the ability to invoke your code in response to events, such as when an object is put to an Amazon S3 bucket or as in this case, when a CloudWatch event is emitted. As part of this solution, you deploy a Lambda function as a target that is invoked by CloudWatch Events when the event matches your rule. You also configure the necessary permissions based on the Lambda permissions model, including a Lambda function policy and Lambda execution role.
Putting it all together
Now that you have all of the pieces, you can put together a complete solution. The following diagram illustrates how the solution works:
Start with a user activity such as launching or terminating an EMR cluster.
EMR automatically sends events to the CloudWatch Events stream.
A CloudWatch Events rule matches the specified event, and routes it to a target, which in this case is a Lambda function. In this case, you are using the EMR Cluster State Change
The Lambda function performs the following key steps:
Get the clusterId value from the event detail and use it to call EMR. DescribeCluster API to retrieve the following data points:
MasterPublicDnsName – public DNS name of the master node
Locate the tag containing the friendly name to use as the CNAME for the cluster. The key name containing the friendly name should be The value should be specified as host.domain.com, where domain is the private hosted zone in which to update the DNS record.
Update DNS based on the state in the event detail.
If the state is STARTING, the function calls the Route 53 API to create or update a resource record set in the private hosted zone specified by the domain tag. This is a CNAME record mapped to MasterPublicDnsName.
Conversely, if the state is TERMINATED, the function calls the Route 53 API to delete the associated resource record set from the private hosted zone.
Deploying the solution
Because all of the components of this solution are serverless, use the AWS Serverless Application Model (AWS SAM) template to deploy the solution. AWS SAM is natively supported by AWS CloudFormation and provides a simplified syntax for expressing serverless resources, resulting in fewer lines of code.
Overview of the SAM template
For this solution, the SAM template has 76 lines of text as compared to 142 lines without SAM resources (and writing the template in YAML would be even slightly smaller). The solution can be deployed using the AWS Management Console, AWS Command Line Interface (AWS CLI), or AWS SAM Local.
CloudFormation transforms help simplify template authoring by condensing a multiple-line resource declaration into a single line in your template. To inform CloudFormation that your template defines a serverless application, add a line under the template format version as follows:
Before SAM, you would use the AWS::Lambda::Function resource type to define your Lambda function. You would then need a resource to define the permissions for the function (AWS::Lambda::Permission), another resource to define a Lambda execution role (AWS::IAM::Role), and finally a CloudWatch Events resource (Events::Rule) that triggers this function.
With SAM, you need to define just a single resource for your function, AWS::Serverless::Function. Using this single resource type, you can define everything that you need, including function properties such as function handler, runtime, and code URI, as well as the required IAM policies and the CloudWatch event.
A few additional things to note in the code example:
CodeUri – Before you can deploy a SAM template, first upload your Lambda function code zip to S3. You can do this manually or use the aws cloudformation package CLI command to automate the task of uploading local artifacts to a S3 bucket, as shown later.
Lambda execution role and permissions – You are not specifying a Lambda execution role in the template. Rather, you are providing the required permissions as IAM policy documents. When the template is submitted, CloudFormation expands the AWS::Serverless::Function resource, declaring a Lambda function and an execution role. The created role has two attached policies: a default AWSLambdaBasicExecutionRole and the inline policy specified in the template.
CloudWatch Events rule – Instead of specifying a CloudWatch Events resource type, you are defining an event source object as a property of the function itself. When the template is submitted, CloudFormation expands this into a CloudWatch Events rule resource and automatically creates the Lambda resource-based permissions to allow the CloudWatch Events rule to trigger the function.
NOTE: If you are trying this solution outside of us-east-1, then you should download the necessary files, upload them to the buckets in your region, edit the script as appropriate and then run it or use the CLI deployment method below.
3.) Choose Next.
4.) On the Specify Details page, keep or modify the stack name and choose Next.
5.) On the Options page, choose Next.
6.) On the Review page, take the following steps:
Acknowledge the two Transform access capabilities. This allows the CloudFormation transform to create the required IAM resources with custom names.
Under Transforms, choose Create Change Set.
Wait a few seconds for the change set to be created before proceeding. The change set should look as follows:
7.) Choose Execute to deploy the template.
After the template is deployed, you should see four resources created:
After the package is successfully uploaded, the output should look as follows:
Uploading to 0f6d12c7872b50b37dbfd5a60385b854 1872 / 1872.0 (100.00%)
Successfully packaged artifacts and wrote output template to file serverless-output.template.
The CodeUri property in serverless-output.template is now referencing the packaged artifacts in the S3 bucket that you specified:
s3://<bucket>/0f6d12c7872b50b37dbfd5a60385b854
Use the aws cloudformation deploy CLI command to deploy the stack:
You should see the following output after the stack has been successfully created:
Waiting for changeset to be created...
Waiting for stack create/update to complete
Successfully created/updated stack – EmrDnsSetterCli
Validating results
To test the solution, launch an EMR cluster. The Lambda function looks for the cluster_name tag associated with the EMR cluster. Make sure to specify the friendly name of your cluster as host.domain.com where the domain is the private hosted zone in which to create the CNAME record.
Here is a sample CLI command to launch a cluster within a specific subnet in a VPC with the required tag cluster_name.
After the cluster is launched, log in to the Route 53 console. In the left navigation pane, choose Hosted Zones to view the list of private and public zones currently configured in Route 53. Select the hosted zone that you specified in the ZONE tag when you launched the cluster. Verify that the resource records were created.
You can also monitor the CloudWatch Events metrics that are published to CloudWatch every minute, such as the number of TriggeredRules and Invocations.
Now that you’ve verified that the Lambda function successfully updated the Route 53 resource records in the zone file, terminate the EMR cluster and verify that the records are removed by the same function.
Conclusion
This solution provides a serverless approach to automatically assigning a friendly name for your EMR cluster for easy access to popular notebooks and other web interfaces. CloudWatch Events also supports cross-account event delivery, so if you are running EMR clusters in multiple AWS accounts, all cluster state events across accounts can be consolidated into a single account.
I hope that this solution provides a small glimpse into the power of CloudWatch Events and Lambda and how they can be leveraged with EMR and other AWS big data services. For example, by using the EMR step state change event, you can chain various pieces of your analytics pipeline. You may have a transient cluster perform data ingest and, when the task successfully completes, spin up an ETL cluster for transformation and upload to Amazon Redshift. The possibilities are truly endless.
A data lake is an increasingly popular way to store and analyze data that addresses the challenges of dealing with massive volumes of heterogeneous data. A data lake allows organizations to store all their data—structured and unstructured—in one centralized repository. Because data can be stored as-is, there is no need to convert it to a predefined schema.
Many organizations understand the benefits of using AWS as their data lake. For example, Amazon S3 is a highly durable, cost-effective object start that supports Open Data Formats while decoupling storage from compute, and it works with all the AWS analytic services. Although Amazon S3 provides the foundation of a data lake, you can add other services to tailor the data lake to your business needs. For more information about building data lakes on AWS, see What is a Data Lake?
Because one of the main challenges of using a data lake is finding the data and understanding the schema and data format, Amazon recently introduced AWS Glue. AWS Glue significantly reduces the time and effort that it takes to derive business insights quickly from an Amazon S3 data lake by discovering the structure and form of your data. AWS Glue automatically crawls your Amazon S3 data, identifies data formats, and then suggests schemas for use with other AWS analytic services.
This post walks you through the process of using AWS Glue to crawl your data on Amazon S3 and build a metadata store that can be used with other AWS offerings.
AWS Glue features
AWS Glue is a fully managed data catalog and ETL (extract, transform, and load) service that simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. AWS Glue crawls your data sources and constructs a data catalog using pre-built classifiers for popular data formats and data types, including CSV, Apache Parquet, JSON, and more.
The AWS Glue Data Catalog is compatible with Apache Hive Metastore and supports popular tools such as Hive, Presto, Apache Spark, and Apache Pig. It also integrates directly with Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum.
In addition, the AWS Glue Data Catalog features the following extensions for ease-of-use and data-management functionality:
AWS Glue is an essential component of an Amazon S3 data lake, providing the data catalog and transformation services for modern data analytics.
In the preceding figure, data is staged for different analytic use cases. Initially, the data is ingested in its raw format, which is the immutable copy of the data. The data is then transformed and enriched to make it more valuable for each use case. In this example, the raw CSV files are transformed into Apache Parquet for use by Amazon Athena to improve performance and reduce cost.
The data can also be enriched by blending it with other datasets to provide additional insights. An AWS Glue crawler creates a table for each stage of the data based on a job trigger or a predefined schedule. In this example, an AWS Lambda function is used to trigger the ETL process every time a new file is added to the Raw Data S3 bucket. The tables can be used by Amazon Athena, Amazon Redshift Spectrum, and Amazon EMR to query the data at any stage using standard SQL or Apache Hive. This configuration is a popular design pattern that delivers Agile Business Intelligence to derive business value from a variety of data quickly and easily.
Walkthrough
In this walkthrough, you define a database, configure a crawler to explore data in an Amazon S3 bucket, create a table, transform the CSV file into Parquet, create a table for the Parquet data, and query the data with Amazon Athena.
Discover the data
Sign in to the AWS Management Console and open the AWS Glue console. You can find AWS Glue in the Analytics section. AWS Glue is currently available in US East (N. Virginia), US East (Ohio), and US West (Oregon). Additional AWS Regions are added frequently.
The first step to discovering the data is to add a database. A database is a collection of tables.
In the console, choose Add database. In Database name, type nycitytaxi, and choose Create.
Choose Tables in the navigation pane. A table consists of the names of columns, data type definitions, and other metadata about a dataset.
Add a table to the database nycitytaxi.You can add a table manually or by using a crawler. A crawler is a program that connects to a data store and progresses through a prioritized list of classifiers to determine the schema for your data. AWS Glue provides classifiers for common file types like CSV, JSON, Avro, and others. You can also write your own classifier using a grok pattern.
To add a crawler, enter the data source: an Amazon S3 bucket named s3://aws-bigdata-blog/artifacts/glue-data-lake/data/. This S3 bucket contains the data file consisting of all the rides for the green taxis for the month of January 2017.
Choose Next.
For IAM role, choose the default role AWSGlueServiceRoleDefault in the drop-down list.
For Frequency, choose Run on demand. The crawler can be run on demand or set to run on a schedule.
For Database, choose nycitytaxi.It is important to understand how AWS Glue deals with schema changes so that you can select the appropriate method. In this example, the table is updated with any change. For more information about schema changes, see Cataloging Tables with a Crawler in the AWS Glue Developer Guide.
Review the steps, and choose Finish. The crawler is ready to run. Choose Run it now. When the crawler has finished, one table has been added.
Choose Tables in the left navigation pane, and then choose data. This screen describes the table, including schema, properties, and other valuable information.
Transform the data from CSV to Parquet format
Now you can configure and run a job to transform the data from CSV to Parquet. Parquet is a columnar format that is well suited for AWS analytics services like Amazon Athena and Amazon Redshift Spectrum.
Under ETL in the left navigation pane, choose Jobs, and then choose Add job.
For the Name, type nytaxi-csv-parquet.
For the IAM role, choose AWSGlueServiceRoleDefault.
For This job runs, choose A proposed script generated by AWS Glue.
Provide a unique Amazon S3 path to store the scripts.
Provide a unique Amazon S3 directory for a temporary directory.
Choose Next.
Choose data as the data source.
Choose Create tables in your data target.
Choose Parquet as the format.
Choose a new location (a new prefix location without any existing objects) to store the results.
Verify the schema mapping, and choose Finish.
View the job.This screen provides a complete view of the job and allows you to edit, save, and run the job.AWS Glue created this script. However, if required, you can create your own.
Choose Save, and then choose Run job.
Add the Parquet table and crawler
When the job has finished, add a new table for the Parquet data using a crawler.
For Crawler name, type nytaxiparquet.
Choose S3 as the Data store.
Include the Amazon S3 path chosen in the ETL
For the IAM role, choose AWSGlueServiceRoleDefault.
For Database, choose nycitytaxi.
For Frequency, choose Run on demand.
After the crawler has finished, there are two tables in the nycitytaxi database: a table for the raw CSV data and a table for the transformed Parquet data.
Analyze the data with Amazon Athena
Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is capable of querying CSV data. However, the Parquet file format significantly reduces the time and cost of querying the data. For more information, see the blog post Analyzing Data in Amazon S3 using Amazon Athena.
To use AWS Glue with Amazon Athena, you must upgrade your Athena data catalog to the AWS Glue Data Catalog. For more information about upgrading your Athena data catalog, see this step-by-step guide.
Open the AWS Management Console for Athena. The Query Editor displays both tables in the nycitytaxi
You can query the data using standard SQL.
Choose the nytaxigreenparquet
Type Select * From "nycitytaxi"."data" limit 10;
Choose Run Query.
Conclusion
This post demonstrates how easy it is to build the foundation of a data lake using AWS Glue and Amazon S3. By using AWS Glue to crawl your data on Amazon S3 and build an Apache Hive-compatible metadata store, you can use the metadata across the AWS analytic services and popular Hadoop ecosystem tools. This combination of AWS services is powerful and easy to use, allowing you to get to business insights faster.
If you have questions or suggestions, please comment below.
Additional reading
See the following blog posts for more information:
Gordon Heinrich is a Solutions Architect working with global systems integrators. He works with our partners and customers to provide them architectural guidance for building data lakes and using AWS analytic services. In his spare time, he enjoys spending time with his family, skiing, hiking, and mountain biking in Colorado.
Success in the popular music industry is typically measured in terms of the number of Top 10 hits artists have to their credit. The music industry is a highly competitive multi-billion dollar business, and record labels incur various costs in exchange for a percentage of the profits from sales and concert tickets.
Predicting the success of an artist’s release in the popular music industry can be difficult. One release may be extremely popular, resulting in widespread play on TV, radio and social media, while another single may turn out quite unpopular, and therefore unprofitable. Record labels need to be selective in their decision making, and predictive analytics can help them with decision making around the type of songs and artists they need to promote.
In this walkthrough, you leverage H2O.ai, Amazon Athena, and RStudio to make predictions on whether a song might make it to the Top 10 Billboard charts. You explore the GLM, GBM, and deep learning modeling techniques using H2O’s rapid, distributed and easy-to-use open source parallel processing engine. RStudio is a popular IDE, licensed either commercially or under AGPLv3, for working with R. This is ideal if you don’t want to connect to a server via SSH and use code editors such as vi to do analytics. RStudio is available in a desktop version, or a server version that allows you to access R via a web browser. RStudio’s Notebooks feature is used to demonstrate the execution of code and output. In addition, this post showcases how you can leverage Athena for query and interactive analysis during the modeling phase. A working knowledge of statistics and machine learning would be helpful to interpret the analysis being performed in this post.
Walkthrough
Your goal is to predict whether a song will make it to the Top 10 Billboard charts. For this purpose, you will be using multiple modeling techniques―namely GLM, GBM and deep learning―and choose the model that is the best fit.
This solution involves the following steps:
Install and configure RStudio with Athena
Log in to RStudio
Install R packages
Connect to Athena
Create a dataset
Create models
Install and configure RStudio with Athena
Use the following AWS CloudFormation stack to install, configure, and connect RStudio on an Amazon EC2 instance with Athena.
Launching this stack creates all required resources and prerequisites:
Amazon EC2 instance with Amazon Linux (minimum size of t2.large is recommended)
Provisioning of the EC2 instance in an existing VPC and public subnet
Installation of Java 8
Assignment of an IAM role to the EC2 instance with the required permissions for accessing Athena and Amazon S3
Security group allowing access to the RStudio and SSH ports from the internet (I recommend restricting access to these ports)
S3 staging bucket required for Athena (referenced within RStudio as ATHENABUCKET)
RStudio username and password
Setup logs in Amazon CloudWatch Logs (if needed for additional troubleshooting)
Amazon EC2 Systems Manager agent, which makes it easy to manage and patch
All AWS resources are created in the US-East-1 Region. To avoid cross-region data transfer fees, launch the CloudFormation stack in the same region. To check the availability of Athena in other regions, see Region Table.
Log in to RStudio
The instance security group has been automatically configured to allow incoming connections on the RStudio port 8787 from any source internet address. You can edit the security group to restrict source IP access. If you have trouble connecting, ensure that port 8787 isn’t blocked by subnet network ACLS or by your outgoing proxy/firewall.
In the CloudFormation stack, choose Outputs, Value, and then open the RStudio URL. You might need to wait for a few minutes until the instance has been launched.
Log in to RStudio with the and password you provided during setup.
Install R packages
Next, install the required R packages from the RStudio console. You can download the R notebook file containing just the code.
#install pacman – a handy package manager for managing installs
if("pacman" %in% rownames(installed.packages()) == FALSE)
{install.packages("pacman")}
library(pacman)
p_load(h2o,rJava,RJDBC,awsjavasdk)
h2o.init(nthreads = -1)
## Connection successful!
##
## R is connected to the H2O cluster:
## H2O cluster uptime: 2 hours 42 minutes
## H2O cluster version: 3.10.4.6
## H2O cluster version age: 4 months and 4 days !!!
## H2O cluster name: H2O_started_from_R_rstudio_hjx881
## H2O cluster total nodes: 1
## H2O cluster total memory: 3.30 GB
## H2O cluster total cores: 4
## H2O cluster allowed cores: 4
## H2O cluster healthy: TRUE
## H2O Connection ip: localhost
## H2O Connection port: 54321
## H2O Connection proxy: NA
## H2O Internal Security: FALSE
## R Version: R version 3.3.3 (2017-03-06)
## Warning in h2o.clusterInfo():
## Your H2O cluster version is too old (4 months and 4 days)!
## Please download and install the latest version from http://h2o.ai/download/
#install aws sdk if not present (pre-requisite for using Athena with an IAM role)
if (!aws_sdk_present()) {
install_aws_sdk()
}
load_sdk()
## NULL
Connect to Athena
Next, establish a connection to Athena from RStudio, using an IAM role associated with your EC2 instance. Use ATHENABUCKET to specify the S3 staging directory.
URL <- 'https://s3.amazonaws.com/athena-downloads/drivers/AthenaJDBC41-1.0.1.jar'
fil <- basename(URL)
#download the file into current working directory
if (!file.exists(fil)) download.file(URL, fil)
#verify that the file has been downloaded successfully
list.files()
## [1] "AthenaJDBC41-1.0.1.jar"
drv <- JDBC(driverClass="com.amazonaws.athena.jdbc.AthenaDriver", fil, identifier.quote="'")
con <- jdbcConnection <- dbConnect(drv, 'jdbc:awsathena://athena.us-east-1.amazonaws.com:443/',
s3_staging_dir=Sys.getenv("ATHENABUCKET"),
aws_credentials_provider_class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
Verify the connection. The results returned depend on your specific Athena setup.
For this analysis, you use a sample dataset combining information from Billboard and Wikipedia with Echo Nest data in the Million Songs Dataset. Upload this dataset into your own S3 bucket. The table below provides a description of the fields used in this dataset.
Field
Description
year
Year that song was released
songtitle
Title of the song
artistname
Name of the song artist
songid
Unique identifier for the song
artistid
Unique identifier for the song artist
timesignature
Variable estimating the time signature of the song
timesignature_confidence
Confidence in the estimate for the timesignature
loudness
Continuous variable indicating the average amplitude of the audio in decibels
tempo
Variable indicating the estimated beats per minute of the song
tempo_confidence
Confidence in the estimate for tempo
key
Variable with twelve levels indicating the estimated key of the song (C, C#, B)
key_confidence
Confidence in the estimate for key
energy
Variable that represents the overall acoustic energy of the song, using a mix of features such as loudness
pitch
Continuous variable that indicates the pitch of the song
timbre_0_min thru timbre_11_min
Variables that indicate the minimum values over all segments for each of the twelve values in the timbre vector
timbre_0_max thru timbre_11_max
Variables that indicate the maximum values over all segments for each of the twelve values in the timbre vector
top10
Indicator for whether or not the song made it to the Top 10 of the Billboard charts (1 if it was in the top 10, and 0 if not)
Create an Athena table based on the dataset
In the Athena console, select the default database, sampled, or create a new database.
Run the following create table statement.
create external table if not exists billboard
(
year int,
songtitle string,
artistname string,
songID string,
artistID string,
timesignature int,
timesignature_confidence double,
loudness double,
tempo double,
tempo_confidence double,
key int,
key_confidence double,
energy double,
pitch double,
timbre_0_min double,
timbre_0_max double,
timbre_1_min double,
timbre_1_max double,
timbre_2_min double,
timbre_2_max double,
timbre_3_min double,
timbre_3_max double,
timbre_4_min double,
timbre_4_max double,
timbre_5_min double,
timbre_5_max double,
timbre_6_min double,
timbre_6_max double,
timbre_7_min double,
timbre_7_max double,
timbre_8_min double,
timbre_8_max double,
timbre_9_min double,
timbre_9_max double,
timbre_10_min double,
timbre_10_max double,
timbre_11_min double,
timbre_11_max double,
Top10 int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://aws-bigdata-blog/artifacts/predict-billboard/data'
;
Inspect the table definition for the ‘billboard’ table that you have created. If you chose a database other than sampledb, replace that value with your choice.
Next, run a sample query to obtain a list of all songs from Janet Jackson that made it to the Billboard Top 10 charts.
dbGetQuery(con, " SELECT songtitle,artistname,top10 FROM sampledb.billboard WHERE lower(artistname) = 'janet jackson' AND top10 = 1")
## songtitle artistname top10
## 1 Runaway Janet Jackson 1
## 2 Because Of Love Janet Jackson 1
## 3 Again Janet Jackson 1
## 4 If Janet Jackson 1
## 5 Love Will Never Do (Without You) Janet Jackson 1
## 6 Black Cat Janet Jackson 1
## 7 Come Back To Me Janet Jackson 1
## 8 Alright Janet Jackson 1
## 9 Escapade Janet Jackson 1
## 10 Rhythm Nation Janet Jackson 1
Determine how many songs in this dataset are specifically from the year 2010.
dbGetQuery(con, " SELECT count(*) FROM sampledb.billboard WHERE year = 2010")
## _col0
## 1 373
The sample dataset provides certain song properties of interest that can be analyzed to gauge the impact to the song’s overall popularity. Look at one such property, timesignature, and determine the value that is the most frequent among songs in the database. Timesignature is a measure of the number of beats and the type of note involved.
Running the query directly may result in an error, as shown in the commented lines below. This error is a result of trying to retrieve a large result set over a JDBC connection, which can cause out-of-memory issues at the client level. To address this, reduce the fetch size and run again.
#t<-dbGetQuery(con, " SELECT timesignature FROM sampledb.billboard")
#Note: Running the preceding query results in the following error:
#Error in .jcall(rp, "I", "fetch", stride, block): java.sql.SQLException: The requested #fetchSize is more than the allowed value in Athena. Please reduce the fetchSize and try #again. Refer to the Athena documentation for valid fetchSize values.
# Use the dbSendQuery function, reduce the fetch size, and run again
r <- dbSendQuery(con, " SELECT timesignature FROM sampledb.billboard")
dftimesignature<- fetch(r, n=-1, block=100)
dbClearResult(r)
## [1] TRUE
table(dftimesignature)
## dftimesignature
## 0 1 3 4 5 7
## 10 143 503 6787 112 19
nrow(dftimesignature)
## [1] 7574
From the results, observe that 6787 songs have a timesignature of 4.
Next, determine the song with the highest tempo.
dbGetQuery(con, " SELECT songtitle,artistname,tempo FROM sampledb.billboard WHERE tempo = (SELECT max(tempo) FROM sampledb.billboard) ")
## songtitle artistname tempo
## 1 Wanna Be Startin' Somethin' Michael Jackson 244.307
Create the training dataset
Your model needs to be trained such that it can learn and make accurate predictions. Split the data into training and test datasets, and create the training dataset first. This dataset contains all observations from the year 2009 and earlier. You may face the same JDBC connection issue pointed out earlier, so this query uses a fetch size.
#BillboardTrain <- dbGetQuery(con, "SELECT * FROM sampledb.billboard WHERE year <= 2009")
#Running the preceding query results in the following error:-
#Error in .verify.JDBC.result(r, "Unable to retrieve JDBC result set for ", : Unable to retrieve #JDBC result set for SELECT * FROM sampledb.billboard WHERE year <= 2009 (Internal error)
#Follow the same approach as before to address this issue.
r <- dbSendQuery(con, "SELECT * FROM sampledb.billboard WHERE year <= 2009")
BillboardTrain <- fetch(r, n=-1, block=100)
dbClearResult(r)
## [1] TRUE
BillboardTrain[1:2,c(1:3,6:10)]
## year songtitle artistname timesignature
## 1 2009 The Awkward Goodbye Athlete 3
## 2 2009 Rubik's Cube Athlete 3
## timesignature_confidence loudness tempo tempo_confidence
## 1 0.732 -6.320 89.614 0.652
## 2 0.906 -9.541 117.742 0.542
nrow(BillboardTrain)
## [1] 7201
Create the test dataset
BillboardTest <- dbGetQuery(con, "SELECT * FROM sampledb.billboard where year = 2010")
BillboardTest[1:2,c(1:3,11:15)]
## year songtitle artistname key
## 1 2010 This Is the House That Doubt Built A Day to Remember 11
## 2 2010 Sticks & Bricks A Day to Remember 10
## key_confidence energy pitch timbre_0_min
## 1 0.453 0.9666556 0.024 0.002
## 2 0.469 0.9847095 0.025 0.000
nrow(BillboardTest)
## [1] 373
Convert the training and test datasets into H2O dataframes
You need to designate the independent and dependent variables prior to applying your modeling algorithms. Because you’re trying to predict the ‘top10’ field, this would be your dependent variable and everything else would be independent.
Create your first model using GLM. Because GLM works best with numeric data, you create your model by dropping non-numeric variables. You only use the variables in the dataset that describe the numerical attributes of the song in the logistic regression model. You won’t use these variables: “year”, “songtitle”, “artistname”, “songid”, or “artistid”.
Create Model 1 with the training dataset, using GLM as the modeling algorithm and H2O’s built-in h2o.glm function.
modelh1 <- h2o.glm( y = y.dep, x = x.indep, training_frame = train.h2o, family = "binomial")
##
|
| | 0%
|
|===== | 8%
|
|=================================================================| 100%
Measure the performance of Model 1, using H2O’s built-in performance function.
h2o.performance(model=modelh1,newdata=test.h2o)
## H2OBinomialMetrics: glm
##
## MSE: 0.09924684
## RMSE: 0.3150347
## LogLoss: 0.3220267
## Mean Per-Class Error: 0.2380168
## AUC: 0.8431394
## Gini: 0.6862787
## R^2: 0.254663
## Null Deviance: 326.0801
## Residual Deviance: 240.2319
## AIC: 308.2319
##
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
## 0 1 Error Rate
## 0 255 59 0.187898 =59/314
## 1 17 42 0.288136 =17/59
## Totals 272 101 0.203753 =76/373
##
## Maximum Metrics: Maximum metrics at their respective thresholds
## metric threshold value idx
## 1 max f1 0.192772 0.525000 100
## 2 max f2 0.124912 0.650510 155
## 3 max f0point5 0.416258 0.612903 23
## 4 max accuracy 0.416258 0.879357 23
## 5 max precision 0.813396 1.000000 0
## 6 max recall 0.037579 1.000000 282
## 7 max specificity 0.813396 1.000000 0
## 8 max absolute_mcc 0.416258 0.455251 23
## 9 max min_per_class_accuracy 0.161402 0.738854 125
## 10 max mean_per_class_accuracy 0.124912 0.765006 155
##
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `
h2o.auc(h2o.performance(modelh1,test.h2o))
## [1] 0.8431394
The AUC metric provides insight into how well the classifier is able to separate the two classes. In this case, the value of 0.8431394 indicates that the classification is good. (A value of 0.5 indicates a worthless test, while a value of 1.0 indicates a perfect test.)
Next, inspect the coefficients of the variables in the dataset.
Typically, songs with heavier instrumentation tend to be louder (have higher values in the variable “loudness”) and more energetic (have higher values in the variable “energy”). This knowledge is helpful for interpreting the modeling results.
You can make the following observations from the results:
The coefficient estimates for the confidence values associated with the time signature, key, and tempo variables are positive. This suggests that higher confidence leads to a higher predicted probability of a Top 10 hit.
The coefficient estimate for loudness is positive, meaning that mainstream listeners prefer louder songs with heavier instrumentation.
The coefficient estimate for energy is negative, meaning that mainstream listeners prefer songs that are less energetic, which are those songs with light instrumentation.
These coefficients lead to contradictory conclusions for Model 1. This could be due to multicollinearity issues. Inspect the correlation between the variables “loudness” and “energy” in the training set.
This number indicates that these two variables are highly correlated, and Model 1 does indeed suffer from multicollinearity. Typically, you associate a value of -1.0 to -0.5 or 1.0 to 0.5 to indicate strong correlation, and a value of 0.1 to 0.1 to indicate weak correlation. To avoid this correlation issue, omit one of these two variables and re-create the models.
You build two variations of the original model:
Model 2, in which you keep “energy” and omit “loudness”
Model 3, in which you keep “loudness” and omit “energy”
You compare these two models and choose the model with a better fit for this use case.
Inspecting the coefficient of the variable energy, Model 2 suggests that songs with high energy levels tend to be more popular. This is as per expectation.
As H2O orders variables by significance, the variable energy is not significant in this model.
You can conclude that Model 2 is not ideal for this use , as energy is not significant.
From the confusion matrix, the model correctly predicts that 33 songs will be top 10 hits (true positives). However, it has 26 false positives (songs that the model predicted would be Top 10 hits, but ended up not being Top 10 hits).
Loudness has a positive coefficient estimate, meaning that this model predicts that songs with heavier instrumentation tend to be more popular. This is the same conclusion from Model 2.
Loudness is significant in this model.
Overall, Model 3 predicts a higher number of top 10 hits with an accuracy rate that is acceptable. To choose the best fit for production runs, record labels should consider the following factors:
Desired model accuracy at a given threshold
Number of correct predictions for top10 hits
Tolerable number of false positives or false negatives
Next, make predictions using Model 3 on the test dataset.
The first set of output results specifies the probabilities associated with each predicted observation. For example, observation 1 is 96.54739% likely to not be a Top 10 hit, and 3.4526052% likely to be a Top 10 hit (predict=1 indicates Top 10 hit and predict=0 indicates not a Top 10 hit). The second set of results list the actual predictions made. From the third set of results, this model predicts that 61 songs will be top 10 hits.
Compute the baseline accuracy, by assuming that the baseline predicts the most frequent outcome, which is that most songs are not Top 10 hits.
table(BillboardTest$top10)
##
## 0 1
## 314 59
Now observe that the baseline model would get 314 observations correct, and 59 wrong, for an accuracy of 314/(314+59) = 0.8418231.
It seems that Model 3, with an accuracy of 0.8552, provides you with a small improvement over the baseline model. But is this model useful for record labels?
View the two models from an investment perspective:
A production company is interested in investing in songs that are more likely to make it to the Top 10. The company’s objective is to minimize the risk of financial losses attributed to investing in songs that end up unpopular.
How many songs does Model 3 correctly predict as a Top 10 hit in 2010? Looking at the confusion matrix, you see that it predicts 33 top 10 hits correctly at an optimal threshold, which is more than half the number
It will be more useful to the record label if you can provide the production company with a list of songs that are highly likely to end up in the Top 10.
The baseline model is not useful, as it simply does not label any song as a hit.
Considering the three models built so far, you can conclude that Model 3 proves to be the best investment choice for the record label.
GBM model
H2O provides you with the ability to explore other learning models, such as GBM and deep learning. Explore building a model using the GBM technique, using the built-in h2o.gbm function.
Before you do this, you need to convert the target variable to a factor for multinomial classification techniques.
train.h2o$top10=as.factor(train.h2o$top10)
gbm.modelh <- h2o.gbm(y=y.dep, x=x.indep, training_frame = train.h2o, ntrees = 500, max_depth = 4, learn_rate = 0.01, seed = 1122,distribution="multinomial")
##
|
| | 0%
|
|=== | 5%
|
|===== | 7%
|
|====== | 9%
|
|======= | 10%
|
|====================== | 33%
|
|===================================== | 56%
|
|==================================================== | 79%
|
|================================================================ | 98%
|
|=================================================================| 100%
perf.gbmh<-h2o.performance(gbm.modelh,test.h2o)
perf.gbmh
## H2OBinomialMetrics: gbm
##
## MSE: 0.09860778
## RMSE: 0.3140188
## LogLoss: 0.3206876
## Mean Per-Class Error: 0.2120263
## AUC: 0.8630573
## Gini: 0.7261146
##
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
## 0 1 Error Rate
## 0 266 48 0.152866 =48/314
## 1 16 43 0.271186 =16/59
## Totals 282 91 0.171582 =64/373
##
## Maximum Metrics: Maximum metrics at their respective thresholds
## metric threshold value idx
## 1 max f1 0.189757 0.573333 90
## 2 max f2 0.130895 0.693717 145
## 3 max f0point5 0.327346 0.598802 26
## 4 max accuracy 0.442757 0.876676 14
## 5 max precision 0.802184 1.000000 0
## 6 max recall 0.049990 1.000000 284
## 7 max specificity 0.802184 1.000000 0
## 8 max absolute_mcc 0.169135 0.496486 104
## 9 max min_per_class_accuracy 0.169135 0.796610 104
## 10 max mean_per_class_accuracy 0.169135 0.805948 104
##
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `
h2o.sensitivity(perf.gbmh,0.5)
## Warning in h2o.find_row_by_threshold(object, t): Could not find exact
## threshold: 0.5 for this set of metrics; using closest threshold found:
## 0.501205344484314. Run `h2o.predict` and apply your desired threshold on a
## probability column.
## [[1]]
## [1] 0.1355932
h2o.auc(perf.gbmh)
## [1] 0.8630573
This model correctly predicts 43 top 10 hits, which is 10 more than the number predicted by Model 3. Moreover, the AUC metric is higher than the one obtained from Model 3.
As seen above, H2O’s API provides the ability to obtain key statistical measures required to analyze the models easily, using several built-in functions. The record label can experiment with different parameters to arrive at the model that predicts the maximum number of Top 10 hits at the desired level of accuracy and threshold.
H2O also allows you to experiment with deep learning models. Deep learning models have the ability to learn features implicitly, but can be more expensive computationally.
Now, create a deep learning model with the h2o.deeplearning function, using the same training and test datasets created before. The time taken to run this model depends on the type of EC2 instance chosen for this purpose. For models that require more computation, consider using accelerated computing instances such as the P2 instance type.
system.time(
dlearning.modelh <- h2o.deeplearning(y = y.dep,
x = x.indep,
training_frame = train.h2o,
epoch = 250,
hidden = c(250,250),
activation = "Rectifier",
seed = 1122,
distribution="multinomial"
)
)
##
|
| | 0%
|
|=== | 4%
|
|===== | 8%
|
|======== | 12%
|
|========== | 16%
|
|============= | 20%
|
|================ | 24%
|
|================== | 28%
|
|===================== | 32%
|
|======================= | 36%
|
|========================== | 40%
|
|============================= | 44%
|
|=============================== | 48%
|
|================================== | 52%
|
|==================================== | 56%
|
|======================================= | 60%
|
|========================================== | 64%
|
|============================================ | 68%
|
|=============================================== | 72%
|
|================================================= | 76%
|
|==================================================== | 80%
|
|======================================================= | 84%
|
|========================================================= | 88%
|
|============================================================ | 92%
|
|============================================================== | 96%
|
|=================================================================| 100%
## user system elapsed
## 1.216 0.020 166.508
perf.dl<-h2o.performance(model=dlearning.modelh,newdata=test.h2o)
perf.dl
## H2OBinomialMetrics: deeplearning
##
## MSE: 0.1678359
## RMSE: 0.4096778
## LogLoss: 1.86509
## Mean Per-Class Error: 0.3433013
## AUC: 0.7568822
## Gini: 0.5137644
##
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
## 0 1 Error Rate
## 0 290 24 0.076433 =24/314
## 1 36 23 0.610169 =36/59
## Totals 326 47 0.160858 =60/373
##
## Maximum Metrics: Maximum metrics at their respective thresholds
## metric threshold value idx
## 1 max f1 0.826267 0.433962 46
## 2 max f2 0.000000 0.588235 239
## 3 max f0point5 0.999929 0.511811 16
## 4 max accuracy 0.999999 0.865952 10
## 5 max precision 1.000000 1.000000 0
## 6 max recall 0.000000 1.000000 326
## 7 max specificity 1.000000 1.000000 0
## 8 max absolute_mcc 0.999929 0.363219 16
## 9 max min_per_class_accuracy 0.000004 0.662420 145
## 10 max mean_per_class_accuracy 0.000000 0.685334 224
##
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `h2o.gainsLift(<model>, valid=<T/F>, xval=<T/F>)`
h2o.sensitivity(perf.dl,0.5)
## Warning in h2o.find_row_by_threshold(object, t): Could not find exact
## threshold: 0.5 for this set of metrics; using closest threshold found:
## 0.496293348880151. Run `h2o.predict` and apply your desired threshold on a
## probability column.
## [[1]]
## [1] 0.3898305
h2o.auc(perf.dl)
## [1] 0.7568822
The AUC metric for this model is 0.7568822, which is less than what you got from the earlier models. I recommend further experimentation using different hyper parameters, such as the learning rate, epoch or the number of hidden layers.
H2O’s built-in functions provide many key statistical measures that can help measure model performance. Here are some of these key terms.
Metric
Description
Sensitivity
Measures the proportion of positives that have been correctly identified. It is also called the true positive rate, or recall.
Specificity
Measures the proportion of negatives that have been correctly identified. It is also called the true negative rate.
Threshold
Cutoff point that maximizes specificity and sensitivity. While the model may not provide the highest prediction at this point, it would not be biased towards positives or negatives.
Precision
The fraction of the documents retrieved that are relevant to the information needed, for example, how many of the positively classified are relevant
AUC
Provides insight into how well the classifier is able to separate the two classes. The implicit goal is to deal with situations where the sample distribution is highly skewed, with a tendency to overfit to a single class.
0.90 – 1 = excellent (A)
0.8 – 0.9 = good (B)
0.7 – 0.8 = fair (C)
.6 – 0.7 = poor (D)
0.5 – 0.5 = fail (F)
Here’s a summary of the metrics generated from H2O’s built-in functions for the three models that produced useful results.
Metric
Model 3
GBM Model
Deep Learning Model
Accuracy
(max)
0.882038
(t=0.435479)
0.876676
(t=0.442757)
0.865952
(t=0.999999)
Precision
(max)
1.0
(t=0.821606)
1.0
(t=0802184)
1.0
(t=1.0)
Recall
(max)
1.0
1.0
1.0
(t=0)
Specificity
(max)
1.0
1.0
1.0
(t=1)
Sensitivity
0.2033898
0.1355932
0.3898305
(t=0.5)
AUC
0.8492389
0.8630573
0.756882
Note: ‘t’ denotes threshold.
Your options at this point could be narrowed down to Model 3 and the GBM model, based on the AUC and accuracy metrics observed earlier. If the slightly lower accuracy of the GBM model is deemed acceptable, the record label can choose to go to production with the GBM model, as it can predict a higher number of Top 10 hits. The AUC metric for the GBM model is also higher than that of Model 3.
Record labels can experiment with different learning techniques and parameters before arriving at a model that proves to be the best fit for their business. Because deep learning models can be computationally expensive, record labels can choose more powerful EC2 instances on AWS to run their experiments faster.
Conclusion
In this post, I showed how the popular music industry can use analytics to predict the type of songs that make the Top 10 Billboard charts. By running H2O’s scalable machine learning platform on AWS, data scientists can easily experiment with multiple modeling techniques and interactively query the data using Amazon Athena, without having to manage the underlying infrastructure. This helps record labels make critical decisions on the type of artists and songs to promote in a timely fashion, thereby increasing sales and revenue.
If you have questions or suggestions, please comment below.
Gopal Wunnava is a Partner Solution Architect with the AWS GSI Team. He works with partners and customers on big data engagements, and is passionate about building analytical solutions that drive business capabilities and decision making. In his spare time, he loves all things sports and movies related and is fond of old classics like Asterix, Obelix comics and Hitchcock movies.
Bob Strahan, a Senior Consultant with AWS Professional Services, contributed to this post.
Amazon Cognito user pools are full-fledged identity providers (IdP) that you can use to maintain a user directory. The directory can scale to hundreds of millions of users and also add sign-up and sign-in support to your mobile or web applications.
You can provide access from an Amazon Cognito user pool to Amazon QuickSight with IAM identity federation in a serverless way. This solution uses Amazon API Gateway and AWS Lambda as a backend fronted by a simple web app. New features such as Amazon Cognito user pools app Integration make it even easier to add sign-in and sign-up logic to your application and federation use cases.
Solution architecture
Here’s how you can accomplish this solution.
In this scenario, your web app hosted on Amazon S3 integrates with Amazon Cognito User Pools to authenticate users. It uses Amazon Cognito Federated Identities to authorize access to Amazon QuickSight on behalf of the authenticated user, with temporary AWS credentials and appropriate permissions. The app then uses an ID token generated by Amazon Cognito to call API Gateway and Lambda to obtain a sign-in token for Amazon QuickSight from AWS Sign-In Federation. With this token, the app redirects access to Amazon QuickSight:
The Amazon Cognito hosted UI provided by the app integration domain performs all sign-in, sign-up, verification, and authentication logic for the web app. This allows you to register and authenticate users.
After a user is authenticated with a valid user name and password, an OpenID Connect token (ID token) is sent to Amazon Cognito Federated Identities. The token retrieves temporary AWS credentials based on an IAM role with “quickSight:CreateUser” permissions. These credentials are used to build a session string that is encoded into the URL https://signin.aws.amazon.com/federation?Action=getSigninToken.
The ID token, along with the encoded URL, is sent to API Gateway, which in turn verifies the token with a user pool authorizer to authorize the API call.
The URL is passed on to a Lambda function that calls the AWS SSO federation endpoint to retrieve a sign-In token.
AWS SSO processes the federation request, authenticates the user, and forwards the authentication token to Amazon QuickSight, which then uses the authentication token and authorizes user access.
How can you use, configure and test this serverless solution in your own AWS account? I created a simple SAM (Serverless Application Model) template that can be used to spin up all the resources needed for the solution.
Implementation steps
Using the AWS CLI, create an S3 bucket in the same region in which to deploy all resources:
Then, execute a couple of commands in a terminal or command prompt from inside the uncompressed folder where all files are located, referring to the S3 bucket created earlier:
AWS CloudFormation automatically creates and configures the following resources in your account:
Amazon CloudFront distribution
S3 static website
Amazon Cognito user pool
Amazon Cognito identity pool
IAM role for authenticated users
API Gateway API
Lambda function
You can follow the progress of the stack creation from the CloudFormation console. View the Outputs tab for the completed stack to get the identifiers of all created resources. You could also execute the following command with the AWS CLI:
aws cloudformation describe-stacks --query 'Stacks[0].[Outputs[].[OutputKey,OutputValue]]|[]' --output text --stack-name CognitoQuickSight
Use the information from the console or CLI command to replace the related resource identifiers in the file “auth.js”.
In the Amazon Cognito User Pools console, select the pool named QuickSightUsers generated by CloudFormation.
Under App integration, choose Domain name and create a domain. Domain names must be unique to the region. Add the domain to the “auth.js” file accordingly:
Choose App integration, App client settings and then select the option Cognito User Pool. Add the CloudFront distribution address (with https://, as SSL is a requirement for the callback/sign out URLs) and make sure that the address matches the related settings in the “auth.js” file exactly. For Allowed OAuth Flows, select implicit grant. For Allowed OAuth Scopes, select openid.
The app integration configuration is now done. Your “auth.js” file should look like the following:
The preceding resources don’t exist anymore. The CloudFormation stack that generated them was deleted. I recommend that you delete your stack after testing, for cleanup purposes. Deleting the stack also deletes all the resources.
Next, upload the four JS and HTML files to the S3 bucket named “cognitoquicksight-s3website-xxxxxxxxx”. Make sure that all files are publicly readable:
Congratulations, the configuration part is now finished!
It’s time to create your first user. Access your CloudFront distribution address in a browser and choose SIGN IN / SIGN UP.
On the Amazon Cognito hosted UI, choose SIGN UP and provide a user name, password and a valid email.
You receive a verification code in email to confirm the user.
In a production system, you might not want to allow open access to your dashboards. As you now have a confirmed user, you can disable the sign-up functionality altogether to avoid letting other users sign themselves up.
In the Amazon Cognito User Pools console, choose General settings, Policies and select Only allow administrators to create users.
In the web app, you can now sign in as the Amazon Cognito user to access the Amazon QuickSight console. Because this is the first time this user is accessing Amazon QuickSight with an IAM role, provide your email address and sign up as an Amazon QuickSight user.
Enjoy your federated access to Amazon QuickSight!
After you’re done testing, go to the CloudFormation console and delete the CognitoQuickSight stack to remove all the resources.
Extending and customizing the solution
Additionally, you could configure SAML federation for your user pool with a couple of clicks, following the instructions in the Amazon Cognito User Pools supports federation with SAML post. If you add more than one SAML IdP, Amazon Cognito can identify the provider to which to redirect the authentication request, based on the user’s corporate email address.
It’s important to understand that while Amazon Cognito User Pools is authenticating (AuthN) the user, the IAM role created for the identity pool is authorizing (AuthZ) the user to perform actions on specific resources. As it is configured, the role only allows “quickSight:CreateUser” permissions. For additional permissions, modify the role accordingly, as in Setting Your IAM Policy. If your users create datasets, remember to add access to data sources such as Amazon S3.
You can customize this solution further by adding multiple groups to your user pool and associating each group with different IAM roles. For more information, see Amazon Cognito Groups and Fine-Grained Role-Based Access Control. For instance, with role-based access control, it’s possible to have a group for Amazon QuickSight administrators and one for users.
You can also modify the sign-In URL (Step 6) to redirect your user to any other service console, provided that the IAM role has appropriate permissions. After receiving a valid sign-in token from the SSO federation endpoint, the user is redirected to https://quicksight.aws.amazon.com. However, you can change the redirection to the main AWS console at https://console.aws.amazon.com. You could also change it to specific services, such as Amazon Redshift, Amazon EMR, Amazon Elasticsearch Service and Kibana (In this particular case the application needs to return an AWS Signature Version 4 – Sigv4 – signed URL based on the temporary credentials received from Cognito instead of calling the AWS Sign-In Federation endpoint), Amazon Kinesis, or AWS Lambda. Or, you can even customize a frontend portal with separate links to multiple specific services and resources that the user is only allowed to access provided that the IAM role being assumed has access to those services and resources.
Conclusion
With the power, flexibility, security, scalability, and the new federation and application integration features of Amazon Cognito user pools, there’s no need to worry about the undifferentiated heavy lifting of maintaining your own identity servers. This allows you to focus on application logic and secure access to other great AWS services, such as Amazon QuickSight.
If you have questions or suggestions, please comment below.
About the Author
Ed Lima is a Solutions Architect who helps AWS customers with their journey in the cloud. He has provided thought leadership to define and drive strategic direction for adoption of Amazon platforms and technologies, skillfully adapting and blending business requirements with technical aspects to achieve the best outcome helping implement well architected solutions. In his spare time, he enjoys snowboarding.
The Big Data Lambda Architecture seeks to provide data engineers and architects with a scalable, fault-tolerant data processing architecture and framework using loosely coupled, distributed systems. At a high level, the Lambda Architecture is designed to handle both real-time and historically aggregated batched data in an integrated fashion. It separates the duties of real-time and batch processing so purpose-built engines, processes, and storage can be used for each, while serving and query layers present a unified view of all of the data.
Historically, the Lambda Architecture demanded the use of various complex systems to achieve the outcomes of uniting batch and real-time views. Data platform engineers and architects were required to implement services running on Amazon EC2 for data collection and ingestion, batch processing, stream processing, serving layers, and dashboards/reporting. As time has gone on, AWS customers have continued to ask for managed solutions that scale seamlessly and put less focus on infrastructure, allowing teams to focus on what really matters: the data and the resulting insights.
In this post, I show you how you can use AWS services like AWS Glue to build a Lambda Architecture completely without servers. I use a practical demonstration to examine the tight integration between serverless services on AWS and create a robust data processing Lambda Architecture system.
New: AWS Glue
With the launch of AWS Glue, AWS provides a portfolio of services to architect a Big Data platform without managing any servers or clusters. AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy for customers to prepare and load their data for analytics. You can create and run an ETL job with a few clicks in the AWS Management Console. You simply point AWS Glue to your data stored on AWS, and AWS Glue discovers your data and stores the associated metadata (for example, the table definition and schema) in the AWS Glue Data Catalog. After it’s cataloged, your data is immediately searchable, queryable, and available for ETL.
AWS Glue generates the code to execute your data transformations and data loading processes. Furthermore, AWS Glue provides a managed Spark execution environment to run ETL jobs against a data lake in Amazon S3. In short, you can now run a Lambda Architecture in AWS in a completely 100% serverless fashion!
“Serverless” applications allow you to build and run applications without thinking about servers. What this means is that you can now stream data in real-time, process huge volumes of data in S3, and run SQL queries and visualizations against that data without managing server provisioning, installation, patching, or capacity scaling. This frees up your users to spend more time interpreting the data and deriving business value for your organization.
Solution overview
The AWS services involved in this solution include:
The following diagram explains how the services work together:
None of the included services require the creation, configuration, or installation of servers, clusters, and databases.
In this example, you use these services to send and process simulated streaming data of sensor devices to Kinesis Firehose and store the raw data in S3. Using AWS Glue, you analyze the raw data from S3 in batch-oriented fashion to look at the thermostat efficiency over time against the historical data, and store results back in S3.
Using Amazon Kinesis Analytics, you analyze and filter the data to detect inefficient sensors in real time. In this example, you detect inefficiencies in thermostat devices by comparing their temperature settings against the temperature they are reading (where thermostats are typically set between 70-72º F).
Finally, you use Athena and Amazon QuickSight to query and visualize the data and build a dashboard that can be shared with other users of Amazon QuickSight in your organization.
Walkthrough
You use the AWS CLI and AWS CloudFormation throughout this tutorial, therefore a basic understanding of these services is assumed.
At the time of writing this post, AWS Glue is only available in the US East (N. Virginia) AWS Region. Complete all the steps in that region.
Step 1: Set up baseline AWS resources
I’ve set up a CloudFormation template to set up the Kinesis streaming resources for you.
Before you continue with the rest of the walkthrough, make sure that the Status value is CREATE_COMPLETE. This can take anywhere between 3 and 5 minutes, so grab coffee or take a moment to prepare and review the next steps.
Step 2: Set up the Amazon Kinesis Data Generator
You will be leveraging the Amazon Kinesis Data Generator (KDG) to simulate devices pushing data into Kinesis Firehose.
Make sure that you can log in to the KDG producer page before you continue.
Step 3: Send data to Kinesis Firehose using the KDG
In this step, you configure the KDG to send simulated records to simulate devices streaming data into Kinesis Firehose. In the data generator template below, notice the complex weighting. This demonstrates outliers in device temperature readings later on. Kinesis Firehose sends all the raw data to S3 in addition to Kinesis Analytics for real-time processing.
On the KDG producer page, for Region, select “us-east-1”. For Stream/delivery stream, select AWSBigDataBlog-RawDeliveryStream. For Records per second, type 100.
Paste the template shown below into KDG template window:
Without leaving the KDG page, navigate to the Kinesis Analytics console to view the status of the application processing the data in real time. When you are happy with the amount of data sent, choose Stop Sending Data to Kinesis. I recommend waiting until at least 20,000 records are sent.
Step 4: Submit AWS Glue crawlers to interpret the table definition for Kinesis Firehose outputs in S3
In this step, you use AWS Glue crawlers to crawl and generate table definitions against the produced data in S3. This allows you to analyze data in aggregate over a historical context instead of just using the latest data.
Use CLI commands to define and run AWS Glue crawlers:
Replace the <bucketName> text in the command with the bucket name that you specified in the CloudFormation template in step 1.
Replace the <RoleArn> text in the command with an IAM role ARN with permissions to use AWS Glue. For more information, see Setting up IAM Permissions for AWS Glue.
Select the crawler that you created earlier, “thermostat-data-crawler”, and choose Run crawler.
After just a few seconds, the crawler shows as “Ready”, and will have automatically inferred the schema of three tables:
“raw”—Data being sent from KDG to Kinesis Firehose, delivered to S3.
“results”—Kinesis Analytics results data on real-time processing for percent inefficiency of devices, stored in S3.
“sampledata”—Simulated user device settings table for 1,000 thermostats in S3. This data is joined to the raw data to calculate the “percent inefficiency” (absolute value of temperature reading minus temperature setting, divided by the temperature setting).
Zooming in on the architecture diagram, I’ve included the names of these tables. Take a moment to see how the tables fit in the overall Lambda Architecture:
Step 5: Submit an AWS Glue job to pre-process daily thermostat efficiency
In this step, you use an AWS Glue job to join data and pre-process calculated views to assess the efficiency of the thermostat devices. The results are stored in S3 to be queried using Athena.
Replace the <bucketName> text in the command with the bucket name that you specified in the CloudFormation template in step 1.
Replace the <RoleName> text in the command with an IAM role with permissions to use AWS Glue. For more information, see Setting up IAM Permissions for AWS Glue.
Select the job named “daily_avg”, and choose Action, Run job.
In the Parameters (optional) dialog box, choose Run job.
This job can take between 10–14 minutes to run. Take a moment to view the script definition in the console. Take care not to save any changes to the script at this time:
Try out each of the individual queries below to analyze the data in Athena.
Query and analyze the raw time-series data in the “master dataset” Lambda Architecture layer
--"Events by Device ID"
SELECT uuid,
devicets,
deviceid,
temp
FROM awsblogsgluedemo."raw"
WHERE deviceid = 1
ORDER BY devicets DESC;
--"Top 20 most active thermostats"
SELECT deviceid,
COUNT(*) AS num_events
FROM awsblogsgluedemo."raw"
GROUP BY deviceid
ORDER BY num_events DESC LIMIT 20;
Query and analyze the data processed by AWS Glue at the “Serving” layer:
--"Top 10 Most inefficient devices"
SELECT deviceid,
daily_avg_inefficiency
FROM awsblogsgluedemo.daily_avg_inefficiency
ORDER BY daily_avg_inefficiency DESC LIMIT 10;
--"KPI - Overall device daily inefficiency"
SELECT ( SUM(daily_avg_inefficiency)/COUNT(*) ) AS all_device_avg_inefficiency,
date
FROM awsblogsgluedemo.daily_avg_inefficiency
GROUP BY date;
Query and analyze the data filtered by Kinesis Analytics, the outputs of the “speed” layer:
--"Top 10 most inefficient devices - event-level granularity"
SELECT col0 AS "uuid",
col1 AS "deviceid",
col2 AS "devicets",
col3 AS "temp",
col4 AS "settemp",
col5 AS "pct_inefficiency"
FROM awsblogsgluedemo.results
ORDER BY pct_inefficiency DESC limit 10;
Step 7: Visualize with Amazon QuickSight
Now you have the data in S3 and table definitions in the AWS Glue Data Catalog, and you can query the data directly in S3 with Athena. You are now ready to begin creating visualizations with Amazon QuickSight.
In these steps, I demonstrate how you can visualize your data in S3, again without provisioning any infrastructure for databases, clusters, or BI tools.
Tie in all the data sources into Amazon QuickSight
Choose Connect to another data source or upload a file, Athena.
Give the data source a name, like awsblogsgluedemo
Choose daily_avg_inefficiency, Select.
Choose Directly query your data, Edit/Preview data. On the next screen, leave the settings unchanged and choose Save.
Choose New data set in the top right corner.
Under From existing data sources, choose awsblogsgluedemo.
Choose Create data set, raw, and Select.
Choose Directly query your data, Edit/Preview data. On the next screen, leave the settings unchanged and choose Save.
Repeat steps 6–9 to add the “results” dataset. At step 9, rename the column “col1” to “deviceid” by choosing the pencil icon.
Create an analysis that includes all data sources (raw, processed, real-time results)
Return to the Amazon QuickSight main page by choosing QuickSight.
Choose New analysis, daily_avg_inefficiency, and Create analysis.
Choose Fields, Edit analysis data sets.
Choose Add data set, results, and Select.
Repeat steps 3 and 4 to add the “raw” dataset.
Create a KPI chart on AWS Glue processed data
Choose Fields, daily_avg_inefficiency.
In the Visual types pane, choose the Key Performance Indicator (KPI) icon, shown below.
Drag daily_avg_inefficiency from Fields into the Value
Drag date into the Trend group well.
For daily_avg_inefficiency (Sum), choose the down arrow, pause on over Aggregate: Sum, and choose Average. Choose the down arrow again, pause on over Show as: Number and choose Percent.
Click daily_avg_inefficiency (Average) in the Value well to modify the presentation to fewer decimal points.
You should have a visualization similar to the following:
To create a scatter plot of all devices:
At the top right of the screen, choose Add, Add visual*.
For Visual type, choose Scatter plot.
Drag deviceid into the X axis well and the Group/Color
Drag daily_avg_inefficiency into the Y axis well and the Size
You should now have a scatter plot visualization similar to the following figure. See if you can find a poorly functioning device with a high percentage inefficiency!
To create a line chart against the raw time-series dataset
Choose Fields, raw.
Choose Add, Add visual*.
For Visual type, choose Line chart.
Drag devicets into the X axis Drag temp into the Value well.
For temp (Sum), choose the down arrow, pause on over Aggregate: Sum, and then choose Average.
Choose the down-arrow at the top right, and choose Format visual.
Expand the Y-Axis menu on the left, and choose Range, Auto (based on data range).
Drag the slider to expand the time-series line characters.
You should now have a line chart similar to the following image. The spikes represent an increase in average temperature readings across the devices.
To create a heat map of the Kinesis Analytics results dataset
Choose Fields, results.
Choose Add, Add visual*.
For Visual type, choose Heap map.
Drag deviceid into the Columns and Values For Aggregate, choose Count.
Choose deviceid (Count). For Sort, choose the descending icon.
You should have a visualization like the following graph, which shows deviceid values on the far top left that have the most occurrences of an inefficient temperature reading. Notice that the heat map also colors them darker shades for easy visualization. Look for the devices at the top right that have a high frequency of inefficiency readings.
Finally, you can resize the charts and save them as a dashboard to share to other Amazon QuickSight users in your organization:
Resize the charts by choosing the bottom right corner of a visual.
Choose Share, Create dashboard.
For Create new dashboard as, type a dashboard name, such as “Thermostats Performance.” Choose Create dashboard.
You should now have a dashboard containing all your visuals, similar to the following:
Conclusion
In this post, you saw how to process and analyze data from both streaming and batch sources together in a 100% serverless fashion. The Lambda architecture principles guided a system that separates the ingestion and processing mechanisms, but using fully managed, serverless AWS services.
You went from collecting real-time data, processing and joining with a user settings file, and querying and visualizing the data without servers.
I hope you found this post useful. In the comments, please share your thoughts on how you might extend this architecture to suit your needs.
Laith Al-Saadoon is a Solutions Architect with a focus on data analytics at Amazon Web Services. He spends his days obsessing over designing customer architectures to process enormous amounts of data at scale. In his free time, he follows the latest trends in artificial intelligence.
When you develop Apache Spark–based applications, you might face some additional challenges when dealing with continuous integration and deployment pipelines, such as the following common issues:
Applications must be tested on real clusters using automation tools (live test)
Any user or developer must be able to easily deploy and use different versions of both the application and infrastructure to be able to debug, experiment on, and test different functionality.
Infrastructure needs to be evaluated and tested along with the application that uses it.
In this post, we walk you through a solution that implements a continuous integration and deployment pipeline supported by AWS services. The pipeline offers the following workflow:
Deploy the application to a QA stage after a commit is performed to the source code.
Perform a unit test using Spark local mode.
Deploy to a dynamically provisioned Amazon EMR cluster and test the Spark application on it
Update the application as an AWS Service Catalog product version, allowing a user to deploy any version (commit) of the application on demand.
Solution overview
The following diagram shows the pipeline workflow.
The solution uses AWS CodePipeline, which allows users to orchestrate and automate the build, test, and deploy stages for application source code. The solution consists of a pipeline that contains the following stages:
Source: Both the Spark application source code in addition to the AWS CloudFormation template file for deploying the application are committed to version control. In this example, we use AWS CodeCommit. For an example of the application source code, see zip.
Build: In this stage, you use Apache Maven both to generate the application .jar binaries and to execute all of the application unit tests that end with *Spec.scala. In this example, we use AWS CodeBuild, which runs the unit tests given that they are designed to use Spark local mode.
QADeploy: In this stage, the .jar file built previously is deployed using the CloudFormation template included with the application source code. All the resources are created in this stage, such as networks, EMR clusters, and so on.
LiveTest: In this stage, you use Apache Maven to execute all the application tests that end with *SpecLive.scala. The tests submit EMR steps to the cluster created as part of the QADeploy step. The tests verify that the steps ran successfully and their results.
LiveTestApproval: This stage is included in case a pipeline administrator approval is required to deploy the application to the next stages. The pipeline pauses in this stage until an administrator manually approves the release.
QACleanup: In this stage, you use an AWS Lambda function to delete the CloudFormation template deployed as part of the QADeploy stage. The function does not affect any resources other than those deployed as part of the QADeploy stage.
DeployProduct: In this stage, you use a Lambda function that creates or updates an AWS Service Catalog product and portfolio. Every time the pipeline releases a change to the application, the AWS Service Catalog product gets a new version, with the commit of the change as the version description.
Try it out!
Use the provided sample template to get started using this solution. This template creates the pipeline described earlier with all of its stages. It performs an initial commit of the sample Spark application in order to trigger the first release change. To deploy the template, use the following AWS CLI command:
After the template finishes creating resources, you see the pipeline name on the stack Outputs tab. After that, open the AWS CodePipeline console and select the newly created pipeline.
After a couple of minutes, AWS CodePipeline detects the initial commit applied by the CloudFormation stack and starts the first release.
You can watch how the pipeline goes through the Build, QADeploy, and LiveTest stages until it finally reaches the LiveTestApproval stage.
At this point, you can check the results of the test in the log files of the Build and LiveTest stage jobs on AWS CodeBuild. If you check the CloudFormation console, you see that a new template has been deployed as part of the QADeploy stage.
You can also visit the EMR console and view how the LiveTest stage submitted steps to the EMR cluster.
After performing the review, manually approve the revision on the LiveTestApproval stage by using the AWS CodePipeline console.
After the revision is approved, the pipeline proceeds to use a Lambda function that destroys the resources deployed on the QAdeploy stage. Finally, it creates or updates a product and portfolio in AWS Service Catalog. After the final stage of the pipeline is complete, you can check that the product is created successfully on the AWS Service Catalog console.
You can check the product versions and notice that the first version is the initial commit performed by the CloudFormation template.
You can proceed to share the created portfolio with any users in your AWS account and allow them to deploy any version of the Spark application. You can also perform a commit on the AWS CodeCommit repository. The pipeline is triggered automatically and repeats the pipeline execution to deploy a new version of the product.
To destroy all of the resources created by the stack, make sure all the deployed stacks using AWS Service Catalog or the QAdeploy stage are destroyed. Then, destroy the pipeline template using the following AWS CLI command:
You can use the sample template and Spark application shared in this post and adapt them for the specific needs of your own application. The pipeline can have as many stages as needed and it can be used to automatically deploy to AWS Service Catalog or a production environment using CloudFormation.
If you have questions or suggestions, please comment below.
Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.
Samuel Schmidt is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.
Apache Hive is one of the most popular tools for analyzing large datasets stored in a Hadoop cluster using SQL. Data analysts and scientists use Hive to query, summarize, explore, and analyze big data.
With the introduction of Hive LLAP (Low Latency Analytical Processing), the notion of Hive being just a batch processing tool has changed. LLAP uses long-lived daemons with intelligent in-memory caching to circumvent batch-oriented latency and provide sub-second query response times.
This post provides an overview of Hive LLAP, including its architecture and common use cases for boosting query performance. You will learn how to install and configure Hive LLAP on an Amazon EMR cluster and run queries on LLAP daemons.
What is Hive LLAP?
Hive LLAP was introduced in Apache Hive 2.0, which provides very fast processing of queries. It uses persistent daemons that are deployed on a Hadoop YARN cluster using Apache Slider. These daemons are long-running and provide functionality such as I/O with DataNode, in-memory caching, query processing, and fine-grained access control. And since the daemons are always running in the cluster, it saves substantial overhead of launching new YARN containers for every new Hive session, thereby avoiding long startup times.
When Hive is configured in hybrid execution mode, small and short queries execute directly on LLAP daemons. Heavy lifting (like large shuffles in the reduce stage) is performed in YARN containers that belong to the application. Resources (CPU, memory, etc.) are obtained in a traditional fashion using YARN. After the resources are obtained, the execution engine can decide which resources are to be allocated to LLAP, or it can launch Apache Tez processors in separate YARN containers. You can also configure Hive to run all the processing workloads on LLAP daemons for querying small datasets at lightning fast speeds.
LLAP daemons are launched under YARN management to ensure that the nodes don’t get overloaded with the compute resources of these daemons. You can use scheduling queues to make sure that there is enough compute capacity for other YARN applications to run.
Why use Hive LLAP?
With many options available in the market (Presto, Spark SQL, etc.) for doing interactive SQL over data that is stored in Amazon S3 and HDFS, there are several reasons why using Hive and LLAP might be a good choice:
For those who are heavily invested in the Hive ecosystem and have external BI tools that connect to Hive over JDBC/ODBC connections, LLAP plugs in to their existing architecture without a steep learning curve.
It’s compatible with existing Hive SQL and other Hive tools, like HiveServer2, and JDBC drivers for Hive.
It has native support for security features with authentication and authorization (SQL standards-based authorization) using HiveServer2.
LLAP daemons are aware about of the columns and records that are being processed which enables you to enforce fine-grained access control.
It can use Hive’s vectorization capabilities to speed up queries, and Hive has better support for Parquet file format when vectorization is enabled.
It can take advantage of a number of Hive optimizations like merging multiple small files for query results, automatically determining the number of reducers for joins and groupbys, etc.
It’s optional and modular so it can be turned on or off depending on the compute and resource requirements of the cluster. This lets you to run other YARN applications concurrently without reserving a cluster specifically for LLAP.
How do you install Hive LLAP in Amazon EMR?
To install and configure LLAP on an EMR cluster, use the following bootstrap action (BA):
This BA downloads and installs Apache Slider on the cluster and configures LLAP so that it works with EMR Hive. For LLAP to work, the EMR cluster must have Hive, Tez, and Apache Zookeeper installed.
You can pass the following arguments to the BA.
Argument
Definition
Default value
--instances
Number of instances of LLAP daemon
Number of core/task nodes of the cluster
--cache
Cache size per instance
20% of physical memory of the node
--executors
Number of executors per instance
Number of CPU cores of the node
--iothreads
Number of IO threads per instance
Number of CPU cores of the node
--size
Container size per instance
50% of physical memory of the node
--xmx
Working memory size
50% of container size
--log-level
Log levels for the LLAP instance
INFO
LLAP example
This section describes how you can try the faster Hive queries with LLAP using the TPC-DS testbench for Hive on Amazon EMR.
Use the following AWS command line interface (AWS CLI) command to launch a 1+3 nodes m4.xlarge EMR 5.6.0 cluster with the bootstrap action to install LLAP:
After the cluster is launched, log in to the master node using SSH, and do the following:
Open the hive-tpcds folder: cd /home/hadoop/hive-tpcds/
Start Hive CLI using the testbench configuration, create the required tables, and run the sample query: hive –i testbench.settings hive> source create_tables.sql; hive> source query55.sql; This sample query runs on a 40 GB dataset that is stored on Amazon S3. The dataset is generated using the data generation tool in the TPC-DS testbench for Hive.It results in output like the following:
This screenshot shows that the query finished in about 47 seconds for LLAP mode. Now, to compare this to the execution time without LLAP, you can run the same workload using only Tez containers: hive> set hive.llap.execution.mode=none; hive> source query55.sql; This query finished in about 80 seconds.
The difference in query execution time is almost 1.7 times when using just YARN containers in contrast to running the query on LLAP daemons. And with every rerun of the query, you notice that the execution time substantially decreases by the virtue of in-memory caching by LLAP daemons.
Conclusion
In this post, I introduced Hive LLAP as a way to boost Hive query performance. I discussed its architecture and described several use cases for the component. I showed how you can install and configure Hive LLAP on an Amazon EMR cluster and how you can run queries on LLAP daemons.
If you have questions about using Hive LLAP on Amazon EMR or would like to share your use cases, please leave a comment below.
Jigar Mistry is a Hadoop Systems Engineer with Amazon Web Services. He works with customers to provide them architectural guidance and technical support for processing large datasets in the cloud using open-source applications. In his spare time, he enjoys going for camping and exploring different restaurants in the Seattle area.
In the world of data science, users must often sacrifice cluster set-up time to allow for complex usability scenarios. Amazon EMR allows data scientists to spin up complex cluster configurations easily, and to be up and running with complex queries in a matter of minutes.
Data scientists often use scheduling applications such as Oozie to run jobs overnight. However, Oozie can be difficult to configure when you are trying to use popular Python packages (such as “pandas,” “numpy,” and “statsmodels”), which are not included by default.
One such popular platform that contains these types of packages (and more) is Anaconda. This post focuses on setting up an Anaconda platform on EMR, with an intent to use its packages with Oozie. I describe how to run jobs using a popular open source scheduler like Oozie.
Walkthrough
For this post, you walk through the following tasks:
Create an EMR cluster.
Download Anaconda on your master node.
Configure Oozie.
Test the steps.
Create an EMR cluster
Spin up an Amazon EMR cluster using the console or the AWS CLI. Use the latest release, and include Apache Hadoop, Apache Spark, Apache Hive, and Oozie.
To create a three-node cluster in the us-east-1 region, issue an AWS CLI command such as the following. This command must be typed as one line, as shown below. It is shown here separated for readability purposes only.
At the time of publication, Anaconda 4.4 is the most current version available. For the download link location for the latest Python 2.7 version (Python 3.6 may encounter issues), see https://www.continuum.io/downloads. Open the context (right-click) menu for the Python 2.7 download link, choose Copy Link Location, and use this value in the previous wget command.
This post used the Anaconda 4.4 installation. If you have a later version, it is shown in the downloaded filename: “anaconda2-<version number>-Linux-x86_64.sh”.
Run this downloaded script and follow the on-screen installer prompts.
For an installation directory, select somewhere with enough space on your cluster, such as “/mnt/anaconda/”.
The process should take approximately 1–2 minutes to install. When prompted if you “wish the installer to prepend the Anaconda2 install location”, select the default option of [no].
After you are done, export the PATH to include this new Anaconda installation:
export PATH=/mnt/anaconda/bin:$PATH
Zip up the Anaconda installation:
cd /mnt/anaconda/
zip -r anaconda.zip .
The zip process may take 4–5 minutes to complete.
(Optional) Upload this anaconda.zip file to your S3 bucket for easier inclusion into future EMR clusters. This removes the need to repeat the previous steps for future EMR clusters.
Configure Oozie
Next, you configure Oozie to use Pyspark and the Anaconda platform.
Get the location of your Oozie sharelibupdate folder. Issue the following command and take note of the “sharelibDirNew” value:
oozie admin -sharelibupdate
For this post, this value is “hdfs://ip-192-168-4-200.us-east-1.compute.internal:8020/user/oozie/share/lib/lib_20170616133136”.
Pass in the required Pyspark files into Oozies sharelibupdate location. The following files are required for Oozie to be able to run Pyspark commands:
pyspark.zip
py4j-0.10.4-src.zip
These are located on the EMR master instance in the location “/usr/lib/spark/python/lib/”, and must be put into the Oozie sharelib spark directory. This location is the value of the sharelibDirNew parameter value (shown above) with “/spark/” appended, that is, “hdfs://ip-192-168-4-200.us-east-1.compute.internal:8020/user/oozie/share/lib/lib_20170616133136/spark/”.
(Optional) Verify that it was transferred successfully with the following command:
hdfs dfs -ls /tmp/myLocation/
On your master node, execute the following command:
export PYSPARK_PYTHON=/mnt/anaconda/bin/python
Set the PYSPARK_PYTHON environment variable on the executor nodes. Put the following configurations in your “spark-opts” values in your Oozie workflow.xml file:
Put the “myPysparkProgram.py” into the location mentioned between the “<jar>xxxxx</jar>” tags in your workflow.xml. In this example, the location is “hdfs:///user/oozie/apps/”. Use the following command to move the “myPysparkProgram.py” file to the correct location:
This is a sentence. 12
So is this 12
This is also a sentence 12
Summary
The myPysparkProgram.py has successfully imported the numpy library from the Anaconda platform and has produced some output with it. If you tried to run this using standard Python, you’d encounter the following error:
Now when your Python job runs in Oozie, any imported packages that are implicitly imported by your Pyspark script are imported into your job within Oozie directly from the Anaconda platform. Simple!
If you have questions or suggestions, please leave a comment below.
John Ohle is an AWS BigData Cloud Support Engineer II for the BigData team in Dublin. He works to provide advice and solutions to our customers on their Big Data projects and workflows on AWS. In his spare time, he likes to play music, learn, develop tools and write documentation to further help others – both colleagues and customers alike.
Nowadays, it’s common for a web server to be fronted by a global content delivery service, like Amazon CloudFront. This type of front end accelerates delivery of websites, APIs, media content, and other web assets to provide a better experience to users across the globe.
The insights gained by analysis of Amazon CloudFront access logs helps improve website availability through bot detection and mitigation, optimizing web content based on the devices and browser used to view your webpages, reducing perceived latency by caching of popular object closer to its viewer, and so on. This results in a significant improvement in the overall perceived experience for the user.
This blog post provides a way to build a serverless architecture to generate some of these insights. To do so, we analyze Amazon CloudFront access logs both at rest and in transit through the stream. This serverless architecture uses Amazon Athena to analyze large volumes of CloudFront access logs (on the scale of terabytes per day), and Amazon Kinesis Analytics for streaming analysis.
The analytic queries in this blog post focus on three common use cases:
Detection of common bots using the user agent string
Calculation of current bandwidth usage per Amazon CloudFront distribution per edge location
Determination of the current top 50 viewers
However, you can easily extend the architecture described to power dashboards for monitoring, reporting, and trigger alarms based on deeper insights gained by processing and analyzing the logs. Some examples are dashboards for cache performance, usage and viewer patterns, and so on.
Following we show a diagram of this architecture.
Prerequisites
Before you set up this architecture, install the AWS Command Line Interface (AWS CLI) tool on your local machine, if you don’t have it already.
Setup summary
The following steps are involved in setting up the serverless architecture on the AWS platform:
Create an Amazon S3 bucket for your Amazon CloudFront access logs to be delivered to and stored in.
Create a second Amazon S3 bucket to receive processed logs and store the partitioned data for interactive analysis.
Create an Amazon Kinesis Firehose delivery stream to batch, compress, and deliver the preprocessed logs for analysis.
Create an AWS Lambda function to preprocess the logs for analysis.
Configure Amazon S3 event notification on the CloudFront access logs bucket, which contains the raw logs, to trigger the Lambda preprocessing function.
Create an Amazon DynamoDB table to look up partition details, such as partition specification and partition location.
Create an Amazon Athena table for interactive analysis.
Create a second AWS Lambda function to add new partitions to the Athena table based on the log delivered to the processed logs bucket.
Configure Amazon S3 event notification on the processed logs bucket to trigger the Lambda partitioning function.
Configure Amazon Kinesis Analytics application for analysis of the logs directly from the stream.
ETL and preprocessing
In this section, we parse the CloudFront access logs as they are delivered, which occurs multiple times in an hour. We filter out commented records and use the user agent string to decipher the browser name, the name of the operating system, and whether the request has been made by a bot. For more details on how to decipher the preceding information based on the user agent string, see user-agents 1.1.0 in the Python documentation.
We use the Lambda preprocessing function to perform these tasks on individual rows of the access log. On successful completion, the rows are pushed to an Amazon Kinesis Firehose delivery stream to be persistently stored in an Amazon S3 bucket, the processed logs bucket.
To create a Firehose delivery stream with a new or existing S3 bucket as the destination, follow the steps described in Create a Firehose Delivery Stream to Amazon S3 in the S3 documentation. Keep most of the default settings, but select an AWS Identity and Access Management (IAM) role that has write access to your S3 bucket and specify GZIP compression. Name the delivery stream CloudFrontLogsToS3.
Another pre-requisite for this setup is to create an IAM role that provides the necessary permissions our AWS Lambda function to get the data from S3, process it, and deliver it to the CloudFrontLogsToS3 delivery stream.
Let’s use the AWS CLI to create the IAM role using the following the steps:
Create the IAM policy (lambda-exec-policy) for the Lambda execution role to use.
Create the Lambda execution role (lambda-cflogs-exec-role) and assign the service to use this role.
Attach the policy created in step 1 to the Lambda execution role.
To download the policy document to your local machine, type the following command.
To attach the policy (lambda-exec-policy) created to the AWS Lambda execution role (lambda-cflogs-exec-role), type the following command.
aws iam attach-role-policy --role-name lambda-cflogs-exec-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-exec-policy
Now that we have created the CloudFrontLogsToS3 Firehose delivery stream and the lambda-cflogs-exec-role IAM role for Lambda, the next step is to create a Lambda preprocessing function.
This Lambda preprocessing function parses the CloudFront access logs delivered into the S3 bucket and performs a few transformation and mapping operations on the data. The Lambda function adds descriptive information, such as the browser and the operating system that were used to make this request based on the user agent string found in the logs. The Lambda function also adds information about the web distribution to support scenarios where CloudFront access logs are delivered to a centralized S3 bucket from multiple distributions. With the solution in this blog post, you can get insights across distributions and their edge locations.
Use the Lambda Management Console to create a new Lambda function with a Python 2.7 runtime and the s3-get-object-python blueprint. Open the console, and on the Configure triggers page, choose the name of the S3 bucket where the CloudFront access logs are delivered. Choose Put for Event type. For Prefix, type the name of the prefix, if any, for the folder where CloudFront access logs are delivered, for example cloudfront-logs/. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable trigger.
Choose Next and provide a function name to identify this Lambda preprocessing function.
For Code entry type, choose Upload a file from Amazon S3. For S3 link URL, type https.amazonaws.com//preprocessing-lambda/pre-data.zip. In the section, also create an environment variable with the key KINESIS_FIREHOSE_STREAM and a value with the name of the Firehose delivery stream as CloudFrontLogsToS3.
Choose lambda-cflogs-exec-role as the IAM role for the Lambda function, and type prep-data.lambda_handler for the value for Handler.
Choose Next, and then choose Create Lambda.
Table creation in Amazon Athena
In this step, we will build the Athena table. Use the Athena console in the same region and create the table using the query editor.
A popular website with millions of requests each day routed using Amazon CloudFront can generate a large volume of logs, on the order of a few terabytes a day. We strongly recommend that you partition your data to effectively restrict the amount of data scanned by each query. Partitioning significantly improves query performance and substantially reduces cost. The Lambda partitioning function adds the partition information to the Athena table for the data delivered to the preprocessed logs bucket.
Before delivering the preprocessed Amazon CloudFront logs file into the preprocessed logs bucket, Amazon Kinesis Firehose adds a UTC time prefix in the format YYYY/MM/DD/HH. This approach supports multilevel partitioning of the data by year, month, date, and hour. You can invoke the Lambda partitioning function every time a new processed Amazon CloudFront log is delivered to the preprocessed logs bucket. To do so, configure the Lambda partitioning function to be triggered by an S3 Put event.
For a website with millions of requests, a large number of preprocessed logs can be delivered multiple times in an hour—for example, at the interval of one each second. To avoid querying the Athena table for partition information every time a preprocessed log file is delivered, you can create an Amazon DynamoDB table for fast lookup.
Based on the year, month, data and hour in the prefix of the delivered log, the Lambda partitioning function checks if the partition specification exists in the Amazon DynamoDB table. If it doesn’t, it’s added to the table using an atomic operation, and then the Athena table is updated.
Type the following command to create the Amazon DynamoDB table.
PartitionSpec is the hash key and is a representation of the partition signature—for example, year=”2017”; month=”05”; day=”15”; hour=”10”.
Depending on the rate at which the processed log files are delivered to the processed log bucket, you might have to increase the ReadCapacityUnits and WriteCapacityUnits values, if these are throttled.
The other attributes besides PartitionSpec are the following:
PartitionPath – The S3 path associated with the partition.
PartitionType – The type of partition used (Hour, Month, Date, Year, or ALL). In this case, ALL is used.
Next step is to create the IAM role to provide permissions for the Lambda partitioning function. You require permissions to do the following:
Look up and write partition information to DynamoDB.
Alter the Athena table with new partition information.
Perform Amazon CloudWatch logs operations.
Perform Amazon S3 operations.
To download the policy document to your local machine, type following command.
To create the Lambda execution role and assign the service to use this role, type the following command.
aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json
Let’s use the AWS CLI to create the IAM role using the following three steps:
Create the IAM policy(lambda-partition-exec-policy) used by the Lambda execution role.
Create the Lambda execution role (lambda-partition-execution-role)and assign the service to use this role.
Attach the policy created in step 1 to the Lambda execution role.
To create the IAM policy used by Lambda execution role, type the following command.
aws iam create-policy --policy-name lambda-partition-exec-policy --policy-document file://<path>/lambda-partition-function-execution-policy.json
To create the Lambda execution role and assign the service to use this role, type the following command.
aws iam create-role --role-name lambda-partition-execution-role --assume-role-policy-document file://<path>/assume-lambda-policy.json
To attach the policy (lambda-partition-exec-policy) created to the AWS Lambda execution role (lambda-partition-execution-role), type the following command.
aws iam attach-role-policy --role-name lambda-partition-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-partition-exec-policy
Following is the lambda-partition-function-execution-policy.json file, which is the IAM policy used by the Lambda execution role.
From the AWS Management Console, create a new Lambda function with Java8 as the runtime. Select the Blank Function blueprint.
On the Configure triggers page, choose the name of the S3 bucket where the preprocessed logs are delivered. Choose Put for the Event Type. For Prefix, type the name of the prefix folder, if any, where preprocessed logs are delivered by Firehose—for example, out/. For Suffix, type the name of the compression format that the Firehose stream (CloudFrontLogToS3) delivers the preprocessed logs —for example, gz. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable Trigger.
Choose Next and provide a function name to identify this Lambda partitioning function.
Choose Java8 for Runtime for the AWS Lambda function. Choose Upload a .ZIP or .JAR file for the Code entry type, and choose Upload to upload the downloaded aws-lambda-athena-1.0.0.jar file.
Next, create the following environment variables for the Lambda function:
TABLE_NAME – The name of the Athena table (for example, cf_logs).
PARTITION_TYPE – The partition to be created based on the Athena table for the logs delivered to the sub folders in S3 bucket based on Year, Month, Date, Hour, or Set this to ALL to use Year, Month, Date, and Hour.
DDB_TABLE_NAME – The name of the DynamoDB table holding partition information (for example, athenapartitiondetails).
ATHENA_REGION – The current AWS Region for the Athena table to construct the JDBC connection string.
S3_STAGING_DIR – The Amazon S3 location where your query output is written. The JDBC driver asks Athena to read the results and provide rows of data back to the user (for example, s3://<bucketname>/<folder>/).
To configure the function handler and IAM, for Handler copy and paste the name of the handler: com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3EventWithDDB::handleRequest. Choose the existing IAM role, lambda-partition-execution-role.
Choose Next and then Create Lambda.
Interactive analysis using Amazon Athena
In this section, we analyze the historical data that’s been collected since we added the partitions to the Amazon Athena table for data delivered to the preprocessing logs bucket.
Scenario 1 is robot traffic by edge location.
SELECT COUNT(*) AS ct, requestip, location FROM cf_logs
WHERE isbot='True'
GROUP BY requestip, location
ORDER BY ct DESC;
Scenario 2 is total bytes transferred per distribution for each edge location for your website.
SELECT distribution, location, SUM(bytes) as totalBytes
FROM cf_logs
GROUP BY location, distribution;
Scenario 3 is the top 50 viewers of your website.
SELECT requestip, COUNT(*) AS ct FROM cf_logs
GROUP BY requestip
ORDER BY ct DESC;
Streaming analysis using Amazon Kinesis Analytics
In this section, you deploy a stream processing application using Amazon Kinesis Analytics to analyze the preprocessed Amazon CloudFront log streams. This application analyzes directly from the Amazon Kinesis Stream as it is delivered to the preprocessing logs bucket. The stream queries in section are focused on gaining the following insights:
The IP address of the bot, identified by its Amazon CloudFront edge location, that is currently sending requests to your website. The query also includes the total bytes transferred as part of the response.
The total bytes served per distribution per population for your website.
The top 10 viewers of your website.
To download the firehose-access-policy.json file, type the following.
Before we create the Amazon Kinesis Analytics application, we need to create the IAM role to provide permission for the analytics application to access Amazon Kinesis Firehose stream.
Let’s use the AWS CLI to create the IAM role using the following three steps:
Create the IAM policy(firehose-access-policy) for the Lambda execution role to use.
Create the Lambda execution role (ka-execution-role) and assign the service to use this role.
Attach the policy created in step 1 to the Lambda execution role.
Following is the firehose-access-policy.json file, which is the IAM policy used by Kinesis Analytics to read Firehose delivery stream.
To create the IAM policy used by Analytics access role, type the following command.
aws iam create-policy --policy-name firehose-access-policy --policy-document file://<path>/firehose-access-policy.json
To create the Analytics execution role and assign the service to use this role, type the following command.
aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy
To attach the policy (irehose-access-policy) created to the Analytics execution role (ka-execution-role), type the following command.
aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy
To deploy the Analytics application, first download the configuration file and then modify ResourceARN and RoleARN for the Amazon Kinesis Firehose input configuration.
Scenario 1 is a query for detecting bots for sending request to your website detection for your website.
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BOT_DETECTION" (requesttime TIME, destribution VARCHAR(16), requestip VARCHAR(64), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "BOT_DETECTION_PUMP" AS INSERT INTO "BOT_DETECTION"
--
SELECT STREAM
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
"distribution_name" as distribution,
"request_ip" as requestip,
"edge_location" as edgelocation,
SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
WHERE "is_bot" = true
GROUP BY "request_ip", "edge_location", "distribution_name",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);
Scenario 2 is a query for total bytes transferred per distribution for each edge location for your website.
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BYTES_TRANSFFERED" (requesttime TIME, destribution VARCHAR(16), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "BYTES_TRANSFFERED_PUMP" AS INSERT INTO "BYTES_TRANSFFERED"
-- Bytes Transffered per second per web destribution by edge location
SELECT STREAM
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
"distribution_name" as distribution,
"edge_location" as edgelocation,
SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
GROUP BY "distribution_name", "edge_location", "request_date",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);
Scenario 3 is a query for the top 50 viewers for your website.
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "TOP_TALKERS" (requestip VARCHAR(64), requestcount DOUBLE);
-- Create pump to insert into output
CREATE OR REPLACE PUMP "TOP_TALKERS_PUMP" AS INSERT INTO "TOP_TALKERS"
-- Top Ten Talker
SELECT STREAM ITEM as requestip, ITEM_COUNT as requestcount FROM TABLE(TOP_K_ITEMS_TUMBLING(
CURSOR(SELECT STREAM * FROM "CF_LOG_STREAM_001"),
'request_ip', -- name of column in single quotes
50, -- number of top items
60 -- tumbling window size in seconds
)
);
Conclusion
Following the steps in this blog post, you just built an end-to-end serverless architecture to analyze Amazon CloudFront access logs. You analyzed these both in interactive and streaming mode, using Amazon Athena and Amazon Kinesis Analytics respectively.
By creating a partition in Athena for the logs delivered to a centralized bucket, this architecture is optimized for performance and cost when analyzing large volumes of logs for popular websites that receive millions of requests. Here, we have focused on just three common use cases for analysis, sharing the analytic queries as part of the post. However, you can extend this architecture to gain deeper insights and generate usage reports to reduce latency and increase availability. This way, you can provide a better experience on your websites fronted with Amazon CloudFront.
In this blog post, we focused on building serverless architecture to analyze Amazon CloudFront access logs. Our plan is to extend the solution to provide rich visualization as part of our next blog post.
About the Authors
Rajeev Srinivasan is a Senior Solution Architect for AWS. He works very close with our customers to provide big data and NoSQL solution leveraging the AWS platform and enjoys coding . In his spare time he enjoys riding his motorcycle and reading books.
Sai Sriparasa is a consultant with AWS Professional Services. He works with our customers to provide strategic and tactical big data solutions with an emphasis on automation, operations & security on AWS. In his spare time, he follows sports and current affairs.
We’re excited to co-host the 10th Annual Hadoop Summit, the leading conference for the Apache Hadoop community, taking place on June 13 – 15 at the San Jose Convention Center. In the last few years, the Hadoop Summit has expanded to cover all things data beyond just Apache Hadoop – such as data science, cloud and operations, IoT and applications – and has been aptly renamed the DataWorks Summit. The three-day program is bursting at the seams! Here are just a few of the reasons why you cannot miss this must-attend event:
Familiarize yourself with the cutting edge in Apache project developments from the committers
Learn from your peers and industry experts about innovative and real-world use cases, development and administration tips and tricks, success stories and best practices to leverage all your data – on-premise and in the cloud – to drive predictive analytics, distributed deep-learning and artificial intelligence initiatives
Attend one of our more than 170 technical deep dive breakout sessions from nearly 200 speakers across eight tracks
Check out our keynotes, meetups, trainings, technical crash courses, birds-of-a-feather sessions, Women in Big Data and more
Similar to previous years, we look forward to continuing Yahoo’s decade-long tradition of thought leadership at this year’s summit. Join us for an in-depth look at Yahoo’s Hadoop culture and for the latest in technologies such as Apache Tez, HBase, Hive, Data Highway Rainbow, Mail Data Warehouse and Distributed Deep Learning at the breakout sessions below. Or, stop by Yahoo kiosk #700 at the community showcase.
Also, as a co-host of the event, Yahoo is pleased to offer a 20% discount for the summit with the code MSPO20. Register here forHadoop Summit, San Jose, California!
Andy Feng – VP Architecture, Big Data and Machine Learning
Lee Yang – Sr. Principal Engineer
In this talk, we will introduce a new framework, TensorFlowOnSpark, for scalable TensorFlow learning, that was open sourced in Q1 2017. This new framework enables easy experimentation for algorithm designs, and supports scalable training & inferencing on Spark clusters. It supports all TensorFlow functionalities including synchronous & asynchronous learning, model & data parallelism, and TensorBoard. It provides architectural flexibility for data ingestion to TensorFlow and network protocols for server-to-server communication. With a few lines of code changes, an existing TensorFlow algorithm can be transformed into a scalable application.
2:10 – 2:50 P.M. Handling Kernel Upgrades at Scale – The Dirty Cow Story
Samy Gawande – Sr. Operations Engineer
Savitha Ravikrishnan – Site Reliability Engineer
Apache Hadoop at Yahoo is a massive platform with 36 different clusters spread across YARN, Apache HBase, and Apache Storm deployments, totaling 60,000 servers made up of 100s of different hardware configurations accumulated over generations, presenting unique operational challenges and a variety of unforeseen corner cases. In this talk, we will share methods, tips and tricks to deal with large scale kernel upgrade on heterogeneous platforms within tight timeframes with 100% uptime and no service or data loss through the Dirty COW use case (privilege escalation vulnerability found in the Linux Kernel in late 2016).
5:00 – 5:40 P.M. Data Highway Rainbow – Petabyte Scale Event Collection, Transport, and Delivery at Yahoo
Nilam Sharma – Sr. Software Engineer
Huibing Yin – Sr. Software Engineer
This talk presents the architecture and features of Data Highway Rainbow, Yahoo’s hosted multi-tenant infrastructure which offers event collection, transport and aggregated delivery as a service. Data Highway supports collection from multiple data centers & aggregated delivery in primary Yahoo data centers which provide a big data computing cluster. From a delivery perspective, Data Highway supports endpoints/sinks such as HDFS, Storm and Kafka; with Storm & Kafka endpoints tailored towards latency sensitive consumers.
DAY 2. WEDNESDAY June 14, 2017
9:05 – 9:15 A.M. Yahoo General Session – Shaping Data Platform for Lasting Value
Sumeet Singh – Sr. Director, Products
With a long history of open innovation with Hadoop, Yahoo continues to invest in and expand the platform capabilities by pushing the boundaries of what the platform can accomplish for the entire organization. In the last 11 years (yes, it is that old!), the Hadoop platform has shown no signs of giving up or giving in. In this talk, we explore what makes the shared multi-tenant Hadoop platform so special at Yahoo.
12:20 – 1:00 P.M. CaffeOnSpark Update – Recent Enhancements and Use Cases
Mridul Jain – Sr. Principal Engineer
Jun Shi – Principal Engineer
By combining salient features from deep learning framework Caffe and big-data frameworks Apache Spark and Apache Hadoop, CaffeOnSpark enables distributed deep learning on a cluster of GPU and CPU servers. We released CaffeOnSpark as an open source project in early 2016, and shared its architecture design and basic usage at Hadoop Summit 2016. In this talk, we will update audiences about the recent development of CaffeOnSpark. We will highlight new features and capabilities: unified data layer which multi-label datasets, distributed LSTM training, interleave testing with training, monitoring/profiling framework, and docker deployment.
12:20 – 1:00 P.M. Tez Shuffle Handler – Shuffling at Scale with Apache Hadoop
Jon Eagles – Principal Engineer
Kuhu Shukla – Software Engineer
In this talk we introduce a new Shuffle Handler for Tez, a YARN Auxiliary Service, that addresses the shortcomings and performance bottlenecks of the legacy MapReduce Shuffle Handler, the default shuffle service in Apache Tez. The Apache Tez Shuffle Handler adds composite fetch which has support for multi-partition fetch to mitigate performance slow down and provides deletion APIs to reduce disk usage for long running Tez sessions. As an emerging technology we will outline future roadmap for the Apache Tez Shuffle Handler and provide performance evaluation results from real world jobs at scale.
2:10 – 2:50 P.M. Achieving HBase Multi-Tenancy with RegionServer Groups and Favored Nodes
Thiruvel Thirumoolan – Principal Engineer
Francis Liu – Sr. Principal Engineer
At Yahoo! HBase has been running as a hosted multi-tenant service since 2013. In a single HBase cluster we have around 30 tenants running various types of workloads (ie batch, near real-time, ad-hoc, etc). We will walk through multi-tenancy features explaining our motivation, how they work as well as our experiences running these multi-tenant clusters. These features will be available in Apache HBase 2.0.
2:10 – 2:50 P.M. Data Driving Yahoo Mail Growth and Evolution with a 50 PB Hadoop Warehouse
Nick Huang – Director, Data Engineering, Yahoo Mail
Saurabh Dixit – Sr. Principal Engineer, Yahoo Mail
Since 2014, the Yahoo Mail Data Engineering team took on the task of revamping the Mail data warehouse and analytics infrastructure in order to drive the continued growth and evolution of Yahoo Mail. Along the way we have built a 50 PB Hadoop warehouse, and surrounding analytics and machine learning programs that have transformed the way data plays in Yahoo Mail. In this session we will share our experience from this 3 year journey, from the system architecture, analytics systems built, to the learnings from development and drive for adoption.
DAY3. THURSDAY June 15, 2017
2:10 – 2:50 P.M. OracleStore – A Highly Performant RawStore Implementation for Hive Metastore
Chris Drome – Sr. Principal Engineer
Jin Sun – Principal Engineer
Today, Yahoo uses Hive in many different spaces, from ETL pipelines to adhoc user queries. Increasingly, we are investigating the practicality of applying Hive to real-time queries, such as those generated by interactive BI reporting systems. In order for Hive to succeed in this space, it must be performant in all aspects of query execution, from query compilation to job execution. One such component is the interaction with the underlying database at the core of the Metastore. As an alternative to ObjectStore, we created OracleStore as a proof-of-concept. Freed of the restrictions imposed by DataNucleus, we were able to design a more performant database schema that better met our needs. Then, we implemented OracleStore with specific goals built-in from the start, such as ensuring the deduplication of data. In this talk we will discuss the details behind OracleStore and the gains that were realized with this alternative implementation. These include a reduction of 97%+ in the storage footprint of multiple tables, as well as query performance that is 13x faster than ObjectStore with DirectSQL and 46x faster than ObjectStore without DirectSQL.
3:00 P.M. – 3:40 P.M. Bullet – A Real Time Data Query Engine
Akshai Sarma – Sr. Software Engineer
Michael Natkovich – Director, Engineering
Bullet is an open sourced, lightweight, pluggable querying system for streaming data without a persistence layer implemented on top of Storm. It allows you to filter, project, and aggregate on data in transit. It includes a UI and WS. Instead of running queries on a finite set of data that arrived and was persisted or running a static query defined at the startup of the stream, our queries can be executed against an arbitrary set of data arriving after the query is submitted. In other words, it is a look-forward system. Bullet is a multi-tenant system that scales independently of the data consumed and the number of simultaneous queries. Bullet is pluggable into any streaming data source. It can be configured to read from systems such as Storm, Kafka, Spark, Flume, etc. Bullet leverages Sketches to perform its aggregate operations such as distinct, count distinct, sum, count, min, max, and average.
3:00 P.M. – 3:40 P.M. Yahoo – Moving Beyond Running 100% of Apache Pig Jobs on Apache Tez
Rohini Palaniswamy – Sr. Principal Engineer
Last year at Yahoo, we spent great effort in scaling, stabilizing and making Pig on Tez production ready and by the end of the year retired running Pig jobs on Mapreduce. This talk will detail the performance and resource utilization improvements Yahoo achieved after migrating all Pig jobs to run on Tez. After successful migration and the improved performance we shifted our focus to addressing some of the bottlenecks we identified and new optimization ideas that we came up with to make it go even faster. We will go over the new features and work done in Tez to make that happen like custom YARN ShuffleHandler, reworking DAG scheduling order, serialization changes, etc. We will also cover exciting new features that were added to Pig for performance such as bloom join and byte code generation.
4:10 P.M. – 4:50 P.M. Leveraging Docker for Hadoop Build Automation and Big Data Stack Provisioning
Evans Ye, Software Engineer
Apache Bigtop as an open source Hadoop distribution, focuses on developing packaging, testing and deployment solutions that help infrastructure engineers to build up their own customized big data platform as easy as possible. However, packages deployed in production require a solid CI testing framework to ensure its quality. Numbers of Hadoop component must be ensured to work perfectly together as well. In this presentation, we’ll talk about how Bigtop deliver its containerized CI framework which can be directly replicated by Bigtop users. The core revolution here are the newly developed Docker Provisioner that leveraged Docker for Hadoop deployment and Docker Sandbox for developer to quickly start a big data stack. The content of this talk includes the containerized CI framework, technical detail of Docker Provisioner and Docker Sandbox, a hierarchy of docker images we designed, and several components we developed such as Bigtop Toolchain to achieve build automation.
You have come up with an exciting hypothesis, and now you are keen to find and analyze as much data as possible to prove (or refute) it. There are many datasets that might be applicable, but they have been created at different times by different people and don’t conform to any common standard. They use different names for variables that mean the same thing and the same names for variables that mean different things. They use different units of measurement and different categories. Some have more variables than others. And they all have data quality issues (for example, badly formed dates and times, invalid geographic coordinates, and so on).
You first need a way to harmonize these datasets, to identify the variables that mean the same thing and make sure that these variables have the same names and units. You also need to clean up or remove records with invalid data.
After the datasets are harmonized, you need to search through the data to find the datasets you’re interested in. Not all of them have records that are relevant to your hypothesis, so you want to filter on a number of important variables to narrow down the datasets and verify they contain enough matching records to be significant.
Having identified the datasets of interest, you are ready to run your custom analyses on the data they contain so that you can prove your hypothesis and create beautiful visualizations to share with the world!
In this blog post, we will describe a sample application that illustrates how to solve these problems. You can install our sample app, which will:
Harmonize and index three disparate datasets to make them searchable.
Present a data-driven, customizable UI for searching the datasets to do preliminary analysis and to locate relevant datasets.
The Police Data Initiative seeks to improve community and law enforcement relations through the public availability of data related to police activity. Datasets from participating cities, available through the Public Safety Open Data Portal, have many of the problems just outlined. Despite the commonality of crime and location metadata, there is no standard naming or value scheme. Datasets are stored in various locations and in various formats. There is no central search and discovery engine. To gain insights and value from this data, you have to analyze datasets city by city.
Although the focus of this post is police incident data, the same approach can be used for datasets in other domains, such as IoT, personalized medicine, news, weather, finance, and much more.
Architecture
Our architecture uses the following AWS services:
Amazon EMR (with Apache Spark and Jupyter notebooks) to explore, clean, harmonize (transform), describe, and save multiple, loosely coupled datasets.
Amazon S3 to store both raw and harmonized datasets.
Amazon Elasticsearch Service (Amazon ES) to host secure, searchable indexes of selected dataset variables and the associated dictionary/metadata used to power the search web page.
AWS Identity and Access Management (IAM) policies and instance roles allow least-privilege access to Amazon ES from the UI containers and from the EMR cluster.
Enter an EC2 key pair and a password you will use to access Jupyter notebooks on your new EMR cluster. (You can create a key pair in the EC2 console.)
The master CloudFormation stack uses several nested stacks to create the following resources in your AWS account:
VPC with public and private subnets in two Availability Zones.
Amazon ES domain (2 x t2.small master nodes and 2 x t2.small data nodes) to store indexes and process search queries. Cost approx. $0.12/hr.
S3 bucket for storing datasets and EMR logs.
EMR cluster (1 x m4.2xlarge master node, 2 x m4.2xlarge core nodes) with Apache Spark, Jupyter Notebooks, and the aws-es-kibana proxy server (courtesy of santthosh) to sign Elasticsearch service requests. Cost approx. $1.80/hr.
ECS cluster (2 x t2.large nodes) and tasks that run the search and discover web service containers and the aws-es-kibana proxy server to sign Amazon ES requests. Cost approx. $0.20/hr.
IAM roles with policies to apply least-privilege principles to instance roles used on ECS and EMR instances.
You will be billed for these resources while they are running. In the us-east-1 region, expect to pay a little more than $2 per hour.
The EMR cluster is the most significant contributor to the compute cost. After you have harmonized the datasets, you can terminate the EMR cluster because it’s used only for harmonization. You can create a new cluster later when you need to harmonize new data.
It can take between 30-60 minutes for CloudFormation to set up the resources. When CREATE_COMPLETE appears in the Status column for the main stack, examine the Outputs tab for the stack. Here you will find the links you need to access the Jupyter harmonization notebooks and the dataset search page UI.
Harmonize sample datasets
Launch the Jupyter Notebook UI with the JupyterURL shown on the Outputs tab.
Type the password you used when you launched the stack.
Open thedatasearch-blog folder:
Here you see the harmonization notebooks for the three cities we use to illustrate the sample app: Baltimore, Detroit, and Los Angeles. The lib folder contains Python classes used to abstract common indexing and harmonization methods used by the harmonization notebooks. The html folder contains read-only HTML copies of each notebook created and published during notebook execution. The kibana-content folder holds the definitions for the default search dashboard pane and scripts for importing and exporting the dashboard to and from Amazon Elasticsearch Service.
Now you’re ready to run the harmonization notebooks.
Click Baltimore-notebook.ipynb to open the Baltimore notebook. To modify the sample to work with different datasets or to implement additional features, you need to understand how these files work. For now, we’ll just run the three sample notebooks to download and harmonize some datasets.
You can use Show/Hide Code to toggle the display of cells that contain Python code.
From the Cell menu, choose Run All to run the notebook:
You can see the execution progress by monitoring the cells in the notebook. As each cell completes execution, output is displayed under the cell. The Notebook Execution Complete message appears at the bottom of the notebook:
At this point, the dataset for Baltimore has been downloaded and stored in S3. The data has been harmonized and a dictionary was generated. The dataset and the dictionary were stored in Elasticsearch indexes to power the search page and in S3 to facilitate later detailed analysis.
Scroll through the sections in the notebook and read the embedded documentation to understand what it’s doing. The sample code is written in Python using the PySpark modules to leverage the power of Apache Spark running on the underlying EMR cluster.
PySpark allows you to easily scale the technique to process much larger datasets. R, Scala, and other languages can also be supported, as described in Tom Zeng’s excellent blog post, Run Jupyter Notebook and JupyterHub on Amazon EMR.
To complete the sample dataset harmonization processes, open and execute the other two city notebooks, Detroit-notebook.ipynb and LosAngeles-notebook.ipynb.
Now that the police incident datasets have been harmonized and indexed into Amazon Elasticsearch Service, you can use the solution’s search and discovery UI to visualize and explore the full set of combined data.
Page layout
Launch the search UI using the SearchPageURL on the Outputs tab of the CloudFormation stack.
The search page has two components:
An embedded Kibana dashboard helps you visualize aggregate information from your datasets as you apply filters:
The sample dashboard is designed for the harmonized police incident datasets, but you can easily implement and substitute your own dashboards to provide alternative visualizations to support different datasets and variables.
You can interact with the dashboard elements to zoom in on the map views and apply filters by selecting geographic areas or values from the visualizations.
A filter sidebar with accordion elements shows groups of variables that you can use to apply filters to the dashboard:
The filter sidebar is built dynamically from the dictionary metadata that was indexed in Amazon Elasticsearch Service by the harmonization process.
Use Jupyter to examine the lib/harmonizeCrimeIncidents.py file to see how each harmonized variable is associated with a vargroup, which is used to group associated variables into the accordion folders in the UI. Variables that aren’t explicitly assigned are implicitly assigned to a default vargroup. For example, the accordion labelled Baltimore (Unharmonized) is the default group for the Baltimore dataset. It contains the original dataset variables.
Filter sidebar components
The filter sidebar code dynamically chooses the UI component type to use for each variable based on dictionary metadata. If you look again at the code in lib/harmonizeCrimeIncidents.py, you will see that each harmonized variable is assigned a type that determines the UI component type. Variables that aren’t explicitly typed by the harmonization code are implicitly assigned a type when the dataset dictionary is built (based on whether the variable contains strings or numbers) and on the distribution of values.
In the filter sidebar, open the Date and Time accordion. Select the datetime check box and click in the From or To fields to open the date/time calendar component:
Select the dayofweek check box to see a multi-valued picklist:
Other variable types include:
Ranges:
Boolean:
Text (with as-you-type automatic suggestions):
Take some time to experiment with the filters. The dashboard view is dynamically updated as you apply and remove filters. The count displayed for each dataset changes to reflect the number of records that match your filter criteria. A summary of your current filters is displayed in the query bar at the top of the dashboard.
Dataset documentation/transparency
The example dashboard provides Notebook links for each dataset:
Click the Notebook link for Baltimore to open a browser tab that displays a read-only copy of the Baltimore notebook. Examine the sections listed in Contents:
Use this mechanism to document your datasets and to provide transparency and reproducibility by encapsulating dataset documentation, harmonization code, and output in one accessible artifact.
Search example
Let’s say your hypothesis relates specifically to homicide incidents occurring on weekends during the period 2007-2015.
Use the search UI to apply the search filters on dayofweek, year, and description:
You’ll see the city of Detroit has the largest number of incidents matching your criteria. Baltimore has a few matching records too, so you may want to narrow in on those two datasets.
Analyze
Having used the search UI to locate datasets, you now need to explore further to create rich analyses and custom visualizations to prove and present your hypothesis.
There are many tools you can use for your analysis.
One option is to use Kibana to build new queries, visualizations, and dashboards to show data patterns and trends in support of your hypothesis.
Or you could reuse the harmonization (Jupyter on EMR/Spark) environment to create new Jupyter notebooks for your research, leveraging your knowledge of Python or R to do deep statistical and predictive analytics on the harmonized dataset stored in S3. Create beautiful notebooks that contain your documentation and code, with stunning inline visualizations generated by using libraries like matplotlib or ggplot2.
Two recently launched AWS analytics services, Amazon Athena and Amazon QuickSight, provide an attractive, serverless option for many research and analytics tasks. Let’s explore how you can use these services to analyze our example police incident data.
Amazon Athena
Open the Amazon Athena console in region us-east-1. From DATABASE, choose incidents. You’ll see the tables for our datasets are already loaded into the Athena catalog. Choose the preview icon for any of these tables to see some records.
How did the tables for our datasets get loaded into the Athena catalog? If you explored the harmonization notebooks, you may know the answer. Our sample harmonization notebooks save the harmonized datasets and dictionaries to S3 as Parquet files and register these datasets as external tables with the Amazon Athena service using Athena’s JDBC driver (example).
Amazon QuickSight
Amazon QuickSight can use Athena as a data source to build beautiful, interactive visualizations. To do this:
Open and sign in to the QuickSight console. If this is your first time using QuickSight, create an account.
Note: Your QuickSight account must have permissions to access Amazon Athena and the S3 bucket(s) where your harmonized datasets are stored.
Follow the steps in the console to create an Athena data source.
After the data source is created, choose the incidents database, and then choose Edit/Preview data:
Build a custom query to combine the separate harmonized city datasets into a single dataset for analysis. Open the Tables accordion and click Switch to Custom SQL tool:
The following example combines the harmonized variables datetime, city, and description from all three of our sample datasets:
Explore the data preview, modify the analysis label, then choose Save and Visualize to store your dataset and open the visualization editor:
Select datetime and description to display a default visualization that shows incident counts over time for each incident description:
Take some time to explore QuickSight’s features by building visualizations, stories, and dashboards. You can find tutorials and examples in the QuickSight UI and in other blog posts.
Customization
This post and the companion sample application are intended to illustrate concepts and to provide you with a framework you can customize.
Code repository and continuous build and deployment
The sample solution installs a pipeline in AWS CodePipeline in your account. The pipeline monitors the source code archive in the aws-big-data-blog S3 bucket (s3://aws-bigdata-blog/artifacts/harmonize-search-analyze/src.zip). Any changes to this source archive will trigger AWS CodePipeline to automatically rebuild and redeploy the UI web application in your account.
Fork our GitHub repository and review the README.md file to learn how to create and publish the solution templates and source code to your own S3 artifact bucket.
Use CloudFormation to launch the solution master stack from your new S3 bucket and verify that your pipeline now monitors the source code archive from your bucket.
You are ready to start customizing the solution.
Customize harmonization
Using the examples as a guide, harmonize and index your own datasets by creating your own Jupyter notebooks. Our sample notebooks and Python classes contain embedded notes and comments to guide you as you make changes. Use the harmonization process to control:
Variable names
Variable values, categories, units of measurements
New variables used to create searchable dataset tags
New variables to enrich data
Data quality checks and filters/corrections
The subset of variables that are indexed and made searchable
How variables are grouped and displayed on the search UI
The data dictionary used to describe variables and preserve data lineage
Dataset documentation, publish and make accessible from search UI
Customize search UI
Customize the web search UI dashboard to reflect the variables in your own datasets and embed/link the dashboard page into your own website.
The main search page panel is an embedded Kibana dashboard. You can use Kibana to create your own customized dashboard, which you can link to the search page by editing ./services/webapp/src/config.js to replace the value of dashboardEmbedUrl.
The filter sidebar is a Bootstrap Javascript application. You won’t need to modify the sidebar code to handle new datasets or variables because it is fully data-driven from the dataset dictionary indexes saved to Amazon Elasticsearch Service during harmonization.
For more information about how to build and test the web UI, see the README.md file in our GitHub repository.
Integration with other AWS services
Consider using the recently launched Data Lake Solution on AWS to create a data lake to organize and manage your datasets in S3. The APIs can be used to register and track your harmonized datasets. The solution’s console can be used to manage your dataset lifecycles and access control.
Use Amazon CloudWatch (logs, metrics, alarms) to monitor the web UI container logs, the ECS cluster, the Elasticsearch domain, and the EMR cluster.
You might also want to store your harmonized datasets into an Amazon Redshift data warehouse, run predictive analytics on harmonized variables using Amazon Machine Learning, and much more. Explore the possibilities!
Cleanup
You will be charged an hourly fee while the resources are running, so don’t forget to delete the resources when you’re done!
Delete the master datasearch-blog CloudFormation stack
Use the S3 console to delete the S3 bucket: datasearch-blog-jupyterspark
Use the Athena console to manually delete the incidents database
Finally, use the QuickSight console to remove the data source and visualization that you created to analyze these datasets
Summary
In this blog post, we have described an approach that leverages AWS services to address many of the challenges of integrating search and analysis across multiple, loosely coupled datasets. We have provided a sample application that you can use to kick the tires. Try customizing the approach to meet your own needs.
If you have questions or suggestions, please leave your feedback in the comments. We would love to hear from you!
About the Authors
Oliver Atoa and Bob Strahan are Senior Consultants for AWS Professional Services. They work with our customers to provide leadership on a variety of projects, helping them shorten their time to value when using AWS.
Ryan Jancaitis is the Sr. Product Manager for the Envision Engineering Center at AWS. He works with our customers to enable their cloud journey through rapid prototyping and innovation to solve core business challenges.
Jupyter Notebook (formerly IPython) is one of the most popular user interfaces for running Python, R, Julia, Scala, and other languages to process and visualize data, perform statistical analysis, and train and run machine learning models. Jupyter notebooks are self-contained documents that can include live code, charts, narrative text, and more. The notebooks can be easily converted to HTML, PDF, and other formats for sharing.
Amazon EMR is a popular hosted big data processing service that allows users to easily run Hadoop, Spark, Presto, and other Hadoop ecosystem applications, such as Hive and Pig.
Python, Scala, and R provide support for Spark and Hadoop, and running them in Jupyter on Amazon EMR makes it easy to take advantage of:
the big-data processing capabilities of Hadoop applications.
the large selection of Python and R packages for analytics and visualization.
JupyterHub is a multiple-user environment for Jupyter. You can use the following bootstrap action (BA) to install Jupyter and JupyterHub on Amazon EMR:
Apache Toree (which provides the Spark, PySpark, SparkR, and SparkSQL kernels)
Julia
Ruby
JavaScript
CoffeeScript
Torch
The BA will install Jupyter, JupyterHub, and sample notebooks on the master node.
Commonly used Python and R data science and machine learning packages can be optionally installed on all nodes.
The following arguments can be passed to the BA:
--r
Install the IRKernel for R.
--toree
Install the Apache Toree kernel that supports Scala, PySpark, SQL, SparkR for Apache Spark.
--julia
Install the IJulia kernel for Julia.
--torch
Install the iTorch kernel for Torch (machine learning and visualization).
--ruby
Install the iRuby kernel for Ruby.
--ds-packages
Install the Python data science-related packages (scikit-learn pandas statsmodels).
--ml-packages
Install the Python machine learning-related packages (theano keras tensorflow).
--python-packages
Install specific Python packages (for example, ggplot and nilearn).
--port
Set the port for Jupyter notebook. The default is 8888.
--password
Set the password for the Jupyter notebook.
--localhost-only
Restrict Jupyter to listen on localhost only. The default is to listen on all IP addresses.
--jupyterhub
Install JupyterHub.
--jupyterhub-port
Set the port for JuputerHub. The default is 8000.
--notebook-dir
Specify the notebook folder. This could be a local directory or an S3 bucket.
--cached-install
Use some cached dependency artifacts on S3 to speed up installation.
--ssl
Enable SSL. For production, make sure to use your own certificate and key files.
--copy-samples
Copy sample notebooks to the notebook folder.
--spark-opts
User-supplied Spark options to override the default values.
--s3fs
Use instead of s3nb (the default) for storing notebooks on Amazon S3. This argument can cause slowness if the S3 bucket has lots of files. The Upload file and Create folder menu options do not work with s3nb.
By default (with no --password and --port arguments), Jupyter will run on port 8888 with no password protection; JupyterHub will run on port 8000. The --port and --jupyterhub-port arguments can be used to override the default ports to avoid conflicts with other applications.
The --r option installs the IRKernel for R. It also installs SparkR and sparklyr for R, so make sure Spark is one of the selected EMR applications to be installed. You’ll need the Spark application if you use the --toree argument.
If you used --jupyterhub, use Linux users to sign in to JupyterHub. (Be sure to create passwords for the Linux users first.) hadoop, the default admin user for JupyterHub, can be used to set up other users. The –password option sets the password for Jupyter and for the hadoop user for JupyterHub.
Jupyter on EMR allows users to save their work on Amazon S3 rather than on local storage on the EMR cluster (master node).
To store notebooks on S3, use:
--notebook-dir <s3://your-bucket/folder/>
To store notebooks in a directory different from the user’s home directory, use:
--notebook-dir <local directory>
The following example CLI command is used to launch a five-node (c3.4xlarge) EMR 5.2.0 cluster with the bootstrap action. The BA will install all the available kernels. It will also install the ggplot and nilearn Python packages and set:
Replace <your-ec2-key> with your AWS access key and <your-s3-bucket> with the S3 bucket where you store notebooks. You can also change the instance types to suit your needs and budget.
If you are using the EMR console to launch a cluster, you can specify the bootstrap action as follows:
When the cluster is available, set up the SSH tunnel and web proxy. The Jupyter notebook should be available at localhost:8880 (as specified in the example CLI command).
After you have signed in, you will see the home page, which displays the notebook files:
If JupyterHub is installed, the Sign in page should be available at port 8001 (as specified in the CLI example):
After you are signed in, you’ll see the JupyterHub and Jupyter home pages are the same. The JupyterHub URL, however, is /user/<username>/tree instead of /tree.
The JupyterHub Admin page is used for managing users:
You can install Jupyter extensions from the Nbextensions tab:
If you specified the --copy-samples option in the BA, you should see sample notebooks on the home page. To try the samples, first open and run the CopySampleDataToHDFS.ipynb notebook to copy some sample data files to HDFS. In the CLI example, --python-packages,'ggplot nilearn' is used to install the ggplot and nilearn packages. You can verify those packages were installed by running the Py-ggplot and PyNilearn notebooks.
The CreateUser.ipynb notebook contains examples for setting up JupyterHub users.
The PySpark.ipynb and ScalaSpark.ipynb notebooks contain the Python and Scala versions of some machine learning examples from the Spark distribution (Logistic Regression, Neural Networks, Random Forest, and Support Vector Machines):
PyHivePrestoHDFS.ipynb shows how to access Hive, Presto, and HDFS in Python. (Be sure to run the CreateHivePrestoS3Tables.ipynb first to create tables.) The %%time and %%timeit cell magics can be used to benchmark Hive and Presto queries (and other executable code):
Here are some other sample notebooks for you to try.
SparkSQLParquetJSON.ipynb:
plot_separating_hyperplane.ipynb:
R-SVMLinearNonLinear.ipynb:
plot_iris.ipynb:
Julia-IrisPlot.ipynb:
PyIrisPlot.ipynb:
R-RandomForestVisualization.ipynb:
GrangerCausality.ipynb:
SQLite.ipynb:
GraphvizDot.ipynb:
Conclusion
Data scientists who run Jupyter and JupyterHub on Amazon EMR can use Python, R, Julia, and Scala to process, analyze, and visualize big data stored in Amazon S3. Jupyter notebooks can be saved to S3 automatically, so users can shut down and launch new EMR clusters, as needed. EMR makes it easy to spin up clusters with different sizes and CPU/memory configurations to suit different workloads and budgets. This can greatly reduce the cost of data-science investigations.
If you have questions about using Jupyter and JupyterHub on EMR or would like share your use cases, please leave a comment below.
Are you trying to move away from a batch-based ETL pipeline? You might do this, for example, to get real-time insights into your streaming data, such as clickstream, financial transactions, sensor data, customer interactions, and so on. If so, it’s possible that as soon as you get down to requirements, you realize your streaming data doesn’t have all of the fields you need for real-time processing, and you are really missing that JOIN keyword!
You might also have requirements to enrich the streaming data based on a static reference, a dynamic dataset, or even with another streaming data source. How can you achieve this without sacrificing the velocity of the streaming data?
In this blog post, I provide three use cases and approaches for joining and enriching streaming data:
Joining streaming data with a relatively static dataset on Amazon S3 using Amazon Kinesis Analytics
In this use case, Amazon Kinesis Analytics can be used to define a reference data input on S3, and use S3 for enriching a streaming data source.
For example, bike share systems around the world can publish data files about available bikes and docks, at each station, in real time. On bike-share system data feeds that follow the General Bikeshare Feed Specification (GBFS), there is a reference dataset that contains a static list of all stations, their capacities, and locations.
Let’s say you would like to enrich the bike-availability data (which changes throughout the day) with the bike station’s latitude and longitude (which is static) for downstream applications. The architecture would look like this:
To illustrate how you can use Amazon Kinesis Analytics to do this, follow these steps to set up an AWS CloudFormation stack, which will do the following:
Continuously produce sample bike-availability data onto an Amazon Kinesis stream (with, by default, a single shard).
Generate a sample S3 reference data file.
Create an Amazon Kinesis Analytics application that performs the join with the reference data file.
To create the CloudFormation stack
Open the AWS CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
For the stack name, type demo-data-enrichment-s3-reference-stack. Under the KinesisAnalyticsReferenceDataS3Bucket parameter, type the name of an S3 bucket in the US East (N. Virginia) region where the reference data will be uploaded. This CloudFormation template will create a sample data file under KinesisAnalyticsReferenceDataS3Key. (Make sure the value of the KinesisAnalyticsReferenceDataS3Key parameter does not conflict with existing S3 objects in your bucket).You’ll also see parameters referencing an S3 bucket (LambdaKinesisAnalyticsApplicationCustomResourceCodeS3Bucket) and an associated S3 key. The S3 bucket and the S3 key represent the location of the AWS Lambda package (written in Python) that contains the AWS CloudFormation custom resources required to create an Amazon Kinesis application. Do not modify the values of these parameters.
Follow the remaining steps in the Create Stack wizard (click “Next” button, and then the “Create” button). Wait until the status displayed for the stack is CREATE_COMPLETE.
You now have an Amazon Kinesis stream in your AWS account that has sample bike-availability streaming data which, if you followed the template, is named demo-data-enrichment-bike-availability-input-stream and an Amazon Kinesis Analytics application that performs the joining.
You might want to examine the reference data file generated under your bucket (its default name is demo_data_enrichment_s3_reference_data.json). If you are using a JSON formatter/viewer, it will look like the following. Note how the records are put together as top-level objects (no commas separating the records).
In the CloudFormation template, the reference data has been added to the Amazon Kinesis application through the AddApplicationReferenceDataSource API method.
Next, open the Amazon Kinesis Analytics console and examine the application. If you did not modify the default parameters, the application name should be demo-data-enrichment-s3-reference-application.
The Application status should be RUNNING. If its status is still STARTING, wait until it changes to RUNNING. Choose Application details, and then go to SQL results. You should see a page similar to the following:
In the Amazon Kinesis Analytics SQL code, an in-application stream (DESTINATION_SQL_STREAM) is created with five columns. Data is inserted into the stream using a pump (STREAM_PUMP) by joining the source stream (SOURCE_SQL_STREAM_001) and the reference S3 data file (REFERENCE_DATA) using the station_id field. For more information about streams and pumps, see In-Application Streams and Pumps in the Amazon Kinesis Analytics Developer Guide.
The joined (enriched) data fields – station_name, station_lat and station_lon – should appear on the Real-time analytics tab, DESTINATION_SQL_STREAM.
The LEFT OUTER JOIN works just like the ANSI-standard SQL. When you use LEFT OUTER JOIN, the streaming records are preserved even if there is no matching station_id in the reference data. You can delete LEFT OUTER (leaving only JOIN, which means INNER join), choose Save and then run SQL. Only the records with a matching station_id will appear in the output.
You can write the results to an Amazon Kinesis stream or to S3, Amazon Redshift, or Amazon Elasticsearch Service with an Amazon Kinesis Firehose delivery stream by adding a destination on the Destination tab. For more information, see the Writing SQL on Streaming Data with Amazon Kinesis Analytics blog post.
Note: If you have not modified the default parameters in the CloudFormation template, S3 reference data source should have been added to your application. If you are interested in adding S3 reference data source on your own streams, the following code snippet shows what it looks like in Python:
This data enrichment approach works for a relatively static dataset because it requires you to upload the entire reference dataset to S3 whenever there is a change. This approach might not work for cases where the dataset changes too frequently to catch up with the streaming data.
If you change the reference data stored in the S3 bucket, you need to use the UpdateApplication operation (using the API or AWS CLI) to refresh the data in the Amazon Kinesis Analytics in-application table. You can use the following Python script to refresh the data. Although it will not be covered in this blog post, you can automate data refresh by setting up Amazon S3 event notification with AWS Lambda.
#!/usr/bin/env python
import json,boto3
# Name of the Kinesis Analytics Application
KINESIS_ANALYTICS_APPLICATION_NAME='demo-data-enrichment-s3-reference-application'
# AWS region
AWS_REGION='us-east-1'
def main_handler():
kinesisanalytics_client=boto3.client('kinesisanalytics',AWS_REGION)
# retrieve the current application version id
response_describe_application = kinesisanalytics_client.describe_application(
ApplicationName=KINESIS_ANALYTICS_APPLICATION_NAME
)
application_version_id=response_describe_application['ApplicationDetail']['ApplicationVersionId']
# update the application
response = kinesisanalytics_client.update_application(
ApplicationName=KINESIS_ANALYTICS_APPLICATION_NAME,
CurrentApplicationVersionId=application_version_id,
ApplicationUpdate={
}
)
print("Done")
if __name__ == "__main__":
main_handler()
To clean up resources created during this demo
To delete the Amazon Kinesis Analytics application, open the Amazon Kinesis Analytics console, choose the application (demo-data-enrichment-s3-reference-application), choose Actions, and then choose Delete application.
To delete the stack, open the AWS CloudFormation console, choose the stack (demo-data-enrichment-s3-reference-stack), choose Actions, and then choose Delete Stack.
The S3 reference data file should have been removed from your bucket, but your other data files should remain intact.
Enriching streaming data with a dynamic dataset using AWS Lambda and Amazon DynamoDB
In many cases, the data you want to enrich is not static. Imagine a case in which you are capturing user activities from a web or mobile application and have to enrich the activity data with frequently changing fields from a user database table.
A better approach is to store reference data in a way that can support random reads and writes efficiently. Amazon DynamoDB is a fully managed non-relational database that can be used in this case. AWS Lambda can be set up to automatically read batches of records from your Amazon Kinesis streams, which can perform data enrichment like looking up data from DynamoDB, and then produce the enriched data onto another stream.
If you have a stream of user activities and want to look up a user’s birth year from a DynamoDB table, the architecture will look like this:
To set up a demo with this architecture, follow these steps to set up an AWS CloudFormation stack, which continuously produces sample user scores onto an Amazon Kinesis stream. An AWS Lambda function enriches the records with data from a DynamoDB table and produces the results onto another Amazon Kinesis stream.
Open the CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
For the stack name, type demo-data-enrichment-ddb-reference-stack. Review the parameters. If none of them conflict with your Amazon Kinesis stream names or DynamoDB table name, you can accept the default values and choose Next. The following steps are written with the assumption that you are using the default values.
Complete the remaining steps in the Create Stack wizard. Wait until CREATE_COMPLETE is displayed for the stack status.This CloudFormation template creates three Lambda functions: one for setting up sample data in a DynamoDB table, one for producing sample records continuously, and one for data enrichment. The description for this last function is Data Enrichment Demo – Lambda function to consume and enrich records from Kinesis.
Open the Lambda console, select Data Enrichment Demo – Lambda function to consume and enrich records from Kinesis, and then choose the Triggers
If No records processed is displayed for Last result, wait for a few minutes, and then reload this page. If things are set up correctly, OK will be displayed for Last result. This might take a few minutes.
At this point, the Lambda function is performing the data enrichment and producing records onto an output stream. Lambda is commonly used for preprocessing the analytics app to handle more complicated data formats.
Although this data enrichment process doesn’t involve Amazon Kinesis Analytics, you can still use the Kinesis Analytics service to examine, or even process, the enriched records, by creating a new Kinesis Analytics application and connect it to demo-data-enrichment-user-activity-output-stream.
The records on the demo-data-enrichment-user-activity-output-stream will look like the following and will show the enriched birthyear field on the streaming data.
You can use this birthyear value in your Amazon Kinesis Analytics application. Although it’s not covered in this blog post, you can aggregate by age groups and count the user activities.
You can review the code for the Lambda function on the Code tab. The code used here is for demonstration purpose only. You might want to modify and thoroughly test it before applying it to your use case.
Note the following:
The Lambda function receives records in batches (instead of one record per Lambda invocation). You can control this behavior through Batch size on the Triggers tab (in the preceding example, Batch size is set to 100). The batch size you specify is the maximum number of records that you want your Lambda function to receive per invocation.
The retrieval from the reference data source (in this case, DynamoDB) can be done in batch instead of record by record. This example uses the batch_get_item() API method.
Depending on how strict the record sequencing requirement is, the writing of results to the output stream can also be done in batch. This example uses the put_records() API method.
DynamoDB is not the only option. What’s important is to have a data source that supports random data read (and write) at a high efficiency. Because it’s a distributed database with built-in partitioning, DynamoDB is a good choice, but you can also use Amazon RDS as long as the data retrieval is fast enough (perhaps by having a proper index and by spreading the read workload over read replicas). You can also use an in-memory database like Amazon ElastiCache for Redis or Memcached.
To clean up the resources created for this demo
If you created a Kinesis Analytics application: Open the Amazon Kinesis Analytics console, select your application, choose Actions, and then choose Delete application.
Open the CloudFormation console, choose demo-data-enrichment-ddb-reference-stack, choose Actions, and then choose Delete Stack.
Joining multiple sets of streaming data using Amazon Kinesis Analytics
To illustrate this use case, let’s say you have two streams of temperature sensor data coming from a set of machines: one measuring the engine temperature and the other measuring the power supply temperature. For the best prediction of machine failure, you’ve been told (perhaps by your data scientist) that the two temperature readings must be validated against a prediction model ─ not individually, but as a set.
Because this is streaming data, the time window for the data joining should be clearly defined. In this example, the joining must occur on temperature readings within the same window (same minute) so that the temperature of the engine is matched against the temperature of the power supply in the same timeframe.
This data joining can be achieved with Amazon Kinesis Analytics, but has to follow a certain pattern.
First of all, an Amazon Kinesis Analytics application supports no more than one streaming data source. The different sets of streaming data have to be produced onto one Amazon Kinesis stream.
An Amazon Kinesis Analytics application can use this Amazon Kinesis stream as input and can process the data based on a certain field (in this example, sensor_location) into multiple in-application streams.
The joining can now occur on the two in-application streams. In this example, the join fields are the machine_id and the one-minute tumbling window.
The selection of the time field for the windowing criteria is a topic of its own. For information, see Writing SQL on Streaming Data with Amazon Kinesis Analytics. It is important to get a well-aligned window for real-world applications. In this example, the processing time (ROWTIME) will be used for the tumbling window calculation for simplicity.
Again, you can follow these steps to set up an AWS CloudFormation stack, which continuously produces two sets of sample sensor data onto a single Amazon Kinesis stream and creates an Amazon Kinesis Analytics application that performs the joining.
Open the CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
For the stack name, type demo-data-joining-multiple-streams-stack. Review the parameters. If none of them conflict with your Amazon Kinesis stream, you can accept the default values and choose Next. The following steps are written with the assumption that you are using the default values.
You will also see parameters referencing an S3 bucket and S3 key. This is an AWS Lambda package (written in Python) that contains the AWS CloudFormation custom resources to create an Amazon Kinesis application.
Complete the remaining steps in the Create Stack wizard. Wait until the status displayed for the stack is CREATE_COMPLETE.
Open the Amazon Kinesis Analytics console and examine the application. If you have not modified the default parameters in the CloudFormation template, the application name should be demo-data-joining-multiple-streams-application.
The Application status should be RUNNING. If its status is still STARTING, wait until it changes to RUNNING.
Choose Application details, and then go to SQL results.
You should see a page similar to the following:
The SQL statements have been populated for you. The script first creates two in-application streams (for engine and power supply temperature readings, respectively). DESTINATION_SQL_STREAM holds the joined results.
On the Real-time analytics tab, you’ll see the average temperature readings of the engine and power supply have been joined together using the machine_id and per-minute tumbling window. The results can be written to an Amazon Kinesis stream or to S3, Amazon Redshift, or Amazon Elastisearch Service with a Firehose delivery stream.
To clean up resources created for this demo
To delete the Amazon Kinesis Analytics application, open the Amazon Kinesis Analytics console, choose demo-data-joining-multiple-streams-application, choose Actions, and then choose Delete application.
Open the CloudFormation console, choose demo-data-joining-multiple-streams-stack, choose Actions, and then choose Delete Stack.
Summary
In this blog post, I shared three approaches for joining and enriching streaming data on Amazon Kinesis Streams by using Amazon Kinesis Analytics, AWS Lambda, and Amazon DynamoDB. I hope this information will be helpful in situations when your real-time analytics applications require additional data fields from reference data sources or the real-time insights must be derived from data across multiple streaming data sources. These joining and enrichment techniques will help you can get the best business value from your data.
If you have a question or suggestion, please leave a comment below.
About the author
Assaf Mentzer is a Senior Consultant in Big Data & Analytics for AWS Professional Services. He works with enterprise customers to provide leadership on big data projects, helping them reach their full data potential in the cloud. In his spare time, he enjoys watching sports, playing ping pong, and hanging out with family and friends.
Aaron Friedman is a Healthcare and Life Sciences Solutions Architect with Amazon Web Services
The genomics industry is in the midst of a data explosion. Due to the rapid drop in the cost to sequence genomes, genomics is now central to many medical advances. When your genome is sequenced and analyzed, raw sequencing files are processed in a multi-step workflow to identify where your genome differs from a standard reference. Your variations are stored in a Variant Call Format (VCF) file, which is then combined with other individuals to enable population-scale analyses. Many of these datasets are publicly available, and an increasing number are hosted on AWS as part of our Open Data project.
To mine genomic data for new discoveries, researchers in both industry and academia build complex models to analyze populations at scale. When building models, they first explore the datasets-of-interest to understand what questions the data might answer. In this step, interactivity is key, as it allows them to move easily from one question to the next.
Recently, we launched Amazon Athena as an interactive query service to analyze data on Amazon S3. With Amazon Athena there are no clusters to manage and tune, no infrastructure to setup or manage, and customers pay only for the queries they run. Athena is able to query many file types straight from S3. This flexibility gives you the ability to interact easily with your datasets, whether they are in a raw text format (CSV/JSON) or specialized formats (e.g. Parquet). By being able to flexibly query different types of data sources, researchers can more rapidly progress through the data exploration phase for discovery. Additionally, researchers don’t have to know nuances of managing and running a big data system. This makes Athena an excellent complement to data warehousing on Amazon Redshift and big data analytics on Amazon EMR.
In this post, I discuss how to prepare genomic data for analysis with Amazon Athena as well as demonstrating how Athena is well-adapted to address common genomics query paradigms. I use the Thousand Genomes dataset hosted on Amazon S3, a seminal genomics study, to demonstrate these approaches. All code that is used as part of this post is available in our GitHub repository.
Although this post is focused on genomic analysis, similar approaches can be applied to any discipline where large-scale, interactive analysis is required.
Select, aggregate, annotate query pattern in genomics
Genomics researchers may ask different questions of their dataset, such as:
What variations in a genome may increase the risk of developing disease?
What positions in the genome have abnormal levels of variation, suggesting issues in quality of sequencing or errors in the genomic reference?
What variations in a genome influence how an individual may respond to a specific drug treatment?
Does a group of individuals contain a higher frequency of a genomic variant known to alter response to a drug relative to the general population?
All these questions, and more, can be generalized under a common query pattern I like to call “Select, Aggregate, Annotate”. Some of our genomics customers, such as Human Longevity, Inc., routinely use this query pattern in their work.
In each of the above queries, you execute the following steps:
SELECT: Specify the cohort of individuals meeting certain criteria (disease, drug response, age, BMI, entire population, etc.).
AGGREGATE: Generate summary statistics of genomic variants across the cohort that you selected.
ANNOTATE: Assign meaning to each of the variants by joining on known information about each variant.
Dataset preparation
Properly organizing your dataset is one of the most critical decisions for enabling fast, interactive analysies. Based on the query pattern I just described, the table representing your population needs to have the following information:
A unique sample ID corresponding to each sample in your population
Information about each variant, specifically its location in the genome as well as the specific deviation from the reference
Information about how many times in a sample a variant occurs (0, 1, or 2 times) as well as if there are multiple variants in the same site. This is known as a genotype.
The extract, transform, load (ETL) process to generate the appropriate data representation has two main steps. First, you use ADAM, a genomics analysis platform built on top of Spark, to convert the variant information residing a VCF file to Parquet for easier downstream analytics, in a process similar to the one described in the Will Spark Power the Data behind Precision Medicine? post. Then, you use custom Python code to massage the data and select only the appropriate fields that you need for analysis with Athena.
First, spin up an EMR cluster (version 5.0.3) for the ETL process. I used a c4.xlarge for my master node and m4.4xlarges with 1 TB of scratch for my core nodes.
After you SSH into your master node, clone the git repository. You can also put this in as a bootstrap action when spinning up your cluster.
You then need to install ADAM and configure it to run with Spark 2.0. In your terminal, enter the following:
# Install Maven
wget http://apache.claz.org/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
tar xvzf apache-maven-3.3.9-bin.tar.gz
export PATH=/home/hadoop/apache-maven-3.3.9/bin:$PATH
# Install ADAM
git clone https://github.com/bigdatagenomics/adam.git
cd adam
sed -i 's/2.6.0/2.7.2/' pom.xml
./scripts/move_to_spark_2.sh
export MAVEN_OPTS="-Xmx512m -XX:MaxPermSize=256m"
mvn clean package -DskipTests
export PATH=/home/hadoop/adam/bin:$PATH
Now that ADAM is installed, enter the working directory of the repo that you first cloned, which contains the code relevant to this post. Your next step is to convert a VCF containing all the information about your population.
For this post, you are going to focus on a single region in the genome, defined as chromosome 22, for all 2,504 samples in the Thousand Genomes dataset. As chromosome 22 was the first chromosome to be sequenced as part of the Human Genome Project, it is your first here as well. You can scale the size of your cluster depending on whether you are looking at just chromosome 22, or the entire genome. In the following lines, replace mytestbucket with your bucket name of choice.
PARQUETS3PATH=s3://<mytestbucket>/thousand_genomes/grch37/chr22.parquet/
cd ~/aws-big-data-blog/aws-blog-athena-genomics/etl/thousand_genomes/
chmod +x ./convert_vcf.sh
./convert_vcf.sh $PARQUETS3PATH
After this conversion completes, you can reduce your Parquet file to only the fields you need.
For this post, use the ClinVar dataset, which is an archive of information about variation in genomes and health. While Parquet is a preferred format for Athena relative to raw text, using the ClinVar TXT file demonstrates the ability of Athena to read multiple file types.
In a Unix terminal, execute the following commands (replace mytestbucket with your bucket name of choice):
ANNOPATH=s3://<mytestbucket>/clinvar/
wget ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/tab_delimited/variant_summary.txt.gz
# Need to strip out the first line (header)
zcat variant_summary.txt.gz | sed '1d' > temp ; mv -f temp variant_summary.trim.txt ; gzip variant_summary.trim.txt
aws s3 cp variant_summary.trim.txt.gz $ANNOPATH --sse
Creating Tables using Amazon Athena
In the recent Amazon Athena – Interactive SQL Queries for Data in Amazon S3 post, Jeff Barr covered how to get started with Athena and navigate to the console. Athena uses the Hive DDL when you create, alter, or drop a table. This means you can use the standard Hive syntax for creating tables. These commands can also be found in the aws-blog-athena-genomics GitHub repo under sql/setup.
First, create your database:
CREATE DATABASE kg;
Next, define your table with the appropriate mappings from the previous ETL processes. Be sure to change to your appropriate bucket name.
For your population data:
If you want to skip the ETL process described above for your population data, you can use the following path for your analysis: s3://aws-bigdata-blog/artifacts/athena_genomics.parquet/ – be sure to substitute it in for the LOCATION below.
You can confirm that both tables have been created by choosing the eye icon next to a table name in the console. This automatically runs a query akin to SELECT * FROM table LIMIT 10. If it succeeds, then your data has been loaded successfully.
Applying the select, aggregate, annotate paradigm
In this section, I walk you through how to use the query pattern described earlier to answer two common questions in genomics. You examine the frequency of different genotypes, which is a distinct combination of all fields in the samplevariants table with the exception of sampleid. These queries can also be found in the aws-blog-athena-genomics GitHub repo under sql/queries.
Population drug response
What small molecules/drugs are most likely to affect a subpopulation of individuals (ancestry, age, etc.) based on their genomic information?
In this query, assume that you have some phenotype data about your population. In this case, also assume that all samples sharing the pattern “NA12” are part of a specific demographic.
In this query, use sampleid as your predicate pushdown. The general steps are:
Filter by the samples in your subpopulation
Aggregate variant frequencies for the subpopulation-of-interest
Join on ClinVar dataset
Filter by variants that have been implicated in drug-response
Order by highest frequency variants
To answer this question, you can craft the following query:
SELECT
count(*)/cast(numsamples AS DOUBLE) AS genotypefrequency
,cv.rsid
,cv.phenotypelist
,sv.chromosome
,sv.startposition
,sv.endposition
,sv.referenceallele
,sv.alternateallele
,sv.genotype0
,sv.genotype1
FROM demo.samplevariants sv
CROSS JOIN
(SELECT count(1) AS numsamples
FROM
(SELECT DISTINCT sampleid
FROM demo.samplevariants
WHERE sampleid LIKE 'NA12%'))
JOIN demo.clinvar cv
ON sv.chromosome = cv.chromosome
AND sv.startposition = cv.startposition - 1
AND sv.endposition = cv.endposition
AND sv.referenceallele = cv.referenceallele
AND sv.alternateallele = cv.alternateallele
WHERE assembly='GRCh37'
AND cv.clinicalsignificance LIKE '%response%'
AND sampleid LIKE 'NA12%'
GROUP BY sv.chromosome
,sv.startposition
,sv.endposition
,sv.referenceallele
,sv.alternateallele
,sv.genotype0
,sv.genotype1
,cv.clinicalsignificance
,cv.phenotypelist
,cv.rsid
,numsamples
ORDER BY genotypefrequency DESC LIMIT 50
Enter the query into the console and you see something like the following:
When you inspect the results, you can quickly see (often in a matter of seconds!) that this population of individuals has a high frequency of variants associated with the metabolism of debrisoquine.
Quality control
Are you systematically finding locations in your reference that are called as variation? In other words, what positions in the genome have abnormal levels of variation, suggesting issues in quality of sequencing or errors in the reference? Are any of these variants implicated in clinically? If so, could this be a false positive clinical finding?
In this query, the entire population is needed so the SELECT predicate is not used. The general steps are:
Aggregate variant frequencies for the entire Thousand Genomes population
Join on the ClinVar dataset
Filter by variants that have been implicated in disease
Order by the highest frequency variants
This translates into the following query:
SELECT
count(*)/cast(numsamples AS DOUBLE) AS genotypefrequency
,cv.clinicalsignificance
,cv.phenotypelist
,sv.chromosome
,sv.startposition
,sv.endposition
,sv.referenceallele
,sv.alternateallele
,sv.genotype0
,sv.genotype1
FROM demo.samplevariants sv
CROSS JOIN
(SELECT count(1) AS numsamples
FROM
(SELECT DISTINCT sampleid
FROM demo.samplevariants))
JOIN demo.clinvar cv
ON sv.chromosome = cv.chromosome
AND sv.startposition = cv.startposition - 1
AND sv.endposition = cv.endposition
AND sv.referenceallele = cv.referenceallele
AND sv.alternateallele = cv.alternateallele
WHERE assembly='GRCh37'
AND cv.clinsigsimple='1'
GROUP BY sv.chromosome ,
sv.startposition
,sv.endposition
,sv.referenceallele
,sv.alternateallele
,sv.genotype0
,sv.genotype1
,cv.clinicalsignificance
,cv.phenotypelist
,numsamples
ORDER BY genotypefrequency DESC LIMIT 50
As you can see in the following screenshot, the highest frequency results have conflicting information, being listed both as potentially causing disease and being benign. In genomics, variants with a higher frequency are less likely to cause disease. Your quick analysis allows you to discount the pathogenic clinical significance annotations of these high genotype frequency variants.
Summary
I hope you have seen how you can easily integrate datasets of different types and values, and combine them to quickly derive new meaning from your data. Do you have a dataset you want to explore or want to learn more about Amazon Athena? Check out our Getting Started Guide and happy querying!
If you have questions or suggestions, please comment below.
About the author
Dr. Aaron Friedman a Healthcare and Life Sciences Partner Solutions Architect at Amazon Web Services. He works with our ISVs and SIs to architect healthcare solutions on AWS, and bring the best possible experience to their customers. His passion is working at the intersection of science, big data, and software. In his spare time, he’s out hiking or learning a new thing to cook.
Big data processing solutions have been using AWS Lambda more lately; customers have been creating solutions such as building metadata indexes for Amazon S3 using Lambda and Amazon DynamoDB and stream processing of data in S3. In this post, I discuss a serverless architecture to run MapReduce jobs using Lambda and S3.
Big data AWS services
Lambda launched in 2015, enabling customers to execute code on demand without any dedicated infrastructure. Since then, customers have successfully used Lambda for various use cases like event-driven processing for data ingested into S3 or Amazon Kinesis, web API backends, and producer/consumer architectures, among others. Lambda has emerged as a key component in building serverless architectures for these architecture paradigms.
S3 integrates directly into other AWS services, such as providing an easy export facility into Amazon Redshift and Amazon Elasticsearch Service, and providing an underlying file system (EMRFS) for Amazon EMR, making it an ideal data lake in the AWS ecosystem.
Amazon Redshift, EMR and the Hadoop ecosystem offer comprehensive solutions for big data processing. While EMR makes it easy, fast and cost-effective to run Hadoop clusters, it still requires provisioning servers, as well as knowledge of Hadoop and its components.
Serverless MapReduce overview
I wanted to extend some of these solutions and present a serverless architecture along with a customizable framework that enables customers to run ad hoc map reduce jobs. Apart from the benefit of not having to manage any servers, this framework was significantly cheaper and, in some cases, faster than existing solutions when running the well-known big data processing Amplab benchmark.
This framework allows you to process big data with no operational effort and no prior Hadoop knowledge. While Hadoop is the most popular big data processing framework today, it does have a steep learning curve given the number of components and their inherent complexity. By minimizing the number of components in the framework and abstracting the infrastructure management, it simplifies data processing for developers or data scientists. They can focus on modeling the data processing problem and deriving value from the data stored in S3.
In addition to reduced complexity, this solution is much cheaper than existing solutions for ad hoc MapReduce workloads. Given that the solution is serverless, customers pay only when the MapReduce job is executed. The cost for the solution is the aggregate usage cost of Lambda and S3. Given that both the services are on-demand, you can compute the cost per query, a feature that’s unique to the solution. This is extremely useful as you can now budget your data processing needs precisely to the query.
For customers who want enhanced security, they can process the data in a VPC by configuring the Lambda functions to access resources in a VPC and creating a VPC endpoint for S3. There are no major performance implications of running multiple variants of the same job or different jobs on the same dataset concurrently. The Lambda function resources are independent, thus allowing for fast iterations and development. This technology is really exciting for our customers, as it enables a truly pay-for-what-you-use cost effective and secure model for big data processing.
Reference architecture for serverless MapReduce
The goals are to:
Abstract infrastructure management
Get close to a "zero" setup time
Provide a pay-per-execution model for every job
Be cheaper than other data processing solutions
Enable multiple executions on the same dataset
The architecture is composed of three main Lambda functions:
Mapper
Reducer
Coordinator
The MapReduce computing paradigm requires maintaining state, so architecturally you require a coordinator or a scheduler to connect the map phase of the processing to the reduce phase. In this architecture, you use a Lambda function as a coordinator to make this completely serverless. The coordinator maintains all its state information by persisting the state data in S3.
Execution workflow:
The workflow starts with the invocation of the driver script that reads in the config file, which includes the mapper and reducer function file paths and the S3 bucket or folder path.
The driver function finds the relevant objects in S3 to process by listing the keys and matching by prefix in the S3 bucket. The keys are aggregated to created batches, which are then passed to the mapper. The batch size is determined by a simple heuristic that tries to achieve maximum parallelism while optimizing the data fit based on the mapper memory size.
Mapper, reducer, and coordinator Lambda functions are created and the code is uploaded.
A S3 folder called job folder is created as a temporary workspace for the current job and is configured as an event source for the coordinator.
The mappers write their outputs to the job folder; after all the mappers finish, the coordinator uses the aforementioned logic to create batches and invoke the reducers.
The coordinator is notified through the S3 event source mapping when each of the reducers end and continues to create subsequent stages of reducers until a single reduced output is created.
The following diagram shows the overall architecture:
Getting started with serverless MapReduce
In this post, I show you how to build and deploy your first serverless MapReduce job with this framework for data stored in S3. The code used in this post, along with detailed instructions about how to set up and run the application, is available in the awslabs lambda-refarch-mapreduce GitHub repo.
For the first job, you compute an aggregation (i.e., sum of ad revenue for every source IP address) on the dataset of size 25.4GB and 155 million individual rows.
Dataset:
s3://big-data-benchmark/pavlo/text/1node/
Schema in CSV:
Each row of the Uservisits dataset is composed of the following:
sourceIP VARCHAR(116)
destURL VARCHAR(100)
visitDate DATE
adRevenue FLOAT
userAgent VARCHAR(256)
countryCode CHAR(3)
languageCode CHAR(6)
searchWord VARCHAR(32)
duration INT
SQL representation of the intended operation:
SELECT SUBSTR(sourceIP, 1, 8), SUM(adRevenue) FROM uservisits GROUP BY SUBSTR(sourceIP, 1, 8)
Prerequisites
You need the following prerequisites for working with this serverless MapReduce framework:
AWS account
S3 bucket
Lambda execution role
IAM credentials with access to create Lambda functions and list S3 buckets
Limits
In its current incarnation, this framework is best suited for ad hoc processing of data in S3. For sustained usage and time sensitive workloads, Amazon Redshift or EMR may be better suited. The framework has the following limits:
By default, each account has a concurrent Lambda execution limit of 100. This is a soft limit and can be increased by opening up a limit increase support ticket.
Lambda currently has a maximum container size limit of 1536 MB.
Components
The application has the following components:
Driver script and config file
Mapper Lambda function
Reducer Lambda function
Coordinator Lambda function
Driver script and config file
The driver script creates and configures the mapper, reducer, and coordinator Lambda functions in your account based on the config file. Here is an example configuration file that the driver script uses.
This is where you perform your map logic; the data is streamed line by line into your mapper function. In this function, you map individual records to a value or as an optimization; also, you need to perform the first reduce step especially when computing aggregations. This is efficient given that you may be reading from multiple input sources in the mapper. In this example, the mapper maps the sourceIP on to adRevenue and stores in a dictionary a running total of the adRevenue of every sourceIP.
# Download and process all keys
for key in src_keys:
response = s3_client.get_object(Bucket=src_bucket, Key=key)
contents = response['Body'].read()
for line in contents.split('\n')[:-1]:
line_count +=1
try:
data = line.split(',')
srcIp = data[0][:8]
if srcIp not in totals:
totals[srcIp] = 0
totals[srcIp] += float(data[3])
except Exception, e:
print e
Reducer Lambda function
The mapper and reducer functions look identical structurally, but perform different operations on the data. The reducer keeps a dictionary of aggregate sums of adRevenue of every sourceIP. The reducer is also responsible for the termination of the job. When the final reducer runs, it then reduces the intermediate results into a single file stored in the job bucket.
# Download and process all mapper output
for key in reducer_keys:
response = s3_client.get_object(Bucket=job_bucket, Key=key)
contents = response['Body'].read()
try:
for srcIp, val in json.loads(contents).iteritems():
line_count +=1
if srcIp not in results:
results[srcIp] = 0
results[srcIp] += float(val)
except Exception, e:
print e
Coordinator Lambda function
The coordinator function keeps track of the job state with the job bucket as a Lambda event source. It is invoked every time a mapper or reducer finishes. After all the mappers finish, it then invokes the reducers to compute the final output.
The pseudo code for the coordinator looks like the following:
If all mappers are done:
If currently in the reduce phase:
number_of_reducers = compute_number_of_reducers(previous_reducer_step_outputs)
Else:
number_of_reducers = compute_number_of_reducers(mapper_outputs)
If number_of_reducers == 1:
Invoke single reducer and write the results to S3
Job done;
Else create event source for reducer
Else:
Return
The coordinator doesn’t need wait for all the mappers to finish in order to start the reducers, but for simplicity, the first version of the framework chooses to wait for all mappers.
Running the job
The driver function is the interface to start the job. It reads the job details like the mapper and reducer code source to create the Lambda functions in your account for the serverless MapReduce job.
Execute the following command:
$ python driver.py
Intermediate outputs from the mappers and reducers are stored in the specified job bucket and the final result is stored as JobBucket/JobID/result. The contents of the job bucket look like the following:
smallya$ head –n 3 result
67.23.87,5.874290244999999
30.94.22,96.25011190570001
25.77.91,14.262780186000002
Cost Analysis
The cost for this job is 2.49 cents, which processed over 25 GB of data and took less than 2 minutes. The majority of the cost can be attributed to the Lambda components and the mapper function in particular, given that the map phase takes the longest and is the most resource-intensive.
A component cost breakdown for the example above is plotted in the following chart.
The cost model is shown to scale almost linearly when the same job is run for a bigger dataset (126.8 GB, 775 million rows) for the Amplab benchmark (more details in the awslabs lambda-refarch-mapreduce GitHub repo), costing around 11 cents and executing in 3.5 minutes.
Summary
In this post, I showed you how to build a simple MapReduce task using a serverless framework for data stored in S3. If you have ad hoc data processing workloads, the cost effectiveness, speed, and price-per-query model makes it very suitable.
The code has been made available in the awslabs lambda-refarch-mapreduce GitHub repo . You can modify or extend this framework to build more complex applications for your data processing needs.
If you have questions or suggestions, please comment below.
The collective thoughts of the interwebz
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.