Tag Archives: bigdata

Connect, collaborate, and learn at AWS Global Summits in 2018

Post Syndicated from Tina Kelleher original https://aws.amazon.com/blogs/big-data/connect-collaborate-and-learn-at-aws-global-summits-in-2018/

Regardless of your career path, there’s no denying that attending industry events can provide helpful career development opportunities — not only for improving and expanding your skill sets, but for networking as well. According to this article from PayScale.com, experts estimate that somewhere between 70-85% of new positions are landed through networking.

Narrowing our focus to networking opportunities with cloud computing professionals who’re working on tackling some of today’s most innovative and exciting big data solutions, attending big data-focused sessions at an AWS Global Summit is a great place to start.

AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. As the name suggests, these summits are held in major cities around the world, and attract technologists from all industries and skill levels who’re interested in hearing from AWS leaders, experts, partners, and customers.

In addition to networking opportunities with top cloud technology providers, consultants and your peers in our Partner and Solutions Expo, you’ll also hone your AWS skills by attending and participating in a multitude of education and training opportunities.

Here’s a brief sampling of some of the upcoming sessions relevant to big data professionals:

May 31st : Big Data Architectural Patterns and Best Practices on AWS | AWS Summit – Mexico City

June 6th-7th: Various (click on the “Big Data & Analytics” header) | AWS Summit – Berlin

June 20-21st : [email protected] | Public Sector Summit – Washington DC

June 21st: Enabling Self Service for Data Scientists with AWS Service Catalog | AWS Summit – Sao Paulo

Be sure to check out the main page for AWS Global Summits, where you can see which cities have AWS Summits planned for 2018, register to attend an upcoming event, or provide your information to be notified when registration opens for a future event.

Predict Billboard Top 10 Hits Using RStudio, H2O and Amazon Athena

Post Syndicated from Gopal Wunnava original https://aws.amazon.com/blogs/big-data/predict-billboard-top-10-hits-using-rstudio-h2o-and-amazon-athena/

Success in the popular music industry is typically measured in terms of the number of Top 10 hits artists have to their credit. The music industry is a highly competitive multi-billion dollar business, and record labels incur various costs in exchange for a percentage of the profits from sales and concert tickets.

Predicting the success of an artist’s release in the popular music industry can be difficult. One release may be extremely popular, resulting in widespread play on TV, radio and social media, while another single may turn out quite unpopular, and therefore unprofitable. Record labels need to be selective in their decision making, and predictive analytics can help them with decision making around the type of songs and artists they need to promote.

In this walkthrough, you leverage H2O.ai, Amazon Athena, and RStudio to make predictions on whether a song might make it to the Top 10 Billboard charts. You explore the GLM, GBM, and deep learning modeling techniques using H2O’s rapid, distributed and easy-to-use open source parallel processing engine. RStudio is a popular IDE, licensed either commercially or under AGPLv3, for working with R. This is ideal if you don’t want to connect to a server via SSH and use code editors such as vi to do analytics. RStudio is available in a desktop version, or a server version that allows you to access R via a web browser. RStudio’s Notebooks feature is used to demonstrate the execution of code and output. In addition, this post showcases how you can leverage Athena for query and interactive analysis during the modeling phase. A working knowledge of statistics and machine learning would be helpful to interpret the analysis being performed in this post.

Walkthrough

Your goal is to predict whether a song will make it to the Top 10 Billboard charts. For this purpose, you will be using multiple modeling techniques―namely GLM, GBM and deep learning―and choose the model that is the best fit.

This solution involves the following steps:

  • Install and configure RStudio with Athena
  • Log in to RStudio
  • Install R packages
  • Connect to Athena
  • Create a dataset
  • Create models

Install and configure RStudio with Athena

Use the following AWS CloudFormation stack to install, configure, and connect RStudio on an Amazon EC2 instance with Athena.

Launching this stack creates all required resources and prerequisites:

  • Amazon EC2 instance with Amazon Linux (minimum size of t2.large is recommended)
  • Provisioning of the EC2 instance in an existing VPC and public subnet
  • Installation of Java 8
  • Assignment of an IAM role to the EC2 instance with the required permissions for accessing Athena and Amazon S3
  • Security group allowing access to the RStudio and SSH ports from the internet (I recommend restricting access to these ports)
  • S3 staging bucket required for Athena (referenced within RStudio as ATHENABUCKET)
  • RStudio username and password
  • Setup logs in Amazon CloudWatch Logs (if needed for additional troubleshooting)
  • Amazon EC2 Systems Manager agent, which makes it easy to manage and patch

All AWS resources are created in the US-East-1 Region. To avoid cross-region data transfer fees, launch the CloudFormation stack in the same region. To check the availability of Athena in other regions, see Region Table.

Log in to RStudio

The instance security group has been automatically configured to allow incoming connections on the RStudio port 8787 from any source internet address. You can edit the security group to restrict source IP access. If you have trouble connecting, ensure that port 8787 isn’t blocked by subnet network ACLS or by your outgoing proxy/firewall.

  1. In the CloudFormation stack, choose Outputs, Value, and then open the RStudio URL. You might need to wait for a few minutes until the instance has been launched.
  2. Log in to RStudio with the and password you provided during setup.

Install R packages

Next, install the required R packages from the RStudio console. You can download the R notebook file containing just the code.

#install pacman – a handy package manager for managing installs
if("pacman" %in% rownames(installed.packages()) == FALSE)
{install.packages("pacman")}  
library(pacman)
p_load(h2o,rJava,RJDBC,awsjavasdk)
h2o.init(nthreads = -1)
##  Connection successful!
## 
## R is connected to the H2O cluster: 
##     H2O cluster uptime:         2 hours 42 minutes 
##     H2O cluster version:        3.10.4.6 
##     H2O cluster version age:    4 months and 4 days !!! 
##     H2O cluster name:           H2O_started_from_R_rstudio_hjx881 
##     H2O cluster total nodes:    1 
##     H2O cluster total memory:   3.30 GB 
##     H2O cluster total cores:    4 
##     H2O cluster allowed cores:  4 
##     H2O cluster healthy:        TRUE 
##     H2O Connection ip:          localhost 
##     H2O Connection port:        54321 
##     H2O Connection proxy:       NA 
##     H2O Internal Security:      FALSE 
##     R Version:                  R version 3.3.3 (2017-03-06)
## Warning in h2o.clusterInfo(): 
## Your H2O cluster version is too old (4 months and 4 days)!
## Please download and install the latest version from http://h2o.ai/download/
#install aws sdk if not present (pre-requisite for using Athena with an IAM role)
if (!aws_sdk_present()) {
  install_aws_sdk()
}

load_sdk()
## NULL

Connect to Athena

Next, establish a connection to Athena from RStudio, using an IAM role associated with your EC2 instance. Use ATHENABUCKET to specify the S3 staging directory.

URL <- 'https://s3.amazonaws.com/athena-downloads/drivers/AthenaJDBC41-1.0.1.jar'
fil <- basename(URL)
#download the file into current working directory
if (!file.exists(fil)) download.file(URL, fil)
#verify that the file has been downloaded successfully
list.files()
## [1] "AthenaJDBC41-1.0.1.jar"
drv <- JDBC(driverClass="com.amazonaws.athena.jdbc.AthenaDriver", fil, identifier.quote="'")

