Tag Archives: Amazon Athena

How I built a data warehouse using Amazon Redshift and AWS services in record time

Post Syndicated from Stephen Borg original https://aws.amazon.com/blogs/big-data/how-i-built-a-data-warehouse-using-amazon-redshift-and-aws-services-in-record-time/

This is a customer post by Stephen Borg, the Head of Big Data and BI at Cerberus Technologies.

Cerberus Technologies, in their own words: Cerberus is a company founded in 2017 by a team of visionary iGaming veterans. Our mission is simple – to offer the best tech solutions through a data-driven and a customer-first approach, delivering innovative solutions that go against traditional forms of working and process. This mission is based on the solid foundations of reliability, flexibility and security, and we intend to fundamentally change the way iGaming and other industries interact with technology.

Over the years, I have developed and created a number of data warehouses from scratch. Recently, I built a data warehouse for the iGaming industry single-handedly. To do it, I used the power and flexibility of Amazon Redshift and the wider AWS data management ecosystem. In this post, I explain how I was able to build a robust and scalable data warehouse without the large team of experts typically needed.

In two of my recent projects, I ran into challenges when scaling our data warehouse using on-premises infrastructure. Data was growing at many tens of gigabytes per day, and query performance was suffering. Scaling required major capital investment for hardware and software licenses, and also significant operational costs for maintenance and technical staff to keep it running and performing well. Unfortunately, I couldn’t get the resources needed to scale the infrastructure with data growth, and these projects were abandoned. Thanks to cloud data warehousing, the bottleneck of infrastructure resources, capital expense, and operational costs have been significantly reduced or have totally gone away. There is no more excuse for allowing obstacles of the past to delay delivering timely insights to decision makers, no matter how much data you have.

With Amazon Redshift and AWS, I delivered a cloud data warehouse to the business very quickly, and with a small team: me. I didn’t have to order hardware or software, and I no longer needed to install, configure, tune, or keep up with patches and version updates. Instead, I easily set up a robust data processing pipeline and we were quickly ingesting and analyzing data. Now, my data warehouse team can be extremely lean, and focus more time on bringing in new data and delivering insights. In this post, I show you the AWS services and the architecture that I used.

Handling data feeds

I have several different data sources that provide everything needed to run the business. The data includes activity from our iGaming platform, social media posts, clickstream data, marketing and campaign performance, and customer support engagements.

To handle the diversity of data feeds, I developed abstract integration applications using Docker that run on Amazon EC2 Container Service (Amazon ECS) and feed data to Amazon Kinesis Data Streams. These data streams can be used for real time analytics. In my system, each record in Kinesis is preprocessed by an AWS Lambda function to cleanse and aggregate information. My system then routes it to be stored where I need on Amazon S3 by Amazon Kinesis Data Firehose. Suppose that you used an on-premises architecture to accomplish the same task. A team of data engineers would be required to maintain and monitor a Kafka cluster, develop applications to stream data, and maintain a Hadoop cluster and the infrastructure underneath it for data storage. With my stream processing architecture, there are no servers to manage, no disk drives to replace, and no service monitoring to write.

Setting up a Kinesis stream can be done with a few clicks, and the same for Kinesis Firehose. Firehose can be configured to automatically consume data from a Kinesis Data Stream, and then write compressed data every N minutes to Amazon S3. When I want to process a Kinesis data stream, it’s very easy to set up a Lambda function to be executed on each message received. I can just set a trigger from the AWS Lambda Management Console, as shown following.

I also monitor the duration of function execution using Amazon CloudWatch and AWS X-Ray.

Regardless of the format I receive the data from our partners, I can send it to Kinesis as JSON data using my own formatters. After Firehose writes this to Amazon S3, I have everything in nearly the same structure I received but compressed, encrypted, and optimized for reading.

This data is automatically crawled by AWS Glue and placed into the AWS Glue Data Catalog. This means that I can immediately query the data directly on S3 using Amazon Athena or through Amazon Redshift Spectrum. Previously, I used Amazon EMR and an Amazon RDS–based metastore in Apache Hive for catalog management. Now I can avoid the complexity of maintaining Hive Metastore catalogs. Glue takes care of high availability and the operations side so that I know that end users can always be productive.

Working with Amazon Athena and Amazon Redshift for analysis

I found Amazon Athena extremely useful out of the box for ad hoc analysis. Our engineers (me) use Athena to understand new datasets that we receive and to understand what transformations will be needed for long-term query efficiency.

For our data analysts and data scientists, we’ve selected Amazon Redshift. Amazon Redshift has proven to be the right tool for us over and over again. It easily processes 20+ million transactions per day, regardless of the footprint of the tables and the type of analytics required by the business. Latency is low and query performance expectations have been more than met. We use Redshift Spectrum for long-term data retention, which enables me to extend the analytic power of Amazon Redshift beyond local data to anything stored in S3, and without requiring me to load any data. Redshift Spectrum gives me the freedom to store data where I want, in the format I want, and have it available for processing when I need it.

To load data directly into Amazon Redshift, I use AWS Data Pipeline to orchestrate data workflows. I create Amazon EMR clusters on an intra-day basis, which I can easily adjust to run more or less frequently as needed throughout the day. EMR clusters are used together with Amazon RDS, Apache Spark 2.0, and S3 storage. The data pipeline application loads ETL configurations from Spring RESTful services hosted on AWS Elastic Beanstalk. The application then loads data from S3 into memory, aggregates and cleans the data, and then writes the final version of the data to Amazon Redshift. This data is then ready to use for analysis. Spark on EMR also helps with recommendations and personalization use cases for various business users, and I find this easy to set up and deliver what users want. Finally, business users use Amazon QuickSight for self-service BI to slice, dice, and visualize the data depending on their requirements.

Each AWS service in this architecture plays its part in saving precious time that’s crucial for delivery and getting different departments in the business on board. I found the services easy to set up and use, and all have proven to be highly reliable for our use as our production environments. When the architecture was in place, scaling out was either completely handled by the service, or a matter of a simple API call, and crucially doesn’t require me to change one line of code. Increasing shards for Kinesis can be done in a minute by editing a stream. Increasing capacity for Lambda functions can be accomplished by editing the megabytes allocated for processing, and concurrency is handled automatically. EMR cluster capacity can easily be increased by changing the master and slave node types in Data Pipeline, or by using Auto Scaling. Lastly, RDS and Amazon Redshift can be easily upgraded without any major tasks to be performed by our team (again, me).

In the end, using AWS services including Kinesis, Lambda, Data Pipeline, and Amazon Redshift allows me to keep my team lean and highly productive. I eliminated the cost and delays of capital infrastructure, as well as the late night and weekend calls for support. I can now give maximum value to the business while keeping operational costs down. My team pushed out an agile and highly responsive data warehouse solution in record time and we can handle changing business requirements rapidly, and quickly adapt to new data and new user requests.


Additional Reading

If you found this post useful, be sure to check out Deploy a Data Warehouse Quickly with Amazon Redshift, Amazon RDS for PostgreSQL and Tableau Server and Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift.


About the Author

Stephen Borg is the Head of Big Data and BI at Cerberus Technologies. He has a background in platform software engineering, and first became involved in data warehousing using the typical RDBMS, SQL, ETL, and BI tools. He quickly became passionate about providing insight to help others optimize the business and add personalization to products. He is now the Head of Big Data and BI at Cerberus Technologies.

 

 

 

Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift

Post Syndicated from Thiyagarajan Arumugam original https://aws.amazon.com/blogs/big-data/top-8-best-practices-for-high-performance-etl-processing-using-amazon-redshift/

An ETL (Extract, Transform, Load) process enables you to load data from source systems into your data warehouse. This is typically executed as a batch or near-real-time ingest process to keep the data warehouse current and provide up-to-date analytical data to end users.

Amazon Redshift is a fast, petabyte-scale data warehouse that enables you easily to make data-driven decisions. With Amazon Redshift, you can get insights into your big data in a cost-effective fashion using standard SQL. You can set up any type of data model, from star and snowflake schemas, to simple de-normalized tables for running any analytical queries.

To operate a robust ETL platform and deliver data to Amazon Redshift in a timely manner, design your ETL processes to take account of Amazon Redshift’s architecture. When migrating from a legacy data warehouse to Amazon Redshift, it is tempting to adopt a lift-and-shift approach, but this can result in performance and scale issues long term. This post guides you through the following best practices for ensuring optimal, consistent runtimes for your ETL processes:

  • COPY data from multiple, evenly sized files.
  • Use workload management to improve ETL runtimes.
  • Perform table maintenance regularly.
  • Perform multiple steps in a single transaction.
  • Loading data in bulk.
  • Use UNLOAD to extract large result sets.
  • Use Amazon Redshift Spectrum for ad hoc ETL processing.
  • Monitor daily ETL health using diagnostic queries.

1. COPY data from multiple, evenly sized files

Amazon Redshift is an MPP (massively parallel processing) database, where all the compute nodes divide and parallelize the work of ingesting data. Each node is further subdivided into slices, with each slice having one or more dedicated cores, equally dividing the processing capacity. The number of slices per node depends on the node type of the cluster. For example, each DS2.XLARGE compute node has two slices, whereas each DS2.8XLARGE compute node has 16 slices.

When you load data into Amazon Redshift, you should aim to have each slice do an equal amount of work. When you load the data from a single large file or from files split into uneven sizes, some slices do more work than others. As a result, the process runs only as fast as the slowest, or most heavily loaded, slice. In the example shown below, a single large file is loaded into a two-node cluster, resulting in only one of the nodes, “Compute-0”, performing all the data ingestion:

When splitting your data files, ensure that they are of approximately equal size – between 1 MB and 1 GB after compression. The number of files should be a multiple of the number of slices in your cluster. Also, I strongly recommend that you individually compress the load files using gzip, lzop, or bzip2 to efficiently load large datasets.

When loading multiple files into a single table, use a single COPY command for the table, rather than multiple COPY commands. Amazon Redshift automatically parallelizes the data ingestion. Using a single COPY command to bulk load data into a table ensures optimal use of cluster resources, and quickest possible throughput.

2. Use workload management to improve ETL runtimes

Use Amazon Redshift’s workload management (WLM) to define multiple queues dedicated to different workloads (for example, ETL versus reporting) and to manage the runtimes of queries. As you migrate more workloads into Amazon Redshift, your ETL runtimes can become inconsistent if WLM is not appropriately set up.

I recommend limiting the overall concurrency of WLM across all queues to around 15 or less. This WLM guide helps you organize and monitor the different queues for your Amazon Redshift cluster.

When managing different workloads on your Amazon Redshift cluster, consider the following for the queue setup:

  • Create a queue dedicated to your ETL processes. Configure this queue with a small number of slots (5 or fewer). Amazon Redshift is designed for analytics queries, rather than transaction processing. The cost of COMMIT is relatively high, and excessive use of COMMIT can result in queries waiting for access to the commit queue. Because ETL is a commit-intensive process, having a separate queue with a small number of slots helps mitigate this issue.
  • Claim extra memory available in a queue. When executing an ETL query, you can take advantage of the wlm_query_slot_count to claim the extra memory available in a particular queue. For example, a typical ETL process might involve COPYing raw data into a staging table so that downstream ETL jobs can run transformations that calculate daily, weekly, and monthly aggregates. To speed up the COPY process (so that the downstream tasks can start in parallel sooner), the wlm_query_slot_count can be increased for this step.
  • Create a separate queue for reporting queries. Configure query monitoring rules on this queue to further manage long-running and expensive queries.
  • Take advantage of the dynamic memory parameters. They swap the memory from your ETL to your reporting queue after the ETL job has completed.

3. Perform table maintenance regularly

Amazon Redshift is a columnar database, which enables fast transformations for aggregating data. Performing regular table maintenance ensures that transformation ETLs are predictable and performant. To get the best performance from your Amazon Redshift database, you must ensure that database tables regularly are VACUUMed and ANALYZEd. The Analyze & Vacuum schema utility helps you automate the table maintenance task and have VACUUM & ANALYZE executed in a regular fashion.

  • Use VACUUM to sort tables and remove deleted blocks

During a typical ETL refresh process, tables receive new incoming records using COPY, and unneeded data (cold data) is removed using DELETE. New rows are added to the unsorted region in a table. Deleted rows are simply marked for deletion.

DELETE does not automatically reclaim the space occupied by the deleted rows. Adding and removing large numbers of rows can therefore cause the unsorted region and the number of deleted blocks to grow. This can degrade the performance of queries executed against these tables.

After an ETL process completes, perform VACUUM to ensure that user queries execute in a consistent manner. The complete list of tables that need VACUUMing can be found using the Amazon Redshift Util’s table_info script.

Use the following approaches to ensure that VACCUM is completed in a timely manner:

  • Use wlm_query_slot_count to claim all the memory allocated in the ETL WLM queue during the VACUUM process.
  • DROP or TRUNCATE intermediate or staging tables, thereby eliminating the need to VACUUM them.
  • If your table has a compound sort key with only one sort column, try to load your data in sort key order. This helps reduce or eliminate the need to VACUUM the table.
  • Consider using time series This helps reduce the amount of data you need to VACUUM.
  • Use ANALYZE to update database statistics

Amazon Redshift uses a cost-based query planner and optimizer using statistics about tables to make good decisions about the query plan for the SQL statements. Regular statistics collection after the ETL completion ensures that user queries run fast, and that daily ETL processes are performant. The Amazon Redshift utility table_info script provides insights into the freshness of the statistics. Keeping the statistics off (pct_stats_off) less than 20% ensures effective query plans for the SQL queries.

4. Perform multiple steps in a single transaction

ETL transformation logic often spans multiple steps. Because commits in Amazon Redshift are expensive, if each ETL step performs a commit, multiple concurrent ETL processes can take a long time to execute.

To minimize the number of commits in a process, the steps in an ETL script should be surrounded by a BEGIN…END statement so that a single commit is performed only after all the transformation logic has been executed. For example, here is an example multi-step ETL script that performs one commit at the end:

Begin
CREATE temporary staging_table;
INSERT INTO staging_table SELECT .. FROM source (transformation logic);
DELETE FROM daily_table WHERE dataset_date =?;
INSERT INTO daily_table SELECT .. FROM staging_table (daily aggregate);
DELETE FROM weekly_table WHERE weekending_date=?;
INSERT INTO weekly_table SELECT .. FROM staging_table(weekly aggregate);
Commit

5. Loading data in bulk

Amazon Redshift is designed to store and query petabyte-scale datasets. Using Amazon S3 you can stage and accumulate data from multiple source systems before executing a bulk COPY operation. The following methods allow efficient and fast transfer of these bulk datasets into Amazon Redshift:

  • Use a manifest file to ingest large datasets that span multiple files. The manifest file is a JSON file that lists all the files to be loaded into Amazon Redshift. Using a manifest file ensures that Amazon Redshift has a consistent view of the data to be loaded from S3, while also ensuring that duplicate files do not result in the same data being loaded more than one time.
  • Use temporary staging tables to hold the data for transformation. These tables are automatically dropped after the ETL session is complete. Temporary tables can be created using the CREATE TEMPORARY TABLE syntax, or by issuing a SELECT … INTO #TEMP_TABLE query. Explicitly specifying the CREATE TEMPORARY TABLE statement allows you to control the DISTRIBUTION KEY, SORT KEY, and compression settings to further improve performance.
  • User ALTER table APPEND to swap data from the staging tables to the target table. Data in the source table is moved to matching columns in the target table. Column order doesn’t matter. After data is successfully appended to the target table, the source table is empty. ALTER TABLE APPEND is much faster than a similar CREATE TABLE AS or INSERT INTO operation because it doesn’t involve copying or moving data.

6. Use UNLOAD to extract large result sets

Fetching a large number of rows using SELECT is expensive and takes a long time. When a large amount of data is fetched from the Amazon Redshift cluster, the leader node has to hold the data temporarily until the fetches are complete. Further, data is streamed out sequentially, which results in longer elapsed time. As a result, the leader node can become hot, which not only affects the SELECT that is being executed, but also throttles resources for creating execution plans and managing the overall cluster resources. Here is an example of a large SELECT statement. Notice that the leader node is doing most of the work to stream out the rows:

Use UNLOAD to extract large results sets directly to S3. After it’s in S3, the data can be shared with multiple downstream systems. By default, UNLOAD writes data in parallel to multiple files according to the number of slices in the cluster. All the compute nodes participate to quickly offload the data into S3.

If you are extracting data for use with Amazon Redshift Spectrum, you should make use of the MAXFILESIZE parameter to and keep files are 150 MB. Similar to item 1 above, having many evenly sized files ensures that Redshift Spectrum can do the maximum amount of work in parallel.

7. Use Redshift Spectrum for ad hoc ETL processing

Events such as data backfill, promotional activity, and special calendar days can trigger additional data volumes that affect the data refresh times in your Amazon Redshift cluster. To help address these spikes in data volumes and throughput, I recommend staging data in S3. After data is organized in S3, Redshift Spectrum enables you to query it directly using standard SQL. In this way, you gain the benefits of additional capacity without having to resize your cluster.

For tips on getting started with and optimizing the use of Redshift Spectrum, see the previous post, 10 Best Practices for Amazon Redshift Spectrum.

8. Monitor daily ETL health using diagnostic queries

Monitoring the health of your ETL processes on a regular basis helps identify the early onset of performance issues before they have a significant impact on your cluster. The following monitoring scripts can be used to provide insights into the health of your ETL processes:

Script Use when… Solution
commit_stats.sql – Commit queue statistics from past days, showing largest queue length and queue time first DML statements such as INSERT/UPDATE/COPY/DELETE operations take several times longer to execute when multiple of these operations are in progress Set up separate WLM queues for the ETL process and limit the concurrency to < 5.
copy_performance.sql –  Copy command statistics for the past days Daily COPY operations take longer to execute • Follow the best practices for the COPY command.
• Analyze data growth with the incoming datasets and consider cluster resize to meet the expected SLA.
table_info.sql – Table skew and unsorted statistics along with storage and key information Transformation steps take longer to execute • Set up regular VACCUM jobs to address unsorted rows and claim the deleted blocks so that transformation SQL execute optimally.
• Consider a table redesign to avoid data skewness.
v_check_transaction_locks.sql – Monitor transaction locks INSERT/UPDATE/COPY/DELETE operations on particular tables do not respond back in timely manner, compared to when run after the ETL Multiple DML statements are operating on the same target table at the same moment from different transactions. Set up ETL job dependency so that they execute serially for the same target table.
v_get_schema_priv_by_user.sql – Get the schema that the user has access to Reporting users can view intermediate tables Set up separate database groups for reporting and ETL users, and grants access to objects using GRANT.
v_generate_tbl_ddl.sql – Get the table DDL You need to create an empty table with same structure as target table for data backfill Generate DDL using this script for data backfill.
v_space_used_per_tbl.sql – monitor space used by individual tables Amazon Redshift data warehouse space growth is trending upwards more than normal

Analyze the individual tables that are growing at higher rate than normal. Consider data archival using UNLOAD to S3 and Redshift Spectrum for later analysis.

Use unscanned_table_summary.sql to find unused table and archive or drop them.

top_queries.sql – Return the top 50 time consuming statements aggregated by its text ETL transformations are taking longer to execute Analyze the top transformation SQL and use EXPLAIN to find opportunities for tuning the query plan.

There are several other useful scripts available in the amazon-redshift-utils repository. The AWS Lambda Utility Runner runs a subset of these scripts on a scheduled basis, allowing you to automate much of monitoring of your ETL processes.

Example ETL process

The following ETL process reinforces some of the best practices discussed in this post. Consider the following four-step daily ETL workflow where data from an RDBMS source system is staged in S3 and then loaded into Amazon Redshift. Amazon Redshift is used to calculate daily, weekly, and monthly aggregations, which are then unloaded to S3, where they can be further processed and made available for end-user reporting using a number of different tools, including Redshift Spectrum and Amazon Athena.

Step 1:  Extract from the RDBMS source to a S3 bucket

In this ETL process, the data extract job fetches change data every 1 hour and it is staged into multiple hourly files. For example, the staged S3 folder looks like the following:

 [[email protected] ~]$ aws s3 ls s3://<<S3 Bucket>>/batch/2017/07/02/
2017-07-02 01:59:58   81900220 20170702T01.export.gz
2017-07-02 02:59:56   84926844 20170702T02.export.gz
2017-07-02 03:59:54   78990356 20170702T03.export.gz
…
2017-07-02 22:00:03   75966745 20170702T21.export.gz
2017-07-02 23:00:02   89199874 20170702T22.export.gz
2017-07-02 00:59:59   71161715 20170702T23.export.gz

Organizing the data into multiple, evenly sized files enables the COPY command to ingest this data using all available resources in the Amazon Redshift cluster. Further, the files are compressed (gzipped) to further reduce COPY times.

Step 2: Stage data to the Amazon Redshift table for cleansing

Ingesting the data can be accomplished using a JSON-based manifest file. Using the manifest file ensures that S3 eventual consistency issues can be eliminated and also provides an opportunity to dedupe any files if needed. A sample manifest20170702.json file looks like the following:

{
  "entries": [
    {"url":" s3://<<S3 Bucket>>/batch/2017/07/02/20170702T01.export.gz", "mandatory":true},
    {"url":" s3://<<S3 Bucket>>/batch/2017/07/02/20170702T02.export.gz", "mandatory":true},
    …
    {"url":" s3://<<S3 Bucket>>/batch/2017/07/02/20170702T23.export.gz", "mandatory":true}
  ]
}

The data can be ingested using the following command:

SET wlm_query_slot_count TO <<max available concurrency in the ETL queue>>;
COPY stage_tbl FROM 's3:// <<S3 Bucket>>/batch/manifest20170702.json' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole' manifest;

Because the downstream ETL processes depend on this COPY command to complete, the wlm_query_slot_count is used to claim all the memory available to the queue. This helps the COPY command complete as quickly as possible.

Step 3: Transform data to create daily, weekly, and monthly datasets and load into target tables

Data is staged in the “stage_tbl” from where it can be transformed into the daily, weekly, and monthly aggregates and loaded into target tables. The following job illustrates a typical weekly process:

Begin
INSERT into ETL_LOG (..) values (..);
DELETE from weekly_tbl where dataset_week = <<current week>>;
INSERT into weekly_tbl (..)
  SELECT date_trunc('week', dataset_day) AS week_begin_dataset_date, SUM(C1) AS C1, SUM(C2) AS C2
	FROM   stage_tbl
GROUP BY date_trunc('week', dataset_day);
INSERT into AUDIT_LOG values (..);
COMMIT;
End;

As shown above, multiple steps are combined into one transaction to perform a single commit, reducing contention on the commit queue.

Step 4: Unload the daily dataset to populate the S3 data lake bucket

The transformed results are now unloaded into another S3 bucket, where they can be further processed and made available for end-user reporting using a number of different tools, including Redshift Spectrum and Amazon Athena.

unload ('SELECT * FROM weekly_tbl WHERE dataset_week = <<current week>>’) TO 's3:// <<S3 Bucket>>/datalake/weekly/20170526/' iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole';

Summary

Amazon Redshift lets you easily operate petabyte-scale data warehouses on the cloud. This post summarized the best practices for operating scalable ETL natively within Amazon Redshift. I demonstrated efficient ways to ingest and transform data, along with close monitoring. I also demonstrated the best practices being used in a typical sample ETL workload to transform the data into Amazon Redshift.

If you have questions or suggestions, please comment below.

 


About the Author

Thiyagarajan Arumugam is a Big Data Solutions Architect at Amazon Web Services and designs customer architectures to process data at scale. Prior to AWS, he built data warehouse solutions at Amazon.com. In his free time, he enjoys all outdoor sports and practices the Indian classical drum mridangam.

 

AWS Glue Now Supports Scala Scripts

Post Syndicated from Mehul Shah original https://aws.amazon.com/blogs/big-data/aws-glue-now-supports-scala-scripts/

We are excited to announce AWS Glue support for running ETL (extract, transform, and load) scripts in Scala. Scala lovers can rejoice because they now have one more powerful tool in their arsenal. Scala is the native language for Apache Spark, the underlying engine that AWS Glue offers for performing data transformations.

Beyond its elegant language features, writing Scala scripts for AWS Glue has two main advantages over writing scripts in Python. First, Scala is faster for custom transformations that do a lot of heavy lifting because there is no need to shovel data between Python and Apache Spark’s Scala runtime (that is, the Java virtual machine, or JVM). You can build your own transformations or invoke functions in third-party libraries. Second, it’s simpler to call functions in external Java class libraries from Scala because Scala is designed to be Java-compatible. It compiles to the same bytecode, and its data structures don’t need to be converted.

To illustrate these benefits, we walk through an example that analyzes a recent sample of the GitHub public timeline available from the GitHub archive. This site is an archive of public requests to the GitHub service, recording more than 35 event types ranging from commits and forks to issues and comments.

This post shows how to build an example Scala script that identifies highly negative issues in the timeline. It pulls out issue events in the timeline sample, analyzes their titles using the sentiment prediction functions from the Stanford CoreNLP libraries, and surfaces the most negative issues.

Getting started

Before we start writing scripts, we use AWS Glue crawlers to get a sense of the data—its structure and characteristics. We also set up a development endpoint and attach an Apache Zeppelin notebook, so we can interactively explore the data and author the script.

Crawl the data

The dataset used in this example was downloaded from the GitHub archive website into our sample dataset bucket in Amazon S3, and copied to the following locations:

s3://aws-glue-datasets-<region>/examples/scala-blog/githubarchive/data/

Choose the best folder by replacing <region> with the region that you’re working in, for example, us-east-1. Crawl this folder, and put the results into a database named githubarchive in the AWS Glue Data Catalog, as described in the AWS Glue Developer Guide. This folder contains 12 hours of the timeline from January 22, 2017, and is organized hierarchically (that is, partitioned) by year, month, and day.

When finished, use the AWS Glue console to navigate to the table named data in the githubarchive database. Notice that this data has eight top-level columns, which are common to each event type, and three partition columns that correspond to year, month, and day.

Choose the payload column, and you will notice that it has a complex schema—one that reflects the union of the payloads of event types that appear in the crawled data. Also note that the schema that crawlers generate is a subset of the true schema because they sample only a subset of the data.

Set up the library, development endpoint, and notebook

Next, you need to download and set up the libraries that estimate the sentiment in a snippet of text. The Stanford CoreNLP libraries contain a number of human language processing tools, including sentiment prediction.

Download the Stanford CoreNLP libraries. Unzip the .zip file, and you’ll see a directory full of jar files. For this example, the following jars are required:

  • stanford-corenlp-3.8.0.jar
  • stanford-corenlp-3.8.0-models.jar
  • ejml-0.23.jar

Upload these files to an Amazon S3 path that is accessible to AWS Glue so that it can load these libraries when needed. For this example, they are in s3://glue-sample-other/corenlp/.

Development endpoints are static Spark-based environments that can serve as the backend for data exploration. You can attach notebooks to these endpoints to interactively send commands and explore and analyze your data. These endpoints have the same configuration as that of AWS Glue’s job execution system. So, commands and scripts that work there also work the same when registered and run as jobs in AWS Glue.

To set up an endpoint and a Zeppelin notebook to work with that endpoint, follow the instructions in the AWS Glue Developer Guide. When you are creating an endpoint, be sure to specify the locations of the previously mentioned jars in the Dependent jars path as a comma-separated list. Otherwise, the libraries will not be loaded.

After you set up the notebook server, go to the Zeppelin notebook by choosing Dev Endpoints in the left navigation pane on the AWS Glue console. Choose the endpoint that you created. Next, choose the Notebook Server URL, which takes you to the Zeppelin server. Log in using the notebook user name and password that you specified when creating the notebook. Finally, create a new note to try out this example.

Each notebook is a collection of paragraphs, and each paragraph contains a sequence of commands and the output for that command. Moreover, each notebook includes a number of interpreters. If you set up the Zeppelin server using the console, the (Python-based) pyspark and (Scala-based) spark interpreters are already connected to your new development endpoint, with pyspark as the default. Therefore, throughout this example, you need to prepend %spark at the top of your paragraphs. In this example, we omit these for brevity.

Working with the data

In this section, we use AWS Glue extensions to Spark to work with the dataset. We look at the actual schema of the data and filter out the interesting event types for our analysis.

Start with some boilerplate code to import libraries that you need:

%spark

import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.types._
import org.apache.spark.SparkContext

Then, create the Spark and AWS Glue contexts needed for working with the data:

@transient val spark: SparkContext = SparkContext.getOrCreate()
val glueContext: GlueContext = new GlueContext(spark)

You need the transient decorator on the SparkContext when working in Zeppelin; otherwise, you will run into a serialization error when executing commands.

Dynamic frames

This section shows how to create a dynamic frame that contains the GitHub records in the table that you crawled earlier. A dynamic frame is the basic data structure in AWS Glue scripts. It is like an Apache Spark data frame, except that it is designed and optimized for data cleaning and transformation workloads. A dynamic frame is well-suited for representing semi-structured datasets like the GitHub timeline.

A dynamic frame is a collection of dynamic records. In Spark lingo, it is an RDD (resilient distributed dataset) of DynamicRecords. A dynamic record is a self-describing record. Each record encodes its columns and types, so every record can have a schema that is unique from all others in the dynamic frame. This is convenient and often more efficient for datasets like the GitHub timeline, where payloads can vary drastically from one event type to another.

The following creates a dynamic frame, github_events, from your table:

val github_events = glueContext
                    .getCatalogSource(database = "githubarchive", tableName = "data")
                    .getDynamicFrame()

The getCatalogSource() method returns a DataSource, which represents a particular table in the Data Catalog. The getDynamicFrame() method returns a dynamic frame from the source.

Recall that the crawler created a schema from only a sample of the data. You can scan the entire dataset, count the rows, and print the complete schema as follows:

github_events.count
github_events.printSchema()

The result looks like the following:

The data has 414,826 records. As before, notice that there are eight top-level columns, and three partition columns. If you scroll down, you’ll also notice that the payload is the most complex column.

Run functions and filter records

This section describes how you can create your own functions and invoke them seamlessly to filter records. Unlike filtering with Python lambdas, Scala scripts do not need to convert records from one language representation to another, thereby reducing overhead and running much faster.

Let’s create a function that picks only the IssuesEvents from the GitHub timeline. These events are generated whenever someone posts an issue for a particular repository. Each GitHub event record has a field, “type”, that indicates the kind of event it is. The issueFilter() function returns true for records that are IssuesEvents.

def issueFilter(rec: DynamicRecord): Boolean = { 
    rec.getField("type").exists(_ == "IssuesEvent") 
}

Note that the getField() method returns an Option[Any] type, so you first need to check that it exists before checking the type.

You pass this function to the filter transformation, which applies the function on each record and returns a dynamic frame of those records that pass.

val issue_events =  github_events.filter(issueFilter)

Now, let’s look at the size and schema of issue_events.

issue_events.count
issue_events.printSchema()

It’s much smaller (14,063 records), and the payload schema is less complex, reflecting only the schema for issues. Keep a few essential columns for your analysis, and drop the rest using the ApplyMapping() transform:

val issue_titles = issue_events.applyMapping(Seq(("id", "string", "id", "string"),
                                                 ("actor.login", "string", "actor", "string"), 
                                                 ("repo.name", "string", "repo", "string"),
                                                 ("payload.action", "string", "action", "string"),
                                                 ("payload.issue.title", "string", "title", "string")))
issue_titles.show()

The ApplyMapping() transform is quite handy for renaming columns, casting types, and restructuring records. The preceding code snippet tells the transform to select the fields (or columns) that are enumerated in the left half of the tuples and map them to the fields and types in the right half.

Estimating sentiment using Stanford CoreNLP

To focus on the most pressing issues, you might want to isolate the records with the most negative sentiments. The Stanford CoreNLP libraries are Java-based and offer sentiment-prediction functions. Accessing these functions through Python is possible, but quite cumbersome. It requires creating Python surrogate classes and objects for those found on the Java side. Instead, with Scala support, you can use those classes and objects directly and invoke their methods. Let’s see how.

First, import the libraries needed for the analysis:

import java.util.Properties
import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import scala.collection.convert.wrapAll._

The Stanford CoreNLP libraries have a main driver that orchestrates all of their analysis. The driver setup is heavyweight, setting up threads and data structures that are shared across analyses. Apache Spark runs on a cluster with a main driver process and a collection of backend executor processes that do most of the heavy sifting of the data.

The Stanford CoreNLP shared objects are not serializable, so they cannot be distributed easily across a cluster. Instead, you need to initialize them once for every backend executor process that might need them. Here is how to accomplish that:

val props = new Properties()
props.setProperty("annotators", "tokenize, ssplit, parse, sentiment")
props.setProperty("parse.maxlen", "70")

object myNLP {
    lazy val coreNLP = new StanfordCoreNLP(props)
}

The properties tell the libraries which annotators to execute and how many words to process. The preceding code creates an object, myNLP, with a field coreNLP that is lazily evaluated. This field is initialized only when it is needed, and only once. So, when the backend executors start processing the records, each executor initializes the driver for the Stanford CoreNLP libraries only one time.

Next is a function that estimates the sentiment of a text string. It first calls Stanford CoreNLP to annotate the text. Then, it pulls out the sentences and takes the average sentiment across all the sentences. The sentiment is a double, from 0.0 as the most negative to 4.0 as the most positive.

def estimatedSentiment(text: String): Double = {
    if ((text == null) || (!text.nonEmpty)) { return Double.NaN }
    val annotations = myNLP.coreNLP.process(text)
    val sentences = annotations.get(classOf[CoreAnnotations.SentencesAnnotation])
    sentences.foldLeft(0.0)( (csum, x) => { 
        csum + RNNCoreAnnotations.getPredictedClass(x.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])) 
    }) / sentences.length
}

Now, let’s estimate the sentiment of the issue titles and add that computed field as part of the records. You can accomplish this with the map() method on dynamic frames:

val issue_sentiments = issue_titles.map((rec: DynamicRecord) => { 
    val mbody = rec.getField("title")
    mbody match {
        case Some(mval: String) => { 
            rec.addField("sentiment", ScalarNode(estimatedSentiment(mval)))
            rec }
        case _ => rec
    }
})

The map() method applies the user-provided function on every record. The function takes a DynamicRecord as an argument and returns a DynamicRecord. The code above computes the sentiment, adds it in a top-level field, sentiment, to the record, and returns the record.

Count the records with sentiment and show the schema. This takes a few minutes because Spark must initialize the library and run the sentiment analysis, which can be involved.

issue_sentiments.count
issue_sentiments.printSchema()

Notice that all records were processed (14,063), and the sentiment value was added to the schema.

Finally, let’s pick out the titles that have the lowest sentiment (less than 1.5). Count them and print out a sample to see what some of the titles look like.

val pressing_issues = issue_sentiments.filter(_.getField("sentiment").exists(_.asInstanceOf[Double] < 1.5))
pressing_issues.count
pressing_issues.show(10)

Next, write them all to a file so that you can handle them later. (You’ll need to replace the output path with your own.)

glueContext.getSinkWithFormat(connectionType = "s3", 
                              options = JsonOptions("""{"path": "s3://<bucket>/out/path/"}"""), 
                              format = "json")
            .writeDynamicFrame(pressing_issues)

Take a look in the output path, and you can see the output files.

Putting it all together

Now, let’s create a job from the preceding interactive session. The following script combines all the commands from earlier. It processes the GitHub archive files and writes out the highly negative issues:

import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.types._
import org.apache.spark.SparkContext
import java.util.Properties
import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import scala.collection.convert.wrapAll._

object GlueApp {

    object myNLP {
        val props = new Properties()
        props.setProperty("annotators", "tokenize, ssplit, parse, sentiment")
        props.setProperty("parse.maxlen", "70")

        lazy val coreNLP = new StanfordCoreNLP(props)
    }

    def estimatedSentiment(text: String): Double = {
        if ((text == null) || (!text.nonEmpty)) { return Double.NaN }
        val annotations = myNLP.coreNLP.process(text)
        val sentences = annotations.get(classOf[CoreAnnotations.SentencesAnnotation])
        sentences.foldLeft(0.0)( (csum, x) => { 
            csum + RNNCoreAnnotations.getPredictedClass(x.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])) 
        }) / sentences.length
    }

    def main(sysArgs: Array[String]) {
        val spark: SparkContext = SparkContext.getOrCreate()
        val glueContext: GlueContext = new GlueContext(spark)

        val dbname = "githubarchive"
        val tblname = "data"
        val outpath = "s3://<bucket>/out/path/"

        val github_events = glueContext
                            .getCatalogSource(database = dbname, tableName = tblname)
                            .getDynamicFrame()

        val issue_events =  github_events.filter((rec: DynamicRecord) => {
            rec.getField("type").exists(_ == "IssuesEvent")
        })

        val issue_titles = issue_events.applyMapping(Seq(("id", "string", "id", "string"),
                                                         ("actor.login", "string", "actor", "string"), 
                                                         ("repo.name", "string", "repo", "string"),
                                                         ("payload.action", "string", "action", "string"),
                                                         ("payload.issue.title", "string", "title", "string")))

        val issue_sentiments = issue_titles.map((rec: DynamicRecord) => { 
            val mbody = rec.getField("title")
            mbody match {
                case Some(mval: String) => { 
                    rec.addField("sentiment", ScalarNode(estimatedSentiment(mval)))
                    rec }
                case _ => rec
            }
        })

        val pressing_issues = issue_sentiments.filter(_.getField("sentiment").exists(_.asInstanceOf[Double] < 1.5))

        glueContext.getSinkWithFormat(connectionType = "s3", 
                              options = JsonOptions(s"""{"path": "$outpath"}"""), 
                              format = "json")
                    .writeDynamicFrame(pressing_issues)
    }
}

Notice that the script is enclosed in a top-level object called GlueApp, which serves as the script’s entry point for the job. (You’ll need to replace the output path with your own.) Upload the script to an Amazon S3 location so that AWS Glue can load it when needed.

To create the job, open the AWS Glue console. Choose Jobs in the left navigation pane, and then choose Add job. Create a name for the job, and specify a role with permissions to access the data. Choose An existing script that you provide, and choose Scala as the language.

For the Scala class name, type GlueApp to indicate the script’s entry point. Specify the Amazon S3 location of the script.

Choose Script libraries and job parameters. In the Dependent jars path field, enter the Amazon S3 locations of the Stanford CoreNLP libraries from earlier as a comma-separated list (without spaces). Then choose Next.

No connections are needed for this job, so choose Next again. Review the job properties, and choose Finish. Finally, choose Run job to execute the job.

You can simply edit the script’s input table and output path to run this job on whatever GitHub timeline datasets that you might have.

Conclusion

In this post, we showed how to write AWS Glue ETL scripts in Scala via notebooks and how to run them as jobs. Scala has the advantage that it is the native language for the Spark runtime. With Scala, it is easier to call Scala or Java functions and third-party libraries for analyses. Moreover, data processing is faster in Scala because there’s no need to convert records from one language runtime to another.

You can find more example of Scala scripts in our GitHub examples repository: https://github.com/awslabs/aws-glue-samples. We encourage you to experiment with Scala scripts and let us know about any interesting ETL flows that you want to share.

Happy Glue-ing!

 


Additional Reading

If you found this post useful, be sure to check out Simplify Querying Nested JSON with the AWS Glue Relationalize Transform and Genomic Analysis with Hail on Amazon EMR and Amazon Athena.

 


About the Authors

Mehul Shah is a senior software manager for AWS Glue. His passion is leveraging the cloud to build smarter, more efficient, and easier to use data systems. He has three girls, and, therefore, he has no spare time.

 

 

 

Ben Sowell is a software development engineer at AWS Glue.

 

 

 

 
Vinay Vivili is a software development engineer for AWS Glue.

 

 

 

Combine Transactional and Analytical Data Using Amazon Aurora and Amazon Redshift

Post Syndicated from Re Alvarez-Parmar original https://aws.amazon.com/blogs/big-data/combine-transactional-and-analytical-data-using-amazon-aurora-and-amazon-redshift/

A few months ago, we published a blog post about capturing data changes in an Amazon Aurora database and sending it to Amazon Athena and Amazon QuickSight for fast analysis and visualization. In this post, I want to demonstrate how easy it can be to take the data in Aurora and combine it with data in Amazon Redshift using Amazon Redshift Spectrum.

With Amazon Redshift, you can build petabyte-scale data warehouses that unify data from a variety of internal and external sources. Because Amazon Redshift is optimized for complex queries (often involving multiple joins) across large tables, it can handle large volumes of retail, inventory, and financial data without breaking a sweat.

In this post, we describe how to combine data in Aurora in Amazon Redshift. Here’s an overview of the solution:

  • Use AWS Lambda functions with Amazon Aurora to capture data changes in a table.
  • Save data in an Amazon S3
  • Query data using Amazon Redshift Spectrum.

We use the following services:

Serverless architecture for capturing and analyzing Aurora data changes

Consider a scenario in which an e-commerce web application uses Amazon Aurora for a transactional database layer. The company has a sales table that captures every single sale, along with a few corresponding data items. This information is stored as immutable data in a table. Business users want to monitor the sales data and then analyze and visualize it.

In this example, you take the changes in data in an Aurora database table and save it in Amazon S3. After the data is captured in Amazon S3, you combine it with data in your existing Amazon Redshift cluster for analysis.

By the end of this post, you will understand how to capture data events in an Aurora table and push them out to other AWS services using AWS Lambda.

The following diagram shows the flow of data as it occurs in this tutorial:

The starting point in this architecture is a database insert operation in Amazon Aurora. When the insert statement is executed, a custom trigger calls a Lambda function and forwards the inserted data. Lambda writes the data that it received from Amazon Aurora to a Kinesis data delivery stream. Kinesis Data Firehose writes the data to an Amazon S3 bucket. Once the data is in an Amazon S3 bucket, it is queried in place using Amazon Redshift Spectrum.

Creating an Aurora database

First, create a database by following these steps in the Amazon RDS console:

  1. Sign in to the AWS Management Console, and open the Amazon RDS console.
  2. Choose Launch a DB instance, and choose Next.
  3. For Engine, choose Amazon Aurora.
  4. Choose a DB instance class. This example uses a small, since this is not a production database.
  5. In Multi-AZ deployment, choose No.
  6. Configure DB instance identifier, Master username, and Master password.
  7. Launch the DB instance.

After you create the database, use MySQL Workbench to connect to the database using the CNAME from the console. For information about connecting to an Aurora database, see Connecting to an Amazon Aurora DB Cluster.

The following screenshot shows the MySQL Workbench configuration:

Next, create a table in the database by running the following SQL statement:

Create Table
CREATE TABLE Sales (
InvoiceID int NOT NULL AUTO_INCREMENT,
ItemID int NOT NULL,
Category varchar(255),
Price double(10,2), 
Quantity int not NULL,
OrderDate timestamp,
DestinationState varchar(2),
ShippingType varchar(255),
Referral varchar(255),
PRIMARY KEY (InvoiceID)
)

You can now populate the table with some sample data. To generate sample data in your table, copy and run the following script. Ensure that the highlighted (bold) variables are replaced with appropriate values.

#!/usr/bin/python
import MySQLdb
import random
import datetime

db = MySQLdb.connect(host="AURORA_CNAME",
                     user="DBUSER",
                     passwd="DBPASSWORD",
                     db="DB")

states = ("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI","ID","IL","IN",
"IA","KS","KY","LA","ME","MD","MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ",
"NM","NY","NC","ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT","VT","VA",
"WA","WV","WI","WY")

shipping_types = ("Free", "3-Day", "2-Day")

product_categories = ("Garden", "Kitchen", "Office", "Household")
referrals = ("Other", "Friend/Colleague", "Repeat Customer", "Online Ad")