con <- jdbcConnection <- dbConnect(drv, 'jdbc:awsathena://athena.us-east-1.amazonaws.com:443/',
                                   s3_staging_dir=Sys.getenv("ATHENABUCKET"),
                                   aws_credentials_provider_class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

Verify the connection. The results returned depend on your specific Athena setup.

con
## <JDBCConnection>
dbListTables(con)
##  [1] "gdelt"               "wikistats"           "elb_logs_raw_native"
##  [4] "twitter"             "twitter2"            "usermovieratings"   
##  [7] "eventcodes"          "events"              "billboard"          
## [10] "billboardtop10"      "elb_logs"            "gdelthist"          
## [13] "gdeltmaster"         "twitter"             "twitter3"

Create a dataset

For this analysis, you use a sample dataset combining information from Billboard and Wikipedia with Echo Nest data in the Million Songs Dataset. Upload this dataset into your own S3 bucket. The table below provides a description of the fields used in this dataset.

Field Description
yearYear that song was released
songtitleTitle of the song
artistnameName of the song artist
songidUnique identifier for the song
artistidUnique identifier for the song artist
timesignatureVariable estimating the time signature of the song
timesignature_confidenceConfidence in the estimate for the timesignature
loudnessContinuous variable indicating the average amplitude of the audio in decibels
tempoVariable indicating the estimated beats per minute of the song
tempo_confidenceConfidence in the estimate for tempo
keyVariable with twelve levels indicating the estimated key of the song (C, C#, B)
key_confidenceConfidence in the estimate for key
energyVariable that represents the overall acoustic energy of the song, using a mix of features such as loudness
pitchContinuous variable that indicates the pitch of the song
timbre_0_min thru timbre_11_minVariables that indicate the minimum values over all segments for each of the twelve values in the timbre vector
timbre_0_max thru timbre_11_maxVariables that indicate the maximum values over all segments for each of the twelve values in the timbre vector
top10Indicator for whether or not the song made it to the Top 10 of the Billboard charts (1 if it was in the top 10, and 0 if not)

Create an Athena table based on the dataset

In the Athena console, select the default database, sampled, or create a new database.

Run the following create table statement.

create external table if not exists billboard
(
year int,
songtitle string,
artistname string,
songID string,
artistID string,
timesignature int,
timesignature_confidence double,
loudness double,
tempo double,
tempo_confidence double,
key int,
key_confidence double,
energy double,
pitch double,
timbre_0_min double,
timbre_0_max double,
timbre_1_min double,
timbre_1_max double,
timbre_2_min double,
timbre_2_max double,
timbre_3_min double,
timbre_3_max double,
timbre_4_min double,
timbre_4_max double,
timbre_5_min double,
timbre_5_max double,
timbre_6_min double,
timbre_6_max double,
timbre_7_min double,
timbre_7_max double,
timbre_8_min double,
timbre_8_max double,
timbre_9_min double,
timbre_9_max double,
timbre_10_min double,
timbre_10_max double,
timbre_11_min double,
timbre_11_max double,
Top10 int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3://aws-bigdata-blog/artifacts/predict-billboard/data'
;

Inspect the table definition for the ‘billboard’ table that you have created. If you chose a database other than sampledb, replace that value with your choice.

dbGetQuery(con, "show create table sampledb.billboard")
##                                      createtab_stmt
## 1       CREATE EXTERNAL TABLE `sampledb.billboard`(
## 2                                       `year` int,
## 3                               `songtitle` string,
## 4                              `artistname` string,
## 5                                  `songid` string,
## 6                                `artistid` string,
## 7                              `timesignature` int,
## 8                `timesignature_confidence` double,
## 9                                `loudness` double,
## 10                                  `tempo` double,
## 11                       `tempo_confidence` double,
## 12                                       `key` int,
## 13                         `key_confidence` double,
## 14                                 `energy` double,
## 15                                  `pitch` double,
## 16                           `timbre_0_min` double,
## 17                           `timbre_0_max` double,
## 18                           `timbre_1_min` double,
## 19                           `timbre_1_max` double,
## 20                           `timbre_2_min` double,
## 21                           `timbre_2_max` double,
## 22                           `timbre_3_min` double,
## 23                           `timbre_3_max` double,
## 24                           `timbre_4_min` double,
## 25                           `timbre_4_max` double,
## 26                           `timbre_5_min` double,
## 27                           `timbre_5_max` double,
## 28                           `timbre_6_min` double,
## 29                           `timbre_6_max` double,
## 30                           `timbre_7_min` double,
## 31                           `timbre_7_max` double,
## 32                           `timbre_8_min` double,
## 33                           `timbre_8_max` double,
## 34                           `timbre_9_min` double,
## 35                           `timbre_9_max` double,
## 36                          `timbre_10_min` double,
## 37                          `timbre_10_max` double,
## 38                          `timbre_11_min` double,
## 39                          `timbre_11_max` double,
## 40                                     `top10` int)
## 41                             ROW FORMAT DELIMITED 
## 42                         FIELDS TERMINATED BY ',' 
## 43                            STORED AS INPUTFORMAT 
## 44       'org.apache.hadoop.mapred.TextInputFormat' 
## 45                                     OUTPUTFORMAT 
## 46  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
## 47                                        LOCATION
## 48    's3://aws-bigdata-blog/artifacts/predict-billboard/data'
## 49                                  TBLPROPERTIES (
## 50            'transient_lastDdlTime'='1505484133')

Run a sample query

Next, run a sample query to obtain a list of all songs from Janet Jackson that made it to the Billboard Top 10 charts.

dbGetQuery(con, " SELECT songtitle,artistname,top10   FROM sampledb.billboard WHERE lower(artistname) =     'janet jackson' AND top10 = 1")
##                       songtitle    artistname top10
## 1                       Runaway Janet Jackson     1
## 2               Because Of Love Janet Jackson     1
## 3                         Again Janet Jackson     1
## 4                            If Janet Jackson     1
## 5  Love Will Never Do (Without You) Janet Jackson 1
## 6                     Black Cat Janet Jackson     1
## 7               Come Back To Me Janet Jackson     1
## 8                       Alright Janet Jackson     1
## 9                      Escapade Janet Jackson     1
## 10                Rhythm Nation Janet Jackson     1

Determine how many songs in this dataset are specifically from the year 2010.

dbGetQuery(con, " SELECT count(*)   FROM sampledb.billboard WHERE year = 2010")
##   _col0
## 1   373

The sample dataset provides certain song properties of interest that can be analyzed to gauge the impact to the song’s overall popularity. Look at one such property, timesignature, and determine the value that is the most frequent among songs in the database. Timesignature is a measure of the number of beats and the type of note involved.

Running the query directly may result in an error, as shown in the commented lines below. This error is a result of trying to retrieve a large result set over a JDBC connection, which can cause out-of-memory issues at the client level. To address this, reduce the fetch size and run again.

#t<-dbGetQuery(con, " SELECT timesignature FROM sampledb.billboard")
#Note:  Running the preceding query results in the following error: 
#Error in .jcall(rp, "I", "fetch", stride, block): java.sql.SQLException: The requested #fetchSize is more than the allowed value in Athena. Please reduce the fetchSize and try #again. Refer to the Athena documentation for valid fetchSize values.
# Use the dbSendQuery function, reduce the fetch size, and run again
r <- dbSendQuery(con, " SELECT timesignature     FROM sampledb.billboard")
dftimesignature<- fetch(r, n=-1, block=100)
dbClearResult(r)
## [1] TRUE
table(dftimesignature)
## dftimesignature
##    0    1    3    4    5    7 
##   10  143  503 6787  112   19
nrow(dftimesignature)
## [1] 7574

From the results, observe that 6787 songs have a timesignature of 4.

Next, determine the song with the highest tempo.

dbGetQuery(con, " SELECT songtitle,artistname,tempo   FROM sampledb.billboard WHERE tempo = (SELECT max(tempo) FROM sampledb.billboard) ")
##                   songtitle      artistname   tempo
## 1 Wanna Be Startin' Somethin' Michael Jackson 244.307

Create the training dataset

Your model needs to be trained such that it can learn and make accurate predictions. Split the data into training and test datasets, and create the training dataset first.  This dataset contains all observations from the year 2009 and earlier. You may face the same JDBC connection issue pointed out earlier, so this query uses a fetch size.

#BillboardTrain <- dbGetQuery(con, "SELECT * FROM sampledb.billboard WHERE year <= 2009")
#Running the preceding query results in the following error:-
#Error in .verify.JDBC.result(r, "Unable to retrieve JDBC result set for ", : Unable to retrieve #JDBC result set for SELECT * FROM sampledb.billboard WHERE year <= 2009 (Internal error)
#Follow the same approach as before to address this issue.

r <- dbSendQuery(con, "SELECT * FROM sampledb.billboard WHERE year <= 2009")
BillboardTrain <- fetch(r, n=-1, block=100)
dbClearResult(r)
## [1] TRUE
BillboardTrain[1:2,c(1:3,6:10)]
##   year           songtitle artistname timesignature
## 1 2009 The Awkward Goodbye    Athlete             3
## 2 2009        Rubik's Cube    Athlete             3
##   timesignature_confidence loudness   tempo tempo_confidence
## 1                    0.732   -6.320  89.614   0.652
## 2                    0.906   -9.541 117.742   0.542
nrow(BillboardTrain)
## [1] 7201

Create the test dataset

BillboardTest <- dbGetQuery(con, "SELECT * FROM sampledb.billboard where year = 2010")
BillboardTest[1:2,c(1:3,11:15)]
##   year              songtitle        artistname key
## 1 2010 This Is the House That Doubt Built A Day to Remember  11
## 2 2010        Sticks & Bricks A Day to Remember  10
##   key_confidence    energy pitch timbre_0_min
## 1          0.453 0.9666556 0.024        0.002
## 2          0.469 0.9847095 0.025        0.000
nrow(BillboardTest)
## [1] 373

Convert the training and test datasets into H2O dataframes

train.h2o <- as.h2o(BillboardTrain)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=================================================================| 100%
test.h2o <- as.h2o(BillboardTest)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=================================================================| 100%

Inspect the column names in your H2O dataframes.

colnames(train.h2o)
##  [1] "year"                     "songtitle"               
##  [3] "artistname"               "songid"                  
##  [5] "artistid"                 "timesignature"           
##  [7] "timesignature_confidence" "loudness"                
##  [9] "tempo"                    "tempo_confidence"        
## [11] "key"                      "key_confidence"          
## [13] "energy"                   "pitch"                   
## [15] "timbre_0_min"             "timbre_0_max"            
## [17] "timbre_1_min"             "timbre_1_max"            
## [19] "timbre_2_min"             "timbre_2_max"            
## [21] "timbre_3_min"             "timbre_3_max"            
## [23] "timbre_4_min"             "timbre_4_max"            
## [25] "timbre_5_min"             "timbre_5_max"            
## [27] "timbre_6_min"             "timbre_6_max"            
## [29] "timbre_7_min"             "timbre_7_max"            
## [31] "timbre_8_min"             "timbre_8_max"            
## [33] "timbre_9_min"             "timbre_9_max"            
## [35] "timbre_10_min"            "timbre_10_max"           
## [37] "timbre_11_min"            "timbre_11_max"           
## [39] "top10"

Create models

You need to designate the independent and dependent variables prior to applying your modeling algorithms. Because you’re trying to predict the ‘top10’ field, this would be your dependent variable and everything else would be independent.

Create your first model using GLM. Because GLM works best with numeric data, you create your model by dropping non-numeric variables. You only use the variables in the dataset that describe the numerical attributes of the song in the logistic regression model. You won’t use these variables:  “year”, “songtitle”, “artistname”, “songid”, or “artistid”.

y.dep <- 39
x.indep <- c(6:38)
x.indep
##  [1]  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
## [24] 29 30 31 32 33 34 35 36 37 38

Create Model 1: All numeric variables

Create Model 1 with the training dataset, using GLM as the modeling algorithm and H2O’s built-in h2o.glm function.

modelh1 <- h2o.glm( y = y.dep, x = x.indep, training_frame = train.h2o, family = "binomial")
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=====                                                            |   8%
  |                                                                       
  |=================================================================| 100%

Measure the performance of Model 1, using H2O’s built-in performance function.

h2o.performance(model=modelh1,newdata=test.h2o)
## H2OBinomialMetrics: glm
## 
## MSE:  0.09924684
## RMSE:  0.3150347
## LogLoss:  0.3220267
## Mean Per-Class Error:  0.2380168
## AUC:  0.8431394
## Gini:  0.6862787
## R^2:  0.254663
## Null Deviance:  326.0801
## Residual Deviance:  240.2319
## AIC:  308.2319
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0   1    Error     Rate
## 0      255  59 0.187898  =59/314
## 1       17  42 0.288136   =17/59
## Totals 272 101 0.203753  =76/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                         metric threshold    value idx
## 1                       max f1  0.192772 0.525000 100
## 2                       max f2  0.124912 0.650510 155
## 3                 max f0point5  0.416258 0.612903  23
## 4                 max accuracy  0.416258 0.879357  23
## 5                max precision  0.813396 1.000000   0
## 6                   max recall  0.037579 1.000000 282
## 7              max specificity  0.813396 1.000000   0
## 8             max absolute_mcc  0.416258 0.455251  23
## 9   max min_per_class_accuracy  0.161402 0.738854 125
## 10 max mean_per_class_accuracy  0.124912 0.765006 155
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or ` 
h2o.auc(h2o.performance(modelh1,test.h2o)) 
## [1] 0.8431394

The AUC metric provides insight into how well the classifier is able to separate the two classes. In this case, the value of 0.8431394 indicates that the classification is good. (A value of 0.5 indicates a worthless test, while a value of 1.0 indicates a perfect test.)

Next, inspect the coefficients of the variables in the dataset.

dfmodelh1 <- as.data.frame(h2o.varimp(modelh1))
dfmodelh1
##                       names coefficients sign
## 1              timbre_0_max  1.290938663  NEG
## 2                  loudness  1.262941934  POS
## 3                     pitch  0.616995941  NEG
## 4              timbre_1_min  0.422323735  POS
## 5              timbre_6_min  0.349016024  NEG
## 6                    energy  0.348092062  NEG
## 7             timbre_11_min  0.307331997  NEG
## 8              timbre_3_max  0.302225619  NEG
## 9             timbre_11_max  0.243632060  POS
## 10             timbre_4_min  0.224233951  POS
## 11             timbre_4_max  0.204134342  POS
## 12             timbre_5_min  0.199149324  NEG
## 13             timbre_0_min  0.195147119  POS
## 14 timesignature_confidence  0.179973904  POS
## 15         tempo_confidence  0.144242598  POS
## 16            timbre_10_max  0.137644568  POS
## 17             timbre_7_min  0.126995955  NEG
## 18            timbre_10_min  0.123851179  POS
## 19             timbre_7_max  0.100031481  NEG
## 20             timbre_2_min  0.096127636  NEG
## 21           key_confidence  0.083115820  POS
## 22             timbre_6_max  0.073712419  POS
## 23            timesignature  0.067241917  POS
## 24             timbre_8_min  0.061301881  POS
## 25             timbre_8_max  0.060041698  POS
## 26                      key  0.056158445  POS
## 27             timbre_3_min  0.050825116  POS
## 28             timbre_9_max  0.033733561  POS
## 29             timbre_2_max  0.030939072  POS
## 30             timbre_9_min  0.020708113  POS
## 31             timbre_1_max  0.014228818  NEG
## 32                    tempo  0.008199861  POS
## 33             timbre_5_max  0.004837870  POS
## 34                                    NA <NA>

Typically, songs with heavier instrumentation tend to be louder (have higher values in the variable “loudness”) and more energetic (have higher values in the variable “energy”). This knowledge is helpful for interpreting the modeling results.

You can make the following observations from the results:

  • The coefficient estimates for the confidence values associated with the time signature, key, and tempo variables are positive. This suggests that higher confidence leads to a higher predicted probability of a Top 10 hit.
  • The coefficient estimate for loudness is positive, meaning that mainstream listeners prefer louder songs with heavier instrumentation.
  • The coefficient estimate for energy is negative, meaning that mainstream listeners prefer songs that are less energetic, which are those songs with light instrumentation.

These coefficients lead to contradictory conclusions for Model 1. This could be due to multicollinearity issues. Inspect the correlation between the variables “loudness” and “energy” in the training set.

cor(train.h2o$loudness,train.h2o$energy)
## [1] 0.7399067

This number indicates that these two variables are highly correlated, and Model 1 does indeed suffer from multicollinearity. Typically, you associate a value of -1.0 to -0.5 or 1.0 to 0.5 to indicate strong correlation, and a value of 0.1 to 0.1 to indicate weak correlation. To avoid this correlation issue, omit one of these two variables and re-create the models.

You build two variations of the original model:

  • Model 2, in which you keep “energy” and omit “loudness”
  • Model 3, in which you keep “loudness” and omit “energy”

You compare these two models and choose the model with a better fit for this use case.

Create Model 2: Keep energy and omit loudness

colnames(train.h2o)
##  [1] "year"                     "songtitle"               
##  [3] "artistname"               "songid"                  
##  [5] "artistid"                 "timesignature"           
##  [7] "timesignature_confidence" "loudness"                
##  [9] "tempo"                    "tempo_confidence"        
## [11] "key"                      "key_confidence"          
## [13] "energy"                   "pitch"                   
## [15] "timbre_0_min"             "timbre_0_max"            
## [17] "timbre_1_min"             "timbre_1_max"            
## [19] "timbre_2_min"             "timbre_2_max"            
## [21] "timbre_3_min"             "timbre_3_max"            
## [23] "timbre_4_min"             "timbre_4_max"            
## [25] "timbre_5_min"             "timbre_5_max"            
## [27] "timbre_6_min"             "timbre_6_max"            
## [29] "timbre_7_min"             "timbre_7_max"            
## [31] "timbre_8_min"             "timbre_8_max"            
## [33] "timbre_9_min"             "timbre_9_max"            
## [35] "timbre_10_min"            "timbre_10_max"           
## [37] "timbre_11_min"            "timbre_11_max"           
## [39] "top10"
y.dep <- 39
x.indep <- c(6:7,9:38)
x.indep
##  [1]  6  7  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
## [24] 30 31 32 33 34 35 36 37 38
modelh2 <- h2o.glm( y = y.dep, x = x.indep, training_frame = train.h2o, family = "binomial")
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=======                                                          |  10%
  |                                                                       
  |=================================================================| 100%

Measure the performance of Model 2.

h2o.performance(model=modelh2,newdata=test.h2o)
## H2OBinomialMetrics: glm
## 
## MSE:  0.09922606
## RMSE:  0.3150017
## LogLoss:  0.3228213
## Mean Per-Class Error:  0.2490554
## AUC:  0.8431933
## Gini:  0.6863867
## R^2:  0.2548191
## Null Deviance:  326.0801
## Residual Deviance:  240.8247
## AIC:  306.8247
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0  1    Error     Rate
## 0      280 34 0.108280  =34/314
## 1       23 36 0.389831   =23/59
## Totals 303 70 0.152815  =57/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                         metric threshold    value idx
## 1                       max f1  0.254391 0.558140  69
## 2                       max f2  0.113031 0.647208 157
## 3                 max f0point5  0.413999 0.596026  22
## 4                 max accuracy  0.446250 0.876676  18
## 5                max precision  0.811739 1.000000   0
## 6                   max recall  0.037682 1.000000 283
## 7              max specificity  0.811739 1.000000   0
## 8             max absolute_mcc  0.254391 0.469060  69
## 9   max min_per_class_accuracy  0.141051 0.716561 131
## 10 max mean_per_class_accuracy  0.113031 0.761821 157
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `h2o.gainsLift(<model>, valid=<T/F>, xval=<T/F>)`
dfmodelh2 <- as.data.frame(h2o.varimp(modelh2))
dfmodelh2
##                       names coefficients sign
## 1                     pitch  0.700331511  NEG
## 2              timbre_1_min  0.510270513  POS
## 3              timbre_0_max  0.402059546  NEG
## 4              timbre_6_min  0.333316236  NEG
## 5             timbre_11_min  0.331647383  NEG
## 6              timbre_3_max  0.252425901  NEG
## 7             timbre_11_max  0.227500308  POS
## 8              timbre_4_max  0.210663865  POS
## 9              timbre_0_min  0.208516163  POS
## 10             timbre_5_min  0.202748055  NEG
## 11             timbre_4_min  0.197246582  POS
## 12            timbre_10_max  0.172729619  POS
## 13         tempo_confidence  0.167523934  POS
## 14 timesignature_confidence  0.167398830  POS
## 15             timbre_7_min  0.142450727  NEG
## 16             timbre_8_max  0.093377516  POS
## 17            timbre_10_min  0.090333426  POS
## 18            timesignature  0.085851625  POS
## 19             timbre_7_max  0.083948442  NEG
## 20           key_confidence  0.079657073  POS
## 21             timbre_6_max  0.076426046  POS
## 22             timbre_2_min  0.071957831  NEG
## 23             timbre_9_max  0.071393189  POS
## 24             timbre_8_min  0.070225578  POS
## 25                      key  0.061394702  POS
## 26             timbre_3_min  0.048384697  POS
## 27             timbre_1_max  0.044721121  NEG
## 28                   energy  0.039698433  POS
## 29             timbre_5_max  0.039469064  POS
## 30             timbre_2_max  0.018461133  POS
## 31                    tempo  0.013279926  POS
## 32             timbre_9_min  0.005282143  NEG
## 33                                    NA <NA>

h2o.auc(h2o.performance(modelh2,test.h2o)) 
## [1] 0.8431933

You can make the following observations:

  • The AUC metric is 0.8431933.
  • Inspecting the coefficient of the variable energy, Model 2 suggests that songs with high energy levels tend to be more popular. This is as per expectation.
  • As H2O orders variables by significance, the variable energy is not significant in this model.

You can conclude that Model 2 is not ideal for this use , as energy is not significant.

CreateModel 3: Keep loudness but omit energy

colnames(train.h2o)
##  [1] "year"                     "songtitle"               
##  [3] "artistname"               "songid"                  
##  [5] "artistid"                 "timesignature"           
##  [7] "timesignature_confidence" "loudness"                
##  [9] "tempo"                    "tempo_confidence"        
## [11] "key"                      "key_confidence"          
## [13] "energy"                   "pitch"                   
## [15] "timbre_0_min"             "timbre_0_max"            
## [17] "timbre_1_min"             "timbre_1_max"            
## [19] "timbre_2_min"             "timbre_2_max"            
## [21] "timbre_3_min"             "timbre_3_max"            
## [23] "timbre_4_min"             "timbre_4_max"            
## [25] "timbre_5_min"             "timbre_5_max"            
## [27] "timbre_6_min"             "timbre_6_max"            
## [29] "timbre_7_min"             "timbre_7_max"            
## [31] "timbre_8_min"             "timbre_8_max"            
## [33] "timbre_9_min"             "timbre_9_max"            
## [35] "timbre_10_min"            "timbre_10_max"           
## [37] "timbre_11_min"            "timbre_11_max"           
## [39] "top10"
y.dep <- 39
x.indep <- c(6:12,14:38)
x.indep
##  [1]  6  7  8  9 10 11 12 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
## [24] 30 31 32 33 34 35 36 37 38
modelh3 <- h2o.glm( y = y.dep, x = x.indep, training_frame = train.h2o, family = "binomial")
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |========                                                         |  12%
  |                                                                       
  |=================================================================| 100%
perfh3<-h2o.performance(model=modelh3,newdata=test.h2o)
perfh3
## H2OBinomialMetrics: glm
## 
## MSE:  0.0978859
## RMSE:  0.3128672
## LogLoss:  0.3178367
## Mean Per-Class Error:  0.264925
## AUC:  0.8492389
## Gini:  0.6984778
## R^2:  0.2648836
## Null Deviance:  326.0801
## Residual Deviance:  237.1062
## AIC:  303.1062
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0  1    Error     Rate
## 0      286 28 0.089172  =28/314
## 1       26 33 0.440678   =26/59
## Totals 312 61 0.144772  =54/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                         metric threshold    value idx
## 1                       max f1  0.273799 0.550000  60
## 2                       max f2  0.125503 0.663265 155
## 3                 max f0point5  0.435479 0.628931  24
## 4                 max accuracy  0.435479 0.882038  24
## 5                max precision  0.821606 1.000000   0
## 6                   max recall  0.038328 1.000000 280
## 7              max specificity  0.821606 1.000000   0
## 8             max absolute_mcc  0.435479 0.471426  24
## 9   max min_per_class_accuracy  0.173693 0.745763 120
## 10 max mean_per_class_accuracy  0.125503 0.775073 155
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `h2o.gainsLift(<model>, valid=<T/F>, xval=<T/F>)`
dfmodelh3 <- as.data.frame(h2o.varimp(modelh3))
dfmodelh3
##                       names coefficients sign
## 1              timbre_0_max 1.216621e+00  NEG
## 2                  loudness 9.780973e-01  POS
## 3                     pitch 7.249788e-01  NEG
## 4              timbre_1_min 3.891197e-01  POS
## 5              timbre_6_min 3.689193e-01  NEG
## 6             timbre_11_min 3.086673e-01  NEG
## 7              timbre_3_max 3.025593e-01  NEG
## 8             timbre_11_max 2.459081e-01  POS
## 9              timbre_4_min 2.379749e-01  POS
## 10             timbre_4_max 2.157627e-01  POS
## 11             timbre_0_min 1.859531e-01  POS
## 12             timbre_5_min 1.846128e-01  NEG
## 13 timesignature_confidence 1.729658e-01  POS
## 14             timbre_7_min 1.431871e-01  NEG
## 15            timbre_10_max 1.366703e-01  POS
## 16            timbre_10_min 1.215954e-01  POS
## 17         tempo_confidence 1.183698e-01  POS
## 18             timbre_2_min 1.019149e-01  NEG
## 19           key_confidence 9.109701e-02  POS
## 20             timbre_7_max 8.987908e-02  NEG
## 21             timbre_6_max 6.935132e-02  POS
## 22             timbre_8_max 6.878241e-02  POS
## 23            timesignature 6.120105e-02  POS
## 24                      key 5.814805e-02  POS
## 25             timbre_8_min 5.759228e-02  POS
## 26             timbre_1_max 2.930285e-02  NEG
## 27             timbre_9_max 2.843755e-02  POS
## 28             timbre_3_min 2.380245e-02  POS
## 29             timbre_2_max 1.917035e-02  POS
## 30             timbre_5_max 1.715813e-02  POS
## 31                    tempo 1.364418e-02  NEG
## 32             timbre_9_min 8.463143e-05  NEG
## 33                                    NA <NA>
h2o.sensitivity(perfh3,0.5)
## Warning in h2o.find_row_by_threshold(object, t): Could not find exact
## threshold: 0.5 for this set of metrics; using closest threshold found:
## 0.501855569251422. Run `h2o.predict` and apply your desired threshold on a
## probability column.
## [[1]]
## [1] 0.2033898
h2o.auc(perfh3)
## [1] 0.8492389

You can make the following observations:

  • The AUC metric is 0.8492389.
  • From the confusion matrix, the model correctly predicts that 33 songs will be top 10 hits (true positives). However, it has 26 false positives (songs that the model predicted would be Top 10 hits, but ended up not being Top 10 hits).
  • Loudness has a positive coefficient estimate, meaning that this model predicts that songs with heavier instrumentation tend to be more popular. This is the same conclusion from Model 2.
  • Loudness is significant in this model.

Overall, Model 3 predicts a higher number of top 10 hits with an accuracy rate that is acceptable. To choose the best fit for production runs, record labels should consider the following factors:

  • Desired model accuracy at a given threshold
  • Number of correct predictions for top10 hits
  • Tolerable number of false positives or false negatives

Next, make predictions using Model 3 on the test dataset.

predict.regh <- h2o.predict(modelh3, test.h2o)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |=================================================================| 100%
print(predict.regh)
##   predict        p0          p1
## 1       0 0.9654739 0.034526052
## 2       0 0.9654748 0.034525236
## 3       0 0.9635547 0.036445318
## 4       0 0.9343579 0.065642149
## 5       0 0.9978334 0.002166601
## 6       0 0.9779949 0.022005078
## 
## [373 rows x 3 columns]
predict.regh$predict
##   predict
## 1       0
## 2       0
## 3       0
## 4       0
## 5       0
## 6       0
## 
## [373 rows x 1 column]
dpr<-as.data.frame(predict.regh)
#Rename the predicted column 
colnames(dpr)[colnames(dpr) == 'predict'] <- 'predict_top10'
table(dpr$predict_top10)
## 
##   0   1 
## 312  61

The first set of output results specifies the probabilities associated with each predicted observation.  For example, observation 1 is 96.54739% likely to not be a Top 10 hit, and 3.4526052% likely to be a Top 10 hit (predict=1 indicates Top 10 hit and predict=0 indicates not a Top 10 hit).  The second set of results list the actual predictions made.  From the third set of results, this model predicts that 61 songs will be top 10 hits.

Compute the baseline accuracy, by assuming that the baseline predicts the most frequent outcome, which is that most songs are not Top 10 hits.

table(BillboardTest$top10)
## 
##   0   1 
## 314  59

Now observe that the baseline model would get 314 observations correct, and 59 wrong, for an accuracy of 314/(314+59) = 0.8418231.

It seems that Model 3, with an accuracy of 0.8552, provides you with a small improvement over the baseline model. But is this model useful for record labels?

View the two models from an investment perspective:

  • A production company is interested in investing in songs that are more likely to make it to the Top 10. The company’s objective is to minimize the risk of financial losses attributed to investing in songs that end up unpopular.
  • How many songs does Model 3 correctly predict as a Top 10 hit in 2010? Looking at the confusion matrix, you see that it predicts 33 top 10 hits correctly at an optimal threshold, which is more than half the number
  • It will be more useful to the record label if you can provide the production company with a list of songs that are highly likely to end up in the Top 10.
  • The baseline model is not useful, as it simply does not label any song as a hit.

Considering the three models built so far, you can conclude that Model 3 proves to be the best investment choice for the record label.

GBM model

H2O provides you with the ability to explore other learning models, such as GBM and deep learning. Explore building a model using the GBM technique, using the built-in h2o.gbm function.

Before you do this, you need to convert the target variable to a factor for multinomial classification techniques.

train.h2o$top10=as.factor(train.h2o$top10)
gbm.modelh <- h2o.gbm(y=y.dep, x=x.indep, training_frame = train.h2o, ntrees = 500, max_depth = 4, learn_rate = 0.01, seed = 1122,distribution="multinomial")
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |===                                                              |   5%
  |                                                                       
  |=====                                                            |   7%
  |                                                                       
  |======                                                           |   9%
  |                                                                       
  |=======                                                          |  10%
  |                                                                       
  |======================                                           |  33%
  |                                                                       
  |=====================================                            |  56%
  |                                                                       
  |====================================================             |  79%
  |                                                                       
  |================================================================ |  98%
  |                                                                       
  |=================================================================| 100%
perf.gbmh<-h2o.performance(gbm.modelh,test.h2o)
perf.gbmh
## H2OBinomialMetrics: gbm
## 
## MSE:  0.09860778
## RMSE:  0.3140188
## LogLoss:  0.3206876
## Mean Per-Class Error:  0.2120263
## AUC:  0.8630573
## Gini:  0.7261146
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0  1    Error     Rate
## 0      266 48 0.152866  =48/314
## 1       16 43 0.271186   =16/59
## Totals 282 91 0.171582  =64/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                       metric threshold    value idx
## 1                     max f1  0.189757 0.573333  90
## 2                     max f2  0.130895 0.693717 145
## 3               max f0point5  0.327346 0.598802  26
## 4               max accuracy  0.442757 0.876676  14
## 5              max precision  0.802184 1.000000   0
## 6                 max recall  0.049990 1.000000 284
## 7            max specificity  0.802184 1.000000   0
## 8           max absolute_mcc  0.169135 0.496486 104
## 9 max min_per_class_accuracy  0.169135 0.796610 104
## 10 max mean_per_class_accuracy  0.169135 0.805948 104
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `
h2o.sensitivity(perf.gbmh,0.5)
## Warning in h2o.find_row_by_threshold(object, t): Could not find exact
## threshold: 0.5 for this set of metrics; using closest threshold found:
## 0.501205344484314. Run `h2o.predict` and apply your desired threshold on a
## probability column.
## [[1]]
## [1] 0.1355932
h2o.auc(perf.gbmh)
## [1] 0.8630573

This model correctly predicts 43 top 10 hits, which is 10 more than the number predicted by Model 3. Moreover, the AUC metric is higher than the one obtained from Model 3.

As seen above, H2O’s API provides the ability to obtain key statistical measures required to analyze the models easily, using several built-in functions. The record label can experiment with different parameters to arrive at the model that predicts the maximum number of Top 10 hits at the desired level of accuracy and threshold.

H2O also allows you to experiment with deep learning models. Deep learning models have the ability to learn features implicitly, but can be more expensive computationally.

Now, create a deep learning model with the h2o.deeplearning function, using the same training and test datasets created before. The time taken to run this model depends on the type of EC2 instance chosen for this purpose.  For models that require more computation, consider using accelerated computing instances such as the P2 instance type.

system.time(
  dlearning.modelh <- h2o.deeplearning(y = y.dep,
                                      x = x.indep,
                                      training_frame = train.h2o,
                                      epoch = 250,
                                      hidden = c(250,250),
                                      activation = "Rectifier",
                                      seed = 1122,
                                      distribution="multinomial"
  )
)
## 
  |                                                                       
  |                                                                 |   0%
  |                                                                       
  |===                                                              |   4%
  |                                                                       
  |=====                                                            |   8%
  |                                                                       
  |========                                                         |  12%
  |                                                                       
  |==========                                                       |  16%
  |                                                                       
  |=============                                                    |  20%
  |                                                                       
  |================                                                 |  24%
  |                                                                       
  |==================                                               |  28%
  |                                                                       
  |=====================                                            |  32%
  |                                                                       
  |=======================                                          |  36%
  |                                                                       
  |==========================                                       |  40%
  |                                                                       
  |=============================                                    |  44%
  |                                                                       
  |===============================                                  |  48%
  |                                                                       
  |==================================                               |  52%
  |                                                                       
  |====================================                             |  56%
  |                                                                       
  |=======================================                          |  60%
  |                                                                       
  |==========================================                       |  64%
  |                                                                       
  |============================================                     |  68%
  |                                                                       
  |===============================================                  |  72%
  |                                                                       
  |=================================================                |  76%
  |                                                                       
  |====================================================             |  80%
  |                                                                       
  |=======================================================          |  84%
  |                                                                       
  |=========================================================        |  88%
  |                                                                       
  |============================================================     |  92%
  |                                                                       
  |==============================================================   |  96%
  |                                                                       
  |=================================================================| 100%
##    user  system elapsed 
##   1.216   0.020 166.508
perf.dl<-h2o.performance(model=dlearning.modelh,newdata=test.h2o)
perf.dl
## H2OBinomialMetrics: deeplearning
## 
## MSE:  0.1678359
## RMSE:  0.4096778
## LogLoss:  1.86509
## Mean Per-Class Error:  0.3433013
## AUC:  0.7568822
## Gini:  0.5137644
## 
## Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
##          0  1    Error     Rate
## 0      290 24 0.076433  =24/314
## 1       36 23 0.610169   =36/59
## Totals 326 47 0.160858  =60/373
## 
## Maximum Metrics: Maximum metrics at their respective thresholds
##                       metric threshold    value idx
## 1                     max f1  0.826267 0.433962  46
## 2                     max f2  0.000000 0.588235 239
## 3               max f0point5  0.999929 0.511811  16
## 4               max accuracy  0.999999 0.865952  10
## 5              max precision  1.000000 1.000000   0
## 6                 max recall  0.000000 1.000000 326
## 7            max specificity  1.000000 1.000000   0
## 8           max absolute_mcc  0.999929 0.363219  16
## 9 max min_per_class_accuracy  0.000004 0.662420 145
## 10 max mean_per_class_accuracy  0.000000 0.685334 224
## 
## Gains/Lift Table: Extract with `h2o.gainsLift(<model>, <data>)` or `h2o.gainsLift(<model>, valid=<T/F>, xval=<T/F>)`
h2o.sensitivity(perf.dl,0.5)
## Warning in h2o.find_row_by_threshold(object, t): Could not find exact
## threshold: 0.5 for this set of metrics; using closest threshold found:
## 0.496293348880151. Run `h2o.predict` and apply your desired threshold on a
## probability column.
## [[1]]
## [1] 0.3898305
h2o.auc(perf.dl)
## [1] 0.7568822

The AUC metric for this model is 0.7568822, which is less than what you got from the earlier models. I recommend further experimentation using different hyper parameters, such as the learning rate, epoch or the number of hidden layers.

H2O’s built-in functions provide many key statistical measures that can help measure model performance. Here are some of these key terms.

MetricDescription
SensitivityMeasures the proportion of positives that have been correctly identified. It is also called the true positive rate, or recall.
SpecificityMeasures the proportion of negatives that have been correctly identified. It is also called the true negative rate.
ThresholdCutoff point that maximizes specificity and sensitivity. While the model may not provide the highest prediction at this point, it would not be biased towards positives or negatives.
PrecisionThe fraction of the documents retrieved that are relevant to the information needed, for example, how many of the positively classified are relevant
AUC

Provides insight into how well the classifier is able to separate the two classes. The implicit goal is to deal with situations where the sample distribution is highly skewed, with a tendency to overfit to a single class.

0.90 – 1 = excellent (A)

0.8 – 0.9 = good (B)

0.7 – 0.8 = fair (C)

.6 – 0.7 = poor (D)

0.5 – 0.5 = fail (F)

Here’s a summary of the metrics generated from H2O’s built-in functions for the three models that produced useful results.

Metric Model 3GBM ModelDeep Learning Model

Accuracy

(max)

0.882038

(t=0.435479)

0.876676

(t=0.442757)

0.865952

(t=0.999999)

Precision

(max)

1.0

(t=0.821606)

1.0

(t=0802184)

1.0

(t=1.0)

Recall

(max)

1.01.0

1.0

(t=0)

Specificity

(max)

1.01.0

1.0

(t=1)

Sensitivity

 

0.20338980.1355932

0.3898305

(t=0.5)

AUC0.84923890.86305730.756882

Note: ‘t’ denotes threshold.

Your options at this point could be narrowed down to Model 3 and the GBM model, based on the AUC and accuracy metrics observed earlier.  If the slightly lower accuracy of the GBM model is deemed acceptable, the record label can choose to go to production with the GBM model, as it can predict a higher number of Top 10 hits.  The AUC metric for the GBM model is also higher than that of Model 3.

Record labels can experiment with different learning techniques and parameters before arriving at a model that proves to be the best fit for their business. Because deep learning models can be computationally expensive, record labels can choose more powerful EC2 instances on AWS to run their experiments faster.

Conclusion

In this post, I showed how the popular music industry can use analytics to predict the type of songs that make the Top 10 Billboard charts. By running H2O’s scalable machine learning platform on AWS, data scientists can easily experiment with multiple modeling techniques and interactively query the data using Amazon Athena, without having to manage the underlying infrastructure. This helps record labels make critical decisions on the type of artists and songs to promote in a timely fashion, thereby increasing sales and revenue.

If you have questions or suggestions, please comment below.


Additional Reading

Learn how to build and explore a simple geospita simple GEOINT application using SparkR.


About the Authors

gopalGopal Wunnava is a Partner Solution Architect with the AWS GSI Team. He works with partners and customers on big data engagements, and is passionate about building analytical solutions that drive business capabilities and decision making. In his spare time, he loves all things sports and movies related and is fond of old classics like Asterix, Obelix comics and Hitchcock movies.

 

 

Bob Strahan, a Senior Consultant with AWS Professional Services, contributed to this post.

 

 

Implement Continuous Integration and Delivery of Apache Spark Applications using AWS

Post Syndicated from Luis Caro Perez original https://aws.amazon.com/blogs/big-data/implement-continuous-integration-and-delivery-of-apache-spark-applications-using-aws/

When you develop Apache Spark–based applications, you might face some additional challenges when dealing with continuous integration and deployment pipelines, such as the following common issues:

  • Applications must be tested on real clusters using automation tools (live test)
  • Any user or developer must be able to easily deploy and use different versions of both the application and infrastructure to be able to debug, experiment on, and test different functionality.
  • Infrastructure needs to be evaluated and tested along with the application that uses it.

In this post, we walk you through a solution that implements a continuous integration and deployment pipeline supported by AWS services. The pipeline offers the following workflow:

  • Deploy the application to a QA stage after a commit is performed to the source code.
  • Perform a unit test using Spark local mode.
  • Deploy to a dynamically provisioned Amazon EMR cluster and test the Spark application on it
  • Update the application as an AWS Service Catalog product version, allowing a user to deploy any version (commit) of the application on demand.

Solution overview

The following diagram shows the pipeline workflow.

The solution uses AWS CodePipeline, which allows users to orchestrate and automate the build, test, and deploy stages for application source code. The solution consists of a pipeline that contains the following stages:

  • Source: Both the Spark application source code in addition to the AWS CloudFormation template file for deploying the application are committed to version control. In this example, we use AWS CodeCommit. For an example of the application source code, see zip. 
  • Build: In this stage, you use Apache Maven both to generate the application .jar binaries and to execute all of the application unit tests that end with *Spec.scala. In this example, we use AWS CodeBuild, which runs the unit tests given that they are designed to use Spark local mode.
  • QADeploy: In this stage, the .jar file built previously is deployed using the CloudFormation template included with the application source code. All the resources are created in this stage, such as networks, EMR clusters, and so on. 
  • LiveTest: In this stage, you use Apache Maven to execute all the application tests that end with *SpecLive.scala. The tests submit EMR steps to the cluster created as part of the QADeploy step. The tests verify that the steps ran successfully and their results. 
  • LiveTestApproval: This stage is included in case a pipeline administrator approval is required to deploy the application to the next stages. The pipeline pauses in this stage until an administrator manually approves the release. 
  • QACleanup: In this stage, you use an AWS Lambda function to delete the CloudFormation template deployed as part of the QADeploy stage. The function does not affect any resources other than those deployed as part of the QADeploy stage. 
  • DeployProduct: In this stage, you use a Lambda function that creates or updates an AWS Service Catalog product and portfolio. Every time the pipeline releases a change to the application, the AWS Service Catalog product gets a new version, with the commit of the change as the version description. 

Try it out!

Use the provided sample template to get started using this solution. This template creates the pipeline described earlier with all of its stages. It performs an initial commit of the sample Spark application in order to trigger the first release change. To deploy the template, use the following AWS CLI command:

aws cloudformation create-stack  --template-url https://s3.amazonaws.com/aws-bigdata-blog/artifacts/sparkAppDemoForPipeline/emrSparkpipeline.yaml --stack-name emr-spark-pipeline --capabilities CAPABILITY_NAMED_IAM

After the template finishes creating resources, you see the pipeline name on the stack Outputs tab. After that, open the AWS CodePipeline console and select the newly created pipeline.

After a couple of minutes, AWS CodePipeline detects the initial commit applied by the CloudFormation stack and starts the first release.

You can watch how the pipeline goes through the Build, QADeploy, and LiveTest stages until it finally reaches the LiveTestApproval stage.

At this point, you can check the results of the test in the log files of the Build and LiveTest stage jobs on AWS CodeBuild. If you check the CloudFormation console, you see that a new template has been deployed as part of the QADeploy stage.

You can also visit the EMR console and view how the LiveTest stage submitted steps to the EMR cluster.

After performing the review, manually approve the revision on the LiveTestApproval stage by using the AWS CodePipeline console.

After the revision is approved, the pipeline proceeds to use a Lambda function that destroys the resources deployed on the QAdeploy stage. Finally, it creates or updates a product and portfolio in AWS Service Catalog. After the final stage of the pipeline is complete, you can check that the product is created successfully on the AWS Service Catalog console.

You can check the product versions and notice that the first version is the initial commit performed by the CloudFormation template.

You can proceed to share the created portfolio with any users in your AWS account and allow them to deploy any version of the Spark application. You can also perform a commit on the AWS CodeCommit repository. The pipeline is triggered automatically and repeats the pipeline execution to deploy a new version of the product.

To destroy all of the resources created by the stack, make sure all the deployed stacks using AWS Service Catalog or the QAdeploy stage are destroyed. Then, destroy the pipeline template using the following AWS CLI command:

 

aws cloudformation delete-stack --stack-name emr-spark-pipeline

Conclusion

You can use the sample template and Spark application shared in this post and adapt them for the specific needs of your own application. The pipeline can have as many stages as needed and it can be used to automatically deploy to AWS Service Catalog or a production environment using CloudFormation.

If you have questions or suggestions, please comment below.


Additional Reading

Learn how to implement authorization and auditing on Amazon EMR using Apache Ranger.

 


About the Authors

Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.

 

 

Samuel Schmidt is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.

 

 

 

Turbocharge your Apache Hive queries on Amazon EMR using LLAP

Post Syndicated from Jigar Mistry original https://aws.amazon.com/blogs/big-data/turbocharge-your-apache-hive-queries-on-amazon-emr-using-llap/

Apache Hive is one of the most popular tools for analyzing large datasets stored in a Hadoop cluster using SQL. Data analysts and scientists use Hive to query, summarize, explore, and analyze big data.

With the introduction of Hive LLAP (Low Latency Analytical Processing), the notion of Hive being just a batch processing tool has changed. LLAP uses long-lived daemons with intelligent in-memory caching to circumvent batch-oriented latency and provide sub-second query response times.

This post provides an overview of Hive LLAP, including its architecture and common use cases for boosting query performance. You will learn how to install and configure Hive LLAP on an Amazon EMR cluster and run queries on LLAP daemons.

What is Hive LLAP?

Hive LLAP was introduced in Apache Hive 2.0, which provides very fast processing of queries. It uses persistent daemons that are deployed on a Hadoop YARN cluster using Apache Slider. These daemons are long-running and provide functionality such as I/O with DataNode, in-memory caching, query processing, and fine-grained access control. And since the daemons are always running in the cluster, it saves substantial overhead of launching new YARN containers for every new Hive session, thereby avoiding long startup times.

When Hive is configured in hybrid execution mode, small and short queries execute directly on LLAP daemons. Heavy lifting (like large shuffles in the reduce stage) is performed in YARN containers that belong to the application. Resources (CPU, memory, etc.) are obtained in a traditional fashion using YARN. After the resources are obtained, the execution engine can decide which resources are to be allocated to LLAP, or it can launch Apache Tez processors in separate YARN containers. You can also configure Hive to run all the processing workloads on LLAP daemons for querying small datasets at lightning fast speeds.

LLAP daemons are launched under YARN management to ensure that the nodes don’t get overloaded with the compute resources of these daemons. You can use scheduling queues to make sure that there is enough compute capacity for other YARN applications to run.

Why use Hive LLAP?

With many options available in the market (Presto, Spark SQL, etc.) for doing interactive SQL  over data that is stored in Amazon S3 and HDFS, there are several reasons why using Hive and LLAP might be a good choice:

  • For those who are heavily invested in the Hive ecosystem and have external BI tools that connect to Hive over JDBC/ODBC connections, LLAP plugs in to their existing architecture without a steep learning curve.
  • It’s compatible with existing Hive SQL and other Hive tools, like HiveServer2, and JDBC drivers for Hive.
  • It has native support for security features with authentication and authorization (SQL standards-based authorization) using HiveServer2.
  • LLAP daemons are aware about of the columns and records that are being processed which enables you to enforce fine-grained access control.
  • It can use Hive’s vectorization capabilities to speed up queries, and Hive has better support for Parquet file format when vectorization is enabled.
  • It can take advantage of a number of Hive optimizations like merging multiple small files for query results, automatically determining the number of reducers for joins and groupbys, etc.
  • It’s optional and modular so it can be turned on or off depending on the compute and resource requirements of the cluster. This lets you to run other YARN applications concurrently without reserving a cluster specifically for LLAP.

How do you install Hive LLAP in Amazon EMR?

To install and configure LLAP on an EMR cluster, use the following bootstrap action (BA):

s3://aws-bigdata-blog/artifacts/Turbocharge_Apache_Hive_on_EMR/configure-Hive-LLAP.sh

This BA downloads and installs Apache Slider on the cluster and configures LLAP so that it works with EMR Hive. For LLAP to work, the EMR cluster must have Hive, Tez, and Apache Zookeeper installed.

You can pass the following arguments to the BA.

ArgumentDefinitionDefault value
--instancesNumber of instances of LLAP daemonNumber of core/task nodes of the cluster
--cacheCache size per instance20% of physical memory of the node
--executorsNumber of executors per instanceNumber of CPU cores of the node
--iothreadsNumber of IO threads per instanceNumber of CPU cores of the node
--sizeContainer size per instance50% of physical memory of the node
--xmxWorking memory size50% of container size
--log-levelLog levels for the LLAP instanceINFO

LLAP example

This section describes how you can try the faster Hive queries with LLAP using the TPC-DS testbench for Hive on Amazon EMR.

Use the following AWS command line interface (AWS CLI) command to launch a 1+3 nodes m4.xlarge EMR 5.6.0 cluster with the bootstrap action to install LLAP:

aws emr create-cluster --release-label emr-5.6.0 \
--applications Name=Hadoop Name=Hive Name=Hue Name=ZooKeeper Name=Tez \
--bootstrap-actions '[{"Path":"s3://aws-bigdata-blog/artifacts/Turbocharge_Apache_Hive_on_EMR/configure-Hive-LLAP.sh","Name":"Custom action"}]' \ 
--ec2-attributes '{"KeyName":"<YOUR-KEY-PAIR>","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-xxxxxxxx","EmrManagedSlaveSecurityGroup":"sg-xxxxxxxx","EmrManagedMasterSecurityGroup":"sg-xxxxxxxx"}' 
--service-role EMR_DefaultRole \
--enable-debugging \
--log-uri 's3n://<YOUR-BUCKET/' --name 'test-hive-llap' \
--instance-groups '[{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}],"EbsOptimized":true},"InstanceGroupType":"MASTER","InstanceType":"m4.xlarge","Name":"Master - 1"},{"InstanceCount":3,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}],"EbsOptimized":true},"InstanceGroupType":"CORE","InstanceType":"m4.xlarge","Name":"Core - 2"}]' 
--region us-east-1

After the cluster is launched, log in to the master node using SSH, and do the following:

  1. Open the hive-tpcds folder:
    cd /home/hadoop/hive-tpcds/
  2. Start Hive CLI using the testbench configuration, create the required tables, and run the sample query:

    hive –i testbench.settings
    hive> source create_tables.sql;
    hive> source query55.sql;

    This sample query runs on a 40 GB dataset that is stored on Amazon S3. The dataset is generated using the data generation tool in the TPC-DS testbench for Hive.It results in output like the following:
  3. This screenshot shows that the query finished in about 47 seconds for LLAP mode. Now, to compare this to the execution time without LLAP, you can run the same workload using only Tez containers:
    hive> set hive.llap.execution.mode=none;
    hive> source query55.sql;


    This query finished in about 80 seconds.

The difference in query execution time is almost 1.7 times when using just YARN containers in contrast to running the query on LLAP daemons. And with every rerun of the query, you notice that the execution time substantially decreases by the virtue of in-memory caching by LLAP daemons.

Conclusion

In this post, I introduced Hive LLAP as a way to boost Hive query performance. I discussed its architecture and described several use cases for the component. I showed how you can install and configure Hive LLAP on an Amazon EMR cluster and how you can run queries on LLAP daemons.

If you have questions about using Hive LLAP on Amazon EMR or would like to share your use cases, please leave a comment below.


Additional Reading

Learn how to to automatically partition Hive external tables with AWS.


About the Author

Jigar Mistry is a Hadoop Systems Engineer with Amazon Web Services. He works with customers to provide them architectural guidance and technical support for processing large datasets in the cloud using open-source applications. In his spare time, he enjoys going for camping and exploring different restaurants in the Seattle area.

 

 

 

 

Run Common Data Science Packages on Anaconda and Oozie with Amazon EMR

Post Syndicated from John Ohle original https://aws.amazon.com/blogs/big-data/run-common-data-science-packages-on-anaconda-and-oozie-with-amazon-emr/

In the world of data science, users must often sacrifice cluster set-up time to allow for complex usability scenarios. Amazon EMR allows data scientists to spin up complex cluster configurations easily, and to be up and running with complex queries in a matter of minutes.

Data scientists often use scheduling applications such as Oozie to run jobs overnight. However, Oozie can be difficult to configure when you are trying to use popular Python packages (such as “pandas,” “numpy,” and “statsmodels”), which are not included by default.

One such popular platform that contains these types of packages (and more) is Anaconda. This post focuses on setting up an Anaconda platform on EMR, with an intent to use its packages with Oozie. I describe how to run jobs using a popular open source scheduler like Oozie.

Walkthrough

For this post, you walk through the following tasks:

  • Create an EMR cluster.
  • Download Anaconda on your master node.
  • Configure Oozie.
  • Test the steps.

Create an EMR cluster

Spin up an Amazon EMR cluster using the console or the AWS CLI. Use the latest release, and include Apache Hadoop, Apache Spark, Apache Hive, and Oozie.

To create a three-node cluster in the us-east-1 region, issue an AWS CLI command such as the following. This command must be typed as one line, as shown below. It is shown here separated for readability purposes only.

aws emr create-cluster \ 
--release-label emr-5.7.0 \ 
 --name '<YOUR-CLUSTER-NAME>' \
 --applications Name=Hadoop Name=Oozie Name=Spark Name=Hive \ 
 --ec2-attributes '{"KeyName":"<YOUR-KEY-PAIR>","SubnetId":"<YOUR-SUBNET-ID>","EmrManagedSlaveSecurityGroup":"<YOUR-EMR-SLAVE-SECURITY-GROUP>","EmrManagedMasterSecurityGroup":"<YOUR-EMR-MASTER-SECURITY-GROUP>"}' \ 
 --use-default-roles \ 
 --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"<YOUR-INSTANCE-TYPE>","Name":"Master - 1"},{"InstanceCount":<YOUR-CORE-INSTANCE-COUNT>,"InstanceGroupType":"CORE","InstanceType":"<YOUR-INSTANCE-TYPE>","Name":"Core - 2"}]'

One-line version for reference:

aws emr create-cluster --release-label emr-5.7.0 --name '<YOUR-CLUSTER-NAME>' --applications Name=Hadoop Name=Oozie Name=Spark Name=Hive --ec2-attributes '{"KeyName":"<YOUR-KEY-PAIR>","SubnetId":"<YOUR-SUBNET-ID>","EmrManagedSlaveSecurityGroup":"<YOUR-EMR-SLAVE-SECURITY-GROUP>","EmrManagedMasterSecurityGroup":"<YOUR-EMR-MASTER-SECURITY-GROUP>"}' --use-default-roles --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"<YOUR-INSTANCE-TYPE>","Name":"Master - 1"},{"InstanceCount":<YOUR-CORE-INSTANCE-COUNT>,"InstanceGroupType":"CORE","InstanceType":"<YOUR-INSTANCE-TYPE>","Name":"Core - 2"}]'