for i in range(0,10):
    item_id = random.randint(1,100)
    state = states[random.randint(0,len(states)-1)]
    shipping_type = shipping_types[random.randint(0,len(shipping_types)-1)]
    product_category = product_categories[random.randint(0,len(product_categories)-1)]
    quantity = random.randint(1,4)
    referral = referrals[random.randint(0,len(referrals)-1)]
    price = random.randint(1,100)
    order_date = datetime.date(2016,random.randint(1,12),random.randint(1,30)).isoformat()

    data_order = (item_id, product_category, price, quantity, order_date, state,
    shipping_type, referral)

    add_order = ("INSERT INTO Sales "
                   "(ItemID, Category, Price, Quantity, OrderDate, DestinationState, \
                   ShippingType, Referral) "
                   "VALUES (%s, %s, %s, %s, %s, %s, %s, %s)")

    cursor = db.cursor()
    cursor.execute(add_order, data_order)

    db.commit()

cursor.close()
db.close() 

The following screenshot shows how the table appears with the sample data:

Sending data from Amazon Aurora to Amazon S3

There are two methods available to send data from Amazon Aurora to Amazon S3:

  • Using a Lambda function
  • Using SELECT INTO OUTFILE S3

To demonstrate the ease of setting up integration between multiple AWS services, we use a Lambda function to send data to Amazon S3 using Amazon Kinesis Data Firehose.

Alternatively, you can use a SELECT INTO OUTFILE S3 statement to query data from an Amazon Aurora DB cluster and save it directly in text files that are stored in an Amazon S3 bucket. However, with this method, there is a delay between the time that the database transaction occurs and the time that the data is exported to Amazon S3 because the default file size threshold is 6 GB.

Creating a Kinesis data delivery stream

The next step is to create a Kinesis data delivery stream, since it’s a dependency of the Lambda function.

To create a delivery stream:

  1. Open the Kinesis Data Firehose console
  2. Choose Create delivery stream.
  3. For Delivery stream name, type AuroraChangesToS3.
  4. For Source, choose Direct PUT.
  5. For Record transformation, choose Disabled.
  6. For Destination, choose Amazon S3.
  7. In the S3 bucket drop-down list, choose an existing bucket, or create a new one.
  8. Enter a prefix if needed, and choose Next.
  9. For Data compression, choose GZIP.
  10. In IAM role, choose either an existing role that has access to write to Amazon S3, or choose to generate one automatically. Choose Next.
  11. Review all the details on the screen, and choose Create delivery stream when you’re finished.

 

Creating a Lambda function

Now you can create a Lambda function that is called every time there is a change that needs to be tracked in the database table. This Lambda function passes the data to the Kinesis data delivery stream that you created earlier.

To create the Lambda function:

  1. Open the AWS Lambda console.
  2. Ensure that you are in the AWS Region where your Amazon Aurora database is located.
  3. If you have no Lambda functions yet, choose Get started now. Otherwise, choose Create function.
  4. Choose Author from scratch.
  5. Give your function a name and select Python 3.6 for Runtime
  6. Choose and existing or create a new Role, the role would need to have access to call firehose:PutRecord
  7. Choose Next on the trigger selection screen.
  8. Paste the following code in the code window. Change the stream_name variable to the Kinesis data delivery stream that you created in the previous step.
  9. Choose File -> Save in the code editor and then choose Save.
import boto3
import json

firehose = boto3.client('firehose')
stream_name = ‘AuroraChangesToS3’


def Kinesis_publish_message(event, context):
    
    firehose_data = (("%s,%s,%s,%s,%s,%s,%s,%s\n") %(event['ItemID'], 
    event['Category'], event['Price'], event['Quantity'],
    event['OrderDate'], event['DestinationState'], event['ShippingType'], 
    event['Referral']))
    
    firehose_data = {'Data': str(firehose_data)}
    print(firehose_data)
    
    firehose.put_record(DeliveryStreamName=stream_name,
    Record=firehose_data)

Note the Amazon Resource Name (ARN) of this Lambda function.

Giving Aurora permissions to invoke a Lambda function

To give Amazon Aurora permissions to invoke a Lambda function, you must attach an IAM role with appropriate permissions to the cluster. For more information, see Invoking a Lambda Function from an Amazon Aurora DB Cluster.

Once you are finished, the Amazon Aurora database has access to invoke a Lambda function.

Creating a stored procedure and a trigger in Amazon Aurora

Now, go back to MySQL Workbench, and run the following command to create a new stored procedure. When this stored procedure is called, it invokes the Lambda function you created. Change the ARN in the following code to your Lambda function’s ARN.

DROP PROCEDURE IF EXISTS CDC_TO_FIREHOSE;
DELIMITER ;;
CREATE PROCEDURE CDC_TO_FIREHOSE (IN ItemID VARCHAR(255), 
									IN Category varchar(255), 
									IN Price double(10,2),
                                    IN Quantity int(11),
                                    IN OrderDate timestamp,
                                    IN DestinationState varchar(2),
                                    IN ShippingType varchar(255),
                                    IN Referral  varchar(255)) LANGUAGE SQL 
BEGIN
  CALL mysql.lambda_async('arn:aws:lambda:us-east-1:XXXXXXXXXXXXX:function:CDCFromAuroraToKinesis', 
     CONCAT('{ "ItemID" : "', ItemID, 
            '", "Category" : "', Category,
            '", "Price" : "', Price,
            '", "Quantity" : "', Quantity, 
            '", "OrderDate" : "', OrderDate, 
            '", "DestinationState" : "', DestinationState, 
            '", "ShippingType" : "', ShippingType, 
            '", "Referral" : "', Referral, '"}')
     );
END
;;
DELIMITER ;

Create a trigger TR_Sales_CDC on the Sales table. When a new record is inserted, this trigger calls the CDC_TO_FIREHOSE stored procedure.

DROP TRIGGER IF EXISTS TR_Sales_CDC;
 
DELIMITER ;;
CREATE TRIGGER TR_Sales_CDC
  AFTER INSERT ON Sales
  FOR EACH ROW
BEGIN
  SELECT  NEW.ItemID , NEW.Category, New.Price, New.Quantity, New.OrderDate
  , New.DestinationState, New.ShippingType, New.Referral
  INTO @ItemID , @Category, @Price, @Quantity, @OrderDate
  , @DestinationState, @ShippingType, @Referral;
  CALL  CDC_TO_FIREHOSE(@ItemID , @Category, @Price, @Quantity, @OrderDate
  , @DestinationState, @ShippingType, @Referral);
END
;;
DELIMITER ;

If a new row is inserted in the Sales table, the Lambda function that is mentioned in the stored procedure is invoked.

Verify that data is being sent from the Lambda function to Kinesis Data Firehose to Amazon S3 successfully. You might have to insert a few records, depending on the size of your data, before new records appear in Amazon S3. This is due to Kinesis Data Firehose buffering. To learn more about Kinesis Data Firehose buffering, see the “Amazon S3” section in Amazon Kinesis Data Firehose Data Delivery.

Every time a new record is inserted in the sales table, a stored procedure is called, and it updates data in Amazon S3.

Querying data in Amazon Redshift

In this section, you use the data you produced from Amazon Aurora and consume it as-is in Amazon Redshift. In order to allow you to process your data as-is, where it is, while taking advantage of the power and flexibility of Amazon Redshift, you use Amazon Redshift Spectrum. You can use Redshift Spectrum to run complex queries on data stored in Amazon S3, with no need for loading or other data prep.

Just create a data source and issue your queries to your Amazon Redshift cluster as usual. Behind the scenes, Redshift Spectrum scales to thousands of instances on a per-query basis, ensuring that you get fast, consistent performance even as your dataset grows to beyond an exabyte! Being able to query data that is stored in Amazon S3 means that you can scale your compute and your storage independently. You have the full power of the Amazon Redshift query model and all the reporting and business intelligence tools at your disposal. Your queries can reference any combination of data stored in Amazon Redshift tables and in Amazon S3.

Redshift Spectrum supports open, common data types, including CSV/TSV, Apache Parquet, SequenceFile, and RCFile. Files can be compressed using gzip or Snappy, with other data types and compression methods in the works.

First, create an Amazon Redshift cluster. Follow the steps in Launch a Sample Amazon Redshift Cluster.

Next, create an IAM role that has access to Amazon S3 and Athena. By default, Amazon Redshift Spectrum uses the Amazon Athena data catalog. Your cluster needs authorization to access your external data catalog in AWS Glue or Athena and your data files in Amazon S3.

In the demo setup, I attached AmazonS3FullAccess and AmazonAthenaFullAccess. In a production environment, the IAM roles should follow the standard security of granting least privilege. For more information, see IAM Policies for Amazon Redshift Spectrum.

Attach the newly created role to the Amazon Redshift cluster. For more information, see Associate the IAM Role with Your Cluster.

Next, connect to the Amazon Redshift cluster, and create an external schema and database:

create external schema if not exists spectrum_schema
from data catalog 
database 'spectrum_db' 
region 'us-east-1'
IAM_ROLE 'arn:aws:iam::XXXXXXXXXXXX:role/RedshiftSpectrumRole'
create external database if not exists;

Don’t forget to replace the IAM role in the statement.

Then create an external table within the database:

 CREATE EXTERNAL TABLE IF NOT EXISTS spectrum_schema.ecommerce_sales(
  ItemID int,
  Category varchar,
  Price DOUBLE PRECISION,
  Quantity int,
  OrderDate TIMESTAMP,
  DestinationState varchar,
  ShippingType varchar,
  Referral varchar)
ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
LOCATION 's3://{BUCKET_NAME}/CDC/'

Query the table, and it should contain data. This is a fact table.

select top 10 * from spectrum_schema.ecommerce_sales

 

Next, create a dimension table. For this example, we create a date/time dimension table. Create the table:

CREATE TABLE date_dimension (
  d_datekey           integer       not null sortkey,
  d_dayofmonth        integer       not null,
  d_monthnum          integer       not null,
  d_dayofweek                varchar(10)   not null,
  d_prettydate        date       not null,
  d_quarter           integer       not null,
  d_half              integer       not null,
  d_year              integer       not null,
  d_season            varchar(10)   not null,
  d_fiscalyear        integer       not null)
diststyle all;

Populate the table with data:

copy date_dimension from 's3://reparmar-lab/2016dates' 
iam_role 'arn:aws:iam::XXXXXXXXXXXX:role/redshiftspectrum'
DELIMITER ','
dateformat 'auto';

The date dimension table should look like the following:

Querying data in local and external tables using Amazon Redshift

Now that you have the fact and dimension table populated with data, you can combine the two and run analysis. For example, if you want to query the total sales amount by weekday, you can run the following:

select sum(quantity*price) as total_sales, date_dimension.d_season
from spectrum_schema.ecommerce_sales 
join date_dimension on spectrum_schema.ecommerce_sales.orderdate = date_dimension.d_prettydate 
group by date_dimension.d_season

You get the following results:

Similarly, you can replace d_season with d_dayofweek to get sales figures by weekday:

With Amazon Redshift Spectrum, you pay only for the queries you run against the data that you actually scan. We encourage you to use file partitioning, columnar data formats, and data compression to significantly minimize the amount of data scanned in Amazon S3. This is important for data warehousing because it dramatically improves query performance and reduces cost.

Partitioning your data in Amazon S3 by date, time, or any other custom keys enables Amazon Redshift Spectrum to dynamically prune nonrelevant partitions to minimize the amount of data processed. If you store data in a columnar format, such as Parquet, Amazon Redshift Spectrum scans only the columns needed by your query, rather than processing entire rows. Similarly, if you compress your data using one of the supported compression algorithms in Amazon Redshift Spectrum, less data is scanned.

Analyzing and visualizing Amazon Redshift data in Amazon QuickSight

Modify the Amazon Redshift security group to allow an Amazon QuickSight connection. For more information, see Authorizing Connections from Amazon QuickSight to Amazon Redshift Clusters.

After modifying the Amazon Redshift security group, go to Amazon QuickSight. Create a new analysis, and choose Amazon Redshift as the data source.

Enter the database connection details, validate the connection, and create the data source.

Choose the schema to be analyzed. In this case, choose spectrum_schema, and then choose the ecommerce_sales table.

Next, we add a custom field for Total Sales = Price*Quantity. In the drop-down list for the ecommerce_sales table, choose Edit analysis data sets.

On the next screen, choose Edit.

In the data prep screen, choose New Field. Add a new calculated field Total Sales $, which is the product of the Price*Quantity fields. Then choose Create. Save and visualize it.

Next, to visualize total sales figures by month, create a graph with Total Sales on the x-axis and Order Data formatted as month on the y-axis.

After you’ve finished, you can use Amazon QuickSight to add different columns from your Amazon Redshift tables and perform different types of visualizations. You can build operational dashboards that continuously monitor your transactional and analytical data. You can publish these dashboards and share them with others.

Final notes

Amazon QuickSight can also read data in Amazon S3 directly. However, with the method demonstrated in this post, you have the option to manipulate, filter, and combine data from multiple sources or Amazon Redshift tables before visualizing it in Amazon QuickSight.

In this example, we dealt with data being inserted, but triggers can be activated in response to an INSERT, UPDATE, or DELETE trigger.

Keep the following in mind:

  • Be careful when invoking a Lambda function from triggers on tables that experience high write traffic. This would result in a large number of calls to your Lambda function. Although calls to the lambda_async procedure are asynchronous, triggers are synchronous.
  • A statement that results in a large number of trigger activations does not wait for the call to the AWS Lambda function to complete. But it does wait for the triggers to complete before returning control to the client.
  • Similarly, you must account for Amazon Kinesis Data Firehose limits. By default, Kinesis Data Firehose is limited to a maximum of 5,000 records/second. For more information, see Monitoring Amazon Kinesis Data Firehose.

In certain cases, it may be optimal to use AWS Database Migration Service (AWS DMS) to capture data changes in Aurora and use Amazon S3 as a target. For example, AWS DMS might be a good option if you don’t need to transform data from Amazon Aurora. The method used in this post gives you the flexibility to transform data from Aurora using Lambda before sending it to Amazon S3. Additionally, the architecture has the benefits of being serverless, whereas AWS DMS requires an Amazon EC2 instance for replication.

For design considerations while using Redshift Spectrum, see Using Amazon Redshift Spectrum to Query External Data.

If you have questions or suggestions, please comment below.


Additional Reading

If you found this post useful, be sure to check out Capturing Data Changes in Amazon Aurora Using AWS Lambda and 10 Best Practices for Amazon Redshift Spectrum


About the Authors

Re Alvarez-Parmar is a solutions architect for Amazon Web Services. He helps enterprises achieve success through technical guidance and thought leadership. In his spare time, he enjoys spending time with his two kids and exploring outdoors.

 

 

 

Now Available: New Digital Training to Help You Learn About AWS Big Data Services

Post Syndicated from Sara Snedeker original https://aws.amazon.com/blogs/big-data/now-available-new-digital-training-to-help-you-learn-about-aws-big-data-services/

AWS Training and Certification recently released free digital training courses that will make it easier for you to build your cloud skills and learn about using AWS Big Data services. This training includes courses like Introduction to Amazon EMR and Introduction to Amazon Athena.

You can get free and unlimited access to more than 100 new digital training courses built by AWS experts at aws.training. It’s easy to access training related to big data. Just choose the Analytics category on our Find Training page to browse through the list of courses. You can also use the keyword filter to search for training for specific AWS offerings.

Recommended training

Just getting started, or looking to learn about a new service? Check out the following digital training courses:

Introduction to Amazon EMR (15 minutes)
Covers the available tools that can be used with Amazon EMR and the process of creating a cluster. It includes a demonstration of how to create an EMR cluster.

Introduction to Amazon Athena (10 minutes)
Introduces the Amazon Athena service along with an overview of its operating environment. It covers the basic steps in implementing Athena and provides a brief demonstration.

Introduction to Amazon QuickSight (10 minutes)
Discusses the benefits of using Amazon QuickSight and how the service works. It also includes a demonstration so that you can see Amazon QuickSight in action.

Introduction to Amazon Redshift (10 minutes)
Walks you through Amazon Redshift and its core features and capabilities. It also includes a quick overview of relevant use cases and a short demonstration.

Introduction to AWS Lambda (10 minutes)
Discusses the rationale for using AWS Lambda, how the service works, and how you can get started using it.

Introduction to Amazon Kinesis Analytics (10 minutes)
Discusses how Amazon Kinesis Analytics collects, processes, and analyzes streaming data in real time. It discusses how to use and monitor the service and explores some use cases.

Introduction to Amazon Kinesis Streams (15 minutes)
Covers how Amazon Kinesis Streams is used to collect, process, and analyze real-time streaming data to create valuable insights.

Introduction to AWS IoT (10 minutes)
Describes how the AWS Internet of Things (IoT) communication architecture works, and the components that make up AWS IoT. It discusses how AWS IoT works with other AWS services and reviews a case study.

Introduction to AWS Data Pipeline (10 minutes)
Covers components like tasks, task runner, and pipeline. It also discusses what a pipeline definition is, and reviews the AWS services that are compatible with AWS Data Pipeline.

Go deeper with classroom training

Want to learn more? Enroll in classroom training to learn best practices, get live feedback, and hear answers to your questions from an instructor.

Big Data on AWS (3 days)
Introduces you to cloud-based big data solutions such as Amazon EMR, Amazon Redshift, Amazon Kinesis, and the rest of the AWS big data platform.

Data Warehousing on AWS (3 days)
Introduces you to concepts, strategies, and best practices for designing a cloud-based data warehousing solution, and demonstrates how to collect, store, and prepare data for the data warehouse.

Building a Serverless Data Lake (1 day)
Teaches you how to design, build, and operate a serverless data lake solution with AWS services. Includes topics such as ingesting data from any data source at large scale, storing the data securely and durably, using the right tool to process large volumes of data, and understanding the options available for analyzing the data in near-real time.

More training coming in 2018

We’re always evaluating and expanding our training portfolio, so stay tuned for more training options in the new year. You can always visit us at aws.training to explore our latest offerings.

Simplify Querying Nested JSON with the AWS Glue Relationalize Transform

Post Syndicated from Trevor Roberts original https://aws.amazon.com/blogs/big-data/simplify-querying-nested-json-with-the-aws-glue-relationalize-transform/

AWS Glue has a transform called Relationalize that simplifies the extract, transform, load (ETL) process by converting nested JSON into columns that you can easily import into relational databases. Relationalize transforms the nested JSON into key-value pairs at the outermost level of the JSON document. The transformed data maintains a list of the original keys from the nested JSON separated by periods.

Let’s look at how Relationalize can help you with a sample use case.

An example of Relationalize in action

Suppose that the developers of a video game want to use a data warehouse like Amazon Redshift to run reports on player behavior based on data that is stored in JSON. Sample 1 shows example user data from the game. The player named “user1” has characteristics such as race, class, and location in nested JSON data. Further down, the player’s arsenal information includes additional nested JSON data. If the developers want to ETL this data into their data warehouse, they might have to resort to nested loops or recursive functions in their code.

Sample 1: Nested JSON

{
	"player": {
		"username": "user1",
		"characteristics": {
			"race": "Human",
			"class": "Warlock",
			"subclass": "Dawnblade",
			"power": 300,
			"playercountry": "USA"
		},
		"arsenal": {
			"kinetic": {
				"name": "Sweet Business",
				"type": "Auto Rifle",
				"power": 300,
				"element": "Kinetic"
			},
			"energy": {
				"name": "MIDA Mini-Tool",
				"type": "Submachine Gun",
				"power": 300,
				"element": "Solar"
			},
			"power": {
				"name": "Play of the Game",
				"type": "Grenade Launcher",
				"power": 300,
				"element": "Arc"
			}
		},
		"armor": {
			"head": "Eye of Another World",
			"arms": "Philomath Gloves",
			"chest": "Philomath Robes",
			"leg": "Philomath Boots",
			"classitem": "Philomath Bond"
		},
		"location": {
			"map": "Titan",
			"waypoint": "The Rig"
		}
	}
}

Instead, the developers can use the Relationalize transform. Sample 2 shows what the transformed data looks like.

Sample 2: Flattened JSON

{
    "player.username": "user1",
    "player.characteristics.race": "Human",
    "player.characteristics.class": "Warlock",
    "player.characteristics.subclass": "Dawnblade",
    "player.characteristics.power": 300,
    "player.characteristics.playercountry": "USA",
    "player.arsenal.kinetic.name": "Sweet Business",
    "player.arsenal.kinetic.type": "Auto Rifle",
    "player.arsenal.kinetic.power": 300,
    "player.arsenal.kinetic.element": "Kinetic",
    "player.arsenal.energy.name": "MIDA Mini-Tool",
    "player.arsenal.energy.type": "Submachine Gun",
    "player.arsenal.energy.power": 300,
    "player.arsenal.energy.element": "Solar",
    "player.arsenal.power.name": "Play of the Game",
    "player.arsenal.power.type": "Grenade Launcher",
    "player.arsenal.power.power": 300,
    "player.arsenal.power.element": "Arc",
    "player.armor.head": "Eye of Another World",
    "player.armor.arms": "Philomath Gloves",
    "player.armor.chest": "Philomath Robes",
    "player.armor.leg": "Philomath Boots",
    "player.armor.classitem": "Philomath Bond",
    "player.location.map": "Titan",
    "player.location.waypoint": "The Rig"
}

You can then write the data to a database or to a data warehouse. You can also write it to delimited text files, such as in comma-separated value (CSV) format, or columnar file formats such as Optimized Row Columnar (ORC) format. You can use either of these format types for long-term storage in Amazon S3. Storing the transformed files in S3 provides the additional benefit of being able to query this data using Amazon Athena or Amazon Redshift Spectrum. You can further extend the usefulness of the data by performing joins between data stored in S3 and the data stored in an Amazon Redshift data warehouse.

Before we get started…

In my example, I took two preparatory steps that save some time in your ETL code development:

  1. I stored my data in an Amazon S3 bucket and used an AWS Glue crawler to make my data available in the AWS Glue data catalog. You can find instructions on how to do that in Cataloging Tables with a Crawler in the AWS Glue documentation. The AWS Glue database name I used was “blog,” and the table name was “players.” You can see these values in use in the sample code that follows.
  2. I deployed a Zeppelin notebook using the automated deployment available within AWS Glue. If you already used an AWS Glue development endpoint to deploy a Zeppelin notebook, you can skip the deployment instructions. Otherwise, let’s quickly review how to deploy Zeppelin.

Deploying a Zeppelin notebook with AWS Glue

The following steps are outlined in the AWS Glue documentation, and I include a few screenshots here for clarity.

First, create two IAM roles:

Next, in the AWS Glue Management Console, choose Dev endpoints, and then choose Add endpoint.

Specify a name for the endpoint and the AWS Glue IAM role that you created.

On the networking screen, choose Skip Networking because our code only communicates with S3.

Complete the development endpoint process by providing a Secure Shell (SSH) public key and confirming your settings.

When your new development endpoint’s Provisioning status changes from PROVISIONING to READY, choose your endpoint, and then for Actions choose Create notebook server.

Enter the notebook server details, including the role you previously created and a security group with inbound access allowed on TCP port 443.

Doing this automatically launches an AWS CloudFormation template. The output specifies the URL that you can use to access your Zeppelin notebook with the username and password you specified in the wizard.

How do we flatten nested JSON?

With my data loaded and my notebook server ready, I accessed Zeppelin, created a new note, and set my interpreter to spark. I used some Python code that AWS Glue previously generated for another job that outputs to ORC. Then I added the Relationalize transform. You can see the resulting Python code in Sample 3.­

Sample 3: Python code to transform the nested JSON and output it to ORC

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
#from awsglue.transforms import Relationalize

# Begin variables to customize with your information
glue_source_database = "blog"
glue_source_table = "players"
glue_temp_storage = "s3://blog-example-edz/temp"
glue_relationalize_output_s3_path = "s3://blog-example-edz/output-flat"
dfc_root_table_name = "root" #default value is "roottable"
# End variables to customize with your information

glueContext = GlueContext(spark.sparkContext)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")
dfc = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")
blogdata = dfc.select(dfc_root_table_name)
blogdataoutput = glueContext.write_dynamic_frame.from_options(frame = blogdata, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "orc", transformation_ctx = "blogdataoutput")

What exactly is going on in this script?

After the import statements, we instantiate a GlueContext object, which allows us to work with the data in AWS Glue. Next, we create a DynamicFrame (datasource0) from the “players” table in the AWS Glue “blog” database. We use this DynamicFrame to perform any necessary operations on the data structure before it’s written to our desired output format. The source files remain unchanged.

We then run the Relationalize transform (Relationalize.apply()) with our datasource0 as one of the parameters. Another important parameter is the name parameter, which is a key that identifies our data after the transformation completes.

The Relationalize.apply() method returns a DynamicFrameCollection, and this is stored in the dfc variable. Before we can write our data to S3, we need to select the DynamicFrame from the DynamicFrameCollection object. We do this with the dfc.select() method. The correct DynamicFrame is stored in the blogdata variable.

You might be curious why a DynamicFrameCollection was returned when we started with a single DynamicFrame. This return value comes from the way Relationalize treats arrays in the JSON document: A DynamicFrame is created for each array. Together with the root data structure, each generated DynamicFrame is added to a DynamicFrameCollection when Relationalize completes its work. Although we didn’t have any arrays in our data, it’s good to keep this in mind. Finally, we output (blogdataoutput) the root DynamicFrame to ORC files in S3.

Using the transformed data

One of the use cases we discussed earlier was using Amazon Athena or Amazon Redshift Spectrum to query the ORC files.

I used the following SQL DDL statements to create external tables in both services to enable queries of my data stored in Amazon S3.

Sample 4: Amazon Athena DDL

CREATE EXTERNAL TABLE IF NOT EXISTS blog.blog_data_athena_test (
  `characteristics_race` string,
  `characteristics_class` string,
  `characteristics_subclass` string,
  `characteristics_power` int,
  `characteristics_playercountry` string,
  `kinetic_name` string,
  `kinetic_type` string,
  `kinetic_power` int,
  `kinetic_element` string,
  `energy_name` string,
  `energy_type` string,
  `energy_power` int,
  `energy_element` string,
  `power_name` string,
  `power_type` string,
  `power_power` int,
  `power_element` string,
  `armor_head` string,
  `armor_arms` string,
  `armor_chest` string,
  `armor_leg` string,
  `armor_classitem` string,
  `map` string,
  `waypoint` string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
) LOCATION 's3://blog-example-edz/output-flat/'
TBLPROPERTIES ('has_encrypted_data'='false');

 

Sample 5: Amazon Redshift Spectrum DDL

-- Create a Schema
-- A single schema can be used with multiple external tables.
-- This step is only required once for the external tables you create.
create external schema spectrum 
from data catalog 
database 'blog' 
iam_role 'arn:aws:iam::0123456789:role/redshift-role'
create external database if not exists;

-- Create an external table in the schema
create external table spectrum.blog(
  username VARCHAR,
  characteristics_race VARCHAR,
  characteristics_class VARCHAR,
  characteristics_subclass VARCHAR,
  characteristics_power INTEGER,
  characteristics_playercountry VARCHAR,
  kinetic_name VARCHAR,
  kinetic_type VARCHAR,
  kinetic_power INTEGER,
  kinetic_element VARCHAR,
  energy_name VARCHAR,
  energy_type VARCHAR,
  energy_power INTEGER,
  energy_element VARCHAR,
  power_name VARCHAR,
  power_type VARCHAR,
  power_power INTEGER,
  power_element VARCHAR,
  armor_head VARCHAR,
  armor_arms VARCHAR,
  armor_chest VARCHAR,
  armor_leg VARCHAR,
  armor_classItem VARCHAR,
  map VARCHAR,
  waypoint VARCHAR)
stored as orc
location 's3://blog-example-edz/output-flat';

I even ran a query, shown in Sample 6, that joined my Redshift Spectrum table (spectrum.playerdata) with data in an Amazon Redshift table (public.raids) to generate advanced reports. In the where clause, I join the two tables based on the username values that are common to both data sources.

Sample 6: Select statement with a join of Redshift Spectrum data with Amazon Redshift data

-- Get Total Raid Completions for the Hunter Class.
select spectrum.playerdata.characteristics_class as class, sum(public.raids."completions.val.raids.leviathan") as "Total Hunter Leviathan Raid Completions" from spectrum.playerdata, public.raids
where spectrum.playerdata.username = public.raids."completions.val.username"
and spectrum.playerdata.characteristics_class = 'Hunter'
group by spectrum.playerdata.characteristics_class;

Summary

This post demonstrated how simple it can be to flatten nested JSON data with AWS Glue, using the Relationalize transform to automate the conversion of nested JSON. AWS Glue also automates the deployment of Zeppelin notebooks that you can use to develop your Python automation script. Finally, AWS Glue can output the transformed data directly to a relational database, or to files in Amazon S3 for further analysis with tools such as Amazon Athena and Amazon Redshift Spectrum.

As great as Relationalize is, it’s not the only transform available with AWS Glue. You can see a complete list of available transforms in Built-In Transforms in the AWS Glue documentation. Try them out today!


Additional Reading

If you found this post useful, be sure to check out Using Amazon Redshift Spectrum, Amazon Athena and AWS Glue with Node.js in Production and Build a Data Lake Foundation with AWS Glue and Amazon S3.


About the Author

Trevor Roberts Jr is a Solutions Architect with AWS. He provides architectural guidance to help customers achieve success in the cloud. In his spare time, Trevor enjoys traveling to new places and spending time with family.

How to Manage Amazon GuardDuty Security Findings Across Multiple Accounts

Post Syndicated from Tom Stickle original https://aws.amazon.com/blogs/security/how-to-manage-amazon-guardduty-security-findings-across-multiple-accounts/

Introduced at AWS re:Invent 2017, Amazon GuardDuty is a managed threat detection service that continuously monitors for malicious or unauthorized behavior to help you protect your AWS accounts and workloads. In an AWS Blog post, Jeff Barr shows you how to enable GuardDuty to monitor your AWS resources continuously. That blog post shows how to get started with a single GuardDuty account and provides an overview of the features of the service. Your security team, though, will probably want to use GuardDuty to monitor a group of AWS accounts continuously.

In this post, I demonstrate how to use GuardDuty to monitor a group of AWS accounts and have their findings routed to another AWS account—the master account—that is owned by a security team. The method I demonstrate in this post is especially useful if your security team is responsible for monitoring a group of AWS accounts over which it does not have direct access—known as member accounts. In this solution, I simplify the work needed to enable GuardDuty in member accounts and configure findings by simplifying the process, which I do by enabling GuardDuty in the master account and inviting member accounts.

Enable GuardDuty in a master account and invite member accounts

To get started, you must enable GuardDuty in the master account, which will receive GuardDuty findings. The master account should be managed by your security team, and it will display the findings from all member accounts. The master account can be reverted later by removing any member accounts you add to it. Adding member accounts is a two-way handshake mechanism to ensure that administrators from both the master and member accounts formally agree to establish the relationship.

To enable GuardDuty in the master account and add member accounts:

  1. Navigate to the GuardDuty console.
  2. In the navigation pane, choose Accounts.
    Screenshot of the Accounts choice in the navigation pane
  1. To designate this account as the GuardDuty master account, start adding member accounts:
    • You can add individual accounts by choosing Add Account, or you can add a list of accounts by choosing Upload List (.csv).
  1. Now, add the account ID and email address of the member account, and choose Add. (If you are uploading a list of accounts, choose Browse, choose the .csv file with the member accounts [one email address and account ID per line], and choose Add accounts.)
    Screenshot of adding an account

For security reasons, AWS checks to make sure each account ID is valid and that you’ve entered each member account’s email address that was used to create the account. If a member account’s account ID and email address do not match, GuardDuty does not send an invitation.
Screenshot showing the Status of Invite

  1. After you add all the member accounts you want to add, you will see them listed in the Member accounts table with a Status of Invite. You don’t have to individually invite each account—you can choose a group of accounts and when you choose to invite one account in the group, all accounts are invited.
  2. When you choose Invite for each member account:
    1. AWS checks to make sure the account ID is valid and the email address provided is the email address of the member account.
    2. AWS sends an email to the member account email address with a link to the GuardDuty console, where the member account owner can accept the invitation. You can add a customized message from your security team. Account owners who receive the invitation must sign in to their AWS account to accept the invitation. The service also sends an invitation through the AWS Personal Health Dashboard in case the member email address is not monitored. This invitation appears in the member account under the AWS Personal Health Dashboard alert bell on the AWS Management Console.
    3. A pending-invitation indicator is shown on the GuardDuty console of the member account, as shown in the following screenshot.
      Screenshot showing the pending-invitation indicator

When the invitation is sent by email, it is sent to the account owner of the GuardDuty member account.
Screenshot of the invitation sent by email

The account owner can click the link in the email invitation or the AWS Personal Health Dashboard message, or the account owner can sign in to their account and navigate to the GuardDuty console. In all cases, the member account displays the pending invitation in the member account’s GuardDuty console with instructions for accepting the invitation. The GuardDuty console walks the account owner through accepting the invitation, including enabling GuardDuty if it is not already enabled.

If you prefer to work in the AWS CLI, you can enable GuardDuty and accept the invitation. To do this, call CreateDetector to enable GuardDuty, and then call AcceptInvitation, which serves the same purpose as accepting the invitation in the GuardDuty console.

  1. After the member account owner accepts the invitation, the Status in the master account is changed to Monitored. The status helps you track the status of each AWS account that you invite.
    Screenshot showing the Status change to Monitored

You have enabled GuardDuty on the member account, and all findings will be forwarded to the master account. You can now monitor the findings about GuardDuty member accounts from the GuardDuty console in the master account.

The member account owner can see GuardDuty findings by default and can control all aspects of the experience in the member account with AWS Identity and Access Management (IAM) permissions. Users with the appropriate permissions can end the multi-account relationship at any time by toggling the Accept button on the Accounts page. Note that ending the relationship changes the Status of the account to Resigned and also triggers a security finding on the side of the master account so that the security team knows the member account is no longer linked to the master account.

Working with GuardDuty findings

Most security teams have ticketing systems, chat operations, security information event management (SIEM) systems, or other security automation systems to which they would like to push GuardDuty findings. For this purpose, GuardDuty sends all findings as JSON-based messages through Amazon CloudWatch Events, a scalable service to which you can subscribe and to which AWS services can stream system events. To access these events, navigate to the CloudWatch Events console and create a rule that subscribes to the GuardDuty-related findings. You then can assign a target such as Amazon Kinesis Data Firehose that can place the findings in a number of services such as Amazon S3. The following screenshot is of the CloudWatch Events console, where I have a rule that pulls all events from GuardDuty and pushes them to a preconfigured AWS Lambda function.

Screenshot of a CloudWatch Events rule

The following example is a subset of GuardDuty findings that includes relevant context and information about the nature of a threat that was detected. In this example, the instanceId, i-00bb62b69b7004a4c, is performing Secure Shell (SSH) brute-force attacks against IP address 172.16.0.28. From a Lambda function, you can access any of the following fields such as the title of the finding and its description, and send those directly to your ticketing system.

Example GuardDuty findings

You can use other AWS services to build custom analytics and visualizations of your security findings. For example, you can connect Kinesis Data Firehose to CloudWatch Events and write events to an S3 bucket in a standard format, which can be encrypted with AWS Key Management Service and then compressed. You also can use Amazon QuickSight to build ad hoc dashboards by using AWS Glue and Amazon Athena. Similarly, you can place the data from Kinesis Data Firehose in Amazon Elasticsearch Service, with which you can use tools such as Kibana to build your own visualizations and dashboards.

Like most other AWS services, GuardDuty is a regional service. This means that when you enable GuardDuty in an AWS Region, all findings are generated and delivered in that region. If you are regulated by a compliance regime, this is often an important requirement to ensure that security findings remain in a specific jurisdiction. Because customers have let us know they would prefer to be able to enable GuardDuty globally and have all findings aggregated in one place, we intend to give the choice of regional or global isolation as we evolve this new service.

Summary

In this blog post, I have demonstrated how to use GuardDuty to monitor a group of GuardDuty member accounts and aggregate security findings in a central master GuardDuty account. You can use this solution whether or not you have direct control over the member accounts.

If you have comments about this blog post, submit them in the “Comments” section below. If you have questions about using GuardDuty, start a thread in the GuardDuty forum or contact AWS Support.

-Tom

Glenn’s Take on re:Invent 2017 – Part 3

Post Syndicated from Glenn Gore original https://aws.amazon.com/blogs/architecture/glenns-take-on-reinvent-2017-part-3/

Glenn Gore here, Chief Architect for AWS. I was in Las Vegas last week — with 43K others — for re:Invent 2017. I checked in to the Architecture blog here and here with my take on what was interesting about some of the bigger announcements from a cloud-architecture perspective.

In the excitement of so many new services being launched, we sometimes overlook feature updates that, while perhaps not as exciting as Amazon DeepLens, have significant impact on how you architect and develop solutions on AWS.

Amazon DynamoDB is used by more than 100,000 customers around the world, handling over a trillion requests every day. From the start, DynamoDB has offered high availability by natively spanning multiple Availability Zones within an AWS Region. As more customers started building and deploying truly-global applications, there was a need to replicate a DynamoDB table to multiple AWS Regions, allowing for read/write operations to occur in any region where the table was replicated. This update is important for providing a globally-consistent view of information — as users may transition from one region to another — or for providing additional levels of availability, allowing for failover between AWS Regions without loss of information.

There are some interesting concurrency-design aspects you need to be aware of and ensure you can handle correctly. For example, we support the “last writer wins” reconciliation where eventual consistency is being used and an application updates the same item in different AWS Regions at the same time. If you require strongly-consistent read/writes then you must perform all of your read/writes in the same AWS Region. The details behind this can be found in the DynamoDB documentation. Providing a globally-distributed, replicated DynamoDB table simplifies many different use cases and allows for the logic of replication, which may have been pushed up into the application layers to be simplified back down into the data layer.

The other big update for DynamoDB is that you can now back up your DynamoDB table on demand with no impact to performance. One of the features I really like is that when you trigger a backup, it is available instantly, regardless of the size of the table. Behind the scenes, we use snapshots and change logs to ensure a consistent backup. While backup is instant, restoring the table could take some time depending on its size and ranges — from minutes to hours for very large tables.

This feature is super important for those of you who work in regulated industries that often have strict requirements around data retention and backups of data, which sometimes limited the use of DynamoDB or required complex workarounds to implement some sort of backup feature in the past. This often incurred significant, additional costs due to increased read transactions on their DynamoDB tables.

Amazon Simple Storage Service (Amazon S3) was our first-released AWS service over 11 years ago, and it proved the simplicity and scalability of true API-driven architectures in the cloud. Today, Amazon S3 stores trillions of objects, with transactional requests per second reaching into the millions! Dealing with data as objects opened up an incredibly diverse array of use cases ranging from libraries of static images, game binary downloads, and application log data, to massive data lakes used for big data analytics and business intelligence. With Amazon S3, when you accessed your data in an object, you effectively had to write/read the object as a whole or use the range feature to retrieve a part of the object — if possible — in your individual use case.

Now, with Amazon S3 Select, an SQL-like query language is used that can work with delimited text and JSON files, as well as work with GZIP compressed files. We don’t support encryption during the preview of Amazon S3 Select.

Amazon S3 Select provides two major benefits:

  • Faster access
  • Lower running costs

Serverless Lambda functions, where every millisecond matters when you are being charged, will benefit greatly from Amazon S3 Select as data retrieval and processing of your Lambda function will experience significant speedups and cost reductions. For example, we have seen 2x speed improvement and 80% cost reduction with the Serverless MapReduce code.

Other AWS services such as Amazon Athena, Amazon Redshift, and Amazon EMR will support Amazon S3 Select as well as partner offerings including Cloudera and Hortonworks. If you are using Amazon Glacier for longer-term data archival, you will be able to use Amazon Glacier Select to retrieve a subset of your content from within Amazon Glacier.

As the volume of data that can be stored within Amazon S3 and Amazon Glacier continues to scale on a daily basis, we will continue to innovate and develop improved and optimized services that will allow you to work with these magnificently-large data sets while reducing your costs (retrieval and processing). I believe this will also allow you to simplify the transformation and storage of incoming data into Amazon S3 in basic, semi-structured formats as a single copy vs. some of the duplication and reformatting of data sometimes required to do upfront optimizations for downstream processing. Amazon S3 Select largely removes the need for this upfront optimization and instead allows you to store data once and process it based on your individual Amazon S3 Select query per application or transaction need.

Thanks for reading!

Glenn contemplating why CSV format is still relevant in 2017 (Italy).

Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production

Post Syndicated from Rafi Ton original https://aws.amazon.com/blogs/big-data/using-amazon-redshift-spectrum-amazon-athena-and-aws-glue-with-node-js-in-production/

This is a guest post by Rafi Ton, founder and CEO of NUVIAD. NUVIAD is, in their own words, “a mobile marketing platform providing professional marketers, agencies and local businesses state of the art tools to promote their products and services through hyper targeting, big data analytics and advanced machine learning tools.”

At NUVIAD, we’ve been using Amazon Redshift as our main data warehouse solution for more than 3 years.

We store massive amounts of ad transaction data that our users and partners analyze to determine ad campaign strategies. When running real-time bidding (RTB) campaigns in large scale, data freshness is critical so that our users can respond rapidly to changes in campaign performance. We chose Amazon Redshift because of its simplicity, scalability, performance, and ability to load new data in near real time.

Over the past three years, our customer base grew significantly and so did our data. We saw our Amazon Redshift cluster grow from three nodes to 65 nodes. To balance cost and analytics performance, we looked for a way to store large amounts of less-frequently analyzed data at a lower cost. Yet, we still wanted to have the data immediately available for user queries and to meet their expectations for fast performance. We turned to Amazon Redshift Spectrum.

In this post, I explain the reasons why we extended Amazon Redshift with Redshift Spectrum as our modern data warehouse. I cover how our data growth and the need to balance cost and performance led us to adopt Redshift Spectrum. I also share key performance metrics in our environment, and discuss the additional AWS services that provide a scalable and fast environment, with data available for immediate querying by our growing user base.

Amazon Redshift as our foundation

The ability to provide fresh, up-to-the-minute data to our customers and partners was always a main goal with our platform. We saw other solutions provide data that was a few hours old, but this was not good enough for us. We insisted on providing the freshest data possible. For us, that meant loading Amazon Redshift in frequent micro batches and allowing our customers to query Amazon Redshift directly to get results in near real time.

The benefits were immediately evident. Our customers could see how their campaigns performed faster than with other solutions, and react sooner to the ever-changing media supply pricing and availability. They were very happy.

However, this approach required Amazon Redshift to store a lot of data for long periods, and our data grew substantially. In our peak, we maintained a cluster running 65 DC1.large nodes. The impact on our Amazon Redshift cluster was evident, and we saw our CPU utilization grow to 90%.

Why we extended Amazon Redshift to Redshift Spectrum

Redshift Spectrum gives us the ability to run SQL queries using the powerful Amazon Redshift query engine against data stored in Amazon S3, without needing to load the data. With Redshift Spectrum, we store data where we want, at the cost that we want. We have the data available for analytics when our users need it with the performance they expect.

Seamless scalability, high performance, and unlimited concurrency

Scaling Redshift Spectrum is a simple process. First, it allows us to leverage Amazon S3 as the storage engine and get practically unlimited data capacity.

Second, if we need more compute power, we can leverage Redshift Spectrum’s distributed compute engine over thousands of nodes to provide superior performance – perfect for complex queries running against massive amounts of data.

Third, all Redshift Spectrum clusters access the same data catalog so that we don’t have to worry about data migration at all, making scaling effortless and seamless.

Lastly, since Redshift Spectrum distributes queries across potentially thousands of nodes, they are not affected by other queries, providing much more stable performance and unlimited concurrency.

Keeping it SQL

Redshift Spectrum uses the same query engine as Amazon Redshift. This means that we did not need to change our BI tools or query syntax, whether we used complex queries across a single table or joins across multiple tables.

An interesting capability introduced recently is the ability to create a view that spans both Amazon Redshift and Redshift Spectrum external tables. With this feature, you can query frequently accessed data in your Amazon Redshift cluster and less-frequently accessed data in Amazon S3, using a single view.

Leveraging Parquet for higher performance

Parquet is a columnar data format that provides superior performance and allows Redshift Spectrum (or Amazon Athena) to scan significantly less data. With less I/O, queries run faster and we pay less per query. You can read all about Parquet at https://parquet.apache.org/ or https://en.wikipedia.org/wiki/Apache_Parquet.