Download Anaconda

SSH into your EMR master node instance and download the official Anaconda installer:

wget https://repo.continuum.io/archive/Anaconda2-4.4.0-Linux-x86_64.sh

At the time of publication, Anaconda 4.4 is the most current version available. For the download link location for the latest Python 2.7 version (Python 3.6 may encounter issues), see https://www.continuum.io/downloads.  Open the context (right-click) menu for the Python 2.7 download link, choose Copy Link Location, and use this value in the previous wget command.

This post used the Anaconda 4.4 installation. If you have a later version, it is shown in the downloaded filename:  “anaconda2-<version number>-Linux-x86_64.sh”.

Run this downloaded script and follow the on-screen installer prompts.

chmod u+x Anaconda2-4.4.0-Linux-x86_64.sh
./Anaconda2-4.4.0-Linux-x86_64.sh

For an installation directory, select somewhere with enough space on your cluster, such as “/mnt/anaconda/”.

The process should take approximately 1–2 minutes to install. When prompted if you “wish the installer to prepend the Anaconda2 install location”, select the default option of [no].

After you are done, export the PATH to include this new Anaconda installation:

export PATH=/mnt/anaconda/bin:$PATH

Zip up the Anaconda installation:

cd /mnt/anaconda/
zip -r anaconda.zip .

The zip process may take 4–5 minutes to complete.

(Optional) Upload this anaconda.zip file to your S3 bucket for easier inclusion into future EMR clusters. This removes the need to repeat the previous steps for future EMR clusters.

Configure Oozie

Next, you configure Oozie to use Pyspark and the Anaconda platform.

Get the location of your Oozie sharelibupdate folder. Issue the following command and take note of the “sharelibDirNew” value:

oozie admin -sharelibupdate

For this post, this value is “hdfs://ip-192-168-4-200.us-east-1.compute.internal:8020/user/oozie/share/lib/lib_20170616133136”.

Pass in the required Pyspark files into Oozies sharelibupdate location. The following files are required for Oozie to be able to run Pyspark commands:

  • pyspark.zip
  • py4j-0.10.4-src.zip

These are located on the EMR master instance in the location “/usr/lib/spark/python/lib/”, and must be put into the Oozie sharelib spark directory. This location is the value of the sharelibDirNew parameter value (shown above) with “/spark/” appended, that is, “hdfs://ip-192-168-4-200.us-east-1.compute.internal:8020/user/oozie/share/lib/lib_20170616133136/spark/”.

To do this, issue the following commands:

hdfs dfs -put /usr/lib/spark/python/lib/py4j-0.10.4-src.zip hdfs://ip-192-168-4-200.us-east-1.compute.internal:8020/user/oozie/share/lib/lib_20170616133136/spark/
hdfs dfs -put /usr/lib/spark/python/lib/pyspark.zip hdfs://ip-192-168-4-200.us-east-1.compute.internal:8020/user/oozie/share/lib/lib_20170616133136/spark/

After you’re done, Oozie can use Pyspark in its processes.

Pass the anaconda.zip file into HDFS as follows:

hdfs dfs -put /mnt/anaconda/anaconda.zip /tmp/myLocation/anaconda.zip

(Optional) Verify that it was transferred successfully with the following command:

hdfs dfs -ls /tmp/myLocation/

On your master node, execute the following command:

export PYSPARK_PYTHON=/mnt/anaconda/bin/python

Set the PYSPARK_PYTHON environment variable on the executor nodes. Put the following configurations in your “spark-opts” values in your Oozie workflow.xml file:

–conf spark.executorEnv.PYSPARK_PYTHON=./anaconda_remote/bin/python
–conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda_remote/bin/python

This is referenced from the Oozie job in the following line in your workflow.xml file, also included as part of your “spark-opts”:

--archives hdfs:///tmp/myLocation/anaconda.zip#anaconda_remote

Your Oozie workflow.xml file should now look something like the following:

<workflow-app name="spark-wf" xmlns="uri:oozie:workflow:0.5">
<start to="start_spark" />
<action name="start_spark">
    <spark xmlns="uri:oozie:spark-action:0.1">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
            <delete path="/tmp/test/spark_oozie_test_out3"/>
        </prepare>
        <master>yarn-cluster</master>
        <mode>cluster</mode>
        <name>SparkJob</name>
        <class>clear</class>
        <jar>hdfs:///user/oozie/apps/myPysparkProgram.py</jar>
        <spark-opts>--queue default
            --conf spark.ui.view.acls=*
            --executor-memory 2G --num-executors 2 --executor-cores 2 --driver-memory 3g
            --conf spark.executorEnv.PYSPARK_PYTHON=./anaconda_remote/bin/python
            --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda_remote/bin/python
            --archives hdfs:///tmp/myLocation/anaconda.zip#anaconda_remote
        </spark-opts>
    </spark>
    <ok to="end"/>
    <error to="kill"/>
</action>
        <kill name="kill">
                <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
        </kill>
        <end name="end"/>
</workflow-app>

Test steps

To test this out, you can use the following job.properties and myPysparkProgram.py file, along with the following steps:

job.properties

masterNode ip-xxx-xxx-xxx-xxx.us-east-1.compute.internal
nameNode hdfs://${masterNode}:8020
jobTracker ${masterNode}:8032
master yarn
mode cluster
queueName default
oozie.libpath ${nameNode}/user/oozie/share/lib
oozie.use.system.libpath true
oozie.wf.application.path ${nameNode}/user/oozie/apps/

Note: You can get your master node IP address (denoted as “ip-xxx-xxx-xxx-xxx” here) from the value for the sharelibDirNew parameter noted earlier.

myPysparkProgram.py

from pyspark import SparkContext, SparkConf
import numpy
import sys

conf = SparkConf().setAppName('myPysparkProgram')
sc = SparkContext(conf=conf)

rdd = sc.textFile("/user/hadoop/input.txt")

x = numpy.sum([3,4,5]) #total = 12

rdd = rdd.map(lambda line: line + str(x))
rdd.saveAsTextFile("/user/hadoop/output")

Put the “myPysparkProgram.py” into the location mentioned between the “<jar>xxxxx</jar>” tags in your workflow.xml. In this example, the location is “hdfs:///user/oozie/apps/”. Use the following command to move the “myPysparkProgram.py” file to the correct location:

hdfs dfs -put myPysparkProgram.py /user/oozie/apps/

Put the above workflow.xml file into the “/user/oozie/apps/” location in hdfs:

hdfs dfs –put workflow.xml /user/oozie/apps/

Note: The job.properties file is run locally from the EMR master node.

Create a sample input.txt file with some data in it. For example:

input.txt

This is a sentence.
So is this. 
This is also a sentence.

Put this file into hdfs:

hdfs dfs -put input.txt /user/hadoop/

Execute the job in Oozie with the following command. This creates an Oozie job ID.

oozie job -config job.properties -run

You can check the Oozie job state with the command:

oozie job -info <Oozie job ID>

  1. When the job is successfully finished, the results are located at:
/user/hadoop/output/part-00000
/user/hadoop/output/part-00001

  1. Run the following commands to view the output:
hdfs dfs -cat /user/hadoop/output/part-00000
hdfs dfs -cat /user/hadoop/output/part-00001

The output will be:

This is a sentence. 12
So is this 12
This is also a sentence 12

Summary

The myPysparkProgram.py has successfully imported the numpy library from the Anaconda platform and has produced some output with it. If you tried to run this using standard Python, you’d encounter the following error:

Now when your Python job runs in Oozie, any imported packages that are implicitly imported by your Pyspark script are imported into your job within Oozie directly from the Anaconda platform. Simple!

If you have questions or suggestions, please leave a comment below.


Additional Reading

Learn how to use Apache Oozie workflows to automate Apache Spark jobs on Amazon EMR.

 


About the Author

John Ohle is an AWS BigData Cloud Support Engineer II for the BigData team in Dublin. He works to provide advice and solutions to our customers on their Big Data projects and workflows on AWS. In his spare time, he likes to play music, learn, develop tools and write documentation to further help others – both colleagues and customers alike.

 

 

 

Build a Serverless Architecture to Analyze Amazon CloudFront Access Logs Using AWS Lambda, Amazon Athena, and Amazon Kinesis Analytics

Post Syndicated from Rajeev Srinivasan original https://aws.amazon.com/blogs/big-data/build-a-serverless-architecture-to-analyze-amazon-cloudfront-access-logs-using-aws-lambda-amazon-athena-and-amazon-kinesis-analytics/

Nowadays, it’s common for a web server to be fronted by a global content delivery service, like Amazon CloudFront. This type of front end accelerates delivery of websites, APIs, media content, and other web assets to provide a better experience to users across the globe.

The insights gained by analysis of Amazon CloudFront access logs helps improve website availability through bot detection and mitigation, optimizing web content based on the devices and browser used to view your webpages, reducing perceived latency by caching of popular object closer to its viewer, and so on. This results in a significant improvement in the overall perceived experience for the user.

This blog post provides a way to build a serverless architecture to generate some of these insights. To do so, we analyze Amazon CloudFront access logs both at rest and in transit through the stream. This serverless architecture uses Amazon Athena to analyze large volumes of CloudFront access logs (on the scale of terabytes per day), and Amazon Kinesis Analytics for streaming analysis.

The analytic queries in this blog post focus on three common use cases:

  1. Detection of common bots using the user agent string
  2. Calculation of current bandwidth usage per Amazon CloudFront distribution per edge location
  3. Determination of the current top 50 viewers

However, you can easily extend the architecture described to power dashboards for monitoring, reporting, and trigger alarms based on deeper insights gained by processing and analyzing the logs. Some examples are dashboards for cache performance, usage and viewer patterns, and so on.

Following we show a diagram of this architecture.

Prerequisites

Before you set up this architecture, install the AWS Command Line Interface (AWS CLI) tool on your local machine, if you don’t have it already.

Setup summary

The following steps are involved in setting up the serverless architecture on the AWS platform:

  1. Create an Amazon S3 bucket for your Amazon CloudFront access logs to be delivered to and stored in.
  2. Create a second Amazon S3 bucket to receive processed logs and store the partitioned data for interactive analysis.
  3. Create an Amazon Kinesis Firehose delivery stream to batch, compress, and deliver the preprocessed logs for analysis.
  4. Create an AWS Lambda function to preprocess the logs for analysis.
  5. Configure Amazon S3 event notification on the CloudFront access logs bucket, which contains the raw logs, to trigger the Lambda preprocessing function.
  6. Create an Amazon DynamoDB table to look up partition details, such as partition specification and partition location.
  7. Create an Amazon Athena table for interactive analysis.
  8. Create a second AWS Lambda function to add new partitions to the Athena table based on the log delivered to the processed logs bucket.
  9. Configure Amazon S3 event notification on the processed logs bucket to trigger the Lambda partitioning function.
  10. Configure Amazon Kinesis Analytics application for analysis of the logs directly from the stream.

ETL and preprocessing

In this section, we parse the CloudFront access logs as they are delivered, which occurs multiple times in an hour. We filter out commented records and use the user agent string to decipher the browser name, the name of the operating system, and whether the request has been made by a bot. For more details on how to decipher the preceding information based on the user agent string, see user-agents 1.1.0 in the Python documentation.

We use the Lambda preprocessing function to perform these tasks on individual rows of the access log. On successful completion, the rows are pushed to an Amazon Kinesis Firehose delivery stream to be persistently stored in an Amazon S3 bucket, the processed logs bucket.

To create a Firehose delivery stream with a new or existing S3 bucket as the destination, follow the steps described in Create a Firehose Delivery Stream to Amazon S3 in the S3 documentation. Keep most of the default settings, but select an AWS Identity and Access Management (IAM) role that has write access to your S3 bucket and specify GZIP compression. Name the delivery stream CloudFrontLogsToS3.

Another pre-requisite for this setup is to create an IAM role that provides the necessary permissions our AWS Lambda function to get the data from S3, process it, and deliver it to the CloudFrontLogsToS3 delivery stream.

Let’s use the AWS CLI to create the IAM role using the following the steps:

  1. Create the IAM policy (lambda-exec-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (lambda-cflogs-exec-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To download the policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/lambda-exec-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/preprocessiong-lambda/assume-lambda-policy.json  <path_on_your_local_machine>

Following is the lambda-exec-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "CloudWatchAccess",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Sid": "S3Access",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        },
        {
            "Sid": "FirehoseAccess",
            "Effect": "Allow",
            "Action": [
                "firehose:ListDeliveryStreams",
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3"
            ]
        }
    ]
}

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-exec-policy --policy-document file://<path>/lambda-exec-policy.json

To create the AWS Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Following is the assume-lambda-policy.json file, to grant Lambda permission to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To attach the policy (lambda-exec-policy) created to the AWS Lambda execution role (lambda-cflogs-exec-role), type the following command.

aws iam attach-role-policy --role-name lambda-cflogs-exec-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-exec-policy

Now that we have created the CloudFrontLogsToS3 Firehose delivery stream and the lambda-cflogs-exec-role IAM role for Lambda, the next step is to create a Lambda preprocessing function.

This Lambda preprocessing function parses the CloudFront access logs delivered into the S3 bucket and performs a few transformation and mapping operations on the data. The Lambda function adds descriptive information, such as the browser and the operating system that were used to make this request based on the user agent string found in the logs. The Lambda function also adds information about the web distribution to support scenarios where CloudFront access logs are delivered to a centralized S3 bucket from multiple distributions. With the solution in this blog post, you can get insights across distributions and their edge locations.

Use the Lambda Management Console to create a new Lambda function with a Python 2.7 runtime and the s3-get-object-python blueprint. Open the console, and on the Configure triggers page, choose the name of the S3 bucket where the CloudFront access logs are delivered. Choose Put for Event type. For Prefix, type the name of the prefix, if any, for the folder where CloudFront access logs are delivered, for example cloudfront-logs/. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable trigger.

Choose Next and provide a function name to identify this Lambda preprocessing function.

For Code entry type, choose Upload a file from Amazon S3. For S3 link URL, type https.amazonaws.com//preprocessing-lambda/pre-data.zip. In the section, also create an environment variable with the key KINESIS_FIREHOSE_STREAM and a value with the name of the Firehose delivery stream as CloudFrontLogsToS3.

Choose lambda-cflogs-exec-role as the IAM role for the Lambda function, and type prep-data.lambda_handler for the value for Handler.

Choose Next, and then choose Create Lambda.

Table creation in Amazon Athena

In this step, we will build the Athena table. Use the Athena console in the same region and create the table using the query editor.