Lower cost

From a cost perspective, we pay standard rates for our data in Amazon S3, and only small amounts per query to analyze data with Redshift Spectrum. Using the Parquet format, we can significantly reduce the amount of data scanned. Our costs are now lower, and our users get fast results even for large complex queries.

What we learned about Amazon Redshift vs. Redshift Spectrum performance

When we first started looking at Redshift Spectrum, we wanted to put it to the test. We wanted to know how it would compare to Amazon Redshift, so we looked at two key questions:

  1. What is the performance difference between Amazon Redshift and Redshift Spectrum on simple and complex queries?
  2. Does the data format impact performance?

During the migration phase, we had our dataset stored in Amazon Redshift and S3 as CSV/GZIP and as Parquet file formats. We tested three configurations:

  • Amazon Redshift cluster with 28 DC1.large nodes
  • Redshift Spectrum using CSV/GZIP
  • Redshift Spectrum using Parquet

We performed benchmarks for simple and complex queries on one month’s worth of data. We tested how much time it took to perform the query, and how consistent the results were when running the same query multiple times. The data we used for the tests was already partitioned by date and hour. Properly partitioning the data improves performance significantly and reduces query times.

Simple query

First, we tested a simple query aggregating billing data across a month:

SELECT 
  user_id, 
  count(*) AS impressions, 
  SUM(billing)::decimal /1000000 AS billing 
FROM <table_name> 
WHERE 
  date >= '2017-08-01' AND 
  date <= '2017-08-31'  
GROUP BY 
  user_id;

We ran the same query seven times and measured the response times (red marking the longest time and green the shortest time):

Execution Time (seconds)
  Amazon Redshift Redshift Spectrum
CSV
Redshift Spectrum Parquet
Run #1 39.65 45.11 11.92
Run #2 15.26 43.13 12.05
Run #3 15.27 46.47 13.38
Run #4 21.22 51.02 12.74
Run #5 17.27 43.35 11.76
Run #6 16.67 44.23 13.67
Run #7 25.37 40.39 12.75
Average 21.53  44.82 12.61

For simple queries, Amazon Redshift performed better than Redshift Spectrum, as we thought, because the data is local to Amazon Redshift.

What was surprising was that using Parquet data format in Redshift Spectrum significantly beat ‘traditional’ Amazon Redshift performance. For our queries, using Parquet data format with Redshift Spectrum delivered an average 40% performance gain over traditional Amazon Redshift. Furthermore, Redshift Spectrum showed high consistency in execution time with a smaller difference between the slowest run and the fastest run.

Comparing the amount of data scanned when using CSV/GZIP and Parquet, the difference was also significant:

Data Scanned (GB)
CSV (Gzip) 135.49
Parquet 2.83

Because we pay only for the data scanned by Redshift Spectrum, the cost saving of using Parquet is evident and substantial.

Complex query

Next, we compared the same three configurations with a complex query.

Execution Time (seconds)
  Amazon Redshift Redshift Spectrum CSV Redshift Spectrum Parquet
Run #1 329.80 84.20 42.40
Run #2 167.60 65.30 35.10
Run #3 165.20 62.20 23.90
Run #4 273.90 74.90 55.90
Run #5 167.70 69.00 58.40
Average 220.84 71.12 43.14

This time, Redshift Spectrum using Parquet cut the average query time by 80% compared to traditional Amazon Redshift!

Bottom line: For complex queries, Redshift Spectrum provided a 67% performance gain over Amazon Redshift. Using the Parquet data format, Redshift Spectrum delivered an 80% performance improvement over Amazon Redshift. For us, this was substantial.

Optimizing the data structure for different workloads

Because the cost of S3 is relatively inexpensive and we pay only for the data scanned by each query, we believe that it makes sense to keep our data in different formats for different workloads and different analytics engines. It is important to note that we can have any number of tables pointing to the same data on S3. It all depends on how we partition the data and update the table partitions.

Data permutations

For example, we have a process that runs every minute and generates statistics for the last minute of data collected. With Amazon Redshift, this would be done by running the query on the table with something as follows:

SELECT 
  user, 
  COUNT(*) 
FROM 
  events_table 
WHERE 
  ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’ 
GROUP BY 
  user;

(Assuming ‘ts’ is your column storing the time stamp for each event.)

With Redshift Spectrum, we pay for the data scanned in each query. If the data is partitioned by the minute instead of the hour, a query looking at one minute would be 1/60th the cost. If we use a temporary table that points only to the data of the last minute, we save that unnecessary cost.

Creating Parquet data efficiently

On the average, we have 800 instances that process our traffic. Each instance sends events that are eventually loaded into Amazon Redshift. When we started three years ago, we would offload data from each server to S3 and then perform a periodic copy command from S3 to Amazon Redshift.

Recently, Amazon Kinesis Firehose added the capability to offload data directly to Amazon Redshift. While this is now a viable option, we kept the same collection process that worked flawlessly and efficiently for three years.

This changed, however, when we incorporated Redshift Spectrum. With Redshift Spectrum, we needed to find a way to:

  • Collect the event data from the instances.
  • Save the data in Parquet format.
  • Partition the data effectively.

To accomplish this, we save the data as CSV and then transform it to Parquet. The most effective method to generate the Parquet files is to:

  1. Send the data in one-minute intervals from the instances to Kinesis Firehose with an S3 temporary bucket as the destination.
  2. Aggregate hourly data and convert it to Parquet using AWS Lambda and AWS Glue.
  3. Add the Parquet data to S3 by updating the table partitions.

With this new process, we had to give more attention to validating the data before we sent it to Kinesis Firehose, because a single corrupted record in a partition fails queries on that partition.

Data validation

To store our click data in a table, we considered the following SQL create table command:

create external TABLE spectrum.blog_clicks (
    user_id varchar(50),
    campaign_id varchar(50),
    os varchar(50),
    ua varchar(255),
    ts bigint,
    billing float
)
partitioned by (date date, hour smallint)  
stored as parquet
location 's3://nuviad-temp/blog/clicks/';

The above statement defines a new external table (all Redshift Spectrum tables are external tables) with a few attributes. We stored ‘ts’ as a Unix time stamp and not as Timestamp, and billing data is stored as float and not decimal (more on that later). We also said that the data is partitioned by date and hour, and then stored as Parquet on S3.

First, we need to get the table definitions. This can be achieved by running the following query:

SELECT 
  * 
FROM 
  svv_external_columns 
WHERE 
  tablename = 'blog_clicks';

This query lists all the columns in the table with their respective definitions:

schemaname tablename columnname external_type columnnum part_key
spectrum blog_clicks user_id varchar(50) 1 0
spectrum blog_clicks campaign_id varchar(50) 2 0
spectrum blog_clicks os varchar(50) 3 0
spectrum blog_clicks ua varchar(255) 4 0
spectrum blog_clicks ts bigint 5 0
spectrum blog_clicks billing double 6 0
spectrum blog_clicks date date 7 1
spectrum blog_clicks hour smallint 8 2

Now we can use this data to create a validation schema for our data:

const rtb_request_schema = {
    "name": "clicks",
    "items": {
        "user_id": {
            "type": "string",
            "max_length": 100
        },
        "campaign_id": {
            "type": "string",
            "max_length": 50
        },
        "os": {
            "type": "string",
            "max_length": 50            
        },
        "ua": {
            "type": "string",
            "max_length": 255            
        },
        "ts": {
            "type": "integer",
            "min_value": 0,
            "max_value": 9999999999999
        },
        "billing": {
            "type": "float",
            "min_value": 0,
            "max_value": 9999999999999
        }
    }
};

Next, we create a function that uses this schema to validate data:

function valueIsValid(value, item_schema) {
    if (schema.type == 'string') {
        return (typeof value == 'string' && value.length <= schema.max_length);
    }
    else if (schema.type == 'integer') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'float' || schema.type == 'double') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'boolean') {
        return typeof value == 'boolean';
    }
    else if (schema.type == 'timestamp') {
        return (new Date(value)).getTime() > 0;
    }
    else {
        return true;
    }
}

Near real-time data loading with Kinesis Firehose

On Kinesis Firehose, we created a new delivery stream to handle the events as follows:

Delivery stream name: events
Source: Direct PUT
S3 bucket: nuviad-events
S3 prefix: rtb/
IAM role: firehose_delivery_role_1
Data transformation: Disabled
Source record backup: Disabled
S3 buffer size (MB): 100
S3 buffer interval (sec): 60
S3 Compression: GZIP
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled

This delivery stream aggregates event data every minute, or up to 100 MB, and writes the data to an S3 bucket as a CSV/GZIP compressed file. Next, after we have the data validated, we can safely send it to our Kinesis Firehose API:

if (validated) {
    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line

    let params = {
        DeliveryStreamName: 'events',
        Record: {
            Data: itemString
        }
    };

    firehose.putRecord(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);        
        }
        else {
            // Continue to your next step 
        }
    });
}

Now, we have a single CSV file representing one minute of event data stored in S3. The files are named automatically by Kinesis Firehose by adding a UTC time prefix in the format YYYY/MM/DD/HH before writing objects to S3. Because we use the date and hour as partitions, we need to change the file naming and location to fit our Redshift Spectrum schema.

Automating data distribution using AWS Lambda

We created a simple Lambda function triggered by an S3 put event that copies the file to a different location (or locations), while renaming it to fit our data structure and processing flow. As mentioned before, the files generated by Kinesis Firehose are structured in a pre-defined hierarchy, such as:

S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz

All we need to do is parse the object name and restructure it as we see fit. In our case, we did the following (the event is an object received in the Lambda function with all the data about the object written to S3):

/*
	object key structure in the event object:
your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
	*/

let key_parts = event.Records[0].s3.object.key.split('/'); 

let event_type = key_parts[0];
let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];
let hour = key_parts[4];
if (hour.indexOf('0') == 0) {
 		hour = parseInt(hour, 10) + '';
}
    
let parts1 = key_parts[5].split('-');
let minute = parts1[7];
if (minute.indexOf('0') == 0) {
        minute = parseInt(minute, 10) + '';
}

Now, we can redistribute the file to the two destinations we need—one for the minute processing task and the other for hourly aggregation:

    copyObjectToHourlyFolder(event, date, hour, minute)
        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))
        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))
        .then(deleteOldMinuteObjects.bind(null, event))
        .then(deleteStreamObject.bind(null, event))        
        .then(result => {
            callback(null, { message: 'done' });            
        })
        .catch(err => {
            console.error(err);
            callback(null, { message: err });            
        }); 

Kinesis Firehose stores the data in a temporary folder. We copy the object to another folder that holds the data for the last processed minute. This folder is connected to a small Redshift Spectrum table where the data is being processed without needing to scan a much larger dataset. We also copy the data to a folder that holds the data for the entire hour, to be later aggregated and converted to Parquet.

Because we partition the data by date and hour, we created a new partition on the Redshift Spectrum table if the processed minute is the first minute in the hour (that is, minute 0). We ran the following:

ALTER TABLE 
  spectrum.events 
ADD partition
  (date='2017-08-01', hour=0) 
  LOCATION 's3://nuviad-temp/events/2017-08-01/0/';

After the data is processed and added to the table, we delete the processed data from the temporary Kinesis Firehose storage and from the minute storage folder.

Migrating CSV to Parquet using AWS Glue and Amazon EMR

The simplest way we found to run an hourly job converting our CSV data to Parquet is using Lambda and AWS Glue (and thanks to the awesome AWS Big Data team for their help with this).

Creating AWS Glue jobs

What this simple AWS Glue script does:

  • Gets parameters for the job, date, and hour to be processed
  • Creates a Spark EMR context allowing us to run Spark code
  • Reads CSV data into a DataFrame
  • Writes the data as Parquet to the destination S3 bucket
  • Adds or modifies the Redshift Spectrum / Amazon Athena table partition for the table
import sys
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])

#day_partition_key = "partition_0"
#hour_partition_key = "partition_1"
#day_partition_value = "2017-08-01"
#hour_partition_value = "0"

day_partition_key = args['day_partition_key']
hour_partition_key = args['hour_partition_key']
day_partition_value = args['day_partition_value']
hour_partition_value = args['hour_partition_value']

print("Running for " + day_partition_value + "/" + hour_partition_value)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)
df.registerTempTable("data")

df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")

df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)

client = boto3.client('athena', region_name='us-east-1')