CREATE EXTERNAL TABLE IF NOT EXISTS cf_logs (
  logdate date,
  logtime string,
  location string,
  bytes bigint,
  requestip string,
  method string,
  host string,
  uri string,
  status bigint,
  referrer string,
  useragent string,
  uriquery string,
  cookie string,
  resulttype string,
  requestid string,
  header string,
  csprotocol string,
  csbytes string,
  timetaken bigint,
  forwardedfor string,
  sslprotocol string,
  sslcipher string,
  responseresulttype string,
  protocolversion string,
  browserfamily string,
  osfamily string,
  isbot string,
  filename string,
  distribution string
)
PARTITIONED BY(year string, month string, day string, hour string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION 's3://<pre-processing-log-bucket>/prefix/';

Creation of the Athena partition

A popular website with millions of requests each day routed using Amazon CloudFront can generate a large volume of logs, on the order of a few terabytes a day. We strongly recommend that you partition your data to effectively restrict the amount of data scanned by each query. Partitioning significantly improves query performance and substantially reduces cost. The Lambda partitioning function adds the partition information to the Athena table for the data delivered to the preprocessed logs bucket.

Before delivering the preprocessed Amazon CloudFront logs file into the preprocessed logs bucket, Amazon Kinesis Firehose adds a UTC time prefix in the format YYYY/MM/DD/HH. This approach supports multilevel partitioning of the data by year, month, date, and hour. You can invoke the Lambda partitioning function every time a new processed Amazon CloudFront log is delivered to the preprocessed logs bucket. To do so, configure the Lambda partitioning function to be triggered by an S3 Put event.

For a website with millions of requests, a large number of preprocessed logs can be delivered multiple times in an hour—for example, at the interval of one each second. To avoid querying the Athena table for partition information every time a preprocessed log file is delivered, you can create an Amazon DynamoDB table for fast lookup.

Based on the year, month, data and hour in the prefix of the delivered log, the Lambda partitioning function checks if the partition specification exists in the Amazon DynamoDB table. If it doesn’t, it’s added to the table using an atomic operation, and then the Athena table is updated.

Type the following command to create the Amazon DynamoDB table.

aws dynamodb create-table --table-name athenapartitiondetails \
--attribute-definitions AttributeName=PartitionSpec,AttributeType=S \
--key-schema AttributeName=PartitionSpec,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=100,WriteCapacityUnits=100

Here the following is true:

  • PartitionSpec is the hash key and is a representation of the partition signature—for example, year=”2017”; month=”05”; day=”15”; hour=”10”.
  • Depending on the rate at which the processed log files are delivered to the processed log bucket, you might have to increase the ReadCapacityUnits and WriteCapacityUnits values, if these are throttled.

The other attributes besides PartitionSpec are the following:

  • PartitionPath – The S3 path associated with the partition.
  • PartitionType – The type of partition used (Hour, Month, Date, Year, or ALL). In this case, ALL is used.

Next step is to create the IAM role to provide permissions for the Lambda partitioning function. You require permissions to do the following:

  1. Look up and write partition information to DynamoDB.
  2. Alter the Athena table with new partition information.
  3. Perform Amazon CloudWatch logs operations.
  4. Perform Amazon S3 operations.

To download the policy document to your local machine, type following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/lambda-partition-function-execution-policy.json  <path_on_your_local_machine>

To download the assume policy document to your local machine, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/assume-lambda-policy.json <path_on_your_local_machine>

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-cflogs-exec-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(lambda-partition-exec-policy) used by the Lambda execution role.
  2. Create the Lambda execution role (lambda-partition-execution-role)and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

To create the IAM policy used by Lambda execution role, type the following command.

aws iam create-policy --policy-name lambda-partition-exec-policy --policy-document file://<path>/lambda-partition-function-execution-policy.json

To create the Lambda execution role and assign the service to use this role, type the following command.

aws iam create-role --role-name lambda-partition-execution-role --assume-role-policy-document file://<path>/assume-lambda-policy.json

To attach the policy (lambda-partition-exec-policy) created to the AWS Lambda execution role (lambda-partition-execution-role), type the following command.

aws iam attach-role-policy --role-name lambda-partition-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/lambda-partition-exec-policy

Following is the lambda-partition-function-execution-policy.json file, which is the IAM policy used by the Lambda execution role.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
            	"Sid": "DDBTableAccess",
            	"Effect": "Allow",
            	"Action": "dynamodb:PutItem"
            	"Resource": "arn:aws:dynamodb*:*:table/athenapartitiondetails"
        	},
        	{
            	"Sid": "S3Access",
            	"Effect": "Allow",
            	"Action": [
                		"s3:GetBucketLocation",
                		"s3:GetObject",
                		"s3:ListBucket",
                		"s3:ListBucketMultipartUploads",
                		"s3:ListMultipartUploadParts",
                		"s3:AbortMultipartUpload",
                		"s3:PutObject"
            	],
          		"Resource":"arn:aws:s3:::*"
		},
	              {
		      "Sid": "AthenaAccess",
      		"Effect": "Allow",
      		"Action": [ "athena:*" ],
      		"Resource": [ "*" ]
	      },
        	{
            	"Sid": "CloudWatchLogsAccess",
            	"Effect": "Allow",
            	"Action": [
                		"logs:CreateLogGroup",
                		"logs:CreateLogStream",
             	   	"logs:PutLogEvents"
            	],
            	"Resource": "arn:aws:logs:*:*:*"
        	}
    ]
}

Download the .jar file containing the Java deployment package to your local machine.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/partitioning-lambda/aws-lambda-athena-1.0.0.jar <path_on_your_local_machine>

From the AWS Management Console, create a new Lambda function with Java8 as the runtime. Select the Blank Function blueprint.

On the Configure triggers page, choose the name of the S3 bucket where the preprocessed logs are delivered. Choose Put for the Event Type. For Prefix, type the name of the prefix folder, if any, where preprocessed logs are delivered by Firehose—for example, out/. For Suffix, type the name of the compression format that the Firehose stream (CloudFrontLogToS3) delivers the preprocessed logs —for example, gz. To invoke Lambda to retrieve the logs from the S3 bucket as they are delivered, select Enable Trigger.

Choose Next and provide a function name to identify this Lambda partitioning function.

Choose Java8 for Runtime for the AWS Lambda function. Choose Upload a .ZIP or .JAR file for the Code entry type, and choose Upload to upload the downloaded aws-lambda-athena-1.0.0.jar file.

Next, create the following environment variables for the Lambda function:

  • TABLE_NAME – The name of the Athena table (for example, cf_logs).
  • PARTITION_TYPE – The partition to be created based on the Athena table for the logs delivered to the sub folders in S3 bucket based on Year, Month, Date, Hour, or Set this to ALL to use Year, Month, Date, and Hour.
  • DDB_TABLE_NAME – The name of the DynamoDB table holding partition information (for example, athenapartitiondetails).
  • ATHENA_REGION – The current AWS Region for the Athena table to construct the JDBC connection string.
  • S3_STAGING_DIR – The Amazon S3 location where your query output is written. The JDBC driver asks Athena to read the results and provide rows of data back to the user (for example, s3://<bucketname>/<folder>/).

To configure the function handler and IAM, for Handler copy and paste the name of the handler: com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3EventWithDDB::handleRequest. Choose the existing IAM role, lambda-partition-execution-role.

Choose Next and then Create Lambda.

Interactive analysis using Amazon Athena

In this section, we analyze the historical data that’s been collected since we added the partitions to the Amazon Athena table for data delivered to the preprocessing logs bucket.

Scenario 1 is robot traffic by edge location.

SELECT COUNT(*) AS ct, requestip, location FROM cf_logs
WHERE isbot='True'
GROUP BY requestip, location
ORDER BY ct DESC;

Scenario 2 is total bytes transferred per distribution for each edge location for your website.

SELECT distribution, location, SUM(bytes) as totalBytes
FROM cf_logs
GROUP BY location, distribution;

Scenario 3 is the top 50 viewers of your website.

SELECT requestip, COUNT(*) AS ct  FROM cf_logs
GROUP BY requestip
ORDER BY ct DESC;

Streaming analysis using Amazon Kinesis Analytics

In this section, you deploy a stream processing application using Amazon Kinesis Analytics to analyze the preprocessed Amazon CloudFront log streams. This application analyzes directly from the Amazon Kinesis Stream as it is delivered to the preprocessing logs bucket. The stream queries in section are focused on gaining the following insights:

  • The IP address of the bot, identified by its Amazon CloudFront edge location, that is currently sending requests to your website. The query also includes the total bytes transferred as part of the response.
  • The total bytes served per distribution per population for your website.
  • The top 10 viewers of your website.

To download the firehose-access-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/firehose-access-policy.json  <path_on_your_local_machine>

To download the kinesisanalytics-policy.json file, type the following.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis/kinesisanalytics/assume-kinesisanalytics-policy.json <path_on_your_local_machine>

Before we create the Amazon Kinesis Analytics application, we need to create the IAM role to provide permission for the analytics application to access Amazon Kinesis Firehose stream.

Let’s use the AWS CLI to create the IAM role using the following three steps:

  1. Create the IAM policy(firehose-access-policy) for the Lambda execution role to use.
  2. Create the Lambda execution role (ka-execution-role) and assign the service to use this role.
  3. Attach the policy created in step 1 to the Lambda execution role.

Following is the firehose-access-policy.json file, which is the IAM policy used by Kinesis Analytics to read Firehose delivery stream.

{
    "Version": "2012-10-17",
    "Statement": [
      	{
    	"Sid": "AmazonFirehoseAccess",
    	"Effect": "Allow",
    	"Action": [
       	"firehose:DescribeDeliveryStream",
        	"firehose:Get*"
    	],
    	"Resource": [
              "arn:aws:firehose:*:*:deliverystream/CloudFrontLogsToS3”
       ]
     }
}

Following is the assume-kinesisanalytics-policy.json file, to grant Amazon Kinesis Analytics permissions to assume a role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kinesisanalytics.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

To create the IAM policy used by Analytics access role, type the following command.

aws iam create-policy --policy-name firehose-access-policy --policy-document file://<path>/firehose-access-policy.json

To create the Analytics execution role and assign the service to use this role, type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To attach the policy (irehose-access-policy) created to the Analytics execution role (ka-execution-role), type the following command.

aws iam attach-role-policy --role-name ka-execution-role --policy-arn arn:aws:iam::<your-account-id>:policy/firehose-access-policy

To deploy the Analytics application, first download the configuration file and then modify ResourceARN and RoleARN for the Amazon Kinesis Firehose input configuration.

"KinesisFirehoseInput": { 
    "ResourceARN": "arn:aws:firehose:<region>:<account-id>:deliverystream/CloudFrontLogsToS3", 
    "RoleARN": "arn:aws:iam:<account-id>:role/ka-execution-role"
}

To download the Analytics application configuration file, type the following command.

aws s3 cp s3://aws-bigdata-blog/artifacts/Serverless-CF-Analysis//kinesisanalytics/kinesis-analytics-app-configuration.json <path_on_your_local_machine>

To deploy the application, type the following command.

aws kinesisanalytics create-application --application-name "cf-log-analysis" --cli-input-json file://<path>/kinesis-analytics-app-configuration.json

To start the application, type the following command.

aws kinesisanalytics start-application --application-name "cf-log-analysis" --input-configuration Id="1.1",InputStartingPositionConfiguration={InputStartingPosition="NOW"}

SQL queries using Amazon Kinesis Analytics

Scenario 1 is a query for detecting bots for sending request to your website detection for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BOT_DETECTION" (requesttime TIME, destribution VARCHAR(16), requestip VARCHAR(64), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BOT_DETECTION_PUMP" AS INSERT INTO "BOT_DETECTION"
--
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "request_ip" as requestip, 
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
WHERE "is_bot" = true
GROUP BY "request_ip", "edge_location", "distribution_name",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 2 is a query for total bytes transferred per distribution for each edge location for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "BYTES_TRANSFFERED" (requesttime TIME, destribution VARCHAR(16), edgelocation VARCHAR(64), totalBytes BIGINT);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "BYTES_TRANSFFERED_PUMP" AS INSERT INTO "BYTES_TRANSFFERED"
-- Bytes Transffered per second per web destribution by edge location
SELECT STREAM 
    STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND) as requesttime,
    "distribution_name" as distribution,
    "edge_location" as edgelocation, 
    SUM("bytes") as totalBytes
FROM "CF_LOG_STREAM_001"
GROUP BY "distribution_name", "edge_location", "request_date",
STEP("CF_LOG_STREAM_001"."request_time" BY INTERVAL '1' SECOND),
STEP("CF_LOG_STREAM_001".ROWTIME BY INTERVAL '1' SECOND);

Scenario 3 is a query for the top 50 viewers for your website.

-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "TOP_TALKERS" (requestip VARCHAR(64), requestcount DOUBLE);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "TOP_TALKERS_PUMP" AS INSERT INTO "TOP_TALKERS"
-- Top Ten Talker
SELECT STREAM ITEM as requestip, ITEM_COUNT as requestcount FROM TABLE(TOP_K_ITEMS_TUMBLING(
  CURSOR(SELECT STREAM * FROM "CF_LOG_STREAM_001"),
  'request_ip', -- name of column in single quotes
  50, -- number of top items
  60 -- tumbling window size in seconds
  )
);

Conclusion

Following the steps in this blog post, you just built an end-to-end serverless architecture to analyze Amazon CloudFront access logs. You analyzed these both in interactive and streaming mode, using Amazon Athena and Amazon Kinesis Analytics respectively.

By creating a partition in Athena for the logs delivered to a centralized bucket, this architecture is optimized for performance and cost when analyzing large volumes of logs for popular websites that receive millions of requests. Here, we have focused on just three common use cases for analysis, sharing the analytic queries as part of the post. However, you can extend this architecture to gain deeper insights and generate usage reports to reduce latency and increase availability. This way, you can provide a better experience on your websites fronted with Amazon CloudFront.

In this blog post, we focused on building serverless architecture to analyze Amazon CloudFront access logs. Our plan is to extend the solution to provide rich visualization as part of our next blog post.


About the Authors

Rajeev Srinivasan is a Senior Solution Architect for AWS. He works very close with our customers to provide big data and NoSQL solution leveraging the AWS platform and enjoys coding . In his spare time he enjoys riding his motorcycle and reading books.

 

Sai Sriparasa is a consultant with AWS Professional Services. He works with our customers to provide strategic and tactical big data solutions with an emphasis on automation, operations & security on AWS. In his spare time, he follows sports and current affairs.

 

 


Related

Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight

Join Us at the 10th Annual Hadoop Summit / DataWorks Summit, San Jose (Jun 13-15)

Post Syndicated from mikesefanov original https://yahooeng.tumblr.com/post/160966148886

yahoohadoop:

image

We’re excited to co-host the 10th Annual Hadoop Summit, the leading conference for the Apache Hadoop community, taking place on June 13 – 15 at the San Jose Convention Center. In the last few years, the Hadoop Summit has expanded to cover all things data beyond just Apache Hadoop – such as data science, cloud and operations, IoT and applications – and has been aptly renamed the DataWorks Summit. The three-day program is bursting at the seams! Here are just a few of the reasons why you cannot miss this must-attend event:

  • Familiarize yourself with the cutting edge in Apache project developments from the committers
  • Learn from your peers and industry experts about innovative and real-world use cases, development and administration tips and tricks, success stories and best practices to leverage all your data – on-premise and in the cloud – to drive predictive analytics, distributed deep-learning and artificial intelligence initiatives
  • Attend one of our more than 170 technical deep dive breakout sessions from nearly 200 speakers across eight tracks
  • Check out our keynotes, meetups, trainings, technical crash courses, birds-of-a-feather sessions, Women in Big Data and more
  • Attend the community showcase where you can network with sponsors and industry experts, including a host of startups and large companies like Microsoft, IBM, Oracle, HP, Dell EMC and Teradata

Similar to previous years, we look forward to continuing Yahoo’s decade-long tradition of thought leadership at this year’s summit. Join us for an in-depth look at Yahoo’s Hadoop culture and for the latest in technologies such as Apache Tez, HBase, Hive, Data Highway Rainbow, Mail Data Warehouse and Distributed Deep Learning at the breakout sessions below. Or, stop by Yahoo kiosk #700 at the community showcase.

Also, as a co-host of the event, Yahoo is pleased to offer a 20% discount for the summit with the code MSPO20. Register here for Hadoop Summit, San Jose, California!


DAY 1. TUESDAY June 13, 2017


12:20 – 1:00 P.M. TensorFlowOnSpark – Scalable TensorFlow Learning On Spark Clusters

Andy Feng – VP Architecture, Big Data and Machine Learning

Lee Yang – Sr. Principal Engineer

In this talk, we will introduce a new framework, TensorFlowOnSpark, for scalable TensorFlow learning, that was open sourced in Q1 2017. This new framework enables easy experimentation for algorithm designs, and supports scalable training & inferencing on Spark clusters. It supports all TensorFlow functionalities including synchronous & asynchronous learning, model & data parallelism, and TensorBoard. It provides architectural flexibility for data ingestion to TensorFlow and network protocols for server-to-server communication. With a few lines of code changes, an existing TensorFlow algorithm can be transformed into a scalable application.

2:10 – 2:50 P.M. Handling Kernel Upgrades at Scale – The Dirty Cow Story

Samy Gawande – Sr. Operations Engineer

Savitha Ravikrishnan – Site Reliability Engineer

Apache Hadoop at Yahoo is a massive platform with 36 different clusters spread across YARN, Apache HBase, and Apache Storm deployments, totaling 60,000 servers made up of 100s of different hardware configurations accumulated over generations, presenting unique operational challenges and a variety of unforeseen corner cases. In this talk, we will share methods, tips and tricks to deal with large scale kernel upgrade on heterogeneous platforms within tight timeframes with 100% uptime and no service or data loss through the Dirty COW use case (privilege escalation vulnerability found in the Linux Kernel in late 2016).

5:00 – 5:40 P.M. Data Highway Rainbow –  Petabyte Scale Event Collection, Transport, and Delivery at Yahoo

Nilam Sharma – Sr. Software Engineer

Huibing Yin – Sr. Software Engineer

This talk presents the architecture and features of Data Highway Rainbow, Yahoo’s hosted multi-tenant infrastructure which offers event collection, transport and aggregated delivery as a service. Data Highway supports collection from multiple data centers & aggregated delivery in primary Yahoo data centers which provide a big data computing cluster. From a delivery perspective, Data Highway supports endpoints/sinks such as HDFS, Storm and Kafka; with Storm & Kafka endpoints tailored towards latency sensitive consumers.


DAY 2. WEDNESDAY June 14, 2017


9:05 – 9:15 A.M. Yahoo General Session – Shaping Data Platform for Lasting Value

Sumeet Singh  – Sr. Director, Products

With a long history of open innovation with Hadoop, Yahoo continues to invest in and expand the platform capabilities by pushing the boundaries of what the platform can accomplish for the entire organization. In the last 11 years (yes, it is that old!), the Hadoop platform has shown no signs of giving up or giving in. In this talk, we explore what makes the shared multi-tenant Hadoop platform so special at Yahoo.

12:20 – 1:00 P.M. CaffeOnSpark Update – Recent Enhancements and Use Cases

Mridul Jain – Sr. Principal Engineer

Jun Shi – Principal Engineer

By combining salient features from deep learning framework Caffe and big-data frameworks Apache Spark and Apache Hadoop, CaffeOnSpark enables distributed deep learning on a cluster of GPU and CPU servers. We released CaffeOnSpark as an open source project in early 2016, and shared its architecture design and basic usage at Hadoop Summit 2016. In this talk, we will update audiences about the recent development of CaffeOnSpark. We will highlight new features and capabilities: unified data layer which multi-label datasets, distributed LSTM training, interleave testing with training, monitoring/profiling framework, and docker deployment.

12:20 – 1:00 P.M. Tez Shuffle Handler – Shuffling at Scale with Apache Hadoop

Jon Eagles – Principal Engineer  

Kuhu Shukla – Software Engineer

In this talk we introduce a new Shuffle Handler for Tez, a YARN Auxiliary Service, that addresses the shortcomings and performance bottlenecks of the legacy MapReduce Shuffle Handler, the default shuffle service in Apache Tez. The Apache Tez Shuffle Handler adds composite fetch which has support for multi-partition fetch to mitigate performance slow down and provides deletion APIs to reduce disk usage for long running Tez sessions. As an emerging technology we will outline future roadmap for the Apache Tez Shuffle Handler and provide performance evaluation results from real world jobs at scale.

2:10 – 2:50 P.M. Achieving HBase Multi-Tenancy with RegionServer Groups and Favored Nodes

Thiruvel Thirumoolan – Principal Engineer

Francis Liu – Sr. Principal Engineer

At Yahoo! HBase has been running as a hosted multi-tenant service since 2013. In a single HBase cluster we have around 30 tenants running various types of workloads (ie batch, near real-time, ad-hoc, etc). We will walk through multi-tenancy features explaining our motivation, how they work as well as our experiences running these multi-tenant clusters. These features will be available in Apache HBase 2.0.

2:10 – 2:50 P.M. Data Driving Yahoo Mail Growth and Evolution with a 50 PB Hadoop Warehouse

Nick Huang – Director, Data Engineering, Yahoo Mail  

Saurabh Dixit – Sr. Principal Engineer, Yahoo Mail

Since 2014, the Yahoo Mail Data Engineering team took on the task of revamping the Mail data warehouse and analytics infrastructure in order to drive the continued growth and evolution of Yahoo Mail. Along the way we have built a 50 PB Hadoop warehouse, and surrounding analytics and machine learning programs that have transformed the way data plays in Yahoo Mail. In this session we will share our experience from this 3 year journey, from the system architecture, analytics systems built, to the learnings from development and drive for adoption.

DAY3. THURSDAY June 15, 2017


2:10 – 2:50 P.M. OracleStore – A Highly Performant RawStore Implementation for Hive Metastore

Chris Drome – Sr. Principal Engineer  

Jin Sun – Principal Engineer

Today, Yahoo uses Hive in many different spaces, from ETL pipelines to adhoc user queries. Increasingly, we are investigating the practicality of applying Hive to real-time queries, such as those generated by interactive BI reporting systems. In order for Hive to succeed in this space, it must be performant in all aspects of query execution, from query compilation to job execution. One such component is the interaction with the underlying database at the core of the Metastore. As an alternative to ObjectStore, we created OracleStore as a proof-of-concept. Freed of the restrictions imposed by DataNucleus, we were able to design a more performant database schema that better met our needs. Then, we implemented OracleStore with specific goals built-in from the start, such as ensuring the deduplication of data. In this talk we will discuss the details behind OracleStore and the gains that were realized with this alternative implementation. These include a reduction of 97%+ in the storage footprint of multiple tables, as well as query performance that is 13x faster than ObjectStore with DirectSQL and 46x faster than ObjectStore without DirectSQL.

3:00 P.M. – 3:40 P.M. Bullet – A Real Time Data Query Engine

Akshai Sarma – Sr. Software Engineer

Michael Natkovich – Director, Engineering

Bullet is an open sourced, lightweight, pluggable querying system for streaming data without a persistence layer implemented on top of Storm. It allows you to filter, project, and aggregate on data in transit. It includes a UI and WS. Instead of running queries on a finite set of data that arrived and was persisted or running a static query defined at the startup of the stream, our queries can be executed against an arbitrary set of data arriving after the query is submitted. In other words, it is a look-forward system. Bullet is a multi-tenant system that scales independently of the data consumed and the number of simultaneous queries. Bullet is pluggable into any streaming data source. It can be configured to read from systems such as Storm, Kafka, Spark, Flume, etc. Bullet leverages Sketches to perform its aggregate operations such as distinct, count distinct, sum, count, min, max, and average.

3:00 P.M. – 3:40 P.M. Yahoo – Moving Beyond Running 100% of Apache Pig Jobs on Apache Tez

Rohini Palaniswamy – Sr. Principal Engineer

Last year at Yahoo, we spent great effort in scaling, stabilizing and making Pig on Tez production ready and by the end of the year retired running Pig jobs on Mapreduce. This talk will detail the performance and resource utilization improvements Yahoo achieved after migrating all Pig jobs to run on Tez. After successful migration and the improved performance we shifted our focus to addressing some of the bottlenecks we identified and new optimization ideas that we came up with to make it go even faster. We will go over the new features and work done in Tez to make that happen like custom YARN ShuffleHandler, reworking DAG scheduling order, serialization changes, etc. We will also cover exciting new features that were added to Pig for performance such as bloom join and byte code generation.

4:10 P.M. – 4:50 P.M. Leveraging Docker for Hadoop Build Automation and Big Data Stack Provisioning

Evans Ye,  Software Engineer

Apache Bigtop as an open source Hadoop distribution, focuses on developing packaging, testing and deployment solutions that help infrastructure engineers to build up their own customized big data platform as easy as possible. However, packages deployed in production require a solid CI testing framework to ensure its quality. Numbers of Hadoop component must be ensured to work perfectly together as well. In this presentation, we’ll talk about how Bigtop deliver its containerized CI framework which can be directly replicated by Bigtop users. The core revolution here are the newly developed Docker Provisioner that leveraged Docker for Hadoop deployment and Docker Sandbox for developer to quickly start a big data stack. The content of this talk includes the containerized CI framework, technical detail of Docker Provisioner and Docker Sandbox, a hierarchy of docker images we designed, and several components we developed such as Bigtop Toolchain to achieve build automation.

Register here for Hadoop Summit, San Jose, California with a 20% discount code MSPO20

Questions? Feel free to reach out to us at [email protected] Hope to see you there!

Running Jupyter Notebook and JupyterHub on Amazon EMR

Post Syndicated from Tom Zeng original https://aws.amazon.com/blogs/big-data/running-jupyter-notebook-and-jupyterhub-on-amazon-emr/

Tom Zeng is a Solutions Architect for Amazon EMR

Jupyter Notebook (formerly IPython) is one of the most popular user interfaces for running Python, R, Julia, Scala, and other languages to process and visualize data, perform statistical analysis, and train and run machine learning models. Jupyter notebooks are self-contained documents that can include live code, charts, narrative text, and more. The notebooks can be easily converted to HTML, PDF, and other formats for sharing.

Amazon EMR is a popular hosted big data processing service that allows users to easily run Hadoop, Spark, Presto, and other Hadoop ecosystem applications, such as Hive and Pig.

Python, Scala, and R provide support for Spark and Hadoop, and running them in Jupyter on Amazon EMR makes it easy to take advantage of:

  • the big-data processing capabilities of Hadoop applications.
  • the large selection of Python and R packages for analytics and visualization.

JupyterHub is a multiple-user environment for Jupyter. You can use the following bootstrap action (BA) to install Jupyter and JupyterHub on Amazon EMR:

s3://aws-bigdata-blog/artifacts/aws-blog-emr-jupyter/install-jupyter-emr5.sh

These are the supported Jupyter kernels:

  • Python
  • R
  • Scala
  • Apache Toree (which provides the Spark, PySpark, SparkR, and SparkSQL kernels)
  • Julia
  • Ruby
  • JavaScript
  • CoffeeScript
  • Torch

The BA will install Jupyter, JupyterHub, and sample notebooks on the master node.

Commonly used Python and R data science and machine learning packages can be optionally installed on all nodes.

The following arguments can be passed to the BA:

--rInstall the IRKernel for R.
--toreeInstall the Apache Toree kernel that supports Scala, PySpark, SQL, SparkR for Apache Spark.
--juliaInstall the IJulia kernel for Julia.
--torchInstall the iTorch kernel for Torch (machine learning and visualization).
--rubyInstall the iRuby kernel for Ruby.
--ds-packagesInstall the Python data science-related packages (scikit-learn pandas statsmodels).
--ml-packagesInstall the Python machine learning-related packages (theano keras tensorflow).
--python-packagesInstall specific Python packages (for example, ggplot and nilearn).
--portSet the port for Jupyter notebook. The default is 8888.
--passwordSet the password for the Jupyter notebook.
--localhost-onlyRestrict Jupyter to listen on localhost only. The default is to listen on all IP addresses.
--jupyterhubInstall JupyterHub.
--jupyterhub-portSet the port for JuputerHub. The default is 8000.
--notebook-dirSpecify the notebook folder. This could be a local directory or an S3 bucket.
--cached-installUse some cached dependency artifacts on S3 to speed up installation.
--sslEnable SSL. For production, make sure to use your own certificate and key files.
--copy-samplesCopy sample notebooks to the notebook folder.
--spark-optsUser-supplied Spark options to override the default values.
--s3fsUse instead of s3nb (the default) for storing notebooks on Amazon S3. This argument can cause slowness if the S3 bucket has lots of files. The Upload file and Create folder menu options do not work with s3nb.

By default (with no --password and --port arguments), Jupyter will run on port 8888 with no password protection; JupyterHub will run on port 8000.  The --port and --jupyterhub-port arguments can be used to override the default ports to avoid conflicts with other applications.

The --r option installs the IRKernel for R. It also installs SparkR and sparklyr for R, so make sure Spark is one of the selected EMR applications to be installed. You’ll need the Spark application if you use the --toree argument.

If you used --jupyterhub, use Linux users to sign in to JupyterHub. (Be sure to create passwords for the Linux users first.)  hadoop, the default admin user for JupyterHub, can be used to set up other users. The –password option sets the password for Jupyter and for the hadoop user for JupyterHub.

Jupyter on EMR allows users to save their work on Amazon S3 rather than on local storage on the EMR cluster (master node).

To store notebooks on S3, use:

--notebook-dir <s3://your-bucket/folder/>

To store notebooks in a directory different from the user’s home directory, use:

--notebook-dir <local directory>

The following example CLI command is used to launch a five-node (c3.4xlarge) EMR 5.2.0 cluster with the bootstrap action. The BA will install all the available kernels. It will also install the ggplot and nilearn Python packages and set:

  • the Jupyter port to 8880
  • the password to jupyter
  • the JupyterHub port to 8001
aws emr create-cluster --release-label emr-5.2.0 \
  --name 'emr-5.2.0 sparklyr + jupyter cli example' \
  --applications Name=Hadoop Name=Hive Name=Spark Name=Pig Name=Tez Name=Ganglia Name=Presto \
  --ec2-attributes KeyName=<your-ec2-key>,InstanceProfile=EMR_EC2_DefaultRole \
  --service-role EMR_DefaultRole \
  --instance-groups \
    InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.4xlarge \
    InstanceGroupType=CORE,InstanceCount=4,InstanceType=c3.4xlarge \
  --region us-east-1 \
  --log-uri s3://<your-s3-bucket>/emr-logs/ \
  --bootstrap-actions \
    Name='Install Jupyter notebook',Path="s3://aws-bigdata-blog/artifacts/aws-blog-emr-jupyter/install-jupyter-emr5.sh",Args=[--r,--julia,--toree,--torch,--ruby,--ds-packages,--ml-packages,--python-packages,'ggplot nilearn',--port,8880,--password,jupyter,--jupyterhub,--jupyterhub-port,8001,--cached-install,--notebook-dir,s3://<your-s3-bucket>/notebooks/,--copy-samples]

Replace <your-ec2-key> with your AWS access key and <your-s3-bucket> with the S3 bucket where you store notebooks. You can also change the instance types to suit your needs and budget.

If you are using the EMR console to launch a cluster, you can specify the bootstrap action as follows:

o_jupyter_1

When the cluster is available, set up the SSH tunnel and web proxy. The Jupyter notebook should be available at localhost:8880 (as specified in the example CLI command).

o_jupyter_2

After you have signed in, you will see the home page, which displays the notebook files:

o_jupyter_3

If JupyterHub is installed, the Sign in page should be available at port 8001 (as specified in the CLI example):

o_jupyter_4

After you are signed in, you’ll see the JupyterHub and Jupyter home pages are the same. The JupyterHub URL, however, is /user/<username>/tree instead of /tree.

o_jupyter_5

The JupyterHub Admin page is used for managing users:

o_jupyter_6

You can install Jupyter extensions from the Nbextensions tab:

o_jupyter_7

If you specified the --copy-samples option in the BA, you should see sample notebooks on the home page. To try the samples, first open and run the CopySampleDataToHDFS.ipynb notebook to copy some sample data files to HDFS. In the CLI example, --python-packages,'ggplot nilearn' is used to install the ggplot and nilearn packages. You can verify those packages were installed by running the Py-ggplot and PyNilearn notebooks.

The CreateUser.ipynb notebook contains examples for setting up JupyterHub users.

The PySpark.ipynb and ScalaSpark.ipynb notebooks contain the Python and Scala versions of some machine learning examples from the Spark distribution (Logistic Regression, Neural Networks, Random Forest, and Support Vector Machines):

o_jupyter_8

PyHivePrestoHDFS.ipynb shows how to access Hive, Presto, and HDFS in Python. (Be sure to run the CreateHivePrestoS3Tables.ipynb first to create tables.) The %%time and %%timeit cell magics can be used to benchmark Hive and Presto queries (and other executable code):

o_jupyter_9

Here are some other sample notebooks for you to try.

SparkSQLParquetJSON.ipynb:

o_SparkSQLParquetJSON

plot_separating_hyperplane.ipynb:

o_plot_separating_hyperplane

R-SVMLinearNonLinear.ipynb:

o_R-SVMLinearNonLinear

plot_iris.ipynb:

o_plot_iris

Julia-IrisPlot.ipynb:

o_Julia-IrisPlot

PyIrisPlot.ipynb:

o_PyIrisPlot

R-RandomForestVisualization.ipynb:

o_R-RandomForestVisualization

GrangerCausality.ipynb:

o_GrangerCausality

SQLite.ipynb:

o_SQLite

GraphvizDot.ipynb:

o_GraphvizDot

Conclusion

Data scientists who run Jupyter and JupyterHub on Amazon EMR can use Python, R, Julia, and Scala to process, analyze, and visualize big data stored in Amazon S3. Jupyter notebooks can be saved to S3 automatically, so users can shut down and launch new EMR clusters, as needed. EMR makes it easy to spin up clusters with different sizes and CPU/memory configurations to suit different workloads and budgets. This can greatly reduce the cost of data-science investigations.

If you have questions about using Jupyter and JupyterHub on EMR or would like share your use cases, please leave a comment below.


Related

Running sparklyr – RStudio’s R Interface to Spark on Amazon EMR

sparklyr_2

Joining and Enriching Streaming Data on Amazon Kinesis

Post Syndicated from Assaf Mentzer original https://aws.amazon.com/blogs/big-data/joining-and-enriching-streaming-data-on-amazon-kinesis/

Are you trying to move away from a batch-based ETL pipeline? You might do this, for example, to get real-time insights into your streaming data, such as clickstream, financial transactions, sensor data, customer interactions, and so on.  If so, it’s possible that as soon as you get down to requirements, you realize your streaming data doesn’t have all of the fields you need for real-time processing, and you are really missing that JOIN keyword!

You might also have requirements to enrich the streaming data based on a static reference, a dynamic dataset, or even with another streaming data source.  How can you achieve this without sacrificing the velocity of the streaming data?

In this blog post, I provide three use cases and approaches for joining and enriching streaming data:

Joining streaming data with a relatively static dataset on Amazon S3 using Amazon Kinesis Analytics

In this use case, Amazon Kinesis Analytics can be used to define a reference data input on S3, and use S3 for enriching a streaming data source.

For example, bike share systems around the world can publish data files about available bikes and docks, at each station, in real time.  On bike-share system data feeds that follow the General Bikeshare Feed Specification (GBFS), there is a reference dataset that contains a static list of all stations, their capacities, and locations.

Let’s say you would like to enrich the bike-availability data (which changes throughout the day) with the bike station’s latitude and longitude (which is static) for downstream applications.  The architecture would look like this:

o_joiningenriching_1

To illustrate how you can use Amazon Kinesis Analytics to do this, follow these steps to set up an AWS CloudFormation stack, which will do the following:

  • Continuously produce sample bike-availability data onto an Amazon Kinesis stream (with, by default, a single shard).
  • Generate a sample S3 reference data file.
  • Create an Amazon Kinesis Analytics application that performs the join with the reference data file.

To create the CloudFormation stack

  1. Open the AWS CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
  2. Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
    https://s3.amazonaws.com/aws-bigdata-blog/artifacts/Joining-and-Enriching-Streaming-Data-on-Amazon-Kinesis/demo-data-enrichment-s3-reference-cfn-template.json
  1. For the stack name, type demo-data-enrichment-s3-reference-stack. Under the KinesisAnalyticsReferenceDataS3Bucket parameter, type the name of an S3 bucket in the US East (N. Virginia) region where the reference data will be uploaded.  This CloudFormation template will create a sample data file under KinesisAnalyticsReferenceDataS3Key. (Make sure the value of the KinesisAnalyticsReferenceDataS3Key parameter does not conflict with existing S3 objects in your bucket).You’ll also see parameters referencing an S3 bucket (LambdaKinesisAnalyticsApplicationCustomResourceCodeS3Bucket) and an associated S3 key. The S3 bucket and the S3 key represent the location of the AWS Lambda package (written in Python) that contains the AWS CloudFormation custom resources required to create an Amazon Kinesis application. Do not modify the values of these parameters.
  1. Follow the remaining steps in the Create Stack wizard (click “Next” button, and then the “Create” button). Wait until the status displayed for the stack is CREATE_COMPLETE.

You now have an Amazon Kinesis stream in your AWS account that has sample bike-availability streaming data which, if you followed the template, is named demo-data-enrichment-bike-availability-input-stream and an Amazon Kinesis Analytics application that performs the joining.

You might want to examine the reference data file generated under your bucket (its default name is demo_data_enrichment_s3_reference_data.json).  If you are using a JSON formatter/viewer, it will look like the following.  Note how the records are put together as top-level objects (no commas separating the records).

{
  "station_id": "s001",
  "name": "Liberty Ave",
  "lat": 40.6892,
  "lon": -74.0445
}
{
  "station_id": "s002",
  "name": "Empire St",
  "lat": 40.7484,
  "lon": -73.9857
}

In the CloudFormation template, the reference data has been added to the Amazon Kinesis application through the AddApplicationReferenceDataSource API method.

Next, open the Amazon Kinesis Analytics console and examine the application.  If you did not modify the default parameters, the application name should be demo-data-enrichment-s3-reference-application.

The Application status should be RUNNING.  If its status is still STARTING, wait until it changes to RUNNING. Choose Application details, and then go to SQL results. You should see a page similar to the following:

o_joiningenriching_2

In the Amazon Kinesis Analytics SQL code, an in-application stream (DESTINATION_SQL_STREAM) is created with five columns. Data is inserted into the stream using a pump (STREAM_PUMP) by joining the source stream (SOURCE_SQL_STREAM_001) and the reference S3 data file (REFERENCE_DATA) using the station_id field. For more information about streams and pumps, see In-Application Streams and Pumps in the Amazon Kinesis Analytics Developer Guide.

The joined (enriched) data fields – station_name, station_lat and station_lon – should appear on the Real-time analytics tab, DESTINATION_SQL_STREAM.

o_joiningenriching_3

The LEFT OUTER JOIN works just like the ANSI-standard SQL. When you use LEFT OUTER JOIN, the streaming records are preserved even if there is no matching station_id in the reference data.  You can delete LEFT OUTER (leaving only JOIN, which means INNER join), choose Save and then run SQL. Only the records with a matching station_id will appear in the output.

o_joiningenriching_4

You can write the results to an Amazon Kinesis stream or to S3, Amazon Redshift, or Amazon Elasticsearch Service with an Amazon Kinesis Firehose delivery stream by adding a destination on the Destination tab.  For more information, see the Writing SQL on Streaming Data with Amazon Kinesis Analytics blog post.

Note: If you have not modified the default parameters in the CloudFormation template, S3 reference data source should have been added to your application.  If you are interested in adding S3 reference data source on your own streams, the following code snippet shows what it looks like in Python:

import boto3
kinesisanalytics_client = boto3.client('kinesisanalytics')

application_name = 'YOUR KINESIS ANALYTICS APPLICATION NAME'
reference_data_s3_bucket = 'YOUR REFERENCE DATA S3 BUCKET'
reference_data_s3_key = 'YOUR REFERENCE DATA S3 KEY' #example: folder/sub-folder/reference.json
reference_data_role_arn = 'YOUR IAM ROLE ARN' #with s3:GetObject permission on the reference data s3 key. Example: arn:aws:iam::123456789012:role/service-role/role-name

response = kinesisanalytics_client.describe_application(
    ApplicationName=application_name
)
application_version_id = response['ApplicationDetail']['ApplicationVersionId']

kinesisanalytics_client.add_application_reference_data_source(
    ApplicationName=application_name,
    CurrentApplicationVersionId=application_version_id,
    ReferenceDataSource={
        'TableName': 'REFERENCE_DATA',
        'S3ReferenceDataSource': {
            'BucketARN': 'arn:aws:s3:::%s' % reference_data_s3_bucket,
            'FileKey': reference_data_s3_key,
            'ReferenceRoleARN': reference_data_role_arn
        },
        'ReferenceSchema': {
            'RecordFormat': {
                'RecordFormatType': 'JSON',
                'MappingParameters': {
                    'JSONMappingParameters': {
                        'RecordRowPath': '$'
                    }
                }
            },
            'RecordEncoding': 'UTF-8',
            'RecordColumns': [
                {
                    'Name': 'station_id',
                    'Mapping': '$.station_id',
                    'SqlType': 'VARCHAR(4)'
                },
                {
                    'Name': 'name',
                    'Mapping': '$.name',
                    'SqlType': 'VARCHAR(64)'
                },
                {
                    'Name': 'lat',
                    'Mapping': '$.lat',
                    'SqlType': 'REAL'
                },
                {
                    'Name': 'lon',
                    'Mapping': '$.lon',
                    'SqlType': 'REAL'
                },
            ]
        }
    }
)

 

This data enrichment approach works for a relatively static dataset because it requires you to upload the entire reference dataset to S3 whenever there is a change.  This approach might not work for cases where the dataset changes too frequently to catch up with the streaming data.

If you change the reference data stored in the S3 bucket, you need to use the UpdateApplication operation (using the API or AWS CLI) to refresh the data in the Amazon Kinesis Analytics in-application table.  You can use the following Python script to refresh the data. Although it will not be covered in this blog post, you can automate data refresh by setting up Amazon S3 event notification with AWS Lambda.

#!/usr/bin/env python
import json,boto3

# Name of the Kinesis Analytics Application
KINESIS_ANALYTICS_APPLICATION_NAME='demo-data-enrichment-s3-reference-application'

# AWS region
AWS_REGION='us-east-1'


def main_handler():

    kinesisanalytics_client=boto3.client('kinesisanalytics',AWS_REGION)

    # retrieve the current application version id
    response_describe_application = kinesisanalytics_client.describe_application(
        ApplicationName=KINESIS_ANALYTICS_APPLICATION_NAME
    )
    application_version_id=response_describe_application['ApplicationDetail']['ApplicationVersionId']

    # update the application
    response = kinesisanalytics_client.update_application(
        ApplicationName=KINESIS_ANALYTICS_APPLICATION_NAME,
        CurrentApplicationVersionId=application_version_id,
        ApplicationUpdate={
        }
    )

    print("Done")


if __name__ == "__main__":
    main_handler()

 

To clean up resources created during this demo

  1. To delete the Amazon Kinesis Analytics application, open the Amazon Kinesis Analytics console, choose the application (demo-data-enrichment-s3-reference-application), choose Actions, and then choose Delete application.
  2. To delete the stack, open the AWS CloudFormation console, choose the stack (demo-data-enrichment-s3-reference-stack), choose Actions, and then choose Delete Stack.

The S3 reference data file should have been removed from your bucket, but your other data files should remain intact.

Enriching streaming data with a dynamic dataset using AWS Lambda and Amazon DynamoDB

In many cases, the data you want to enrich is not static. Imagine a case in which you are capturing user activities from a web or mobile application and have to enrich the activity data with frequently changing fields from a user database table.

A better approach is to store reference data in a way that can support random reads and writes efficiently.  Amazon DynamoDB is a fully managed non-relational database that can be used in this case.  AWS Lambda can be set up to automatically read batches of records from your Amazon Kinesis streams, which can perform data enrichment like looking up data from DynamoDB, and then produce the enriched data onto another stream.

If you have a stream of user activities and want to look up a user’s birth year from a DynamoDB table, the architecture will look like this:

o_joiningenriching_5

To set up a demo with this architecture, follow these steps to set up an AWS CloudFormation stack, which continuously produces sample user scores onto an Amazon Kinesis stream. An AWS Lambda function enriches the records with data from a DynamoDB table and produces the results onto another Amazon Kinesis stream.

  1. Open the CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
  2. Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
    https://s3.amazonaws.com/aws-bigdata-blog/artifacts/Joining-and-Enriching-Streaming-Data-on-Amazon-Kinesis/demo-data-enrichment-ddb-reference-cfn-template.json
  3. For the stack name, type demo-data-enrichment-ddb-reference-stack. Review the parameters. If none of them conflict with your Amazon Kinesis stream names or DynamoDB table name, you can accept the default values and choose Next.  The following steps are written with the assumption that you are using the default values.
  4. Complete the remaining steps in the Create Stack wizard. Wait until CREATE_COMPLETE is displayed for the stack status.This CloudFormation template creates three Lambda functions: one for setting up sample data in a DynamoDB table, one for producing sample records continuously, and one for data enrichment. The description for this last function is Data Enrichment Demo – Lambda function to consume and enrich records from Kinesis.
  1. Open the Lambda console, select Data Enrichment Demo – Lambda function to consume and enrich records from Kinesis, and then choose the Triggers
  2. If No records processed is displayed for Last result, wait for a few minutes, and then reload this page. If things are set up correctly, OK will be displayed for Last result.  This might take a few minutes.

o_joiningenriching_6

At this point, the Lambda function is performing the data enrichment and producing records onto an output stream. Lambda is commonly used for preprocessing the analytics app to handle more complicated data formats.

Although this data enrichment process doesn’t involve Amazon Kinesis Analytics, you can still use the Kinesis Analytics service to examine, or even process, the enriched records, by creating a new Kinesis Analytics application and connect it to demo-data-enrichment-user-activity-output-stream.

The records on the demo-data-enrichment-user-activity-output-stream will look like the following and will show the enriched birthyear field on the streaming data.

o_joiningenriching_7

You can use this birthyear value in your Amazon Kinesis Analytics application. Although it’s not covered in this blog post, you can aggregate by age groups and count the user activities.

You can review the code for the Lambda function on the Code tab. The code used here is for demonstration purpose only. You might want to modify and thoroughly test it before applying it to your use case.

Note the following:

  • The Lambda function receives records in batches (instead of one record per Lambda invocation). You can control this behavior through Batch size on the Triggers tab (in the preceding example, Batch size is set to 100). The batch size you specify is the maximum number of records that you want your Lambda function to receive per invocation.
  • The retrieval from the reference data source (in this case, DynamoDB) can be done in batch instead of record by record. This example uses the batch_get_item() API method.
  • Depending on how strict the record sequencing requirement is, the writing of results to the output stream can also be done in batch. This example uses the put_records() API method.

DynamoDB is not the only option.  What’s important is to have a data source that supports random data read (and write) at a high efficiency.  Because it’s a distributed database with built-in partitioning, DynamoDB is a good choice, but you can also use Amazon RDS as long as the data retrieval is fast enough (perhaps by having a proper index and by spreading the read workload over read replicas). You can also use an in-memory database like Amazon ElastiCache for Redis or Memcached.

To clean up the resources created for this demo

  1. If you created a Kinesis Analytics application: Open the Amazon Kinesis Analytics console, select your application, choose Actions, and then choose Delete application.
  2. Open the CloudFormation console, choose demo-data-enrichment-ddb-reference-stack, choose Actions, and then choose Delete Stack.

Joining multiple sets of streaming data using Amazon Kinesis Analytics

To illustrate this use case, let’s say you have two streams of temperature sensor data coming from a set of machines: one measuring the engine temperature and the other measuring the power supply temperature.  For the best prediction of machine failure, you’ve been told (perhaps by your data scientist) that the two temperature readings must be validated against a prediction model ─ not individually, but as a set.

Because this is streaming data, the time window for the data joining should be clearly defined.  In this example, the joining must occur on temperature readings within the same window (same minute) so that the temperature of the engine is matched against the temperature of the power supply in the same timeframe.

This data joining can be achieved with Amazon Kinesis Analytics, but has to follow a certain pattern.

First of all, an Amazon Kinesis Analytics application supports no more than one streaming data source. The different sets of streaming data have to be produced onto one Amazon Kinesis stream.

An Amazon Kinesis Analytics application can use this Amazon Kinesis stream as input and can process the data based on a certain field (in this example, sensor_location) into multiple in-application streams.

The joining can now occur on the two in-application streams.  In this example, the join fields are the machine_id and the one-minute tumbling window.

o_joiningenriching_8

The selection of the time field for the windowing criteria is a topic of its own.  For information, see Writing SQL on Streaming Data with Amazon Kinesis Analytics. It is important to get a well-aligned window for real-world applications.  In this example, the processing time (ROWTIME) will be used for the tumbling window calculation for simplicity.

Again, you can follow these steps to set up an AWS CloudFormation stack, which continuously produces two sets of sample sensor data onto a single Amazon Kinesis stream and creates an Amazon Kinesis Analytics application that performs the joining.

  1. Open the CloudFormation console and choose Create Stack. Make sure the US East (N. Virginia) region is selected.
  2. Choose Specify an Amazon S3 template URL, enter or paste the following URL, and then choose Next.
    https://s3.amazonaws.com/aws-bigdata-blog/artifacts/Joining-and-Enriching-Streaming-Data-on-Amazon-Kinesis/demo-data-joining-multiple-streams-cfn-template.json
  3. For the stack name, type demo-data-joining-multiple-streams-stack. Review the parameters. If none of them conflict with your Amazon Kinesis stream, you can accept the default values and choose Next. The following steps are written with the assumption that you are using the default values.
  4. You will also see parameters referencing an S3 bucket and S3 key. This is an AWS Lambda package (written in Python) that contains the AWS CloudFormation custom resources to create an Amazon Kinesis application.
  5. Complete the remaining steps in the Create Stack wizard. Wait until the status displayed for the stack is CREATE_COMPLETE.

Open the Amazon Kinesis Analytics console and examine the application.  If you have not modified the default parameters in the CloudFormation template, the application name should be demo-data-joining-multiple-streams-application.

The Application status should be RUNNING.  If its status is still STARTING, wait until it changes to RUNNING.

Choose Application details, and then go to SQL results.

You should see a page similar to the following:

o_joiningenriching_9

The SQL statements have been populated for you. The script first creates two in-application streams (for engine and power supply temperature readings, respectively).  DESTINATION_SQL_STREAM holds the joined results.

On the Real-time analytics tab, you’ll see the average temperature readings of the engine and power supply have been joined together using the machine_id and per-minute tumbling window.  The results can be written to an Amazon Kinesis stream or to S3, Amazon Redshift, or Amazon Elastisearch Service with a Firehose delivery stream.

o_joiningenriching_10

To clean up resources created for this demo

  1. To delete the Amazon Kinesis Analytics application, open the Amazon Kinesis Analytics console, choose demo-data-joining-multiple-streams-application, choose Actions, and then choose Delete application.
  2. Open the CloudFormation console, choose demo-data-joining-multiple-streams-stack, choose Actions, and then choose Delete Stack.

Summary

In this blog post, I shared three approaches for joining and enriching streaming data on Amazon Kinesis Streams by using Amazon Kinesis Analytics, AWS Lambda, and Amazon DynamoDB.  I hope this information will be helpful in situations when your real-time analytics applications require additional data fields from reference data sources or the real-time insights must be derived from data across multiple streaming data sources.  These joining and enrichment techniques will help you can get the best business value from your data.

If you have a question or suggestion, please leave a comment below.


About the author

 

assaf_mentzer_90Assaf Mentzer is a Senior Consultant in Big Data & Analytics for AWS Professional Services. He works with enterprise customers to provide leadership on big data projects, helping them reach their full data potential in the cloud. In his spare time, he enjoys watching sports, playing ping pong, and hanging out with family and friends.

 

 


Related

 

Writing SQL on Streaming Data with Amazon Kinesis Analytics

writingsql_image1


Interactive Analysis of Genomic Datasets Using Amazon Athena

Post Syndicated from Aaron Friedman original https://aws.amazon.com/blogs/big-data/interactive-analysis-of-genomic-datasets-using-amazon-athena/

Aaron Friedman is a Healthcare and Life Sciences Solutions Architect with Amazon Web Services

The genomics industry is in the midst of a data explosion. Due to the rapid drop in the cost to sequence genomes, genomics is now central to many medical advances. When your genome is sequenced and analyzed, raw sequencing files are processed in a multi-step workflow to identify where your genome differs from a standard reference. Your variations are stored in a Variant Call Format (VCF) file, which is then combined with other individuals to enable population-scale analyses. Many of these datasets are publicly available, and an increasing number are hosted on AWS as part of our Open Data project.

To mine genomic data for new discoveries, researchers in both industry and academia build complex models to analyze populations at scale. When building models, they first explore the datasets-of-interest to understand what questions the data might answer. In this step, interactivity is key, as it allows them to move easily from one question to the next.

Recently, we launched Amazon Athena as an interactive query service to analyze data on Amazon S3. With Amazon Athena there are no clusters to manage and tune, no infrastructure to setup or manage, and customers pay only for the queries they run. Athena is able to query many file types straight from S3. This flexibility gives you the ability to interact easily with your datasets, whether they are in a raw text format (CSV/JSON) or specialized formats (e.g. Parquet). By being able to flexibly query different types of data sources, researchers can more rapidly progress through the data exploration phase for discovery. Additionally, researchers don’t have to know nuances of managing and running a big data system. This makes Athena an excellent complement to data warehousing on Amazon Redshift and big data analytics on Amazon EMR 

In this post, I discuss how to prepare genomic data for analysis with Amazon Athena as well as demonstrating how Athena is well-adapted to address common genomics query paradigms.  I use the Thousand Genomes dataset hosted on Amazon S3, a seminal genomics study, to demonstrate these approaches. All code that is used as part of this post is available in our GitHub repository.

Although this post is focused on genomic analysis, similar approaches can be applied to any discipline where large-scale, interactive analysis is required.

 

Select, aggregate, annotate query pattern in genomics

Genomics researchers may ask different questions of their dataset, such as:

  • What variations in a genome may increase the risk of developing disease?
  • What positions in the genome have abnormal levels of variation, suggesting issues in quality of sequencing or errors in the genomic reference?
  • What variations in a genome influence how an individual may respond to a specific drug treatment?
  • Does a group of individuals contain a higher frequency of a genomic variant known to alter response to a drug relative to the general population?

All these questions, and more, can be generalized under a common query pattern I like to call “Select, Aggregate, Annotate”. Some of our genomics customers, such as Human Longevity, Inc., routinely use this query pattern in their work.

In each of the above queries, you execute the following steps:

SELECT: Specify the cohort of individuals meeting certain criteria (disease, drug response, age, BMI, entire population, etc.).

AGGREGATE: Generate summary statistics of genomic variants across the cohort that you selected.

ANNOTATE: Assign meaning to each of the variants by joining on known information about each variant.

Dataset preparation

Properly organizing your dataset is one of the most critical decisions for enabling fast, interactive analysies. Based on the query pattern I just described, the table representing your population needs to have the following information:

  • A unique sample ID corresponding to each sample in your population
  • Information about each variant, specifically its location in the genome as well as the specific deviation from the reference
  • Information about how many times in a sample a variant occurs (0, 1, or 2 times) as well as if there are multiple variants in the same site. This is known as a genotype.

The extract, transform, load (ETL) process to generate the appropriate data representation has two main steps. First, you use ADAM, a genomics analysis platform built on top of Spark, to convert the variant information residing a VCF file to Parquet for easier downstream analytics, in a process similar to the one described in the Will Spark Power the Data behind Precision Medicine? post. Then, you use custom Python code to massage the data and select only the appropriate fields that you need for analysis with Athena.

First, spin up an EMR cluster (version 5.0.3) for the ETL process. I used a c4.xlarge for my master node and m4.4xlarges with 1 TB of scratch for my core nodes.

After you SSH into your master node, clone the git repository. You can also put this in as a bootstrap action when spinning up your cluster.

sudo yum –y install git
git clone https://github.com/awslabs/aws-big-data-blog.git 

You then need to install ADAM and configure it to run with Spark 2.0. In your terminal, enter the following:

# Install Maven
wget http://apache.claz.org/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
tar xvzf apache-maven-3.3.9-bin.tar.gz
export PATH=/home/hadoop/apache-maven-3.3.9/bin:$PATH

# Install ADAM
git clone https://github.com/bigdatagenomics/adam.git
cd adam
sed -i 's/2.6.0/2.7.2/' pom.xml
./scripts/move_to_spark_2.sh
export MAVEN_OPTS="-Xmx512m -XX:MaxPermSize=256m"
mvn clean package -DskipTests

export PATH=/home/hadoop/adam/bin:$PATH

Now that ADAM is installed, enter the working directory of the repo that you first cloned, which contains the code relevant to this post. Your next step is to convert a VCF containing all the information about your population.

For this post, you are going to focus on a single region in the genome, defined as chromosome 22, for all 2,504 samples in the Thousand Genomes dataset. As chromosome 22 was the first chromosome to be sequenced as part of the Human Genome Project, it is your first here as well. You can scale the size of your cluster depending on whether you are looking at just chromosome 22, or the entire genome. In the following lines, replace mytestbucket with your bucket name of choice.

PARQUETS3PATH=s3://<mytestbucket>/thousand_genomes/grch37/chr22.parquet/
cd ~/aws-big-data-blog/aws-blog-athena-genomics/etl/thousand_genomes/
chmod +x ./convert_vcf.sh
./convert_vcf.sh $PARQUETS3PATH

After this conversion completes, you can reduce your Parquet file to only the fields you need.

TRIMMEDS3PATH=s3://<mytestbucket>/thousand_genomes/grch37_trimmed/chr22.parquet/
spark-submit --master yarn --executor-memory 40G --deploy-mode cluster create_trimmed_parquet.py --input_s3_path $PARQUETS3PATH --output_s3_path $TRIMMEDS3PATH 

Annotation data preparation

For this post, use the ClinVar dataset, which is an archive of information about variation in genomes and health. While Parquet is a preferred format for Athena relative to raw text, using the ClinVar TXT file demonstrates the ability of Athena to read multiple file types.

In a Unix terminal, execute the following commands (replace mytestbucket with your bucket name of choice):

ANNOPATH=s3://<mytestbucket>/clinvar/

wget ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/tab_delimited/variant_summary.txt.gz

# Need to strip out the first line (header)
zcat variant_summary.txt.gz | sed '1d' > temp ; mv -f temp variant_summary.trim.txt ; gzip variant_summary.trim.txt

aws s3 cp variant_summary.trim.txt.gz $ANNOPATH --sse

Creating Tables using Amazon Athena

In the recent Amazon Athena – Interactive SQL Queries for Data in Amazon S3 post, Jeff Barr covered how to get started with Athena and navigate to the console. Athena uses the Hive DDL when you create, alter, or drop a table. This means you can use the standard Hive syntax for creating tables. These commands can also be found in the aws-blog-athena-genomics GitHub repo under sql/setup.

First, create your database:

CREATE DATABASE kg;

Next, define your table with the appropriate mappings from the previous ETL processes. Be sure to change to your appropriate bucket name.

For your population data:

If you want to skip the ETL process described above for your population data, you can use the following path for your analysis: s3://aws-bigdata-blog/artifacts/athena_genomics.parquet/ – be sure to substitute it in for the LOCATION below.

CREATE EXTERNAL TABLE demo.samplevariants
(
  alternateallele STRING
  ,chromosome STRING
  ,endposition BIGINT
  ,genotype0 STRING
  ,genotype1 STRING
  ,referenceallele STRING
  ,sampleid STRING
  ,startposition BIGINT
)
STORED AS PARQUET
LOCATION ' s3://<mytestbucket>/thousand_genomes/grch37_trimmed/chr22.parquet/';

For your annotation data:

CREATE EXTERNAL TABLE demo.clinvar
(
    alleleId STRING
    ,variantType STRING
    ,hgvsName STRING
    ,geneID STRING
    ,geneSymbol STRING
    ,hgncId STRING
    ,clinicalSignificance STRING
    ,clinSigSimple STRING
    ,lastEvaluated STRING
    ,rsId STRING
    ,dbVarId STRING
    ,rcvAccession STRING
    ,phenotypeIds STRING
    ,phenotypeList STRING
    ,origin STRING
    ,originSimple STRING
    ,assembly STRING
    ,chromosomeAccession STRING
    ,chromosome STRING
    ,startPosition INT
    ,endPosition INT
    ,referenceAllele STRING
    ,alternateAllele STRING
    ,cytogenetic STRING
    ,reviewStatus STRING
    ,numberSubmitters STRING
    ,guidelines STRING
    ,testedInGtr STRING
    ,otherIds STRING
    ,submitterCategories STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION 's3://<mytestbucket>/clinvar/';

You can confirm that both tables have been created by choosing the eye icon next to a table name in the console. This automatically runs a query akin to SELECT * FROM table LIMIT 10. If it succeeds, then your data has been loaded successfully.

o_athena_genomes_1

Applying the select, aggregate, annotate paradigm

In this section, I walk you through how to use the query pattern described earlier to answer two common questions in genomics. You examine the frequency of different genotypes, which is a distinct combination of all fields in the samplevariants table with the exception of sampleid. These queries can also be found in the aws-blog-athena-genomics GitHub repo under sql/queries.

Population drug response

What small molecules/drugs are most likely to affect a subpopulation of individuals (ancestry, age, etc.) based on their genomic information?

In this query, assume that you have some phenotype data about your population. In this case, also assume that all samples sharing the pattern “NA12” are part of a specific demographic.

In this query, use sampleid as your predicate pushdown. The general steps are:

  1. Filter by the samples in your subpopulation
  2. Aggregate variant frequencies for the subpopulation-of-interest
  3. Join on ClinVar dataset
  4. Filter by variants that have been implicated in drug-response
  5. Order by highest frequency variants

To answer this question, you can craft the following query:

SELECT
    count(*)/cast(numsamples AS DOUBLE) AS genotypefrequency
    ,cv.rsid
    ,cv.phenotypelist
    ,sv.chromosome
    ,sv.startposition
    ,sv.endposition
    ,sv.referenceallele
    ,sv.alternateallele
    ,sv.genotype0
    ,sv.genotype1
FROM demo.samplevariants sv
CROSS JOIN
    (SELECT count(1) AS numsamples
    FROM
        (SELECT DISTINCT sampleid
        FROM demo.samplevariants
        WHERE sampleid LIKE 'NA12%'))
JOIN demo.clinvar cv
ON sv.chromosome = cv.chromosome
    AND sv.startposition = cv.startposition - 1
    AND sv.endposition = cv.endposition
    AND sv.referenceallele = cv.referenceallele
    AND sv.alternateallele = cv.alternateallele
WHERE assembly='GRCh37'
    AND cv.clinicalsignificance LIKE '%response%'
    AND sampleid LIKE 'NA12%'
GROUP BY  sv.chromosome
          ,sv.startposition
          ,sv.endposition
          ,sv.referenceallele
          ,sv.alternateallele
          ,sv.genotype0
          ,sv.genotype1
          ,cv.clinicalsignificance
          ,cv.phenotypelist
          ,cv.rsid
          ,numsamples
ORDER BY  genotypefrequency DESC LIMIT 50

Enter the query into the console and you see something like the following:

o_athena_genomes_2

When you inspect the results, you can quickly see (often in a matter of seconds!) that this population of individuals has a high frequency of variants associated with the metabolism of debrisoquine.

o_athena_genomes_3

Quality control

Are you systematically finding locations in your reference that are called as variation? In other words, what positions in the genome have abnormal levels of variation, suggesting issues in quality of sequencing or errors in the reference? Are any of these variants implicated in clinically? If so, could this be a false positive clinical finding?

In this query, the entire population is needed so the SELECT predicate is not used. The general steps are:

  1. Aggregate variant frequencies for the entire Thousand Genomes population
  2. Join on the ClinVar dataset
  3. Filter by variants that have been implicated in disease
  4. Order by the highest frequency variants

This translates into the following query:

SELECT
    count(*)/cast(numsamples AS DOUBLE) AS genotypefrequency
    ,cv.clinicalsignificance
    ,cv.phenotypelist
    ,sv.chromosome
    ,sv.startposition
    ,sv.endposition
    ,sv.referenceallele
    ,sv.alternateallele
    ,sv.genotype0
    ,sv.genotype1
FROM demo.samplevariants sv
CROSS JOIN
    (SELECT count(1) AS numsamples
    FROM
        (SELECT DISTINCT sampleid
        FROM demo.samplevariants))
    JOIN demo.clinvar cv
    ON sv.chromosome = cv.chromosome
        AND sv.startposition = cv.startposition - 1
        AND sv.endposition = cv.endposition
        AND sv.referenceallele = cv.referenceallele
        AND sv.alternateallele = cv.alternateallele
WHERE assembly='GRCh37'
        AND cv.clinsigsimple='1'
GROUP BY  sv.chromosome ,
          sv.startposition
          ,sv.endposition
          ,sv.referenceallele
          ,sv.alternateallele
          ,sv.genotype0
          ,sv.genotype1
          ,cv.clinicalsignificance
          ,cv.phenotypelist
          ,numsamples
ORDER BY  genotypefrequency DESC LIMIT 50

As you can see in the following screenshot, the highest frequency results have conflicting information, being listed both as potentially causing disease and being benign. In genomics, variants with a higher frequency are less likely to cause disease. Your quick analysis allows you to discount the pathogenic clinical significance annotations of these high genotype frequency variants.

o_athena_genomes_4

 

Summary

I hope you have seen how you can easily integrate datasets of different types and values, and combine them to quickly derive new meaning from your data. Do you have a dataset you want to explore or want to learn more about Amazon Athena? Check out our Getting Started Guide and happy querying!

If you have questions or suggestions, please comment below.


About the author

friedman_90Dr. Aaron Friedman a Healthcare and Life Sciences Partner Solutions Architect at Amazon Web Services. He works with our ISVs and SIs to architect healthcare solutions on AWS, and bring the best possible experience to their customers. His passion is working at the intersection of science, big data, and software. In his spare time, he’s out hiking or learning a new thing to cook.

 

 


Related

Analyzing Data in S3 using Amazon Athena

o_athena_10

Implementing Authorization and Auditing using Apache Ranger on Amazon EMR

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/implementing-authorization-and-auditing-using-apache-ranger-on-amazon-emr/

Varun Rao is a Big Data Architect for AWS Professional Services

Role-based access control (RBAC) is an important security requirement for multi-tenant Hadoop clusters. Enforcing this across always-on and transient clusters can be hard to set up and maintain.

Imagine an organization that has an RBAC matrix using Active Directory users and groups. They would like to manage it on a central security policy server and enforce it on all Hadoop clusters that are spun up on AWS. This policy server should also store access and audit information for compliance needs.

In this post, I provide the steps to enable authorization and audit for Amazon EMR clusters using Apache Ranger.

Apache Ranger

Apache Ranger is a framework to enable, monitor, and manage comprehensive data security across the Hadoop platform. Features include centralized security administration, fine-grained authorization across many Hadoop components (Hadoop, Hive, HBase, Storm, Knox, Solr, Kafka, and YARN) and central auditing. It uses agents to sync policies and users, and plugins that run within the same process as the Hadoop component, like NameNode and HiveServer2.

Architecture

Using the setup in the following diagram, multiple EMR clusters can sync policies with a standalone security policy server. The idea is similar to a shared Hive metastore that can be used across EMR clusters.

EMRRanger_1

Walkthrough

In this walkthrough, three users—analyst1, analyst2, and admin1—are set up for the initial authorization, as shown in the following diagram. Using the Ranger Admin UI, I show how to modify these access permissions. These changes are propagated to the EMR cluster and validated through Hue.

o_EMRRanger_2

To manage users/groups/credentials, we will use Simple AD, a managed directory service offered by AWS Directory Service. A Windows EC2 instance will be setup to join the SimpleAD domain and load users/groups using a PowerShell script. A stand-alone security policy server (Ranger) and EMR cluster will be setup and configured. Finally, we will update the security policies and test the changes.

Prerequisites

The following steps assume that you have a VPC with at least two subnets, with NAT configured for private subnets. Also, verify that DNS Resolution (enableDnsSupport) and DNS Hostnames (enableDnsHostnames) are set to Yes on the VPC. The EC2 instance created in the steps below can be used as bastion if launched in a public subnet. If no public subnets are selected, you will need a bastion host or a VPN connection to login to the windows instance and access Web UI links (Hue, Ranger).

I have created AWS CloudFormation templates for each step and a nested CloudFormation template for single-click deployment launch_stack. If you use this nested Cloudformation template, skip to the “Testing the cluster” step after the stack has been successfully created.

To create each component individually, follow the steps below.

IMPORTANT: The templates use hard-coded username and passwords, and open security groups. They are not intended for production use without modification.

Setting up a SimpleAD server

Using this CloudFormation template, set up a SimpleAD server. To launch the stack directly through the console, use launch_stack. It takes the following parameters:

EMRRanger_1_1

CloudFormation output:

EMRRanger_Grid2

NOTE: SimpleAD creates two servers for high availability. For the following steps, you can use either of the two IP addresses.

Creating a Windows EC2 instance

To manage the SimpleAD server, set up a Windows instance. It is used to load LDAP users required to test the access policies. On instance startup, a PowerShell script is executed automatically to load users (analyst1, analyst2, admin1).

Using this CloudFormation template, set up this Windows instance. Select a public subnet if you want to use this as a bastion host to access Web UI (Hue, Ranger). To launch the stack directly through the console, use launch_stack. It takes the following parameters:

EMRRanger_3_2

You can specify either of the two SimpleAD IP addresses.

CloudFormation output:

EMRRanger_Grid4

Once stack creation is complete, Remote desktop into this instance using the SimpleAD username (EmrSimpleAD\Administrator) and password ([email protected]) before moving to the next step.

NOTE: The instance initialization is longer than usual because of the SimpleAD Join and PowerShell scripts that need to be executed after the join.

Setting up the Ranger server

Now that SimpleAD has been created and the users loaded, you are ready to set up the security policy server (Ranger). This runs on a standard Amazon Linux instance and Ranger is installed and configured on startup.

Using this CloudFormation template, set up the Ranger server. To launch the stack directly through the console, use launch_stack. It takes the following parameters:

EMRRanger_5_1

CloudFormation output:

EMRRanger_Grid6

NOTE: The Ranger server syncs users with SimpleAD and enables LDAP authentication for the Admin UI. The default Ranger Admin password is not changed.

Creating an EMR cluster

Finally, it’s time to create the EMR cluster and configure it with the required plugins. You can use the AWS CLI or CloudFormation to create and configure the cluster. EMR security configurations are not currently supported by CloudFormation.

Using the AWS CLI to create a cluster

aws emr create-cluster --applications Name=Hive Name=Spark Name=Hue --tags 'Name=EMR-Security' \
--release-label emr-5.0.0 \
--ec2-attributes 'SubnetId=<subnet-xxxxx>,InstanceProfile=EMR_EC2_DefaultRole,KeyName=<key name>' \
--service-role EMR_DefaultRole \
--instance-count 4 \
--instance-type m3.2xlarge \
--log-uri '<s3 location for logging>' \
--name 'SecurityPOCCluster' --region us-east-1 \
--bootstrap-actions '[{"Path":"s3://aws-bigdata-blog/artifacts/aws-blog-emr-ranger/scripts/download-scripts.sh","Args":["s3://aws-bigdata-blog/artifacts/aws-blog-emr-ranger"],"Name":"Download scripts"}]' \
--steps '[{"Args":["/mnt/tmp/aws-blog-emr-ranger/scripts/emr-steps/updateHueLdapUrl.sh","<ip address of simple ad server>"],"Type":"CUSTOM_JAR","MainClass":"","ActionOnFailure":"CONTINUE","Jar":"s3://elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"UpdateHueLdapServer"},{"Args":["/mnt/tmp/aws-blog-emr-ranger/scripts/emr-steps/install-hive-hdfs-ranger-policies.sh","<ranger host ip>","s3://aws-bigdata-blog/artifacts/aws-blog-emr-ranger/inputdata"],"Type":"CUSTOM_JAR","MainClass":"","ActionOnFailure":"CONTINUE","Jar":"s3://elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"InstallRangerPolicies"},{"Args":["spark-submit","--deploy-mode","cluster","--class","org.apache.spark.examples.SparkPi","/usr/lib/spark/examples/jars/spark-examples.jar","10"],"Type":"CUSTOM_JAR","MainClass":"","ActionOnFailure":"CONTINUE","Jar":"command-runner.jar","Properties":"","Name":"SparkStep"},{"Args":["/mnt/tmp/aws-blog-emr-ranger/scripts/emr-steps/install-hive-hdfs-ranger-plugin.sh","<ranger host ip>","0.6","s3://aws-bigdata-blog/artifacts/aws-blog-emr-ranger"],"Type":"CUSTOM_JAR","MainClass":"","ActionOnFailure":"CONTINUE","Jar":"s3://elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"InstallRangerPlugin"},{"Args":["/mnt/tmp/aws-blog-emr-ranger/scripts/emr-steps/loadDataIntoHDFS.sh","us-east-1"],"Type":"CUSTOM_JAR","MainClass":"","ActionOnFailure":"CONTINUE","Jar":"s3://elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"LoadHDFSData"},{"Args":["/mnt/tmp/aws-blog-emr-ranger/scripts/emr-steps/createHiveTables.sh","us-east-1"],"Type":"CUSTOM_JAR","MainClass":"","ActionOnFailure":"CONTINUE","Jar":"s3://elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"CreateHiveTables"}]' \
--configurations '[{"Classification":"hue-ini","Properties":{},"Configurations":[{"Classification":"desktop","Properties":{},"Configurations":[{"Classification":"auth","Properties":{"backend":"desktop.auth.backend.LdapBackend"},"Configurations":[]},{"Classification":"ldap","Properties":{"bind_dn":"binduser","trace_level":"0","search_bind_authentication":"false","debug":"true","base_dn":"dc=corp,dc=emr,dc=local","bind_password":"[email protected]","ignore_username_case":"true","create_users_on_login":"true","ldap_username_pattern":"uid=<username>,cn=users,dc=corp,dc=emr,dc=local","force_username_lowercase":"true","ldap_url":"ldap://<ip address of simple ad server>","nt_domain":"corp.emr.local"},"Configurations":[{"Classification":"groups","Properties":{"group_filter":"objectclass=*","group_name_attr":"cn"},"Configurations":[]},{"Classification":"users","Properties":{"user_name_attr":"sAMAccountName","user_filter":"objectclass=*"},"Configurations":[]}]}]}]}]'

The LDAP-related configuration for HUE is passed using the --configurations option. For more information, see Configure Hue for LDAP Users and the EMR create-cluster CLI reference.

Using a CloudFormation template to create a cluster

This step requires some Hue configuration changes in the CloudFormation template. The IP address of the LDAP server (SimpleAD) needs to be updated.

  1. Open the template in CloudFormation Designer. For more information about how to modify a CloudFormation template, see Walkthrough: Use AWS CloudFormation Designer to Modify a Stack’s Template.
  2. Choose EMRSampleCluster.
  3. On the Properties section, update the value of ldap_url with the IP address of the SimpleAD server:
    "ldap_url": "ldap://<change it to the SimpleAD IP address>",
  4. On the Designer toolbar, choose Validate template to check for syntax errors in your template.
  5. Choose Create Stack.
  6. Update Stack name and the stack parameters.

CloudFormation parameters:

EMRRanger_7_1

CloudFormation output:

EMRRanger_Grid8

EMR steps are used to perform the following:

  • Install and configure Ranger HDFS and Hive plugins
  • Use the Ranger REST API to update repository and authorization policies.
    NOTE: This step needs to be executed the first time. New clusters do not need to include this step action.
  • Create Hive tables (tblAnalyst1 and tblAnalyst2) and copy sample data.
  • Create HDFS folders (/user/analyst1 and /user/analyst2) and copy sample data.
  • Run a SparkPi job using the spark submit action to verify the cluster setup.

To validate that all the step actions were executed successfully, view the Step section for the EMR cluster.

o_EMRRanger_3

NOTE: Cluster creation can take anywhere between 10-15 minutes.

Testing the cluster

Congratulations! You have successfully configured the EMR cluster with the ability to manage authorization policies, using Ranger. How do you know if it actually works? You can test this by accessing HDFS files and running Hive queries.

Using HDFS

Log in to Hue (URL: http://<master DNS or IP>:8888) as “analyst1” and try to delete a file owned by “analyst2”. For more information about how to access Hue, see Launch the Hue Web Interface. The Windows EC2 instance created in the previous steps can be used to access this without having to setup a SSH tunnel.

  1. Log in as user “analyst1” (password: [email protected]).
  2. Browse to the /user/analyst2 HDFS directory and move the file “football_coach_position.tsv” to trash.
  3. You should see a “Permission denied” error, which is expected.
    o_EMRRanger_4

Using Hive queries

Using the HUE SQL Editor, execute the following query.

These queries use external tables, and Hive leverages EMRFS to access the data stored in S3. Because HiveServer2 (where Hue is submitting these queries) is checking with Ranger to grant or deny before accessing any data in S3, you can create fine-grained SQL-based permissions for users even though there is a single EC2 role specified for the cluster (which is used by all requests the cluster makes to S3). For more information, see Additional Features of Hive on Amazon EMR.

SELECT * FROM default.tblanalyst1

This should return the results as expected. Now, run the following query:

SELECT * FROM default.tblanalyst2

You should see the following error:

o_EMRRanger_5

This makes sense. User analyst1 does not have table SELECT permissions on table tblanalyst2.

User analyst2 (default password: [email protected]) should see a similar error when accessing table tblanalyst1. User admin1 (default password: [email protected]) should be able to run both queries.

Updating the security policies

You have verified that the policies are being enforced. Now, let’s try to update them.

  1. Log in to the Ranger Admin UI server
    • URL: http:://<ip address of the ranger server>:6080/login.jsp
    • Default admin username/password: admin/admin.
  2. View all the Ranger Hive policies by selecting “hivedev”
    o_EMRRanger_6
  3. Select the policy named “Analyst2Policy”
  4. Edit the policy by adding “analyst1” user with “select” permissions for table “tblanalyst2”
    EMRRanger_7
  5. Save the changes.

This policy change is pulled in by the Hive plugin on the EMR cluster. Give it at least 60 seconds for the policy refresh to happen.

Go back to Hue to test if this change has been propagated.

  1. Log back in to the Hue UI as user “analyst1” (see earlier steps).
  2. In the Hive SQL Editor, run the query that failed earlier:
    SELECT * FROM default.tblanalyst2

This query should now run successfully.

o_EMRRanger_8

Audits

Can you now find those who tried to access the Hive tables and see if they were “denied” or “allowed”?

  1. Log back in to the Ranger UI as admin (see earlier steps).
    URL: http://<ip address of the ranger server>:6080/login.jsp
  2. Choose Audit and filter by “analyst1”.
    • Analyst1 was denied SELECT access to the tblanalyst2 table.
      o_EMRRanger_9
    • After the policy change, the access was granted and logged.
      o_EMRRanger_10

The same audit information is also stored in SOLR for performing more complex and full test searches. The SOLR instance is installed on the same instance as the Ranger server.

  • Open Solr UI:
    http://<ip-address-of-ranger-server>:8983/solr/#/ranger_audits/query
  • Perform a document search
    o_EMRRanger_11

Direct URL: http:// <ip-address-of-ranger-server>:8983/solr/ranger_audits/select?q=*%3A*&wt=json&indent=true

Conclusion

In this post, I walked through the steps required to enable authorization and audit capabilities on EMR using Apache Ranger, with a centrally managed security policy server. I also covered the steps to automate this using CloudFormation templates.

Stay tuned for more posts about security on EMR. If you have questions or suggestions, please comment below.

For information about other EMR security aspects, see Jeff Barr’s posts:


About the author


varun_90Varun Rao is a Big Data Architect for AWS Professional Services.
He works with enterprise customers to define data strategy in the cloud. In his spare time, he tries to keep up with his 2-year-old.

 

 


Related

Encrypt Data At-Rest and In-Flight on Amazon EMR with Security Configurations

security_config