response = client.start_query_execution(
    QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ')  location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

response = client.start_query_execution(
    QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

job.commit()

Note: Because Redshift Spectrum and Athena both use the AWS Glue Data Catalog, we could use the Athena client to add the partition to the table.

Here are a few words about float, decimal, and double. Using decimal proved to be more challenging than we expected, as it seems that Redshift Spectrum and Spark use them differently. Whenever we used decimal in Redshift Spectrum and in Spark, we kept getting errors, such as:

S3 Query Exception (Fetch). Task failed due to an internal error. File 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column 's3://nuviad-events/events.lat'. Column type: DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq

We had to experiment with a few floating-point formats until we found that the only combination that worked was to define the column as double in the Spark code and float in Spectrum. This is the reason you see billing defined as float in Spectrum and double in the Spark code.

Creating a Lambda function to trigger conversion

Next, we created a simple Lambda function to trigger the AWS Glue script hourly using a simple Python code:

import boto3
import json
from datetime import datetime, timedelta
 
client = boto3.client('glue')
 
def lambda_handler(event, context):
    last_hour_date_time = datetime.now() - timedelta(hours = 1)
    day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") 
    hour_partition_value = last_hour_date_time.strftime("%-H") 
    response = client.start_job_run(
    JobName='convertEventsParquetHourly',
    Arguments={
         '--day_partition_key': 'date',
         '--hour_partition_key': 'hour',
         '--day_partition_value': day_partition_value,
         '--hour_partition_value': hour_partition_value
         }
    )

Using Amazon CloudWatch Events, we trigger this function hourly. This function triggers an AWS Glue job named ‘convertEventsParquetHourly’ and runs it for the previous hour, passing job names and values of the partitions to process to AWS Glue.

Redshift Spectrum and Node.js

Our development stack is based on Node.js, which is well-suited for high-speed, light servers that need to process a huge number of transactions. However, a few limitations of the Node.js environment required us to create workarounds and use other tools to complete the process.

Node.js and Parquet

The lack of Parquet modules for Node.js required us to implement an AWS Glue/Amazon EMR process to effectively migrate data from CSV to Parquet. We would rather save directly to Parquet, but we couldn’t find an effective way to do it.

One interesting project in the works is the development of a Parquet NPM by Marc Vertes called node-parquet (https://www.npmjs.com/package/node-parquet). It is not in a production state yet, but we think it would be well worth following the progress of this package.

Timestamp data type

According to the Parquet documentation, Timestamp data are stored in Parquet as 64-bit integers. However, JavaScript does not support 64-bit integers, because the native number type is a 64-bit double, giving only 53 bits of integer range.

The result is that you cannot store Timestamp correctly in Parquet using Node.js. The solution is to store Timestamp as string and cast the type to Timestamp in the query. Using this method, we did not witness any performance degradation whatsoever.

Lessons learned

You can benefit from our trial-and-error experience.

Lesson #1: Data validation is critical

As mentioned earlier, a single corrupt entry in a partition can fail queries running against this partition, especially when using Parquet, which is harder to edit than a simple CSV file. Make sure that you validate your data before scanning it with Redshift Spectrum.

Lesson #2: Structure and partition data effectively

One of the biggest benefits of using Redshift Spectrum (or Athena for that matter) is that you don’t need to keep nodes up and running all the time. You pay only for the queries you perform and only for the data scanned per query.

Keeping different permutations of your data for different queries makes a lot of sense in this case. For example, you can partition your data by date and hour to run time-based queries, and also have another set partitioned by user_id and date to run user-based queries. This results in faster and more efficient performance of your data warehouse.

Storing data in the right format

Use Parquet whenever you can. The benefits of Parquet are substantial. Faster performance, less data to scan, and much more efficient columnar format. However, it is not supported out-of-the-box by Kinesis Firehose, so you need to implement your own ETL. AWS Glue is a great option.

Creating small tables for frequent tasks

When we started using Redshift Spectrum, we saw our Amazon Redshift costs jump by hundreds of dollars per day. Then we realized that we were unnecessarily scanning a full day’s worth of data every minute. Take advantage of the ability to define multiple tables on the same S3 bucket or folder, and create temporary and small tables for frequent queries.

Lesson #3: Combine Athena and Redshift Spectrum for optimal performance

Moving to Redshift Spectrum also allowed us to take advantage of Athena as both use the AWS Glue Data Catalog. Run fast and simple queries using Athena while taking advantage of the advanced Amazon Redshift query engine for complex queries using Redshift Spectrum.

Redshift Spectrum excels when running complex queries. It can push many compute-intensive tasks, such as predicate filtering and aggregation, down to the Redshift Spectrum layer, so that queries use much less of your cluster’s processing capacity.

Lesson #4: Sort your Parquet data within the partition

We achieved another performance improvement by sorting data within the partition using sortWithinPartitions(sort_field). For example:

df.repartition(1).sortWithinPartitions("campaign_id")…

Conclusion

We were extremely pleased with using Amazon Redshift as our core data warehouse for over three years. But as our client base and volume of data grew substantially, we extended Amazon Redshift to take advantage of scalability, performance, and cost with Redshift Spectrum.

Redshift Spectrum lets us scale to virtually unlimited storage, scale compute transparently, and deliver super-fast results for our users. With Redshift Spectrum, we store data where we want at the cost we want, and have the data available for analytics when our users need it with the performance they expect.


About the Author

With 7 years of experience in the AdTech industry and 15 years in leading technology companies, Rafi Ton is the founder and CEO of NUVIAD. He enjoys exploring new technologies and putting them to use in cutting edge products and services, in the real world generating real money. Being an experienced entrepreneur, Rafi believes in practical-programming and fast adaptation of new technologies to achieve a significant market advantage.

 

 

Amazon QuickSight Update – Geospatial Visualization, Private VPC Access, and More

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/amazon-quicksight-update-geospatial-visualization-private-vpc-access-and-more/

We don’t often recognize or celebrate anniversaries at AWS. With nearly 100 services on our list, we’d be eating cake and drinking champagne several times a week. While that might sound like fun, we’d rather spend our working hours listening to customers and innovating. With that said, Amazon QuickSight has now been generally available for a little over a year and I would like to give you a quick update!

QuickSight in Action
Today, tens of thousands of customers (from startups to enterprises, in industries as varied as transportation, legal, mining, and healthcare) are using QuickSight to analyze and report on their business data.

Here are a couple of examples:

Gemini provides legal evidence procurement for California attorneys who represent injured workers. They have gone from creating custom reports and running one-off queries to creating and sharing dynamic QuickSight dashboards with drill-downs and filtering. QuickSight is used to track sales pipeline, measure order throughput, and to locate bottlenecks in the order processing pipeline.

Jivochat provides a real-time messaging platform to connect visitors to website owners. QuickSight lets them create and share interactive dashboards while also providing access to the underlying datasets. This has allowed them to move beyond the sharing of static spreadsheets, ensuring that everyone is looking at the same and is empowered to make timely decisions based on current data.

Transfix is a tech-powered freight marketplace that matches loads and increases visibility into logistics for Fortune 500 shippers in retail, food and beverage, manufacturing, and other industries. QuickSight has made analytics accessible to both BI engineers and non-technical business users. They scrutinize key business and operational metrics including shipping routes, carrier efficient, and process automation.

Looking Back / Looking Ahead
The feedback on QuickSight has been incredibly helpful. Customers tell us that their employees are using QuickSight to connect to their data, perform analytics, and make high-velocity, data-driven decisions, all without setting up or running their own BI infrastructure. We love all of the feedback that we get, and use it to drive our roadmap, leading to the introduction of over 40 new features in just a year. Here’s a summary:

Looking forward, we are watching an interesting trend develop within our customer base. As these customers take a close look at how they analyze and report on data, they are realizing that a serverless approach offers some tangible benefits. They use Amazon Simple Storage Service (S3) as a data lake and query it using a combination of QuickSight and Amazon Athena, giving them agility and flexibility without static infrastructure. They also make great use of QuickSight’s dashboards feature, monitoring business results and operational metrics, then sharing their insights with hundreds of users. You can read Building a Serverless Analytics Solution for Cleaner Cities and review Serverless Big Data Analytics using Amazon Athena and Amazon QuickSight if you are interested in this approach.

New Features and Enhancements
We’re still doing our best to listen and to learn, and to make sure that QuickSight continues to meet your needs. I’m happy to announce that we are making seven big additions today:

Geospatial Visualization – You can now create geospatial visuals on geographical data sets.

Private VPC Access – You can now sign up to access a preview of a new feature that allows you to securely connect to data within VPCs or on-premises, without the need for public endpoints.

Flat Table Support – In addition to pivot tables, you can now use flat tables for tabular reporting. To learn more, read about Using Tabular Reports.

Calculated SPICE Fields – You can now perform run-time calculations on SPICE data as part of your analysis. Read Adding a Calculated Field to an Analysis for more information.

Wide Table Support – You can now use tables with up to 1000 columns.

Other Buckets – You can summarize the long tail of high-cardinality data into buckets, as described in Working with Visual Types in Amazon QuickSight.

HIPAA Compliance – You can now run HIPAA-compliant workloads on QuickSight.

Geospatial Visualization
Everyone seems to want this feature! You can now take data that contains a geographic identifier (country, city, state, or zip code) and create beautiful visualizations with just a few clicks. QuickSight will geocode the identifier that you supply, and can also accept lat/long map coordinates. You can use this feature to visualize sales by state, map stores to shipping destinations, and so forth. Here’s a sample visualization:

To learn more about this feature, read Using Geospatial Charts (Maps), and Adding Geospatial Data.

Private VPC Access Preview
If you have data in AWS (perhaps in Amazon Redshift, Amazon Relational Database Service (RDS), or on EC2) or on-premises in Teradata or SQL Server on servers without public connectivity, this feature is for you. Private VPC Access for QuickSight uses an Elastic Network Interface (ENI) for secure, private communication with data sources in a VPC. It also allows you to use AWS Direct Connect to create a secure, private link with your on-premises resources. Here’s what it looks like:

If you are ready to join the preview, you can sign up today.

Jeff;

 

Visualize AWS Cloudtrail Logs using AWS Glue and Amazon Quicksight

Post Syndicated from Luis Caro Perez original https://aws.amazon.com/blogs/big-data/streamline-aws-cloudtrail-log-visualization-using-aws-glue-and-amazon-quicksight/

Being able to easily visualize AWS CloudTrail logs gives you a better understanding of how your AWS infrastructure is being used. It can also help you audit and review AWS API calls and detect security anomalies inside your AWS account. To do this, you must be able to perform analytics based on your CloudTrail logs.

In this post, I walk through using AWS Glue and AWS Lambda to convert AWS CloudTrail logs from JSON to a query-optimized format dataset in Amazon S3. I then use Amazon Athena and Amazon QuickSight to query and visualize the data.

Solution overview

To process CloudTrail logs, you must implement the following architecture:

CloudTrail delivers log files in an Amazon S3 bucket folder. To correctly crawl these logs, you modify the file contents and folder structure using an Amazon S3-triggered Lambda function that stores the transformed files in an S3 bucket single folder. When the files are in a single folder, AWS Glue scans the data, converts it into Apache Parquet format, and catalogs it to allow for querying and visualization using Amazon Athena and Amazon QuickSight.

Walkthrough

Let’s look at the steps that are required to build the solution.

Set up CloudTrail logs

First, you need to set up a trail that delivers log files to an S3 bucket. To create a trail in CloudTrail, follow the instructions in Creating a Trail.

When you finish, the trail settings page should look like the following screenshot:

In this example, I set up log files to be delivered to the cloudtraillfcaro bucket.

Consolidate CloudTrail reports into a single folder using Lambda

AWS CloudTrail delivers log files using the following folder structure inside the configured Amazon S3 bucket:

AWSLogs/ACCOUNTID/CloudTrail/REGION/YEAR/MONTH/HOUR/filename.json.gz

Additionally, log files have the following structure:

{
    "Records": [{
        "eventVersion": "1.01",
        "userIdentity": {
            "type": "IAMUser",
            "principalId": "AIDAJDPLRKLG7UEXAMPLE",
            "arn": "arn:aws:iam::123456789012:user/Alice",
            "accountId": "123456789012",
            "accessKeyId": "AKIAIOSFODNN7EXAMPLE",
            "userName": "Alice",
            "sessionContext": {
                "attributes": {
                    "mfaAuthenticated": "false",
                    "creationDate": "2014-03-18T14:29:23Z"
                }
            }
        },
        "eventTime": "2014-03-18T14:30:07Z",
        "eventSource": "cloudtrail.amazonaws.com",
        "eventName": "StartLogging",
        "awsRegion": "us-west-2",
        "sourceIPAddress": "72.21.198.64",
        "userAgent": "signin.amazonaws.com",
        "requestParameters": {
            "name": "Default"
        },
        "responseElements": null,
        "requestID": "cdc73f9d-aea9-11e3-9d5a-835b769c0d9c",
        "eventID": "3074414d-c626-42aa-984b-68ff152d6ab7"
    },
    ... additional entries ...
    ]

If AWS Glue crawlers are used to catalog these files as they are written, the following obstacles arise:

  1. AWS Glue identifies different tables per different folders because they don’t follow a traditional partition format.
  2. Based on the structure of the file content, AWS Glue identifies the tables as having a single column of type array.
  3. CloudTrail logs have JSON attributes that use uppercase letters. According to the Best Practices When Using Athena with AWS Glue, it is recommended that you convert these to lowercase.

To have AWS Glue catalog all log files in a single table with all the columns describing each event, implement the following Lambda function:

from __future__ import print_function
import json
import urllib
import boto3
import gzip

s3 = boto3.resource('s3')
client = boto3.client('s3')

def convertColumntoLowwerCaps(obj):
    for key in obj.keys():
        new_key = key.lower()
        if new_key != key:
            obj[new_key] = obj[key]
            del obj[key]
    return obj


def lambda_handler(event, context):

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    print(bucket)
    print(key)
    try:
        newKey = 'flatfiles/' + key.replace("/", "")
        client.download_file(bucket, key, '/tmp/file.json.gz')
        with gzip.open('/tmp/out.json.gz', 'w') as output, gzip.open('/tmp/file.json.gz', 'rb') as file:
            i = 0
            for line in file: 
                for record in json.loads(line,object_hook=convertColumntoLowwerCaps)['records']:
            		if i != 0:
            		    output.write("\n")
            		output.write(json.dumps(record))
            		i += 1
        client.upload_file('/tmp/out.json.gz', bucket,newKey)
        return "success"
    except Exception as e:
        print(e)
        print('Error processing object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e

The function goes over each element of the records array, changes uppercase letters to lowercase in column names, and inserts each element of the array as a single line of a new file. The new file is saved inside a flatfiles folder created by the function without any subfolders in the S3 bucket.

The function should have a role containing a policy with at least the following permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3:::cloudtraillfcaro/*",
                "arn:aws:s3:::cloudtraillfcaro"
            ],
            "Effect": "Allow"
        }
    ]
}

In this example, CloudTrail delivers logs to the cloudtraillfcaro bucket. Make sure that you replace this name with your bucket name in the policy. For more information about how to work with inline policies, see Working with Inline Policies.

After the Lambda function is created, you can set up the following trigger using the Triggers tab on the AWS Lambda console.

Choose Add trigger, and choose S3 as a source of the trigger.

After choosing the source, configure the following settings:

In the trigger, any file that is written to the path for the log files—which in this case is AWSLogs/119582755581/CloudTrail/—is processed. Make sure that the Enable trigger check box is selected and that the bucket and prefix parameters match your use case.

After you set up the function and receive log files, the bucket (in this case cloudtraillfcaro) should contain the processed files inside the flatfiles folder.

Catalog source data

Once the files are processed by the Lambda function, set up a crawler named cloudtrail to catalog them.

The crawler must point to the flatfiles folder.

All the crawlers and AWS Glue jobs created for this solution must have a role with the AWSGlueServiceRole managed policy and an inline policy with permissions to modify the S3 buckets used on the Lambda function. For more information, see Working with Managed Policies.

The role should look like the following:

In this example, the inline policy named s3perms contains the permissions to modify the S3 buckets.

After you choose the role, you can schedule the crawler to run on demand.

A new database is created, and the crawler is set to use it. In this case, the cloudtrail database is used for all the tables.

After the crawler runs, a single table should be created in the catalog with the following structure:

The table should contain the following columns:

Create and run the AWS Glue job

To convert all the CloudTrail logs to a columnar store in Parquet, set up an AWS Glue job by following these steps.

Upload the following script into a bucket in Amazon S3:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "cloudtrail", table_name = "flatfiles", transformation_ctx = "datasource0")
resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice1")
relationalized1 = resolvechoice1.relationalize("trail", args["TempDir"]).select("trail")
datasink = glueContext.write_dynamic_frame.from_options(frame = relationalized1, connection_type = "s3", connection_options = {"path": "s3://cloudtraillfcaro/parquettrails"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

In the example, you load the script as a file named cloudtrailtoparquet.py. Make sure that you modify the script and update the “{"path": "s3://cloudtraillfcaro/parquettrails"}” with the destination in which you want to store your results.

After uploading the script, add a new AWS Glue job. Choose a name and role for the job, and choose the option of running the job from An existing script that you provide.

To avoid processing the same data twice, enable the Job bookmark setting in the Advanced properties section of the job properties.

Choose Next twice, and then choose Finish.

If logs are already in the flatfiles folder, you can run the job on demand to generate the first set of results.

Once the job starts running, wait for it to complete.

When the job is finished, its Run status should be Succeeded. After that, you can verify that the Parquet files are written to the Amazon S3 location.

Catalog results

To be able to process results from Athena, you can use an AWS Glue crawler to catalog the results of the AWS Glue job.

In this example, the crawler is set to use the same database as the source named cloudtrail.

You can run the crawler using the console. When the crawler finishes running and has processed the Parquet results, a new table should be created in the AWS Glue Data Catalog. In this example, it’s named parquettrails.

The table should have the classification set to parquet.

It should have the same columns as the flatfiles table, with the exception of the struct type columns, which should be relationalized into several columns:

In this example, notice how the requestparameters column, which was a struct in the original table (flatfiles), was transformed to several columns—one for each key value inside it. This is done using a transformation native to AWS Glue called relationalize.

Query results with Athena

After crawling the results, you can query them using Athena. For example, to query what events took place in the time frame between 2017-10-23t12:00:00 and 2017-10-23t13:00, use the following select statement:

select *
from cloudtrail.parquettrails
where eventtime > '2017-10-23T12:00:00Z' AND eventtime < '2017-10-23T13:00:00Z'
order by eventtime asc;

Be sure to replace cloudtrail.parquettrails with the names of your database and table that references the Parquet results. Replace the datetimes with an hour when your account had activity and was processed by the AWS Glue job.

Visualize results using Amazon QuickSight

Once you can query the data using Athena, you can visualize it using Amazon QuickSight. Before connecting Amazon QuickSight to Athena, be sure to grant QuickSight access to Athena and the associated S3 buckets in your account. For more information, see Managing Amazon QuickSight Permissions to AWS Resources. You can then create a new data set in Amazon QuickSight based on the Athena table that you created.

After setting up permissions, you can create a new analysis in Amazon QuickSight by choosing New analysis.

Then add a new data set.

Choose Athena as the source.

Give the data source a name (in this case, I named it cloudtrail).

Choose the name of the database and the table referencing the Parquet results.

Then choose Visualize.

After that, you should see the following screen:

Now you can create some visualizations. First, search for the sourceipaddress column, and drag it to the AutoGraph section.

You can see a list of the IP addresses that you have used to interact with AWS. To review whether these IP addresses have been used from IAM users, internal AWS services, or roles, use the type value that is inside the useridentity field of the original log files. Thanks to the relationalize transformation, this value is available as the useridentity.type column. After the column is added into the Group/Color box, the visualization should look like the following:

You can now see and distinguish the most used IPs and whether they are used from roles, AWS services, or IAM users.

After following all these steps, you can use Amazon QuickSight to add different columns from CloudTrail and perform different types of visualizations. You can build operational dashboards that continuously monitor AWS infrastructure usage and access. You can share those dashboards with others in your organization who might need to see this data.

Summary

In this post, you saw how you can use a simple Lambda function and an AWS Glue script to convert text files into Parquet to improve Athena query performance and data compression. The post also demonstrated how to use AWS Lambda to preprocess files in Amazon S3 and transform them into a format that is recognizable by AWS Glue crawlers.

This example, used AWS CloudTrail logs, but you can apply the proposed solution to any set of files that after preprocessing, can be cataloged by AWS Glue.


Additional Reading

Learn how to Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight.


About the Authors

Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.

 

 

 

Tableau 10.4 Supports Amazon Redshift Spectrum with External Amazon S3 Tables

Post Syndicated from Robin Cottiss original https://aws.amazon.com/blogs/big-data/tableau-10-4-supports-amazon-redshift-spectrum-with-external-amazon-s3-tables/

This is a guest post by Robin Cottiss, strategic customer consultant, Russell Christopher, staff product manager, and Vaidy Krishnan, senior manager of product marketing, at Tableau. Tableau, in their own words, “helps anyone quickly analyze, visualize, and share information. More than 61,000 customer accounts get rapid results with Tableau in the office and on the go. Over 300,000 people use Tableau Public to share public data in their blogs and websites.”

We’re excited to announce today an update to our Amazon Redshift connector with support for Amazon Redshift Spectrum to analyze data in external Amazon S3 tables. This feature, the direct result of joint engineering and testing work performed by the teams at Tableau and AWS, was released as part of Tableau 10.3.3 and will be available broadly in Tableau 10.4.1. With this update, you can quickly and directly connect Tableau to data in Amazon Redshift and analyze it in conjunction with data in Amazon S3—all with drag-and-drop ease.

This connector is yet another in a series of market-leading integrations of Tableau with AWS’s analytics platform, with services such as Amazon Redshift, Amazon EMR, and Amazon Athena. These integrations have allowed Tableau to become the natural choice of tool for analyzing data stored on AWS. Beyond this, Tableau Server runs seamlessly in the AWS Cloud infrastructure. If you prefer to deploy all your applications inside AWS, you have a complete solution offering from Tableau.

How does support for Amazon Redshift Spectrum help you?

If you’re like many Tableau customers, you have large buckets of data stored in Amazon S3. You might need to access this data frequently and store it in a consistent, highly structured format. If so, you can provision it to a data warehouse like Amazon Redshift. You might also want to explore this S3 data on an ad hoc basis. For example, you might want to determine whether or not to provision the data, and where—options might be Hadoop, Impala, Amazon EMR, or Amazon Redshift. To do so, you can use Amazon Athena, a serverless interactive query service from AWS that requires no infrastructure setup and management.

But what if you want to analyze both the frequently accessed data stored locally in Amazon Redshift AND your full datasets stored cost-effectively in Amazon S3? What if you want the throughput of disk and sophisticated query optimization of Amazon Redshift AND a service that combines a serverless scale-out processing capability with the massively reliable and scalable S3 infrastructure? What if you want the super-fast performance of Amazon Redshift AND support for open storage formats (for example, Parquet or ORC) in S3?

To enable these AND and resolve the tyranny of ORs, AWS launched Amazon Redshift Spectrum earlier this year.

Amazon Redshift Spectrum gives you the freedom to store your data where you want, in the format you want, and have it available for processing when you need it. Since the Amazon Redshift Spectrum launch, Tableau has worked tirelessly to provide best-in-class support for this new service. With Tableau and Redshift Spectrum, you can extend your Amazon Redshift analyses out to the entire universe of data in your S3 data lakes.

This latest update has been tested by many customers with very positive feedback. One such customer is the world’s largest food product distributor, Sysco—you can watch their session referencing the Amazon Spectrum integration at Tableau Conference 2017. Sysco also plans to reprise its “Tableau on AWS” story again in a month’s time at AWS re:Invent.

Now, I’d like to use a concrete example to demonstrate how Tableau works with Amazon Redshift Spectrum. In this example, I also show you how and why you might want to connect to your AWS data in different ways.

The setup

I use the pipeline described following to ingest, process, and analyze data with Tableau on an AWS stack. The source data is the New York City Taxi dataset, which has 9 years’ worth of taxi rides activity (including pick-up and drop-off location, amount paid, payment type, and so on) captured in 1.2 billion records.

In this pipeline, this data lands in S3, is cleansed and partitioned by using Amazon EMR, and is then converted to a columnar Parquet format that is analytically optimized. You can point Tableau to the raw data in S3 by using Amazon Athena. You can also access the cleansed data with Tableau using Presto through your Amazon EMR cluster.

Why use Tableau this early in the pipeline? Because sometimes you want to understand what’s there and what questions are worth asking before you even start the analysis.

After you find out what those questions are and determine if this sort of analysis has long-term usefulness, you can automate and optimize that pipeline. You do this to add new data as soon as possible as it arrives, to get it to the processes and people that need it. You might also want to provision this data to a highly performant “hotter” layer (Amazon Redshift or Tableau Extract) for repeated access.

In the illustration preceding, S3 contains the raw denormalized ride data at the timestamp level of granularity. This S3 data is the fact table. Amazon Redshift has the time dimensions broken out by date, month, and year, and also has the taxi zone information.

Now imagine I want to know where and when taxi pickups happen on a certain date in a certain borough. With support for Amazon Redshift Spectrum, I can now join the S3 tables with the Amazon Redshift dimensions, as shown following.

I can next analyze the data in Tableau to produce a borough-by-borough view of New York City ride density on Christmas Day 2015.

Or I can hone in on just Manhattan and identify pickup hotspots, with ride charges way above the average!

With Amazon Redshift Spectrum, you now have a fast, cost-effective engine that minimizes data processed with dynamic partition pruning. You can further improve query performance by reducing the data scanned. You do this by partitioning and compressing data and by using a columnar format for storage.

At the end of the day, which engine you use behind Tableau is a function of what you want to optimize for. Some possible engines are Amazon Athena, Amazon Redshift, and Redshift Spectrum, or you can bring a subset of data into Tableau Extract. Factors in planning optimization include these:

  • Are you comfortable with the serverless cost model of Amazon Athena and potential full scans? Or do you prefer the advantages of no setup?
  • Do you want the throughput of local disk?
  • Effort and time of setup. Are you okay with the lead-time of an Amazon Redshift cluster setup, as opposed to just bringing everything into Tableau Extract?

To meet the many needs of our customers, Tableau’s approach is simple: It’s all about choice. The choice of how you want to connect to and analyze your data. Throughout the history of our product and into the future, we have and will continue to empower choice for customers.

For more on how to deal with choice, as you go about making architecture decisions for your enterprise, watch this big data strategy session my friend Robin Cottiss and I delivered at Tableau Conference 2017. This session includes several customer examples leveraging the Tableau on AWS platform, and also a run-through of the aforementioned demonstration.

If you’re curious to learn more about analyzing data with Tableau on Amazon Redshift we encourage you to check out the following resources:

Hot Startups on AWS – October 2017

Post Syndicated from Tina Barr original https://aws.amazon.com/blogs/aws/hot-startups-on-aws-october-2017/

In 2015, the Centers for Medicare and Medicaid Services (CMS) reported that healthcare spending made up 17.8% of the U.S. GDP – that’s almost $3.2 trillion or $9,990 per person. By 2025, the CMS estimates this number will increase to nearly 20%. As cloud technology evolves in the healthcare and life science industries, we are seeing how companies of all sizes are using AWS to provide powerful and innovative solutions to customers across the globe. This month we are excited to feature the following startups:

  • ClearCare – helping home care agencies operate efficiently and grow their business.
  • DNAnexus – providing a cloud-based global network for sharing and managing genomic data.

ClearCare (San Francisco, CA)

ClearCare envisions a future where home care is the only choice for aging in place. Home care agencies play a critical role in the economy and their communities by significantly lowering the overall cost of care, reducing the number of hospital admissions, and bending the cost curve of aging. Patients receiving home care typically have multiple chronic conditions and functional limitations, driving over $190 billion in healthcare spending in the U.S. each year. To offset these costs, health insurance payers are developing in-home care management programs for patients. ClearCare’s goal is to help home care agencies leverage technology to improve costs, outcomes, and quality of life for the aging population. The company’s powerful software platform is specifically designed for use by non-medical, in-home care agencies to manage their businesses.

Founder and CEO Geoff Nudd created ClearCare because of his own grandmother’s need for care. Keeping family members and caregivers up to date on a loved one’s well being can be difficult, so Geoff created what is now ClearCare’s Family Room, which enables caregivers and agency staff to check schedules and receive real-time updates about what’s happening in the home. Since then, agencies have provided feedback on others areas of their businesses that could be streamlined. ClearCare has now built over 20 modules to help home care agencies optimize operations with services including a telephony service, billing and payroll, and more. ClearCare now serves over 4,000 home care agencies, representing 500,000 caregivers and 400,000 seniors.

Using AWS, ClearCare is able to spin up reliable infrastructure for proofs of concept and iterate on those systems to quickly get value to market. The company runs many AWS services including Amazon Elasticsearch Service, Amazon RDS, and Amazon CloudFront. Amazon EMR and Amazon Athena have enabled ClearCare to build a Hadoop-based ETL and data warehousing system that processes terabytes of data each day. By utilizing these managed services, ClearCare has been able to go from concept to customer delivery in less than three months.

To learn more about ClearCare, check out their website.

DNAnexus (Mountain View, CA)

DNAnexus is accelerating the application of genomic data in precision medicine by providing a cloud-based platform for sharing and managing genomic and biomedical data and analysis tools. The company was founded in 2009 by Stanford graduate student Andreas Sundquist and two Stanford professors Arend Sidow and Serafim Batzoglou, to address the need for scaling secondary analysis of next-generation sequencing (NGS) data in the cloud. The founders quickly learned that users needed a flexible solution to build complex analysis workflows and tools that enable them to share and manage large volumes of data. DNAnexus is optimized to address the challenges of security, scalability, and collaboration for organizations that are pursuing genomic-based approaches to health, both in clinics and research labs. DNAnexus has a global customer base – spanning North America, Europe, Asia-Pacific, South America, and Africa – that runs a million jobs each month and is doubling their storage year-over-year. The company currently stores more than 10 petabytes of biomedical and genomic data. That is equivalent to approximately 100,000 genomes, or in simpler terms, over 50 billion Facebook photos!

DNAnexus is working with its customers to help expand their translational informatics research, which includes expanding into clinical trial genomic services. This will help companies developing different medicines to better stratify clinical trial populations and develop companion tests that enable the right patient to get the right medicine. In collaboration with Janssen Human Microbiome Institute, DNAnexus is also launching Mosaic – a community platform for microbiome research.

AWS provides DNAnexus and its customers the flexibility to grow and scale research programs. Building the technology infrastructure required to manage these projects in-house is expensive and time-consuming. DNAnexus removes that barrier for labs of any size by using AWS scalable cloud resources. The company deploys its customers’ genomic pipelines on Amazon EC2, using Amazon S3 for high-performance, high-durability storage, and Amazon Glacier for low-cost data archiving. DNAnexus is also an AWS Life Sciences Competency Partner.

Learn more about DNAnexus here.

-Tina

Predict Billboard Top 10 Hits Using RStudio, H2O and Amazon Athena

Post Syndicated from Gopal Wunnava original https://aws.amazon.com/blogs/big-data/predict-billboard-top-10-hits-using-rstudio-h2o-and-amazon-athena/

Success in the popular music industry is typically measured in terms of the number of Top 10 hits artists have to their credit. The music industry is a highly competitive multi-billion dollar business, and record labels incur various costs in exchange for a percentage of the profits from sales and concert tickets.

Predicting the success of an artist’s release in the popular music industry can be difficult. One release may be extremely popular, resulting in widespread play on TV, radio and social media, while another single may turn out quite unpopular, and therefore unprofitable. Record labels need to be selective in their decision making, and predictive analytics can help them with decision making around the type of songs and artists they need to promote.

In this walkthrough, you leverage H2O.ai, Amazon Athena, and RStudio to make predictions on whether a song might make it to the Top 10 Billboard charts. You explore the GLM, GBM, and deep learning modeling techniques using H2O’s rapid, distributed and easy-to-use open source parallel processing engine. RStudio is a popular IDE, licensed either commercially or under AGPLv3, for working with R. This is ideal if you don’t want to connect to a server via SSH and use code editors such as vi to do analytics. RStudio is available in a desktop version, or a server version that allows you to access R via a web browser. RStudio’s Notebooks feature is used to demonstrate the execution of code and output. In addition, this post showcases how you can leverage Athena for query and interactive analysis during the modeling phase. A working knowledge of statistics and machine learning would be helpful to interpret the analysis being performed in this post.

Walkthrough

Your goal is to predict whether a song will make it to the Top 10 Billboard charts. For this purpose, you will be using multiple modeling techniques―namely GLM, GBM and deep learning―and choose the model that is the best fit.

This solution involves the following steps:

  • Install and configure RStudio with Athena
  • Log in to RStudio
  • Install R packages
  • Connect to Athena
  • Create a dataset
  • Create models

Install and configure RStudio with Athena

Use the following AWS CloudFormation stack to install, configure, and connect RStudio on an Amazon EC2 instance with Athena.

Launching this stack creates all required resources and prerequisites:

  • Amazon EC2 instance with Amazon Linux (minimum size of t2.large is recommended)
  • Provisioning of the EC2 instance in an existing VPC and public subnet
  • Installation of Java 8
  • Assignment of an IAM role to the EC2 instance with the required permissions for accessing Athena and Amazon S3
  • Security group allowing access to the RStudio and SSH ports from the internet (I recommend restricting access to these ports)
  • S3 staging bucket required for Athena (referenced within RStudio as ATHENABUCKET)
  • RStudio username and password
  • Setup logs in Amazon CloudWatch Logs (if needed for additional troubleshooting)
  • Amazon EC2 Systems Manager agent, which makes it easy to manage and patch

All AWS resources are created in the US-East-1 Region. To avoid cross-region data transfer fees, launch the CloudFormation stack in the same region. To check the availability of Athena in other regions, see Region Table.

Log in to RStudio

The instance security group has been automatically configured to allow incoming connections on the RStudio port 8787 from any source internet address. You can edit the security group to restrict source IP access. If you have trouble connecting, ensure that port 8787 isn’t blocked by subnet network ACLS or by your outgoing proxy/firewall.

  1. In the CloudFormation stack, choose Outputs, Value, and then open the RStudio URL. You might need to wait for a few minutes until the instance has been launched.
  2. Log in to RStudio with the and password you provided during setup.

Install R packages

Next, install the required R packages from the RStudio console. You can download the R notebook file containing just the code.

#install pacman – a handy package manager for managing installs
if("pacman" %in% rownames(installed.packages()) == FALSE)
{install.packages("pacman")}  
library(pacman)
p_load(h2o,rJava,RJDBC,awsjavasdk)
h2o.init(nthreads = -1)
##  Connection successful!
## 
## R is connected to the H2O cluster: 
##     H2O cluster uptime:         2 hours 42 minutes 
##     H2O cluster version:        3.10.4.6 
##     H2O cluster version age:    4 months and 4 days !!! 
##     H2O cluster name:           H2O_started_from_R_rstudio_hjx881 
##     H2O cluster total nodes:    1 
##     H2O cluster total memory:   3.30 GB 
##     H2O cluster total cores:    4 
##     H2O cluster allowed cores:  4 
##     H2O cluster healthy:        TRUE 
##     H2O Connection ip:          localhost 
##     H2O Connection port:        54321 
##     H2O Connection proxy:       NA 
##     H2O Internal Security:      FALSE 
##     R Version:                  R version 3.3.3 (2017-03-06)
## Warning in h2o.clusterInfo(): 
## Your H2O cluster version is too old (4 months and 4 days)!
## Please download and install the latest version from http://h2o.ai/download/
#install aws sdk if not present (pre-requisite for using Athena with an IAM role)
if (!aws_sdk_present()) {
  install_aws_sdk()
}

load_sdk()
## NULL

Connect to Athena

Next, establish a connection to Athena from RStudio, using an IAM role associated with your EC2 instance. Use ATHENABUCKET to specify the S3 staging directory.

URL <- 'https://s3.amazonaws.com/athena-downloads/drivers/AthenaJDBC41-1.0.1.jar'
fil <- basename(URL)
#download the file into current working directory
if (!file.exists(fil)) download.file(URL, fil)
#verify that the file has been downloaded successfully
list.files()
## [1] "AthenaJDBC41-1.0.1.jar"
drv <- JDBC(driverClass="com.amazonaws.athena.jdbc.AthenaDriver", fil, identifier.quote="'")

con <- jdbcConnection <- dbConnect(drv, 'jdbc:awsathena://athena.us-east-1.amazonaws.com:443/',
                                   s3_staging_dir=Sys.getenv("ATHENABUCKET"),
                                   aws_credentials_provider_class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

Verify the connection. The results returned depend on your specific Athena setup.

con
## <JDBCConnection>
dbListTables(con)
##  [1] "gdelt"               "wikistats"           "elb_logs_raw_native"
##  [4] "twitter"             "twitter2"            "usermovieratings"   
##  [7] "eventcodes"          "events"              "billboard"          
## [10] "billboardtop10"      "elb_logs"            "gdelthist"          
## [13] "gdeltmaster"         "twitter"             "twitter3"

Create a dataset

For this analysis, you use a sample dataset combining information from Billboard and Wikipedia with Echo Nest data in the Million Songs Dataset. Upload this dataset into your own S3 bucket. The table below provides a description of the fields used in this dataset.

Field Description
year Year that song was released
songtitle Title of the song
artistname Name of the song artist
songid Unique identifier for the song
artistid Unique identifier for the song artist
timesignature Variable estimating the time signature of the song
timesignature_confidence Confidence in the estimate for the timesignature
loudness Continuous variable indicating the average amplitude of the audio in decibels
tempo Variable indicating the estimated beats per minute of the song
tempo_confidence Confidence in the estimate for tempo
key Variable with twelve levels indicating the estimated key of the song (C, C#, B)
key_confidence Confidence in the estimate for key
energy Variable that represents the overall acoustic energy of the song, using a mix of features such as loudness
pitch Continuous variable that indicates the pitch of the song
timbre_0_min thru timbre_11_min Variables that indicate the minimum values over all segments for each of the twelve values in the timbre vector
timbre_0_max thru timbre_11_max Variables that indicate the maximum values over all segments for each of the twelve values in the timbre vector
top10 Indicator for whether or not the song made it to the Top 10 of the Billboard charts (1 if it was in the top 10, and 0 if not)

Create an Athena table based on the dataset

In the Athena console, select the default database, sampled, or create a new database.

Run the following create table statement.

create external table if not exists billboard
(
year int,
songtitle string,
artistname string,
songID string,
artistID string,
timesignature int,
timesignature_confidence double,
loudness double,
tempo double,
tempo_confidence double,
key int,
key_confidence double,
energy double,
pitch double,
timbre_0_min double,
timbre_0_max double,
timbre_1_min double,
timbre_1_max double,
timbre_2_min double,
timbre_2_max double,
timbre_3_min double,
timbre_3_max double,
timbre_4_min double,
timbre_4_max double,
timbre_5_min double,
timbre_5_max double,
timbre_6_min double,
timbre_6_max double,
timbre_7_min double,
timbre_7_max double,
timbre_8_min double,
timbre_8_max double,
timbre_9_min double,
timbre_9_max double,
timbre_10_min double,
timbre_10_max double,
timbre_11_min double,
timbre_11_max double,
Top10 int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://aws-bigdata-blog/artifacts/predict-billboard/data'
;

Inspect the table definition for the ‘billboard’ table that you have created. If you chose a database other than sampledb, replace that value with your choice.

dbGetQuery(con, "show create table sampledb.billboard")
##                                      createtab_stmt
## 1       CREATE EXTERNAL TABLE `sampledb.billboard`(
## 2                                       `year` int,
## 3                               `songtitle` string,
## 4                              `artistname` string,
## 5                                  `songid` string,
## 6                                `artistid` string,
## 7                              `timesignature` int,
## 8                `timesignature_confidence` double,
## 9                                `loudness` double,
## 10                                  `tempo` double,
## 11                       `tempo_confidence` double,
## 12                                       `key` int,
## 13                         `key_confidence` double,
## 14                                 `energy` double,
## 15                                  `pitch` double,
## 16                           `timbre_0_min` double,
## 17                           `timbre_0_max` double,
## 18                           `timbre_1_min` double,
## 19                           `timbre_1_max` double,
## 20                           `timbre_2_min` double,
## 21                           `timbre_2_max` double,
## 22                           `timbre_3_min` double,
## 23                           `timbre_3_max` double,
## 24                           `timbre_4_min` double,
## 25                           `timbre_4_max` double,
## 26                           `timbre_5_min` double,
## 27                           `timbre_5_max` double,
## 28                           `timbre_6_min` double,
## 29                           `timbre_6_max` double,
## 30                           `timbre_7_min` double,
## 31                           `timbre_7_max` double,
## 32                           `timbre_8_min` double,
## 33                           `timbre_8_max` double,
## 34                           `timbre_9_min` double,
## 35                           `timbre_9_max` double,
## 36                          `timbre_10_min` double,
## 37                          `timbre_10_max` double,
## 38                          `timbre_11_min` double,
## 39                          `timbre_11_max` double,
## 40                                     `top10` int)
## 41                             ROW FORMAT DELIMITED 
## 42                         FIELDS TERMINATED BY ',' 
## 43                            STORED AS INPUTFORMAT 
## 44       'org.apache.hadoop.mapred.TextInputFormat' 
## 45                                     OUTPUTFORMAT 
## 46  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
## 47                                        LOCATION
## 48    's3://aws-bigdata-blog/artifacts/predict-billboard/data'
## 49                                  TBLPROPERTIES (
## 50            'transient_lastDdlTime'='1505484133')

Run a sample query

Next, run a sample query to obtain a list of all songs from Janet Jackson that made it to the Billboard Top 10 charts.

dbGetQuery(con, " SELECT songtitle,artistname,top10   FROM sampledb.billboard WHERE lower(artistname) =     'janet jackson' AND top10 = 1")
##                       songtitle    artistname top10
## 1                       Runaway Janet Jackson     1
## 2               Because Of Love Janet Jackson     1
## 3                         Again Janet Jackson     1
## 4                            If Janet Jackson     1
## 5  Love Will Never Do (Without You) Janet Jackson 1
## 6                     Black Cat Janet Jackson     1
## 7               Come Back To Me Janet Jackson     1
## 8                       Alright Janet Jackson     1
## 9                      Escapade Janet Jackson     1
## 10                Rhythm Nation Janet Jackson     1

Determine how many songs in this dataset are specifically from the year 2010.

dbGetQuery(con, " SELECT count(*)   FROM sampledb.billboard WHERE year = 2010")
##   _col0
## 1   373

The sample dataset provides certain song properties of interest that can be analyzed to gauge the impact to the song’s overall popularity. Look at one such property, timesignature, and determine the value that is the most frequent among songs in the database. Timesignature is a measure of the number of beats and the type of note involved.

Running the query directly may result in an error, as shown in the commented lines below. This error is a result of trying to retrieve a large result set over a JDBC connection, which can cause out-of-memory issues at the client level. To address this, reduce the fetch size and run again.

#t<-dbGetQuery(con, " SELECT timesignature FROM sampledb.billboard")
#Note:  Running the preceding query results in the following error: 
#Error in .jcall(rp, "I", "fetch", stride, block): java.sql.SQLException: The requested #fetchSize is more than the allowed value in Athena. Please reduce the fetchSize and try #again. Refer to the Athena documentation for valid fetchSize values.
# Use the dbSendQuery function, reduce the fetch size, and run again
r <- dbSendQuery(con, " SELECT timesignature     FROM sampledb.billboard")
dftimesignature<- fetch(r, n=-1, block=100)
dbClearResult(r)
## [1] TRUE
table(dftimesignature)
## dftimesignature
##    0    1    3    4    5    7 
##   10  143  503 6787  112   19
nrow(dftimesignature)
## [1] 7574

From the results, observe that 6787 songs have a timesignature of 4.

Next, determine the song with the highest tempo.

dbGetQuery(con, " SELECT songtitle,artistname,tempo   FROM sampledb.billboard WHERE tempo = (SELECT max(tempo) FROM sampledb.billboard) ")
##                   songtitle      artistname   tempo
## 1 Wanna Be Startin' Somethin' Michael Jackson 244.307

Create the training dataset

Your model needs to be trained such that it can learn and make accurate predictions. Split the data into training and test datasets, and create the training dataset first.  This dataset contains all observations from the year 2009 and earlier. You may face the same JDBC connection issue pointed out earlier, so this query uses a fetch size.

#BillboardTrain <- dbGetQuery(con, "SELECT * FROM sampledb.billboard WHERE year <= 2009")
#Running the preceding query results in the following error:-
#Error in .verify.JDBC.result(r, "Unable to retrieve JDBC result set for ", : Unable to retrieve #JDBC result set for SELECT * FROM sampledb.billboard WHERE year <= 2009 (Internal error)
#Follow the same approach as before to address this issue.

r <- dbSendQuery(con, "SELECT * FROM sampledb.billboard WHERE year <= 2009")
BillboardTrain <- fetch(r, n=-1, block=100)
dbClearResult(r)
## [1] TRUE
BillboardTrain[1:2,c(1:3,6:10)]
##   year           songtitle artistname timesignature
## 1 2009 The Awkward Goodbye    Athlete             3
## 2 2009        Rubik's Cube    Athlete             3
##   timesignature_confidence loudness   tempo tempo_confidence
## 1                    0.732   -6.320  89.614   0.652
## 2                    0.906   -9.541 117.742   0.542
nrow(BillboardTrain)
## [1] 7201

Create the test dataset

BillboardTest <- dbGetQuery(con, "SELECT * FROM sampledb.billboard where year = 2010")
BillboardTest[1:2,c(1:3,11:15)]
##   year              songtitle        artistname key
## 1 2010 This Is the House That Doubt Built A Day to Remember  11
## 2 2010        Sticks & Bricks A Day to Remember  10
##   key_confidence    energy pitch timbre_0_min
## 1          0.453 0.9666556 0.024        0.002
## 2          0.469 0.9847095 0.025        0.000
nrow(BillboardTest)
## [1] 373

Convert the training and test datasets into H2O dataframes

train.h2o <- as.h2o(BillboardTrain)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=================================================================| 100%
test.h2o <- as.h2o(BillboardTest)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=================================================================| 100%

Inspect the column names in your H2O dataframes.

colnames(train.h2o)
##  [1] "year"                     "songtitle"               
##  [3] "artistname"               "songid"                  
##  [5] "artistid"                 "timesignature"           
##  [7] "timesignature_confidence" "loudness"                
##  [9] "tempo"                    "tempo_confidence"        
## [11] "key"                      "key_confidence"          
## [13] "energy"                   "pitch"                   
## [15] "timbre_0_min"             "timbre_0_max"            
## [17] "timbre_1_min"             "timbre_1_max"            
## [19] "timbre_2_min"             "timbre_2_max"            
## [21] "timbre_3_min"             "timbre_3_max"            
## [23] "timbre_4_min"             "timbre_4_max"            
## [25] "timbre_5_min"             "timbre_5_max"            
## [27] "timbre_6_min"             "timbre_6_max"            
## [29] "timbre_7_min"             "timbre_7_max"            
## [31] "timbre_8_min"             "timbre_8_max"            
## [33] "timbre_9_min"             "timbre_9_max"            
## [35] "timbre_10_min"            "timbre_10_max"           
## [37] "timbre_11_min"            "timbre_11_max"           
## [39] "top10"

Create models

You need to designate the independent and dependent variables prior to applying your modeling algorithms. Because you’re trying to predict the ‘top10’ field, this would be your dependent variable and everything else would be independent.

Create your first model using GLM. Because GLM works best with numeric data, you create your model by dropping non-numeric variables. You only use the variables in the dataset that describe the numerical attributes of the song in the logistic regression model. You won’t use these variables:  “year”, “songtitle”, “artistname”, “songid”, or “artistid”.

y.dep <- 39
x.indep <- c(6:38)
x.indep
##  [1]  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
## [24] 29 30 31 32 33 34 35 36 37 38

Create Model 1: All numeric variables

Create Model 1 with the training dataset, using GLM as the modeling algorithm and H2O’s built-in h2o.glm function.

modelh1 <- h2o.glm( y = y.dep, x = x.indep, training_frame = train.h2o, family = "binomial")
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=====                                                            |   8%
  |                                                                       
  |=================================================================| 100%

Measure the performance of Model 1, using H2O’s built-in performance function.

h2o.performance(model=modelh1,newdata=test.h2o)
## H2OBinomialMetrics: glm
## 
## MSE:  0.09924684
## RMSE:  0.3150347
## LogLoss:  0.3220267
## Mean Per-Class Error:  0.2380168
## AUC:  0.8431394
## Gini:  0.6862787
## R^2:  0.254663
## Null Deviance:  326.0801
## Residual Deviance:  240.2319
## AIC:  308.2319
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0   1    Error     Rate
## 0      255  59 0.187898  =59/314
## 1       17  42 0.288136   =17/59
## Totals 272 101 0.203753  =76/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                         metric threshold    value idx
## 1                       max f1  0.192772 0.525000 100
## 2                       max f2  0.124912 0.650510 155
## 3                 max f0point5  0.416258 0.612903  23
## 4                 max accuracy  0.416258 0.879357  23
## 5                max precision  0.813396 1.000000   0
## 6                   max recall  0.037579 1.000000 282
## 7              max specificity  0.813396 1.000000   0
## 8             max absolute_mcc  0.416258 0.455251  23
## 9   max min_per_class_accuracy  0.161402 0.738854 125
## 10 max mean_per_class_accuracy  0.124912 0.765006 155
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or ` 
h2o.auc(h2o.performance(modelh1,test.h2o)) 
## [1] 0.8431394

The AUC metric provides insight into how well the classifier is able to separate the two classes. In this case, the value of 0.8431394 indicates that the classification is good. (A value of 0.5 indicates a worthless test, while a value of 1.0 indicates a perfect test.)

Next, inspect the coefficients of the variables in the dataset.

dfmodelh1 <- as.data.frame(h2o.varimp(modelh1))
dfmodelh1
##                       names coefficients sign
## 1              timbre_0_max  1.290938663  NEG
## 2                  loudness  1.262941934  POS
## 3                     pitch  0.616995941  NEG
## 4              timbre_1_min  0.422323735  POS
## 5              timbre_6_min  0.349016024  NEG
## 6                    energy  0.348092062  NEG
## 7             timbre_11_min  0.307331997  NEG
## 8              timbre_3_max  0.302225619  NEG
## 9             timbre_11_max  0.243632060  POS
## 10             timbre_4_min  0.224233951  POS
## 11             timbre_4_max  0.204134342  POS
## 12             timbre_5_min  0.199149324  NEG
## 13             timbre_0_min  0.195147119  POS
## 14 timesignature_confidence  0.179973904  POS
## 15         tempo_confidence  0.144242598  POS
## 16            timbre_10_max  0.137644568  POS
## 17             timbre_7_min  0.126995955  NEG
## 18            timbre_10_min  0.123851179  POS
## 19             timbre_7_max  0.100031481  NEG
## 20             timbre_2_min  0.096127636  NEG
## 21           key_confidence  0.083115820  POS
## 22             timbre_6_max  0.073712419  POS
## 23            timesignature  0.067241917  POS
## 24             timbre_8_min  0.061301881  POS
## 25             timbre_8_max  0.060041698  POS
## 26                      key  0.056158445  POS
## 27             timbre_3_min  0.050825116  POS
## 28             timbre_9_max  0.033733561  POS
## 29             timbre_2_max  0.030939072  POS
## 30             timbre_9_min  0.020708113  POS
## 31             timbre_1_max  0.014228818  NEG
## 32                    tempo  0.008199861  POS
## 33             timbre_5_max  0.004837870  POS
## 34                                    NA <NA>

Typically, songs with heavier instrumentation tend to be louder (have higher values in the variable “loudness”) and more energetic (have higher values in the variable “energy”). This knowledge is helpful for interpreting the modeling results.

You can make the following observations from the results:

  • The coefficient estimates for the confidence values associated with the time signature, key, and tempo variables are positive. This suggests that higher confidence leads to a higher predicted probability of a Top 10 hit.
  • The coefficient estimate for loudness is positive, meaning that mainstream listeners prefer louder songs with heavier instrumentation.
  • The coefficient estimate for energy is negative, meaning that mainstream listeners prefer songs that are less energetic, which are those songs with light instrumentation.

These coefficients lead to contradictory conclusions for Model 1. This could be due to multicollinearity issues. Inspect the correlation between the variables “loudness” and “energy” in the training set.

cor(train.h2o$loudness,train.h2o$energy)
## [1] 0.7399067

This number indicates that these two variables are highly correlated, and Model 1 does indeed suffer from multicollinearity. Typically, you associate a value of -1.0 to -0.5 or 1.0 to 0.5 to indicate strong correlation, and a value of 0.1 to 0.1 to indicate weak correlation. To avoid this correlation issue, omit one of these two variables and re-create the models.

You build two variations of the original model:

  • Model 2, in which you keep “energy” and omit “loudness”
  • Model 3, in which you keep “loudness” and omit “energy”

You compare these two models and choose the model with a better fit for this use case.

Create Model 2: Keep energy and omit loudness

colnames(train.h2o)
##  [1] "year"                     "songtitle"               
##  [3] "artistname"               "songid"                  
##  [5] "artistid"                 "timesignature"           
##  [7] "timesignature_confidence" "loudness"                
##  [9] "tempo"                    "tempo_confidence"        
## [11] "key"                      "key_confidence"          
## [13] "energy"                   "pitch"                   
## [15] "timbre_0_min"             "timbre_0_max"            
## [17] "timbre_1_min"             "timbre_1_max"            
## [19] "timbre_2_min"             "timbre_2_max"            
## [21] "timbre_3_min"             "timbre_3_max"            
## [23] "timbre_4_min"             "timbre_4_max"            
## [25] "timbre_5_min"             "timbre_5_max"            
## [27] "timbre_6_min"             "timbre_6_max"            
## [29] "timbre_7_min"             "timbre_7_max"            
## [31] "timbre_8_min"             "timbre_8_max"            
## [33] "timbre_9_min"             "timbre_9_max"            
## [35] "timbre_10_min"            "timbre_10_max"           
## [37] "timbre_11_min"            "timbre_11_max"           
## [39] "top10"
y.dep <- 39
x.indep <- c(6:7,9:38)
x.indep
##  [1]  6  7  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
## [24] 30 31 32 33 34 35 36 37 38
modelh2 <- h2o.glm( y = y.dep, x = x.indep, training_frame = train.h2o, family = "binomial")
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=======                                                          |  10%
  |                                                                       
  |=================================================================| 100%

Measure the performance of Model 2.

h2o.performance(model=modelh2,newdata=test.h2o)
## H2OBinomialMetrics: glm
## 
## MSE:  0.09922606
## RMSE:  0.3150017
## LogLoss:  0.3228213
## Mean Per-Class Error:  0.2490554
## AUC:  0.8431933
## Gini:  0.6863867
## R^2:  0.2548191
## Null Deviance:  326.0801
## Residual Deviance:  240.8247
## AIC:  306.8247
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0  1    Error     Rate
## 0      280 34 0.108280  =34/314
## 1       23 36 0.389831   =23/59
## Totals 303 70 0.152815  =57/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                         metric threshold    value idx
## 1                       max f1  0.254391 0.558140  69
## 2                       max f2  0.113031 0.647208 157
## 3                 max f0point5  0.413999 0.596026  22
## 4                 max accuracy  0.446250 0.876676  18
## 5                max precision  0.811739 1.000000   0
## 6                   max recall  0.037682 1.000000 283
## 7              max specificity  0.811739 1.000000   0
## 8             max absolute_mcc  0.254391 0.469060  69
## 9   max min_per_class_accuracy  0.141051 0.716561 131
## 10 max mean_per_class_accuracy  0.113031 0.761821 157
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `h2o.gainsLift(<model>, valid=<T/F>, xval=<T/F>)`
dfmodelh2 <- as.data.frame(h2o.varimp(modelh2))
dfmodelh2
##                       names coefficients sign
## 1                     pitch  0.700331511  NEG
## 2              timbre_1_min  0.510270513  POS
## 3              timbre_0_max  0.402059546  NEG
## 4              timbre_6_min  0.333316236  NEG
## 5             timbre_11_min  0.331647383  NEG
## 6              timbre_3_max  0.252425901  NEG
## 7             timbre_11_max  0.227500308  POS
## 8              timbre_4_max  0.210663865  POS
## 9              timbre_0_min  0.208516163  POS
## 10             timbre_5_min  0.202748055  NEG
## 11             timbre_4_min  0.197246582  POS
## 12            timbre_10_max  0.172729619  POS
## 13         tempo_confidence  0.167523934  POS
## 14 timesignature_confidence  0.167398830  POS
## 15             timbre_7_min  0.142450727  NEG
## 16             timbre_8_max  0.093377516  POS
## 17            timbre_10_min  0.090333426  POS
## 18            timesignature  0.085851625  POS
## 19             timbre_7_max  0.083948442  NEG
## 20           key_confidence  0.079657073  POS
## 21             timbre_6_max  0.076426046  POS
## 22             timbre_2_min  0.071957831  NEG
## 23             timbre_9_max  0.071393189  POS
## 24             timbre_8_min  0.070225578  POS
## 25                      key  0.061394702  POS
## 26             timbre_3_min  0.048384697  POS
## 27             timbre_1_max  0.044721121  NEG
## 28                   energy  0.039698433  POS
## 29             timbre_5_max  0.039469064  POS
## 30             timbre_2_max  0.018461133  POS
## 31                    tempo  0.013279926  POS
## 32             timbre_9_min  0.005282143  NEG
## 33                                    NA <NA>

h2o.auc(h2o.performance(modelh2,test.h2o)) 
## [1] 0.8431933

You can make the following observations:

  • The AUC metric is 0.8431933.
  • Inspecting the coefficient of the variable energy, Model 2 suggests that songs with high energy levels tend to be more popular. This is as per expectation.
  • As H2O orders variables by significance, the variable energy is not significant in this model.

You can conclude that Model 2 is not ideal for this use , as energy is not significant.

CreateModel 3: Keep loudness but omit energy

colnames(train.h2o)
##  [1] "year"                     "songtitle"               
##  [3] "artistname"               "songid"                  
##  [5] "artistid"                 "timesignature"           
##  [7] "timesignature_confidence" "loudness"                
##  [9] "tempo"                    "tempo_confidence"        
## [11] "key"                      "key_confidence"          
## [13] "energy"                   "pitch"                   
## [15] "timbre_0_min"             "timbre_0_max"            
## [17] "timbre_1_min"             "timbre_1_max"            
## [19] "timbre_2_min"             "timbre_2_max"            
## [21] "timbre_3_min"             "timbre_3_max"            
## [23] "timbre_4_min"             "timbre_4_max"            
## [25] "timbre_5_min"             "timbre_5_max"            
## [27] "timbre_6_min"             "timbre_6_max"            
## [29] "timbre_7_min"             "timbre_7_max"            
## [31] "timbre_8_min"             "timbre_8_max"            
## [33] "timbre_9_min"             "timbre_9_max"            
## [35] "timbre_10_min"            "timbre_10_max"           
## [37] "timbre_11_min"            "timbre_11_max"           
## [39] "top10"
y.dep <- 39
x.indep <- c(6:12,14:38)
x.indep
##  [1]  6  7  8  9 10 11 12 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
## [24] 30 31 32 33 34 35 36 37 38
modelh3 <- h2o.glm( y = y.dep, x = x.indep, training_frame = train.h2o, family = "binomial")
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |========                                                         |  12%
  |                                                                       
  |=================================================================| 100%
perfh3<-h2o.performance(model=modelh3,newdata=test.h2o)
perfh3
## H2OBinomialMetrics: glm
## 
## MSE:  0.0978859
## RMSE:  0.3128672
## LogLoss:  0.3178367
## Mean Per-Class Error:  0.264925
## AUC:  0.8492389
## Gini:  0.6984778
## R^2:  0.2648836
## Null Deviance:  326.0801
## Residual Deviance:  237.1062
## AIC:  303.1062
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0  1    Error     Rate
## 0      286 28 0.089172  =28/314
## 1       26 33 0.440678   =26/59
## Totals 312 61 0.144772  =54/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                         metric threshold    value idx
## 1                       max f1  0.273799 0.550000  60
## 2                       max f2  0.125503 0.663265 155
## 3                 max f0point5  0.435479 0.628931  24
## 4                 max accuracy  0.435479 0.882038  24
## 5                max precision  0.821606 1.000000   0
## 6                   max recall  0.038328 1.000000 280
## 7              max specificity  0.821606 1.000000   0
## 8             max absolute_mcc  0.435479 0.471426  24
## 9   max min_per_class_accuracy  0.173693 0.745763 120
## 10 max mean_per_class_accuracy  0.125503 0.775073 155
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `h2o.gainsLift(<model>, valid=<T/F>, xval=<T/F>)`
dfmodelh3 <- as.data.frame(h2o.varimp(modelh3))
dfmodelh3
##                       names coefficients sign
## 1              timbre_0_max 1.216621e+00  NEG
## 2                  loudness 9.780973e-01  POS
## 3                     pitch 7.249788e-01  NEG
## 4              timbre_1_min 3.891197e-01  POS
## 5              timbre_6_min 3.689193e-01  NEG
## 6             timbre_11_min 3.086673e-01  NEG
## 7              timbre_3_max 3.025593e-01  NEG
## 8             timbre_11_max 2.459081e-01  POS
## 9              timbre_4_min 2.379749e-01  POS
## 10             timbre_4_max 2.157627e-01  POS
## 11             timbre_0_min 1.859531e-01  POS
## 12             timbre_5_min 1.846128e-01  NEG
## 13 timesignature_confidence 1.729658e-01  POS
## 14             timbre_7_min 1.431871e-01  NEG
## 15            timbre_10_max 1.366703e-01  POS
## 16            timbre_10_min 1.215954e-01  POS
## 17         tempo_confidence 1.183698e-01  POS
## 18             timbre_2_min 1.019149e-01  NEG
## 19           key_confidence 9.109701e-02  POS
## 20             timbre_7_max 8.987908e-02  NEG
## 21             timbre_6_max 6.935132e-02  POS
## 22             timbre_8_max 6.878241e-02  POS
## 23            timesignature 6.120105e-02  POS
## 24                      key 5.814805e-02  POS
## 25             timbre_8_min 5.759228e-02  POS
## 26             timbre_1_max 2.930285e-02  NEG
## 27             timbre_9_max 2.843755e-02  POS
## 28             timbre_3_min 2.380245e-02  POS
## 29             timbre_2_max 1.917035e-02  POS
## 30             timbre_5_max 1.715813e-02  POS
## 31                    tempo 1.364418e-02  NEG
## 32             timbre_9_min 8.463143e-05  NEG
## 33                                    NA <NA>
h2o.sensitivity(perfh3,0.5)
## Warning in h2o.find_row_by_threshold(object, t): Could not find exact
## threshold: 0.5 for this set of metrics; using closest threshold found:
## 0.501855569251422. Run `h2o.predict` and apply your desired threshold on a
## probability column.
## [[1]]
## [1] 0.2033898
h2o.auc(perfh3)
## [1] 0.8492389

You can make the following observations:

  • The AUC metric is 0.8492389.
  • From the confusion matrix, the model correctly predicts that 33 songs will be top 10 hits (true positives). However, it has 26 false positives (songs that the model predicted would be Top 10 hits, but ended up not being Top 10 hits).
  • Loudness has a positive coefficient estimate, meaning that this model predicts that songs with heavier instrumentation tend to be more popular. This is the same conclusion from Model 2.
  • Loudness is significant in this model.

Overall, Model 3 predicts a higher number of top 10 hits with an accuracy rate that is acceptable. To choose the best fit for production runs, record labels should consider the following factors:

  • Desired model accuracy at a given threshold
  • Number of correct predictions for top10 hits
  • Tolerable number of false positives or false negatives

Next, make predictions using Model 3 on the test dataset.

predict.regh <- h2o.predict(modelh3, test.h2o)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=================================================================| 100%
print(predict.regh)
##   predict        p0          p1
## 1       0 0.9654739 0.034526052
## 2       0 0.9654748 0.034525236
## 3       0 0.9635547 0.036445318
## 4       0 0.9343579 0.065642149
## 5       0 0.9978334 0.002166601
## 6       0 0.9779949 0.022005078
## 
## [373 rows x 3 columns]
predict.regh$predict
##   predict
## 1       0
## 2       0
## 3       0
## 4       0
## 5       0
## 6       0
## 
## [373 rows x 1 column]
dpr<-as.data.frame(predict.regh)
#Rename the predicted column 
colnames(dpr)[colnames(dpr) == 'predict'] <- 'predict_top10'
table(dpr$predict_top10)
## 
##   0   1 
## 312  61

The first set of output results specifies the probabilities associated with each predicted observation.  For example, observation 1 is 96.54739% likely to not be a Top 10 hit, and 3.4526052% likely to be a Top 10 hit (predict=1 indicates Top 10 hit and predict=0 indicates not a Top 10 hit).  The second set of results list the actual predictions made.  From the third set of results, this model predicts that 61 songs will be top 10 hits.

Compute the baseline accuracy, by assuming that the baseline predicts the most frequent outcome, which is that most songs are not Top 10 hits.

table(BillboardTest$top10)
## 
##   0   1 
## 314  59

Now observe that the baseline model would get 314 observations correct, and 59 wrong, for an accuracy of 314/(314+59) = 0.8418231.

It seems that Model 3, with an accuracy of 0.8552, provides you with a small improvement over the baseline model. But is this model useful for record labels?

View the two models from an investment perspective:

  • A production company is interested in investing in songs that are more likely to make it to the Top 10. The company’s objective is to minimize the risk of financial losses attributed to investing in songs that end up unpopular.
  • How many songs does Model 3 correctly predict as a Top 10 hit in 2010? Looking at the confusion matrix, you see that it predicts 33 top 10 hits correctly at an optimal threshold, which is more than half the number
  • It will be more useful to the record label if you can provide the production company with a list of songs that are highly likely to end up in the Top 10.
  • The baseline model is not useful, as it simply does not label any song as a hit.

Considering the three models built so far, you can conclude that Model 3 proves to be the best investment choice for the record label.

GBM model

H2O provides you with the ability to explore other learning models, such as GBM and deep learning. Explore building a model using the GBM technique, using the built-in h2o.gbm function.

Before you do this, you need to convert the target variable to a factor for multinomial classification techniques.

train.h2o$top10=as.factor(train.h2o$top10)
gbm.modelh <- h2o.gbm(y=y.dep, x=x.indep, training_frame = train.h2o, ntrees = 500, max_depth = 4, learn_rate = 0.01, seed = 1122,distribution="multinomial")
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |===                                                              |   5%
  |                                                                       
  |=====                                                            |   7%
  |                                                                       
  |======                                                           |   9%
  |                                                                       
  |=======                                                          |  10%
  |                                                                       
  |======================                                           |  33%
  |                                                                       
  |=====================================                            |  56%
  |                                                                       
  |====================================================             |  79%
  |                                                                       
  |================================================================ |  98%
  |                                                                       
  |=================================================================| 100%
perf.gbmh<-h2o.performance(gbm.modelh,test.h2o)
perf.gbmh
## H2OBinomialMetrics: gbm
## 
## MSE:  0.09860778
## RMSE:  0.3140188
## LogLoss:  0.3206876
## Mean Per-Class Error:  0.2120263
## AUC:  0.8630573
## Gini:  0.7261146
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0  1    Error     Rate
## 0      266 48 0.152866  =48/314
## 1       16 43 0.271186   =16/59
## Totals 282 91 0.171582  =64/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                       metric threshold    value idx
## 1                     max f1  0.189757 0.573333  90
## 2                     max f2  0.130895 0.693717 145
## 3               max f0point5  0.327346 0.598802  26
## 4               max accuracy  0.442757 0.876676  14
## 5              max precision  0.802184 1.000000   0
## 6                 max recall  0.049990 1.000000 284
## 7            max specificity  0.802184 1.000000   0
## 8           max absolute_mcc  0.169135 0.496486 104
## 9 max min_per_class_accuracy  0.169135 0.796610 104
## 10 max mean_per_class_accuracy  0.169135 0.805948 104
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `
h2o.sensitivity(perf.gbmh,0.5)
## Warning in h2o.find_row_by_threshold(object, t): Could not find exact
## threshold: 0.5 for this set of metrics; using closest threshold found:
## 0.501205344484314. Run `h2o.predict` and apply your desired threshold on a
## probability column.
## [[1]]
## [1] 0.1355932
h2o.auc(perf.gbmh)
## [1] 0.8630573

This model correctly predicts 43 top 10 hits, which is 10 more than the number predicted by Model 3. Moreover, the AUC metric is higher than the one obtained from Model 3.

As seen above, H2O’s API provides the ability to obtain key statistical measures required to analyze the models easily, using several built-in functions. The record label can experiment with different parameters to arrive at the model that predicts the maximum number of Top 10 hits at the desired level of accuracy and threshold.

H2O also allows you to experiment with deep learning models. Deep learning models have the ability to learn features implicitly, but can be more expensive computationally.

Now, create a deep learning model with the h2o.deeplearning function, using the same training and test datasets created before. The time taken to run this model depends on the type of EC2 instance chosen for this purpose.  For models that require more computation, consider using accelerated computing instances such as the P2 instance type.

system.time(
  dlearning.modelh <- h2o.deeplearning(y = y.dep,
                                      x = x.indep,
                                      training_frame = train.h2o,
                                      epoch = 250,
                                      hidden = c(250,250),
                                      activation = "Rectifier",
                                      seed = 1122,
                                      distribution="multinomial"
  )
)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |===                                                              |   4%
  |                                                                       
  |=====                                                            |   8%
  |                                                                       
  |========                                                         |  12%
  |                                                                       
  |==========                                                       |  16%
  |                                                                       
  |=============                                                    |  20%
  |                                                                       
  |================                                                 |  24%
  |                                                                       
  |==================                                               |  28%
  |                                                                       
  |=====================                                            |  32%
  |                                                                       
  |=======================                                          |  36%
  |                                                                       
  |==========================                                       |  40%
  |                                                                       
  |=============================                                    |  44%
  |                                                                       
  |===============================                                  |  48%
  |                                                                       
  |==================================                               |  52%
  |                                                                       
  |====================================                             |  56%
  |                                                                       
  |=======================================                          |  60%
  |                                                                       
  |==========================================                       |  64%
  |                                                                       
  |============================================                     |  68%
  |                                                                       
  |===============================================                  |  72%
  |                                                                       
  |=================================================                |  76%
  |                                                                       
  |====================================================             |  80%
  |                                                                       
  |=======================================================          |  84%
  |                                                                       
  |=========================================================        |  88%
  |                                                                       
  |============================================================     |  92%
  |                                                                       
  |==============================================================   |  96%
  |                                                                       
  |=================================================================| 100%
##    user  system elapsed 
##   1.216   0.020 166.508
perf.dl<-h2o.performance(model=dlearning.modelh,newdata=test.h2o)
perf.dl
## H2OBinomialMetrics: deeplearning
## 
## MSE:  0.1678359
## RMSE:  0.4096778
## LogLoss:  1.86509
## Mean Per-Class Error:  0.3433013
## AUC:  0.7568822
## Gini:  0.5137644
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0  1    Error     Rate
## 0      290 24 0.076433  =24/314
## 1       36 23 0.610169   =36/59
## Totals 326 47 0.160858  =60/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                       metric threshold    value idx
## 1                     max f1  0.826267 0.433962  46
## 2                     max f2  0.000000 0.588235 239
## 3               max f0point5  0.999929 0.511811  16
## 4               max accuracy  0.999999 0.865952  10
## 5              max precision  1.000000 1.000000   0
## 6                 max recall  0.000000 1.000000 326
## 7            max specificity  1.000000 1.000000   0
## 8           max absolute_mcc  0.999929 0.363219  16
## 9 max min_per_class_accuracy  0.000004 0.662420 145
## 10 max mean_per_class_accuracy  0.000000 0.685334 224
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `h2o.gainsLift(<model>, valid=<T/F>, xval=<T/F>)`
h2o.sensitivity(perf.dl,0.5)
## Warning in h2o.find_row_by_threshold(object, t): Could not find exact
## threshold: 0.5 for this set of metrics; using closest threshold found:
## 0.496293348880151. Run `h2o.predict` and apply your desired threshold on a
## probability column.
## [[1]]
## [1] 0.3898305
h2o.auc(perf.dl)
## [1] 0.7568822

The AUC metric for this model is 0.7568822, which is less than what you got from the earlier models. I recommend further experimentation using different hyper parameters, such as the learning rate, epoch or the number of hidden layers.

H2O’s built-in functions provide many key statistical measures that can help measure model performance. Here are some of these key terms.

Metric Description
Sensitivity Measures the proportion of positives that have been correctly identified. It is also called the true positive rate, or recall.
Specificity Measures the proportion of negatives that have been correctly identified. It is also called the true negative rate.
Threshold Cutoff point that maximizes specificity and sensitivity. While the model may not provide the highest prediction at this point, it would not be biased towards positives or negatives.
Precision The fraction of the documents retrieved that are relevant to the information needed, for example, how many of the positively classified are relevant
AUC

Provides insight into how well the classifier is able to separate the two classes. The implicit goal is to deal with situations where the sample distribution is highly skewed, with a tendency to overfit to a single class.

0.90 – 1 = excellent (A)

0.8 – 0.9 = good (B)

0.7 – 0.8 = fair (C)

.6 – 0.7 = poor (D)

0.5 – 0.5 = fail (F)

Here’s a summary of the metrics generated from H2O’s built-in functions for the three models that produced useful results.

Metric Model 3 GBM Model Deep Learning Model

Accuracy

(max)

0.882038

(t=0.435479)

0.876676

(t=0.442757)

0.865952

(t=0.999999)

Precision

(max)

1.0

(t=0.821606)

1.0

(t=0802184)

1.0

(t=1.0)

Recall

(max)

1.0 1.0

1.0

(t=0)

Specificity

(max)

1.0 1.0

1.0

(t=1)

Sensitivity

 

0.2033898 0.1355932

0.3898305

(t=0.5)

AUC 0.8492389 0.8630573 0.756882

Note: ‘t’ denotes threshold.

Your options at this point could be narrowed down to Model 3 and the GBM model, based on the AUC and accuracy metrics observed earlier.  If the slightly lower accuracy of the GBM model is deemed acceptable, the record label can choose to go to production with the GBM model, as it can predict a higher number of Top 10 hits.  The AUC metric for the GBM model is also higher than that of Model 3.

Record labels can experiment with different learning techniques and parameters before arriving at a model that proves to be the best fit for their business. Because deep learning models can be computationally expensive, record labels can choose more powerful EC2 instances on AWS to run their experiments faster.

Conclusion

In this post, I showed how the popular music industry can use analytics to predict the type of songs that make the Top 10 Billboard charts. By running H2O’s scalable machine learning platform on AWS, data scientists can easily experiment with multiple modeling techniques and interactively query the data using Amazon Athena, without having to manage the underlying infrastructure. This helps record labels make critical decisions on the type of artists and songs to promote in a timely fashion, thereby increasing sales and revenue.

If you have questions or suggestions, please comment below.


Additional Reading

Learn how to build and explore a simple geospita simple GEOINT application using SparkR.


About the Authors

gopalGopal Wunnava is a Partner Solution Architect with the AWS GSI Team. He works with partners and customers on big data engagements, and is passionate about building analytical solutions that drive business capabilities and decision making. In his spare time, he loves all things sports and movies related and is fond of old classics like Asterix, Obelix comics and Hitchcock movies.

 

 

Bob Strahan, a Senior Consultant with AWS Professional Services, contributed to this post.